feat(block): Add enriching email feature for SearchPeopleBlock & introduce GetPersonDetailBlock (#10251)

<!-- Clearly explain the need for these changes: -->

### Changes 🏗️

* Add an enriching email feature toggle for SearchPeopleBlock
* Introduce GetPersonDetailBlock
* Adjust the cost of both blocks

### Checklist 📋

#### For code changes:
- [x] I have clearly listed my changes in the PR description
- [x] I have made a test plan
- [x] I have tested my changes according to the test plan:
  <!-- Put your test plan here: -->
  - [x] Execute SearchPeopleBlock & GetPersonDetailBlock
This commit is contained in:
Zamil Majdy
2025-06-26 13:11:43 -07:00
committed by GitHub
parent 2f12e8d731
commit e01dd94e36
5 changed files with 297 additions and 91 deletions

View File

@@ -4,6 +4,7 @@ from typing import List
from backend.blocks.apollo._auth import ApolloCredentials
from backend.blocks.apollo.models import (
Contact,
EnrichPersonRequest,
Organization,
SearchOrganizationsRequest,
SearchOrganizationsResponse,
@@ -110,3 +111,33 @@ class ApolloClient:
return (
organizations[: query.max_results] if query.max_results else organizations
)
async def enrich_person(self, query: EnrichPersonRequest) -> Contact:
"""Enrich a person's data including email reveal"""
# Extract reveal parameters for query string
reveal_emails = query.reveal_personal_emails
reveal_phone = query.reveal_phone_number
# Create request data without reveal parameters
request_data = query.model_dump(
exclude={"reveal_personal_emails", "reveal_phone_number"}
)
# Build query parameters
params = {}
if reveal_emails:
params["reveal_personal_emails"] = "true"
if reveal_phone:
params["reveal_phone_number"] = "true"
response = await self.requests.post(
f"{self.API_URL}/people/match",
headers=self._get_headers(),
json=request_data,
params=params,
)
data = response.json()
if "person" not in data:
raise ValueError(f"Person not found or enrichment failed: {data}")
return Contact(**data["person"])

View File

@@ -0,0 +1,154 @@
from backend.blocks.apollo._api import ApolloClient
from backend.blocks.apollo._auth import (
TEST_CREDENTIALS,
TEST_CREDENTIALS_INPUT,
ApolloCredentials,
ApolloCredentialsInput,
)
from backend.blocks.apollo.models import Contact, EnrichPersonRequest
from backend.data.block import Block, BlockCategory, BlockOutput, BlockSchema
from backend.data.model import CredentialsField, SchemaField
class GetPersonDetailBlock(Block):
"""Get detailed person data with Apollo API, including email reveal"""
class Input(BlockSchema):
person_id: str = SchemaField(
description="Apollo person ID to enrich (most accurate method)",
default="",
advanced=False,
)
first_name: str = SchemaField(
description="First name of the person to enrich",
default="",
advanced=False,
)
last_name: str = SchemaField(
description="Last name of the person to enrich",
default="",
advanced=False,
)
name: str = SchemaField(
description="Full name of the person to enrich (alternative to first_name + last_name)",
default="",
advanced=False,
)
email: str = SchemaField(
description="Known email address of the person (helps with matching)",
default="",
advanced=False,
)
domain: str = SchemaField(
description="Company domain of the person (e.g., 'google.com')",
default="",
advanced=False,
)
company: str = SchemaField(
description="Company name of the person",
default="",
advanced=False,
)
linkedin_url: str = SchemaField(
description="LinkedIn URL of the person",
default="",
advanced=False,
)
organization_id: str = SchemaField(
description="Apollo organization ID of the person's company",
default="",
advanced=True,
)
title: str = SchemaField(
description="Job title of the person to enrich",
default="",
advanced=True,
)
reveal_personal_emails: bool = SchemaField(
description="Whether to reveal personal email addresses (consumes Apollo credits)",
default=True,
advanced=False,
)
reveal_phone_number: bool = SchemaField(
description="Whether to reveal phone numbers (consumes Apollo credits)",
default=False,
advanced=True,
)
credentials: ApolloCredentialsInput = CredentialsField(
description="Apollo credentials",
)
class Output(BlockSchema):
contact: Contact = SchemaField(
description="Enriched contact information",
)
error: str = SchemaField(
description="Error message if enrichment failed",
default="",
)
def __init__(self):
super().__init__(
id="3b18d46c-3db6-42ae-a228-0ba441bdd176",
description="Get detailed person data with Apollo API, including email reveal",
categories={BlockCategory.SEARCH},
input_schema=GetPersonDetailBlock.Input,
output_schema=GetPersonDetailBlock.Output,
test_credentials=TEST_CREDENTIALS,
test_input={
"credentials": TEST_CREDENTIALS_INPUT,
"first_name": "John",
"last_name": "Doe",
"company": "Google",
"reveal_personal_emails": True,
},
test_output=[
(
"contact",
Contact(
id="1",
name="John Doe",
first_name="John",
last_name="Doe",
email="john.doe@gmail.com",
title="Software Engineer",
organization_name="Google",
linkedin_url="https://www.linkedin.com/in/johndoe",
),
),
],
test_mock={
"enrich_person": lambda query, credentials: Contact(
id="1",
name="John Doe",
first_name="John",
last_name="Doe",
email="john.doe@gmail.com",
title="Software Engineer",
organization_name="Google",
linkedin_url="https://www.linkedin.com/in/johndoe",
)
},
)
@staticmethod
async def enrich_person(
query: EnrichPersonRequest, credentials: ApolloCredentials
) -> Contact:
client = ApolloClient(credentials)
return await client.enrich_person(query)
async def run(
self,
input_data: Input,
*,
credentials: ApolloCredentials,
**kwargs,
) -> BlockOutput:
try:
query = EnrichPersonRequest(**input_data.model_dump())
contact = await self.enrich_person(query, credentials)
yield "contact", contact
except Exception as e:
yield "error", str(e)

View File

@@ -560,3 +560,56 @@ class SearchPeopleResponse(BaseModel):
model_ids: list[str] = []
num_fetch_result: Optional[str] = "N/A"
derived_params: Optional[str] = "N/A"
class EnrichPersonRequest(BaseModel):
"""Request for Apollo's person enrichment API"""
person_id: Optional[str] = SchemaField(
description="Apollo person ID to enrich (most accurate method)",
default=None,
)
first_name: Optional[str] = SchemaField(
description="First name of the person to enrich",
default=None,
)
last_name: Optional[str] = SchemaField(
description="Last name of the person to enrich",
default=None,
)
name: Optional[str] = SchemaField(
description="Full name of the person to enrich",
default=None,
)
email: Optional[str] = SchemaField(
description="Email address of the person to enrich",
default=None,
)
domain: Optional[str] = SchemaField(
description="Company domain of the person to enrich",
default=None,
)
company: Optional[str] = SchemaField(
description="Company name of the person to enrich",
default=None,
)
linkedin_url: Optional[str] = SchemaField(
description="LinkedIn URL of the person to enrich",
default=None,
)
organization_id: Optional[str] = SchemaField(
description="Apollo organization ID of the person's company",
default=None,
)
title: Optional[str] = SchemaField(
description="Job title of the person to enrich",
default=None,
)
reveal_personal_emails: bool = SchemaField(
description="Whether to reveal personal email addresses (consumes credits)",
default=False,
)
reveal_phone_number: bool = SchemaField(
description="Whether to reveal phone numbers (consumes credits)",
default=False,
)

View File

@@ -1,3 +1,5 @@
import asyncio
from backend.blocks.apollo._api import ApolloClient
from backend.blocks.apollo._auth import (
TEST_CREDENTIALS,
@@ -8,6 +10,7 @@ from backend.blocks.apollo._auth import (
from backend.blocks.apollo.models import (
Contact,
ContactEmailStatuses,
EnrichPersonRequest,
SearchPeopleRequest,
SenorityLevels,
)
@@ -90,10 +93,15 @@ class SearchPeopleBlock(Block):
advanced=False,
)
max_results: int = SchemaField(
description="""The maximum number of results to return. If you don't specify this parameter, the default is 100.""",
default=100,
description="""The maximum number of results to return. If you don't specify this parameter, the default is 25. Limited to 500 to prevent overspending.""",
default=25,
ge=1,
le=50000,
le=500,
advanced=True,
)
enrich_info: bool = SchemaField(
description="""Whether to enrich contacts with detailed information including real email addresses. This will double the search cost.""",
default=False,
advanced=True,
)
@@ -106,10 +114,6 @@ class SearchPeopleBlock(Block):
description="List of people found",
default_factory=list,
)
person: Contact = SchemaField(
title="Person",
description="Each found person, one at a time",
)
error: str = SchemaField(
description="Error message if the search failed",
default="",
@@ -125,87 +129,6 @@ class SearchPeopleBlock(Block):
test_credentials=TEST_CREDENTIALS,
test_input={"credentials": TEST_CREDENTIALS_INPUT},
test_output=[
(
"person",
Contact(
contact_roles=[],
id="1",
name="John Doe",
first_name="John",
last_name="Doe",
linkedin_url="https://www.linkedin.com/in/johndoe",
title="Software Engineer",
organization_name="Google",
organization_id="123456",
contact_stage_id="1",
owner_id="1",
creator_id="1",
person_id="1",
email_needs_tickling=True,
source="apollo",
original_source="apollo",
headline="Software Engineer",
photo_url="https://www.linkedin.com/in/johndoe",
present_raw_address="123 Main St, Anytown, USA",
linkededin_uid="123456",
extrapolated_email_confidence=0.8,
salesforce_id="123456",
salesforce_lead_id="123456",
salesforce_contact_id="123456",
saleforce_account_id="123456",
crm_owner_id="123456",
created_at="2021-01-01",
emailer_campaign_ids=[],
direct_dial_status="active",
direct_dial_enrichment_failed_at="2021-01-01",
email_status="active",
email_source="apollo",
account_id="123456",
last_activity_date="2021-01-01",
hubspot_vid="123456",
hubspot_company_id="123456",
crm_id="123456",
sanitized_phone="123456",
merged_crm_ids="123456",
updated_at="2021-01-01",
queued_for_crm_push=True,
suggested_from_rule_engine_config_id="123456",
email_unsubscribed=None,
label_ids=[],
has_pending_email_arcgate_request=True,
has_email_arcgate_request=True,
existence_level=None,
email=None,
email_from_customer=None,
typed_custom_fields=[],
custom_field_errors=None,
salesforce_record_id=None,
crm_record_url=None,
email_status_unavailable_reason=None,
email_true_status=None,
updated_email_true_status=True,
contact_rule_config_statuses=[],
source_display_name=None,
twitter_url=None,
contact_campaign_statuses=[],
state=None,
city=None,
country=None,
account=None,
contact_emails=[],
organization=None,
employment_history=[],
time_zone=None,
intent_strength=None,
show_intent=True,
phone_numbers=[],
account_phone_note=None,
free_domain=True,
is_likely_to_engage=True,
email_domain_catchall=True,
contact_job_change_event=None,
),
),
(
"people",
[
@@ -380,6 +303,13 @@ class SearchPeopleBlock(Block):
client = ApolloClient(credentials)
return await client.search_people(query)
@staticmethod
async def enrich_person(
query: EnrichPersonRequest, credentials: ApolloCredentials
) -> Contact:
client = ApolloClient(credentials)
return await client.enrich_person(query)
async def run(
self,
input_data: Input,
@@ -390,6 +320,19 @@ class SearchPeopleBlock(Block):
query = SearchPeopleRequest(**input_data.model_dump())
people = await self.search_people(query, credentials)
for person in people:
yield "person", person
# Enrich with detailed info if requested
if input_data.enrich_info:
async def enrich_or_fallback(person):
try:
enrich_query = EnrichPersonRequest(person_id=person.person_id)
return await self.enrich_person(enrich_query, credentials)
except Exception:
return person # If enrichment fails, use original person data
people = await asyncio.gather(
*(enrich_or_fallback(person) for person in people)
)
yield "people", people

View File

@@ -2,6 +2,7 @@ from typing import Type
from backend.blocks.ai_music_generator import AIMusicGeneratorBlock
from backend.blocks.ai_shortform_video_block import AIShortformVideoCreatorBlock
from backend.blocks.apollo.get_people_detail import GetPersonDetailBlock
from backend.blocks.apollo.organization import SearchOrganizationsBlock
from backend.blocks.apollo.people import SearchPeopleBlock
from backend.blocks.flux_kontext import AIImageEditorBlock, FluxKontextModelName
@@ -362,7 +363,31 @@ BLOCK_COSTS: dict[Type[Block], list[BlockCost]] = {
],
SearchPeopleBlock: [
BlockCost(
cost_amount=2,
cost_amount=10,
cost_filter={
"enrich_info": False,
"credentials": {
"id": apollo_credentials.id,
"provider": apollo_credentials.provider,
"type": apollo_credentials.type,
},
},
),
BlockCost(
cost_amount=20,
cost_filter={
"enrich_info": True,
"credentials": {
"id": apollo_credentials.id,
"provider": apollo_credentials.provider,
"type": apollo_credentials.type,
},
},
),
],
GetPersonDetailBlock: [
BlockCost(
cost_amount=1,
cost_filter={
"credentials": {
"id": apollo_credentials.id,