Compare commits

...

2 Commits

Author SHA1 Message Date
SwiftyOS
01531571cd proxycurl integration 2025-03-21 13:40:21 +01:00
SwiftyOS
6a9a5b7161 Airtable integration 2025-03-21 13:23:06 +01:00
18 changed files with 1991 additions and 1 deletions

View File

@@ -174,6 +174,12 @@ EXA_API_KEY=
E2B_API_KEY=
# Mem0
# Airtable
# Proxycurl
PROXYCURL_API_KEY=
AIRTABLE_API_KEY=
MEM0_API_KEY=
# Nvidia

View File

@@ -0,0 +1,440 @@
"""
API module for Airtable API integration.
This module provides a client for interacting with the Airtable API,
including methods for working with tables, fields, records, and webhooks.
"""
from json import JSONDecodeError
from typing import Any, Dict, List, Optional
from pydantic import BaseModel
from backend.data.model import APIKeyCredentials
from backend.util.request import Requests
class AirtableAPIException(Exception):
def __init__(self, message: str, status_code: int):
super().__init__(message)
self.status_code = status_code
# Response Models
class TableField(BaseModel):
id: str
name: str
type: str
options: Optional[Dict[str, Any]] = None
class Table(BaseModel):
id: str
name: str
description: Optional[str] = None
fields: List[TableField]
class Record(BaseModel):
id: str
fields: Dict[str, Any]
createdTime: Optional[str] = None
class RecordAttachment(BaseModel):
id: str
url: str
filename: str
size: Optional[int] = None
type: Optional[str] = None
class Webhook(BaseModel):
id: str
url: str
event: str
notification_url: Optional[str] = None
active: bool
class ListTablesResponse(BaseModel):
tables: List[Table]
class ListRecordsResponse(BaseModel):
records: List[Record]
offset: Optional[str] = None
class ListAttachmentsResponse(BaseModel):
attachments: List[RecordAttachment]
offset: Optional[str] = None
class ListWebhooksResponse(BaseModel):
webhooks: List[Webhook]
offset: Optional[str] = None
class AirtableClient:
"""Client for the Airtable API"""
API_BASE_URL = "https://api.airtable.com/v0"
def __init__(
self,
credentials: Optional[APIKeyCredentials] = None,
custom_requests: Optional[Requests] = None,
):
if custom_requests:
self._requests = custom_requests
else:
headers: dict[str, str] = {
"Content-Type": "application/json",
}
if credentials:
headers["Authorization"] = (
f"Bearer {credentials.api_key.get_secret_value()}"
)
self._requests = Requests(
extra_headers=headers,
raise_for_status=False,
)
@staticmethod
def _handle_response(response) -> Any:
"""
Handles API response and checks for errors.
Args:
response: The response object from the request.
Returns:
The parsed JSON response data.
Raises:
AirtableAPIException: If the API request fails.
"""
if not response.ok:
try:
error_data = response.json()
error_message = error_data.get("error", {}).get("message", "")
except JSONDecodeError:
error_message = response.text
raise AirtableAPIException(
f"Airtable API request failed ({response.status_code}): {error_message}",
response.status_code,
)
return response.json()
# Table Methods
def list_tables(self, base_id: str) -> ListTablesResponse:
"""
List all tables in a base.
Args:
base_id: The ID of the base to list tables from.
Returns:
ListTablesResponse: Object containing the list of tables.
Raises:
AirtableAPIException: If the API request fails.
"""
try:
response = self._requests.get(f"{self.API_BASE_URL}/bases/{base_id}/tables")
data = self._handle_response(response)
return ListTablesResponse(**data)
except Exception as e:
raise AirtableAPIException(f"Failed to list tables: {str(e)}", 500)
def get_table(self, base_id: str, table_id: str) -> Table:
"""
Get a specific table schema.
Args:
base_id: The ID of the base containing the table.
table_id: The ID of the table to retrieve.
Returns:
Table: The table object.
Raises:
AirtableAPIException: If the API request fails.
"""
try:
response = self._requests.get(
f"{self.API_BASE_URL}/bases/{base_id}/tables/{table_id}"
)
data = self._handle_response(response)
return Table(**data)
except Exception as e:
raise AirtableAPIException(f"Failed to get table: {str(e)}", 500)
def create_table(
self, base_id: str, name: str, description: str, fields: List[Dict[str, Any]]
) -> Table:
"""
Create a new table in a base.
Args:
base_id: The ID of the base to create the table in.
name: The name of the new table.
description: The description of the new table.
fields: The fields to create in the new table.
Returns:
Table: The created table object.
Raises:
AirtableAPIException: If the API request fails.
"""
try:
payload = {
"name": name,
"description": description,
"fields": fields,
}
response = self._requests.post(
f"{self.API_BASE_URL}/meta/bases/{base_id}/tables", json=payload
)
data = self._handle_response(response)
return Table(**data)
except Exception as e:
raise AirtableAPIException(f"Failed to create table: {str(e)}", 500)
# Field Methods
def list_fields(self, base_id: str, table_id: str) -> List[TableField]:
"""
List all fields in a table.
Args:
base_id: The ID of the base containing the table.
table_id: The ID of the table to list fields from.
Returns:
List[TableField]: List of field objects.
Raises:
AirtableAPIException: If the API request fails.
"""
try:
response = self._requests.get(
f"{self.API_BASE_URL}/bases/{base_id}/tables/{table_id}/fields"
)
data = self._handle_response(response)
return [TableField(**field) for field in data.get("fields", [])]
except Exception as e:
raise AirtableAPIException(f"Failed to list fields: {str(e)}", 500)
def get_field(self, base_id: str, table_id: str, field_id: str) -> TableField:
"""
Get a specific field.
Args:
base_id: The ID of the base containing the table.
table_id: The ID of the table containing the field.
field_id: The ID of the field to retrieve.
Returns:
TableField: The field object.
Raises:
AirtableAPIException: If the API request fails.
"""
try:
response = self._requests.get(
f"{self.API_BASE_URL}/bases/{base_id}/tables/{table_id}/fields/{field_id}"
)
data = self._handle_response(response)
return TableField(**data)
except Exception as e:
raise AirtableAPIException(f"Failed to get field: {str(e)}", 500)
def create_field(
self,
base_id: str,
table_id: str,
name: str,
field_type: str,
options: Optional[Dict[str, Any]] = None,
) -> TableField:
"""
Create a new field in a table.
Args:
base_id: The ID of the base containing the table.
table_id: The ID of the table to create the field in.
name: The name of the new field.
field_type: The type of the new field.
options: Optional field type options.
Returns:
TableField: The created field object.
Raises:
AirtableAPIException: If the API request fails.
"""
try:
payload = {
"name": name,
"type": field_type,
}
if options:
payload["options"] = options
response = self._requests.post(
f"{self.API_BASE_URL}/meta/bases/{base_id}/tables/{table_id}/fields",
json=payload,
)
data = self._handle_response(response)
return TableField(**data)
except Exception as e:
raise AirtableAPIException(f"Failed to create field: {str(e)}", 500)
# Record Methods
def list_records(
self,
base_id: str,
table_id: str,
filter_formula: Optional[str] = None,
offset: Optional[str] = None,
) -> ListRecordsResponse:
"""
List records in a table, with optional filtering.
Args:
base_id: The ID of the base containing the table.
table_id: The ID of the table to list records from.
filter_formula: Optional formula to filter records.
offset: Optional pagination offset.
Returns:
ListRecordsResponse: Object containing the list of records.
Raises:
AirtableAPIException: If the API request fails.
"""
try:
params = {}
if filter_formula:
params["filterByFormula"] = filter_formula
if offset:
params["offset"] = offset
response = self._requests.get(
f"{self.API_BASE_URL}/bases/{base_id}/tables/{table_id}/records",
params=params,
)
data = self._handle_response(response)
return ListRecordsResponse(**data)
except Exception as e:
raise AirtableAPIException(f"Failed to list records: {str(e)}", 500)
def get_record(self, base_id: str, table_id: str, record_id: str) -> Record:
"""
Get a specific record.
Args:
base_id: The ID of the base containing the table.
table_id: The ID of the table containing the record.
record_id: The ID of the record to retrieve.
Returns:
Record: The record object.
Raises:
AirtableAPIException: If the API request fails.
"""
try:
response = self._requests.get(
f"{self.API_BASE_URL}/bases/{base_id}/tables/{table_id}/records/{record_id}"
)
data = self._handle_response(response)
return Record(**data)
except Exception as e:
raise AirtableAPIException(f"Failed to get record: {str(e)}", 500)
def create_record(
self, base_id: str, table_id: str, fields: Dict[str, Any]
) -> Record:
"""
Create a new record in a table.
Args:
base_id: The ID of the base containing the table.
table_id: The ID of the table to create the record in.
fields: The field values for the new record.
Returns:
Record: The created record object.
Raises:
AirtableAPIException: If the API request fails.
"""
try:
payload = {"fields": fields}
response = self._requests.post(
f"{self.API_BASE_URL}/bases/{base_id}/tables/{table_id}/records",
json=payload,
)
data = self._handle_response(response)
return Record(**data)
except Exception as e:
raise AirtableAPIException(f"Failed to create record: {str(e)}", 500)
def update_record(
self, base_id: str, table_id: str, record_id: str, fields: Dict[str, Any]
) -> Record:
"""
Update a record in a table.
Args:
base_id: The ID of the base containing the table.
table_id: The ID of the table containing the record.
record_id: The ID of the record to update.
fields: The field values to update.
Returns:
Record: The updated record object.
Raises:
AirtableAPIException: If the API request fails.
"""
try:
payload = {"fields": fields}
response = self._requests.patch(
f"{self.API_BASE_URL}/bases/{base_id}/tables/{table_id}/records/{record_id}",
json=payload,
)
data = self._handle_response(response)
return Record(**data)
except Exception as e:
raise AirtableAPIException(f"Failed to update record: {str(e)}", 500)
def delete_record(self, base_id: str, table_id: str, record_id: str) -> bool:
"""
Delete a record from a table.
Args:
base_id: The ID of the base containing the table.
table_id: The ID of the table containing the record.
record_id: The ID of the record to delete.
Returns:
bool: True if the deletion was successful.
Raises:
AirtableAPIException: If the API request fails.
"""
try:
response = self._requests.delete(
f"{self.API_BASE_URL}/bases/{base_id}/tables/{table_id}/records/{record_id}"
)
self._handle_response(response)
return True
except Exception as e:
raise AirtableAPIException(f"Failed to delete record: {str(e)}", 500)

View File

@@ -0,0 +1,37 @@
"""
Authentication module for Airtable API integration.
This module provides credential types and test credentials for the Airtable API integration.
It defines the structure for API key credentials used to authenticate with the Airtable API
and provides mock credentials for testing purposes.
"""
from typing import Literal
from pydantic import SecretStr
from backend.data.model import APIKeyCredentials, CredentialsMetaInput
from backend.integrations.providers import ProviderName
# Define the type of credentials input expected for Airtable API
AirtableCredentialsInput = CredentialsMetaInput[
Literal[ProviderName.AIRTABLE], Literal["api_key"]
]
# Mock credentials for testing Airtable API integration
TEST_CREDENTIALS = APIKeyCredentials(
id="7a91c8f0-399f-4235-a79c-59c0e37454d5",
provider="airtable",
api_key=SecretStr("mock-airtable-api-key"),
title="Mock Airtable API key",
expires_at=None,
)
# Dictionary representation of test credentials for input fields
TEST_CREDENTIALS_INPUT = {
"provider": TEST_CREDENTIALS.provider,
"id": TEST_CREDENTIALS.id,
"type": TEST_CREDENTIALS.type,
"title": TEST_CREDENTIALS.title,
}

View File

@@ -0,0 +1,384 @@
"""
Airtable API integration blocks.
This module provides blocks for interacting with the Airtable API,
including operations for tables, fields, and records.
"""
import logging
from typing import Dict, List, Optional
from pydantic import BaseModel
from backend.data.block import Block, BlockCategory, BlockOutput, BlockSchema
from backend.data.model import APIKeyCredentials, CredentialsField, SchemaField
from ._api import AirtableAPIException, AirtableClient
from ._auth import TEST_CREDENTIALS, TEST_CREDENTIALS_INPUT, AirtableCredentialsInput
logger = logging.getLogger(__name__)
# Common response models
class AirtableTable(BaseModel):
id: str
name: str
description: Optional[str] = None
class AirtableField(BaseModel):
id: str
name: str
type: str
class AirtableRecord(BaseModel):
id: str
fields: Dict[str, any]
created_time: Optional[str] = None
class AirtableTablesBlock(Block):
"""Block for listing, getting, and creating tables in Airtable."""
class Input(BlockSchema):
base_id: str = SchemaField(
description="The ID of the Airtable base",
placeholder="appXXXXXXXXXXXXXX",
)
operation: str = SchemaField(
description="The operation to perform on tables",
placeholder="list",
choices=["list", "get", "create"],
)
table_id: Optional[str] = SchemaField(
description="The ID of the table (required for 'get' operation)",
placeholder="tblXXXXXXXXXXXXXX",
advanced=True,
)
table_name: Optional[str] = SchemaField(
description="The name of the new table (required for 'create' operation)",
placeholder="My New Table",
advanced=True,
)
table_description: Optional[str] = SchemaField(
description="The description of the new table (for 'create' operation)",
placeholder="Description of my table",
advanced=True,
)
fields: Optional[List[Dict[str, str]]] = SchemaField(
description="The fields to create in the new table (for 'create' operation)",
placeholder='[{"name": "Name", "type": "text"}]',
advanced=True,
)
credentials: AirtableCredentialsInput = CredentialsField(
description="The credentials for the Airtable API"
)
class Output(BlockSchema):
tables: Optional[List[AirtableTable]] = SchemaField(
description="List of tables in the base"
)
table: Optional[AirtableTable] = SchemaField(
description="The retrieved or created table"
)
error: Optional[str] = SchemaField(description="Error message if any")
def __init__(self):
super().__init__(
id="da53b48c-6e97-4c1c-afb9-4ecf10c81856",
description="List, get, or create tables in an Airtable base",
categories={BlockCategory.DATA},
input_schema=AirtableTablesBlock.Input,
output_schema=AirtableTablesBlock.Output,
test_input={
"base_id": "appXXXXXXXXXXXXXX",
"operation": "list",
"credentials": TEST_CREDENTIALS_INPUT,
},
test_output=[
("tables", [AirtableTable(id="tbl123", name="Example Table")])
],
test_mock={
"list_tables": lambda *args, **kwargs: {
"tables": [{"id": "tbl123", "name": "Example Table"}]
}
},
test_credentials=TEST_CREDENTIALS,
)
def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
"""
Perform operations on Airtable tables.
Args:
input_data: The input parameters for the block.
credentials: The Airtable API credentials.
Yields:
BlockOutput: The result of the table operation.
"""
try:
client = AirtableClient(credentials=credentials)
if input_data.operation == "list":
# List all tables in the base
response = client.list_tables(input_data.base_id)
tables = [
AirtableTable(
id=table.id, name=table.name, description=table.description
)
for table in response.tables
]
yield "tables", tables
elif input_data.operation == "get":
# Get a specific table
if not input_data.table_id:
yield "error", "Table ID is required for 'get' operation"
return
table = client.get_table(input_data.base_id, input_data.table_id)
yield "table", AirtableTable(
id=table.id, name=table.name, description=table.description
)
elif input_data.operation == "create":
# Create a new table
if not input_data.table_name:
yield "error", "Table name is required for 'create' operation"
return
if not input_data.fields or len(input_data.fields) == 0:
yield "error", "At least one field is required for 'create' operation"
return
table = client.create_table(
input_data.base_id,
input_data.table_name,
input_data.table_description or "",
input_data.fields,
)
yield "table", AirtableTable(
id=table.id, name=table.name, description=table.description
)
else:
yield "error", f"Unknown operation: {input_data.operation}"
except AirtableAPIException as e:
yield "error", f"Airtable API error: {str(e)}"
except Exception as e:
logger.exception("Error in AirtableTablesBlock")
yield "error", f"Error: {str(e)}"
class AirtableFieldsBlock(Block):
"""Block for listing, getting, and creating fields in Airtable tables."""
class Input(BlockSchema):
base_id: str = SchemaField(
description="The ID of the Airtable base",
placeholder="appXXXXXXXXXXXXXX",
)
table_id: str = SchemaField(
description="The ID of the table",
placeholder="tblXXXXXXXXXXXXXX",
)
operation: str = SchemaField(
description="The operation to perform on fields",
placeholder="list",
choices=["list", "get", "create"],
)
field_id: Optional[str] = SchemaField(
description="The ID of the field (required for 'get' operation)",
placeholder="fldXXXXXXXXXXXXXX",
advanced=True,
)
field_name: Optional[str] = SchemaField(
description="The name of the new field (required for 'create' operation)",
placeholder="My New Field",
advanced=True,
)
field_type: Optional[str] = SchemaField(
description="The type of the new field (required for 'create' operation)",
placeholder="text",
advanced=True,
choices=[
"text",
"number",
"checkbox",
"singleSelect",
"multipleSelects",
"date",
"dateTime",
"attachment",
"link",
"multipleRecordLinks",
"formula",
"rollup",
"count",
"lookup",
"currency",
"percent",
"duration",
"rating",
"richText",
"barcode",
"button",
],
)
credentials: AirtableCredentialsInput = CredentialsField(
description="The credentials for the Airtable API"
)
class Output(BlockSchema):
fields: Optional[List[AirtableField]] = SchemaField(
description="List of fields in the table"
)
field: Optional[AirtableField] = SchemaField(
description="The retrieved or created field"
)
error: Optional[str] = SchemaField(description="Error message if any")
def __init__(self):
super().__init__(
id="c27a6a11-8c09-4f8c-afeb-82c7a0c81857",
description="List, get, or create fields in an Airtable table",
categories={BlockCategory.DATA},
input_schema=AirtableFieldsBlock.Input,
output_schema=AirtableFieldsBlock.Output,
test_input={
"base_id": "appXXXXXXXXXXXXXX",
"table_id": "tblXXXXXXXXXXXXXX",
"operation": "list",
"credentials": TEST_CREDENTIALS_INPUT,
},
test_output=[
("fields", [AirtableField(id="fld123", name="Name", type="text")])
],
test_mock={
"list_fields": lambda *args, **kwargs: [
{"id": "fld123", "name": "Name", "type": "text"}
]
},
test_credentials=TEST_CREDENTIALS,
)
def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
"""
Perform operations on Airtable fields.
Args:
input_data: The input parameters for the block.
credentials: The Airtable API credentials.
Yields:
BlockOutput: The result of the field operation.
"""
try:
client = AirtableClient(credentials=credentials)
if input_data.operation == "list":
# List all fields in the table
fields_list = client.list_fields(
input_data.base_id, input_data.table_id
)
fields = [
AirtableField(id=field.id, name=field.name, type=field.type)
for field in fields_list
]
yield "fields", fields
elif input_data.operation == "get":
# Get a specific field
if not input_data.field_id:
yield "error", "Field ID is required for 'get' operation"
return
field = client.get_field(
input_data.base_id, input_data.table_id, input_data.field_id
)
yield "field", AirtableField(
id=field.id, name=field.name, type=field.type
)
elif input_data.operation == "create":
# Create a new field
if not input_data.field_name:
yield "error", "Field name is required for 'create' operation"
return
if not input_data.field_type:
yield "error", "Field type is required for 'create' operation"
return
field = client.create_field(
input_data.base_id,
input_data.table_id,
input_data.field_name,
input_data.field_type,
)
yield "field", AirtableField(
id=field.id, name=field.name, type=field.type
)
else:
yield "error", f"Unknown operation: {input_data.operation}"
except AirtableAPIException as e:
yield "error", f"Airtable API error: {str(e)}"
except Exception as e:
logger.exception("Error in AirtableFieldsBlock")
yield "error", f"Error: {str(e)}"
class AirtableRecordsBlock(Block):
"""Block for creating, reading, updating, and deleting records in Airtable."""
class Input(BlockSchema):
base_id: str = SchemaField(
description="The ID of the Airtable base",
placeholder="appXXXXXXXXXXXXXX",
)
table_id: str = SchemaField(
description="The ID of the table",
placeholder="tblXXXXXXXXXXXXXX",
)
operation: str = SchemaField(
description="The operation to perform on records",
placeholder="list",
choices=["list", "get", "create", "update", "delete"],
)
record_id: Optional[str] = SchemaField(
description="The ID of the record (required for 'get', 'update', and 'delete' operations)",
placeholder="recXXXXXXXXXXXXXX",
advanced=True,
)
filter_formula: Optional[str] = SchemaField(
description="Filter formula for listing records (optional for 'list' operation)",
placeholder="{Field}='Value'",
advanced=True,
)
fields: Optional[Dict[str, any]] = SchemaField(
description="The field values (required for 'create' and 'update' operations)",
placeholder='{"Name": "John Doe", "Email": "john@example.com"}',
advanced=True,
)
credentials: AirtableCredentialsInput = CredentialsField(
description="The credentials for the Airtable API"
)
class Output(BlockSchema):
records: Optional[List[AirtableRecord]] = SchemaField(
description="List of records in the table"
)
record: Optional[AirtableRecord] = SchemaField(
description="The retrieved, created, or updated record"
)
success: Optional[bool] = SchemaField(
description="Success status for delete operation"
)
error: Optional[str] = SchemaField(description="Error message if any")

View File

@@ -0,0 +1,87 @@
"""
Module for Airtable webhook triggers.
This module provides trigger blocks that respond to Airtable webhook events.
"""
import logging
from typing import Dict
from strenum import StrEnum
from backend.data.block import (
Block,
BlockCategory,
BlockManualWebhookConfig,
BlockOutput,
BlockSchema,
)
from backend.data.model import SchemaField
logger = logging.getLogger(__name__)
class AirtableWebhookEventType(StrEnum):
"""Types of webhook events supported by Airtable."""
RECORDS_CREATED = "records:created"
RECORDS_UPDATED = "records:updated"
RECORDS_DELETED = "records:deleted"
class AirtableWebhookTriggerBlock(Block):
"""
A trigger block that responds to Airtable webhook events.
This block is activated when a webhook event is received from Airtable.
"""
class Input(BlockSchema):
# The payload field is hidden because it's automatically populated by the webhook system
payload: Dict = SchemaField(hidden=True)
class Output(BlockSchema):
event_data: Dict = SchemaField(
description="The contents of the Airtable webhook event."
)
base_id: str = SchemaField(description="The ID of the Airtable base.")
table_id: str = SchemaField(description="The ID of the Airtable table.")
event_type: str = SchemaField(description="The type of event that occurred.")
def __init__(self):
super().__init__(
id="8c3b52d1-f7e9-4c5d-a6f1-60e937d94d2a",
description="This block will output the contents of an Airtable webhook event.",
categories={BlockCategory.DATA},
input_schema=AirtableWebhookTriggerBlock.Input,
output_schema=AirtableWebhookTriggerBlock.Output,
webhook_config=BlockManualWebhookConfig(
provider="airtable",
webhook_type=AirtableWebhookEventType.RECORDS_UPDATED,
),
test_input=[
{
"payload": {
"baseId": "app123",
"tableId": "tbl456",
"event": "records:updated",
"data": {},
}
}
],
test_output=[
(
"event_data",
{
"baseId": "app123",
"tableId": "tbl456",
"event": "records:updated",
"data": {},
},
)
],
)
def run(self, input_data: Input, **kwargs) -> BlockOutput:
"""Process the Airtable webhook event and yield its contents."""
logger.info("Airtable webhook trigger received payload: %s", input_data.payload)
yield "event_data", input_data.payload

View File

@@ -0,0 +1,327 @@
"""
API module for Proxycurl integration.
This module provides a client for interacting with the Proxycurl API,
which allows fetching LinkedIn profile data and related information.
"""
import logging
from json import JSONDecodeError
from typing import Any, Dict, List, Optional, TypeVar, Union
from pydantic import BaseModel, Field
from backend.data.model import APIKeyCredentials
from backend.util.request import Requests
logger = logging.getLogger(__name__)
T = TypeVar("T")
class ProxycurlAPIException(Exception):
"""Exception raised for Proxycurl API errors."""
def __init__(self, message: str, status_code: int):
super().__init__(message)
self.status_code = status_code
class SocialMediaProfiles(BaseModel):
"""Social media profiles model."""
twitter: Optional[str] = None
facebook: Optional[str] = None
github: Optional[str] = None
class Experience(BaseModel):
"""Experience model for LinkedIn profiles."""
company: Optional[str] = None
title: Optional[str] = None
description: Optional[str] = None
location: Optional[str] = None
starts_at: Optional[Dict[str, int]] = None
ends_at: Optional[Dict[str, int]] = None
company_linkedin_profile_url: Optional[str] = None
class Education(BaseModel):
"""Education model for LinkedIn profiles."""
school: Optional[str] = None
degree_name: Optional[str] = None
field_of_study: Optional[str] = None
starts_at: Optional[Dict[str, int]] = None
ends_at: Optional[Dict[str, int]] = None
school_linkedin_profile_url: Optional[str] = None
class PersonProfileResponse(BaseModel):
"""Response model for LinkedIn person profile."""
public_identifier: Optional[str] = None
profile_pic_url: Optional[str] = None
full_name: Optional[str] = None
first_name: Optional[str] = None
last_name: Optional[str] = None
occupation: Optional[str] = None
headline: Optional[str] = None
summary: Optional[str] = None
country: Optional[str] = None
country_full_name: Optional[str] = None
city: Optional[str] = None
state: Optional[str] = None
experiences: Optional[List[Experience]] = None
education: Optional[List[Education]] = None
languages: Optional[List[str]] = None
skills: Optional[List[str]] = None
inferred_salary: Optional[Dict[str, Any]] = None
personal_email: Optional[str] = None
personal_contact_number: Optional[str] = None
social_media_profiles: Optional[SocialMediaProfiles] = None
extra: Optional[Dict[str, Any]] = None
class SimilarProfile(BaseModel):
"""Similar profile model for LinkedIn person lookup."""
similarity: float
linkedin_profile_url: str
class PersonLookupResponse(BaseModel):
"""Response model for LinkedIn person lookup."""
linkedin_profile_url: Optional[str] = None
similar_profiles: Optional[List[SimilarProfile]] = None
class RoleLookupResponse(BaseModel):
"""Response model for LinkedIn role lookup."""
linkedin_profile_url: Optional[str] = None
profile_data: Optional[PersonProfileResponse] = None
class ProfilePictureResponse(BaseModel):
"""Response model for LinkedIn profile picture."""
profile_picture_url: str = Field(..., description="URL of the profile picture")
class ProxycurlClient:
"""Client for interacting with the Proxycurl API."""
API_BASE_URL = "https://nubela.co/proxycurl/api"
def __init__(
self,
credentials: Optional[APIKeyCredentials] = None,
custom_requests: Optional[Requests] = None,
):
"""
Initialize the Proxycurl client.
Args:
credentials: The credentials to use for authentication.
custom_requests: Custom Requests instance for testing.
"""
if custom_requests:
self._requests = custom_requests
else:
headers: Dict[str, str] = {
"Content-Type": "application/json",
}
if credentials:
headers["Authorization"] = f"Bearer {credentials.api_key.get_secret_value()}"
self._requests = Requests(
extra_headers=headers,
raise_for_status=False,
)
def _handle_response(self, response) -> Any:
"""
Handle API response and check for errors.
Args:
response: The response object from the request.
Returns:
The response data.
Raises:
ProxycurlAPIException: If the API request fails.
"""
if not response.ok:
try:
error_data = response.json()
error_message = error_data.get("message", "")
except JSONDecodeError:
error_message = response.text
raise ProxycurlAPIException(
f"Proxycurl API request failed ({response.status_code}): {error_message}",
response.status_code,
)
return response.json()
def fetch_profile(
self,
linkedin_url: str,
fallback_to_cache: str = "on-error",
use_cache: str = "if-present",
include_skills: bool = False,
include_inferred_salary: bool = False,
include_personal_email: bool = False,
include_personal_contact_number: bool = False,
include_social_media: bool = False,
include_extra: bool = False,
) -> PersonProfileResponse:
"""
Fetch a LinkedIn profile with optional parameters.
Args:
linkedin_url: The LinkedIn profile URL to fetch.
fallback_to_cache: Cache usage if live fetch fails ('on-error' or 'never').
use_cache: Cache utilization ('if-present' or 'never').
include_skills: Whether to include skills data.
include_inferred_salary: Whether to include inferred salary data.
include_personal_email: Whether to include personal email.
include_personal_contact_number: Whether to include personal contact number.
include_social_media: Whether to include social media profiles.
include_extra: Whether to include additional data.
Returns:
The LinkedIn profile data.
Raises:
ProxycurlAPIException: If the API request fails.
"""
params = {
"url": linkedin_url,
"fallback_to_cache": fallback_to_cache,
"use_cache": use_cache,
}
if include_skills:
params["skills"] = "include"
if include_inferred_salary:
params["inferred_salary"] = "include"
if include_personal_email:
params["personal_email"] = "include"
if include_personal_contact_number:
params["personal_contact_number"] = "include"
if include_social_media:
params["twitter_profile_id"] = "include"
params["facebook_profile_id"] = "include"
params["github_profile_id"] = "include"
if include_extra:
params["extra"] = "include"
response = self._requests.get(f"{self.API_BASE_URL}/v2/linkedin", params=params)
return PersonProfileResponse(**self._handle_response(response))
def lookup_person(
self,
first_name: str,
last_name: str,
company_domain: Optional[str] = None,
location: Optional[str] = None,
title: Optional[str] = None,
include_similarity_checks: bool = False,
enrich_profile: bool = False,
) -> PersonLookupResponse:
"""
Look up a LinkedIn profile by person's information.
Args:
first_name: The person's first name.
last_name: The person's last name.
company_domain: The domain of the company they work for.
location: The person's location.
title: The person's job title.
include_similarity_checks: Whether to include similarity checks.
enrich_profile: Whether to enrich the profile.
Returns:
The LinkedIn profile lookup result.
Raises:
ProxycurlAPIException: If the API request fails.
"""
params = {
"first_name": first_name,
"last_name": last_name,
}
if company_domain:
params["company_domain"] = company_domain
if location:
params["location"] = location
if title:
params["title"] = title
if include_similarity_checks:
params["similarity_checks"] = "include"
if enrich_profile:
params["enrich_profile"] = "enrich"
response = self._requests.get(
f"{self.API_BASE_URL}/linkedin/profile/resolve", params=params
)
return PersonLookupResponse(**self._handle_response(response))
def lookup_role(
self, role: str, company_name: str, enrich_profile: bool = False
) -> RoleLookupResponse:
"""
Look up a LinkedIn profile by role in a company.
Args:
role: The role title (e.g., CEO, CTO).
company_name: The name of the company.
enrich_profile: Whether to enrich the profile.
Returns:
The LinkedIn profile lookup result.
Raises:
ProxycurlAPIException: If the API request fails.
"""
params = {
"role": role,
"company_name": company_name,
}
if enrich_profile:
params["enrich_profile"] = "enrich"
response = self._requests.get(
f"{self.API_BASE_URL}/find/company/role/", params=params
)
return RoleLookupResponse(**self._handle_response(response))
def get_profile_picture(self, linkedin_profile_url: str) -> ProfilePictureResponse:
"""
Get a LinkedIn profile picture URL.
Args:
linkedin_profile_url: The LinkedIn profile URL.
Returns:
The profile picture URL.
Raises:
ProxycurlAPIException: If the API request fails.
"""
params = {
"linkedin_person_profile_url": linkedin_profile_url,
}
response = self._requests.get(
f"{self.API_BASE_URL}/linkedin/person/profile-picture", params=params
)
return ProfilePictureResponse(**self._handle_response(response))

View File

@@ -0,0 +1,34 @@
"""
Authentication module for Proxycurl API integration.
This module provides credential types and test credentials for the Proxycurl API.
"""
from typing import Literal
from pydantic import SecretStr
from backend.data.model import APIKeyCredentials, CredentialsMetaInput
from backend.integrations.providers import ProviderName
# Define the type of credentials input expected for Proxycurl API
ProxycurlCredentialsInput = CredentialsMetaInput[
Literal[ProviderName.PROXYCURL], Literal["api_key"]
]
# Mock credentials for testing Proxycurl API integration
TEST_CREDENTIALS = APIKeyCredentials(
id="1234a567-89bc-4def-ab12-3456cdef7890",
provider="proxycurl",
api_key=SecretStr("mock-proxycurl-api-key"),
title="Mock Proxycurl API key",
expires_at=None,
)
# Dictionary representation of test credentials for input fields
TEST_CREDENTIALS_INPUT = {
"provider": TEST_CREDENTIALS.provider,
"id": TEST_CREDENTIALS.id,
"type": TEST_CREDENTIALS.type,
"title": TEST_CREDENTIALS.title,
}

View File

@@ -0,0 +1,436 @@
"""
Block definitions for Proxycurl API integration.
This module implements blocks for interacting with the Proxycurl API,
which provides access to LinkedIn profile data and related information.
"""
import logging
import uuid
from typing import Dict, List, Optional
from pydantic import BaseModel, Field
from backend.data.block import Block, BlockCategory, BlockOutput, BlockSchema
from backend.data.model import APIKeyCredentials, CredentialsField, SchemaField
from ._api import (
Experience,
Education,
PersonProfileResponse,
PersonLookupResponse,
ProfilePictureResponse,
ProxycurlClient,
RoleLookupResponse,
SimilarProfile,
SocialMediaProfiles,
)
from ._auth import TEST_CREDENTIALS, TEST_CREDENTIALS_INPUT, ProxycurlCredentialsInput
logger = logging.getLogger(__name__)
class ProxycurlProfileFetchBlock(Block):
"""Block to fetch LinkedIn profile data using Proxycurl API."""
class Input(BlockSchema):
"""Input schema for ProxycurlProfileFetchBlock."""
linkedin_url: str = SchemaField(
description="LinkedIn profile URL to fetch data from",
placeholder="https://www.linkedin.com/in/username/",
)
fallback_to_cache: str = SchemaField(
description="Cache usage if live fetch fails",
default="on-error",
enum=["on-error", "never"],
advanced=True,
)
use_cache: str = SchemaField(
description="Cache utilization strategy",
default="if-present",
enum=["if-present", "never"],
advanced=True,
)
include_skills: bool = SchemaField(
description="Include skills data",
default=False,
advanced=True,
)
include_inferred_salary: bool = SchemaField(
description="Include inferred salary data",
default=False,
advanced=True,
)
include_personal_email: bool = SchemaField(
description="Include personal email",
default=False,
advanced=True,
)
include_personal_contact_number: bool = SchemaField(
description="Include personal contact number",
default=False,
advanced=True,
)
include_social_media: bool = SchemaField(
description="Include social media profiles",
default=False,
advanced=True,
)
include_extra: bool = SchemaField(
description="Include additional data",
default=False,
advanced=True,
)
credentials: ProxycurlCredentialsInput = CredentialsField(
description="Proxycurl API credentials"
)
class Output(BlockSchema):
"""Output schema for ProxycurlProfileFetchBlock."""
profile: PersonProfileResponse = SchemaField(
description="LinkedIn profile data"
)
error: Optional[str] = SchemaField(
description="Error message if the request failed"
)
def __init__(self):
"""Initialize ProxycurlProfileFetchBlock."""
super().__init__(
id="f6e0ac73-4f1d-4acb-b4b7-b67066c5984e",
description="Fetch LinkedIn profile data using Proxycurl",
categories={BlockCategory.SOCIAL},
input_schema=ProxycurlProfileFetchBlock.Input,
output_schema=ProxycurlProfileFetchBlock.Output,
test_input={
"linkedin_url": "https://www.linkedin.com/in/williamhgates/",
"include_skills": True,
"include_social_media": True,
"credentials": TEST_CREDENTIALS_INPUT,
},
test_output=[
(
"profile",
PersonProfileResponse(
public_identifier="williamhgates",
full_name="Bill Gates",
occupation="Co-chair at Bill & Melinda Gates Foundation",
experiences=[
Experience(
company="Bill & Melinda Gates Foundation",
title="Co-chair",
starts_at={"year": 2000},
)
],
),
)
],
test_credentials=TEST_CREDENTIALS,
)
def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
"""
Run the block to fetch LinkedIn profile data.
Args:
input_data: Input parameters for the block
credentials: API key credentials for Proxycurl
**kwargs: Additional keyword arguments
Yields:
Tuples of (output_name, output_value)
"""
try:
client = ProxycurlClient(credentials=credentials)
profile = client.fetch_profile(
linkedin_url=input_data.linkedin_url,
fallback_to_cache=input_data.fallback_to_cache,
use_cache=input_data.use_cache,
include_skills=input_data.include_skills,
include_inferred_salary=input_data.include_inferred_salary,
include_personal_email=input_data.include_personal_email,
include_personal_contact_number=input_data.include_personal_contact_number,
include_social_media=input_data.include_social_media,
include_extra=input_data.include_extra,
)
yield "profile", profile
except Exception as e:
logger.error(f"Error fetching LinkedIn profile: {str(e)}")
yield "error", str(e)
class ProxycurlPersonLookupBlock(Block):
"""Block to look up LinkedIn profiles by person's information using Proxycurl API."""
class Input(BlockSchema):
"""Input schema for ProxycurlPersonLookupBlock."""
first_name: str = SchemaField(
description="Person's first name",
placeholder="John",
)
last_name: str = SchemaField(
description="Person's last name",
placeholder="Doe",
)
company_domain: Optional[str] = SchemaField(
description="Domain of the company they work for (optional)",
placeholder="example.com",
default=None,
)
location: Optional[str] = SchemaField(
description="Person's location (optional)",
placeholder="San Francisco",
default=None,
)
title: Optional[str] = SchemaField(
description="Person's job title (optional)",
placeholder="CEO",
default=None,
)
include_similarity_checks: bool = SchemaField(
description="Include similarity checks",
default=False,
advanced=True,
)
enrich_profile: bool = SchemaField(
description="Enrich the profile with additional data",
default=False,
advanced=True,
)
credentials: ProxycurlCredentialsInput = CredentialsField(
description="Proxycurl API credentials"
)
class Output(BlockSchema):
"""Output schema for ProxycurlPersonLookupBlock."""
lookup_result: PersonLookupResponse = SchemaField(
description="LinkedIn profile lookup result"
)
error: Optional[str] = SchemaField(
description="Error message if the request failed"
)
def __init__(self):
"""Initialize ProxycurlPersonLookupBlock."""
super().__init__(
id="d237a98a-5c4b-4a1c-b9e3-e6f9a6c81df7",
description="Look up LinkedIn profiles by person information using Proxycurl",
categories={BlockCategory.SOCIAL},
input_schema=ProxycurlPersonLookupBlock.Input,
output_schema=ProxycurlPersonLookupBlock.Output,
test_input={
"first_name": "Bill",
"last_name": "Gates",
"company_domain": "gatesfoundation.org",
"include_similarity_checks": True,
"credentials": TEST_CREDENTIALS_INPUT,
},
test_output=[
(
"lookup_result",
PersonLookupResponse(
linkedin_profile_url="https://www.linkedin.com/in/williamhgates/",
similar_profiles=[
SimilarProfile(
similarity=0.95,
linkedin_profile_url="https://www.linkedin.com/in/billgates/",
)
],
),
)
],
test_credentials=TEST_CREDENTIALS,
)
def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
"""
Run the block to look up LinkedIn profiles.
Args:
input_data: Input parameters for the block
credentials: API key credentials for Proxycurl
**kwargs: Additional keyword arguments
Yields:
Tuples of (output_name, output_value)
"""
try:
client = ProxycurlClient(credentials=credentials)
lookup_result = client.lookup_person(
first_name=input_data.first_name,
last_name=input_data.last_name,
company_domain=input_data.company_domain,
location=input_data.location,
title=input_data.title,
include_similarity_checks=input_data.include_similarity_checks,
enrich_profile=input_data.enrich_profile,
)
yield "lookup_result", lookup_result
except Exception as e:
logger.error(f"Error looking up LinkedIn profile: {str(e)}")
yield "error", str(e)
class ProxycurlRoleLookupBlock(Block):
"""Block to look up LinkedIn profiles by role in a company using Proxycurl API."""
class Input(BlockSchema):
"""Input schema for ProxycurlRoleLookupBlock."""
role: str = SchemaField(
description="Role title (e.g., CEO, CTO)",
placeholder="CEO",
)
company_name: str = SchemaField(
description="Name of the company",
placeholder="Microsoft",
)
enrich_profile: bool = SchemaField(
description="Enrich the profile with additional data",
default=False,
advanced=True,
)
credentials: ProxycurlCredentialsInput = CredentialsField(
description="Proxycurl API credentials"
)
class Output(BlockSchema):
"""Output schema for ProxycurlRoleLookupBlock."""
role_lookup_result: RoleLookupResponse = SchemaField(
description="LinkedIn role lookup result"
)
error: Optional[str] = SchemaField(
description="Error message if the request failed"
)
def __init__(self):
"""Initialize ProxycurlRoleLookupBlock."""
super().__init__(
id="3b9fc742-06d4-49c7-b5ce-7e302dd7c8a7",
description="Look up LinkedIn profiles by role in a company using Proxycurl",
categories={BlockCategory.SOCIAL},
input_schema=ProxycurlRoleLookupBlock.Input,
output_schema=ProxycurlRoleLookupBlock.Output,
test_input={
"role": "Co-chair",
"company_name": "Gates Foundation",
"enrich_profile": True,
"credentials": TEST_CREDENTIALS_INPUT,
},
test_output=[
(
"role_lookup_result",
RoleLookupResponse(
linkedin_profile_url="https://www.linkedin.com/in/williamhgates/",
),
)
],
test_credentials=TEST_CREDENTIALS,
)
def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
"""
Run the block to look up LinkedIn profiles by role.
Args:
input_data: Input parameters for the block
credentials: API key credentials for Proxycurl
**kwargs: Additional keyword arguments
Yields:
Tuples of (output_name, output_value)
"""
try:
client = ProxycurlClient(credentials=credentials)
role_lookup_result = client.lookup_role(
role=input_data.role,
company_name=input_data.company_name,
enrich_profile=input_data.enrich_profile,
)
yield "role_lookup_result", role_lookup_result
except Exception as e:
logger.error(f"Error looking up role in company: {str(e)}")
yield "error", str(e)
class ProxycurlProfilePictureBlock(Block):
"""Block to get LinkedIn profile pictures using Proxycurl API."""
class Input(BlockSchema):
"""Input schema for ProxycurlProfilePictureBlock."""
linkedin_profile_url: str = SchemaField(
description="LinkedIn profile URL",
placeholder="https://www.linkedin.com/in/username/",
)
credentials: ProxycurlCredentialsInput = CredentialsField(
description="Proxycurl API credentials"
)
class Output(BlockSchema):
"""Output schema for ProxycurlProfilePictureBlock."""
profile_picture: ProfilePictureResponse = SchemaField(
description="LinkedIn profile picture URL"
)
error: Optional[str] = SchemaField(
description="Error message if the request failed"
)
def __init__(self):
"""Initialize ProxycurlProfilePictureBlock."""
super().__init__(
id="68d5a942-9b3f-4e9a-b7c1-d96ea4321f0d",
description="Get LinkedIn profile pictures using Proxycurl",
categories={BlockCategory.SOCIAL},
input_schema=ProxycurlProfilePictureBlock.Input,
output_schema=ProxycurlProfilePictureBlock.Output,
test_input={
"linkedin_profile_url": "https://www.linkedin.com/in/williamhgates/",
"credentials": TEST_CREDENTIALS_INPUT,
},
test_output=[
(
"profile_picture",
ProfilePictureResponse(
profile_picture_url="https://media.licdn.com/dms/image/C4D03AQFj-xjuXrLFSQ/profile-displayphoto-shrink_800_800/0/1576881858598?e=1686787200&v=beta&t=zrQC76QwsfQQIWthfOnrKRBMZ5D-qIAvzLXLmWgYvTk"
),
)
],
test_credentials=TEST_CREDENTIALS,
)
def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
"""
Run the block to get LinkedIn profile pictures.
Args:
input_data: Input parameters for the block
credentials: API key credentials for Proxycurl
**kwargs: Additional keyword arguments
Yields:
Tuples of (output_name, output_value)
"""
try:
client = ProxycurlClient(credentials=credentials)
profile_picture = client.get_profile_picture(
linkedin_profile_url=input_data.linkedin_profile_url,
)
yield "profile_picture", profile_picture
except Exception as e:
logger.error(f"Error getting profile picture: {str(e)}")
yield "error", str(e)

View File

@@ -2,6 +2,11 @@ from typing import Type
from backend.blocks.ai_music_generator import AIMusicGeneratorBlock
from backend.blocks.ai_shortform_video_block import AIShortformVideoCreatorBlock
from backend.blocks.airtable.airtable import (
AirtableFieldsBlock,
AirtableRecordsBlock,
AirtableTablesBlock,
)
from backend.blocks.ideogram import IdeogramModelBlock
from backend.blocks.jina.embeddings import JinaEmbeddingBlock
from backend.blocks.jina.search import ExtractWebsiteContentBlock, SearchTheWebBlock
@@ -13,6 +18,11 @@ from backend.blocks.llm import (
AITextGeneratorBlock,
AITextSummarizerBlock,
LlmModel,
)
from backend.blocks.proxycurl.proxycurl import (
ProxycurlProfileFetchBlock,
ProxycurlPersonLookupBlock,
ProxycurlRoleLookupBlock,
)
from backend.blocks.replicate_flux_advanced import ReplicateFluxAdvancedModelBlock
from backend.blocks.smart_decision_maker import SmartDecisionMakerBlock
@@ -21,6 +31,7 @@ from backend.blocks.text_to_speech_block import UnrealTextToSpeechBlock
from backend.data.block import Block
from backend.data.cost import BlockCost, BlockCostType
from backend.integrations.credentials_store import (
airtable_credentials,
anthropic_credentials,
did_credentials,
groq_credentials,
@@ -28,6 +39,7 @@ from backend.integrations.credentials_store import (
jina_credentials,
open_router_credentials,
openai_credentials,
proxycurl_credentials,
replicate_credentials,
revid_credentials,
unreal_credentials,
@@ -266,5 +278,77 @@ BLOCK_COSTS: dict[Type[Block], list[BlockCost]] = {
},
)
],
AirtableTablesBlock: [
BlockCost(
cost_amount=1,
cost_filter={
"credentials": {
"id": airtable_credentials.id,
"provider": airtable_credentials.provider,
"type": airtable_credentials.type,
}
},
)
],
AirtableFieldsBlock: [
BlockCost(
cost_amount=1,
cost_filter={
"credentials": {
"id": airtable_credentials.id,
"provider": airtable_credentials.provider,
"type": airtable_credentials.type,
}
},
)
],
AirtableRecordsBlock: [
BlockCost(
cost_amount=1,
cost_filter={
"credentials": {
"id": airtable_credentials.id,
"provider": airtable_credentials.provider,
"type": airtable_credentials.type,
}
},
),
],
ProxycurlProfileFetchBlock: [
BlockCost(
cost_amount=1,
cost_filter={
"credentials": {
"id": proxycurl_credentials.id,
"provider": proxycurl_credentials.provider,
"type": proxycurl_credentials.type,
}
},
)
],
ProxycurlPersonLookupBlock: [
BlockCost(
cost_amount=2,
cost_filter={
"credentials": {
"id": proxycurl_credentials.id,
"provider": proxycurl_credentials.provider,
"type": proxycurl_credentials.type,
}
},
)
],
ProxycurlRoleLookupBlock: [
BlockCost(
cost_amount=3,
cost_filter={
"credentials": {
"id": proxycurl_credentials.id,
"provider": proxycurl_credentials.provider,
"type": proxycurl_credentials.type,
}
},
)
],
SmartDecisionMakerBlock: LLM_COST,
}

View File

@@ -1,5 +1,5 @@
from backend.app import run_processes
from backend.executor import DatabaseManager, ExecutionManager
from backend.executor import ExecutionManager
def main():

View File

@@ -169,6 +169,23 @@ zerobounce_credentials = APIKeyCredentials(
expires_at=None,
)
airtable_credentials = APIKeyCredentials(
id="b3c7f68f-bb6a-4995-99ec-b45b40d33499",
provider="airtable",
api_key=SecretStr(settings.secrets.airtable_api_key),
title="Use Credits for Airtable",
expires_at=None,
)
proxycurl_credentials = APIKeyCredentials(
id="d9fce73a-6c1d-4e8b-ba2e-12a456789def",
provider="proxycurl",
api_key=SecretStr(settings.secrets.proxycurl_api_key),
title="Use Credits for Proxycurl",
expires_at=None,
)
DEFAULT_CREDENTIALS = [
ollama_credentials,
revid_credentials,
@@ -181,11 +198,13 @@ DEFAULT_CREDENTIALS = [
jina_credentials,
unreal_credentials,
open_router_credentials,
proxycurl_credentials,
fal_credentials,
exa_credentials,
e2b_credentials,
mem0_credentials,
nvidia_credentials,
airtable_credentials,
screenshotone_credentials,
apollo_credentials,
smartlead_credentials,
@@ -225,6 +244,8 @@ class IntegrationCredentialsStore:
all_credentials.append(ollama_credentials)
# These will only be added if the API key is set
if settings.secrets.airtable_api_key:
all_credentials.append(airtable_credentials)
if settings.secrets.revid_api_key:
all_credentials.append(revid_credentials)
if settings.secrets.ideogram_api_key:
@@ -245,6 +266,8 @@ class IntegrationCredentialsStore:
all_credentials.append(unreal_credentials)
if settings.secrets.open_router_api_key:
all_credentials.append(open_router_credentials)
if settings.secrets.proxycurl_api_key:
all_credentials.append(proxycurl_credentials)
if settings.secrets.fal_api_key:
all_credentials.append(fal_credentials)
if settings.secrets.exa_api_key:

View File

@@ -6,6 +6,7 @@ class ProviderName(str, Enum):
ANTHROPIC = "anthropic"
APOLLO = "apollo"
COMPASS = "compass"
AIRTABLE = "airtable"
DISCORD = "discord"
D_ID = "d_id"
E2B = "e2b"
@@ -16,6 +17,7 @@ class ProviderName(str, Enum):
GOOGLE_MAPS = "google_maps"
GROQ = "groq"
HUBSPOT = "hubspot"
PROXYCURL = "proxycurl"
IDEOGRAM = "ideogram"
JINA = "jina"
LINEAR = "linear"

View File

@@ -1,5 +1,6 @@
from typing import TYPE_CHECKING
from .airtable import AirtableWebhookManager
from .compass import CompassWebhookManager
from .github import GithubWebhooksManager
from .slant3d import Slant3DWebhooksManager
@@ -15,6 +16,7 @@ WEBHOOK_MANAGERS_BY_NAME: dict["ProviderName", type["BaseWebhooksManager"]] = {
CompassWebhookManager,
GithubWebhooksManager,
Slant3DWebhooksManager,
AirtableWebhookManager,
]
}
# --8<-- [end:WEBHOOK_MANAGERS_BY_NAME]

View File

@@ -0,0 +1,120 @@
"""
Webhook manager for Airtable webhooks.
This module manages the registration and processing of webhooks from Airtable.
"""
import logging
from typing import Dict, Tuple
import requests
from fastapi import Request
from strenum import StrEnum
from backend.data import integrations
from backend.data.model import APIKeyCredentials, Credentials
from backend.integrations.providers import ProviderName
from ._manual_base import ManualWebhookManagerBase
logger = logging.getLogger(__name__)
class AirtableWebhookEventType(StrEnum):
"""Types of webhook events supported by Airtable."""
RECORDS_CREATED = "records:created"
RECORDS_UPDATED = "records:updated"
RECORDS_DELETED = "records:deleted"
class AirtableWebhookManager(ManualWebhookManagerBase):
"""Manager class for Airtable webhooks."""
# Provider name for this webhook manager
PROVIDER_NAME = ProviderName.AIRTABLE
# Define the webhook event types this manager can handle
WebhookEventType = AirtableWebhookEventType
# Airtable API URL for webhooks
BASE_URL = "https://api.airtable.com/v0"
@classmethod
async def validate_payload(
cls, webhook: integrations.Webhook, request: Request
) -> Tuple[Dict, str]:
"""
Validate the incoming webhook payload.
Args:
webhook: The webhook object from the database.
request: The incoming request containing the webhook payload.
Returns:
A tuple of (payload_dict, event_type)
"""
# Extract the JSON payload from the request
payload = await request.json()
# Determine the event type from the payload
event_type = payload.get("event", AirtableWebhookEventType.RECORDS_UPDATED)
return payload, event_type
async def _register_webhook(
self,
credentials: Credentials,
webhook_type: str,
resource: str,
events: list[str],
ingress_url: str,
secret: str,
) -> Tuple[str, Dict]:
"""
Register a webhook with Airtable.
Args:
credentials: The API credentials.
webhook_type: The type of webhook to register.
resource: The base ID to register webhooks for.
events: List of event types to listen for.
ingress_url: URL where webhook notifications should be sent.
secret: Secret for webhook security.
Returns:
Tuple of (webhook_id, webhook_config)
"""
if not isinstance(credentials, APIKeyCredentials):
raise ValueError("API key is required to register Airtable webhook")
headers = {
"Authorization": f"Bearer {credentials.api_key.get_secret_value()}",
"Content-Type": "application/json",
}
payload = {
"url": ingress_url,
"event": webhook_type, # Use the webhook_type as the event type
}
response = requests.post(
f"{self.BASE_URL}/bases/{resource}/webhooks",
headers=headers,
json=payload,
)
if not response.ok:
error = response.json().get("error", "Unknown error")
raise ValueError(f"Failed to register Airtable webhook: {error}")
webhook_data = response.json()
webhook_id = webhook_data.get("id", "")
webhook_config = {
"provider": self.PROVIDER_NAME,
"base_id": resource,
"event": webhook_type,
"url": ingress_url,
}
return webhook_id, webhook_config

View File

@@ -403,8 +403,10 @@ class Secrets(UpdateTrackingModel["Secrets"], BaseSettings):
apollo_api_key: str = Field(default="", description="Apollo API Key")
smartlead_api_key: str = Field(default="", description="SmartLead API Key")
zerobounce_api_key: str = Field(default="", description="ZeroBounce API Key")
proxycurl_api_key: str = Field(default="", description="Proxycurl API Key")
# Add more secret fields as needed
airtable_api_key: str = Field(default="", description="Airtable API Key")
model_config = SettingsConfigDict(
env_file=".env",

View File

@@ -55,6 +55,7 @@ export const providerIcons: Record<
CredentialsProviderName,
React.FC<{ className?: string }>
> = {
airtable: fallbackIcon,
anthropic: fallbackIcon,
apollo: fallbackIcon,
e2b: fallbackIcon,
@@ -83,6 +84,7 @@ export const providerIcons: Record<
reddit: fallbackIcon,
fal: fallbackIcon,
revid: fallbackIcon,
proxycurl: fallbackIcon,
twitter: FaTwitter,
unreal_speech: fallbackIcon,
exa: fallbackIcon,

View File

@@ -17,6 +17,7 @@ const CREDENTIALS_PROVIDER_NAMES = Object.values(
// --8<-- [start:CredentialsProviderNames]
const providerDisplayNames: Record<CredentialsProviderName, string> = {
airtable: "Airtable",
anthropic: "Anthropic",
apollo: "Apollo",
discord: "Discord",
@@ -46,6 +47,7 @@ const providerDisplayNames: Record<CredentialsProviderName, string> = {
smartlead: "SmartLead",
smtp: "SMTP",
reddit: "Reddit",
proxycurl: "Proxycurl",
replicate: "Replicate",
revid: "Rev.ID",
twitter: "Twitter",

View File

@@ -113,6 +113,7 @@ export type Credentials =
// --8<-- [start:BlockIOCredentialsSubSchema]
export const PROVIDER_NAMES = {
AIRTABLE: "airtable",
ANTHROPIC: "anthropic",
APOLLO: "apollo",
D_ID: "d_id",
@@ -135,6 +136,7 @@ export const PROVIDER_NAMES = {
OLLAMA: "ollama",
OPENAI: "openai",
OPENWEATHERMAP: "openweathermap",
PROXYCURL: "proxycurl",
OPEN_ROUTER: "open_router",
PINECONE: "pinecone",
SCREENSHOTONE: "screenshotone",