mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-04-30 03:00:41 -04:00
<!-- Clearly explain the need for these changes: --> We want to support some more advanced search specific actions. These are the base API layers and sample blocks for some of the services we need. ### Changes 🏗️ <!-- Concisely describe all of the changes made in this pull request: --> - support pydantic models as an output format - add apollo - add smartlead - add zerobounce ### Checklist 📋 #### For code changes: - [x] I have clearly listed my changes in the PR description - [x] I have made a test plan - [x] I have tested my changes according to the test plan: <!-- Put your test plan here: --> - [x] Built agents to test --------- Co-authored-by: Krzysztof Czerwinski <34861343+kcze@users.noreply.github.com> Co-authored-by: Bently <tomnoon9@gmail.com>
109 lines
3.9 KiB
Python
109 lines
3.9 KiB
Python
import logging
|
|
from typing import List
|
|
|
|
from backend.blocks.apollo._auth import ApolloCredentials
|
|
from backend.blocks.apollo.models import (
|
|
Contact,
|
|
Organization,
|
|
SearchOrganizationsRequest,
|
|
SearchOrganizationsResponse,
|
|
SearchPeopleRequest,
|
|
SearchPeopleResponse,
|
|
)
|
|
from backend.util.request import Requests
|
|
|
|
logger = logging.getLogger(name=__name__)
|
|
|
|
|
|
class ApolloClient:
|
|
"""Client for the Apollo API"""
|
|
|
|
API_URL = "https://api.apollo.io/api/v1"
|
|
|
|
def __init__(self, credentials: ApolloCredentials):
|
|
self.credentials = credentials
|
|
self.requests = Requests()
|
|
|
|
def _get_headers(self) -> dict[str, str]:
|
|
return {"x-api-key": self.credentials.api_key.get_secret_value()}
|
|
|
|
def search_people(self, query: SearchPeopleRequest) -> List[Contact]:
|
|
"""Search for people in Apollo"""
|
|
response = self.requests.get(
|
|
f"{self.API_URL}/mixed_people/search",
|
|
headers=self._get_headers(),
|
|
params=query.model_dump(exclude={"credentials", "max_results"}),
|
|
)
|
|
parsed_response = SearchPeopleResponse(**response.json())
|
|
if parsed_response.pagination.total_entries == 0:
|
|
return []
|
|
|
|
people = parsed_response.people
|
|
|
|
# handle pagination
|
|
if (
|
|
query.max_results is not None
|
|
and query.max_results < parsed_response.pagination.total_entries
|
|
and len(people) < query.max_results
|
|
):
|
|
while (
|
|
len(people) < query.max_results
|
|
and query.page < parsed_response.pagination.total_pages
|
|
and len(parsed_response.people) > 0
|
|
):
|
|
query.page += 1
|
|
response = self.requests.get(
|
|
f"{self.API_URL}/mixed_people/search",
|
|
headers=self._get_headers(),
|
|
params=query.model_dump(exclude={"credentials", "max_results"}),
|
|
)
|
|
parsed_response = SearchPeopleResponse(**response.json())
|
|
people.extend(parsed_response.people[: query.max_results - len(people)])
|
|
|
|
logger.info(f"Found {len(people)} people")
|
|
return people[: query.max_results] if query.max_results else people
|
|
|
|
def search_organizations(
|
|
self, query: SearchOrganizationsRequest
|
|
) -> List[Organization]:
|
|
"""Search for organizations in Apollo"""
|
|
response = self.requests.get(
|
|
f"{self.API_URL}/mixed_companies/search",
|
|
headers=self._get_headers(),
|
|
params=query.model_dump(exclude={"credentials", "max_results"}),
|
|
)
|
|
parsed_response = SearchOrganizationsResponse(**response.json())
|
|
if parsed_response.pagination.total_entries == 0:
|
|
return []
|
|
|
|
organizations = parsed_response.organizations
|
|
|
|
# handle pagination
|
|
if (
|
|
query.max_results is not None
|
|
and query.max_results < parsed_response.pagination.total_entries
|
|
and len(organizations) < query.max_results
|
|
):
|
|
while (
|
|
len(organizations) < query.max_results
|
|
and query.page < parsed_response.pagination.total_pages
|
|
and len(parsed_response.organizations) > 0
|
|
):
|
|
query.page += 1
|
|
response = self.requests.get(
|
|
f"{self.API_URL}/mixed_companies/search",
|
|
headers=self._get_headers(),
|
|
params=query.model_dump(exclude={"credentials", "max_results"}),
|
|
)
|
|
parsed_response = SearchOrganizationsResponse(**response.json())
|
|
organizations.extend(
|
|
parsed_response.organizations[
|
|
: query.max_results - len(organizations)
|
|
]
|
|
)
|
|
|
|
logger.info(f"Found {len(organizations)} organizations")
|
|
return (
|
|
organizations[: query.max_results] if query.max_results else organizations
|
|
)
|