Compare commits

...

1 Commits

Author SHA1 Message Date
SwiftyOS
6a9a5b7161 Airtable integration 2025-03-21 13:23:06 +01:00
15 changed files with 1132 additions and 1 deletions

View File

@@ -174,6 +174,9 @@ EXA_API_KEY=
E2B_API_KEY=
# Mem0
# Airtable
AIRTABLE_API_KEY=
MEM0_API_KEY=
# Nvidia

View File

@@ -0,0 +1,440 @@
"""
API module for Airtable API integration.
This module provides a client for interacting with the Airtable API,
including methods for working with tables, fields, records, and webhooks.
"""
from json import JSONDecodeError
from typing import Any, Dict, List, Optional
from pydantic import BaseModel
from backend.data.model import APIKeyCredentials
from backend.util.request import Requests
class AirtableAPIException(Exception):
def __init__(self, message: str, status_code: int):
super().__init__(message)
self.status_code = status_code
# Response Models
class TableField(BaseModel):
id: str
name: str
type: str
options: Optional[Dict[str, Any]] = None
class Table(BaseModel):
id: str
name: str
description: Optional[str] = None
fields: List[TableField]
class Record(BaseModel):
id: str
fields: Dict[str, Any]
createdTime: Optional[str] = None
class RecordAttachment(BaseModel):
id: str
url: str
filename: str
size: Optional[int] = None
type: Optional[str] = None
class Webhook(BaseModel):
id: str
url: str
event: str
notification_url: Optional[str] = None
active: bool
class ListTablesResponse(BaseModel):
tables: List[Table]
class ListRecordsResponse(BaseModel):
records: List[Record]
offset: Optional[str] = None
class ListAttachmentsResponse(BaseModel):
attachments: List[RecordAttachment]
offset: Optional[str] = None
class ListWebhooksResponse(BaseModel):
webhooks: List[Webhook]
offset: Optional[str] = None
class AirtableClient:
"""Client for the Airtable API"""
API_BASE_URL = "https://api.airtable.com/v0"
def __init__(
self,
credentials: Optional[APIKeyCredentials] = None,
custom_requests: Optional[Requests] = None,
):
if custom_requests:
self._requests = custom_requests
else:
headers: dict[str, str] = {
"Content-Type": "application/json",
}
if credentials:
headers["Authorization"] = (
f"Bearer {credentials.api_key.get_secret_value()}"
)
self._requests = Requests(
extra_headers=headers,
raise_for_status=False,
)
@staticmethod
def _handle_response(response) -> Any:
"""
Handles API response and checks for errors.
Args:
response: The response object from the request.
Returns:
The parsed JSON response data.
Raises:
AirtableAPIException: If the API request fails.
"""
if not response.ok:
try:
error_data = response.json()
error_message = error_data.get("error", {}).get("message", "")
except JSONDecodeError:
error_message = response.text
raise AirtableAPIException(
f"Airtable API request failed ({response.status_code}): {error_message}",
response.status_code,
)
return response.json()
# Table Methods
def list_tables(self, base_id: str) -> ListTablesResponse:
"""
List all tables in a base.
Args:
base_id: The ID of the base to list tables from.
Returns:
ListTablesResponse: Object containing the list of tables.
Raises:
AirtableAPIException: If the API request fails.
"""
try:
response = self._requests.get(f"{self.API_BASE_URL}/bases/{base_id}/tables")
data = self._handle_response(response)
return ListTablesResponse(**data)
except Exception as e:
raise AirtableAPIException(f"Failed to list tables: {str(e)}", 500)
def get_table(self, base_id: str, table_id: str) -> Table:
"""
Get a specific table schema.
Args:
base_id: The ID of the base containing the table.
table_id: The ID of the table to retrieve.
Returns:
Table: The table object.
Raises:
AirtableAPIException: If the API request fails.
"""
try:
response = self._requests.get(
f"{self.API_BASE_URL}/bases/{base_id}/tables/{table_id}"
)
data = self._handle_response(response)
return Table(**data)
except Exception as e:
raise AirtableAPIException(f"Failed to get table: {str(e)}", 500)
def create_table(
self, base_id: str, name: str, description: str, fields: List[Dict[str, Any]]
) -> Table:
"""
Create a new table in a base.
Args:
base_id: The ID of the base to create the table in.
name: The name of the new table.
description: The description of the new table.
fields: The fields to create in the new table.
Returns:
Table: The created table object.
Raises:
AirtableAPIException: If the API request fails.
"""
try:
payload = {
"name": name,
"description": description,
"fields": fields,
}
response = self._requests.post(
f"{self.API_BASE_URL}/meta/bases/{base_id}/tables", json=payload
)
data = self._handle_response(response)
return Table(**data)
except Exception as e:
raise AirtableAPIException(f"Failed to create table: {str(e)}", 500)
# Field Methods
def list_fields(self, base_id: str, table_id: str) -> List[TableField]:
"""
List all fields in a table.
Args:
base_id: The ID of the base containing the table.
table_id: The ID of the table to list fields from.
Returns:
List[TableField]: List of field objects.
Raises:
AirtableAPIException: If the API request fails.
"""
try:
response = self._requests.get(
f"{self.API_BASE_URL}/bases/{base_id}/tables/{table_id}/fields"
)
data = self._handle_response(response)
return [TableField(**field) for field in data.get("fields", [])]
except Exception as e:
raise AirtableAPIException(f"Failed to list fields: {str(e)}", 500)
def get_field(self, base_id: str, table_id: str, field_id: str) -> TableField:
"""
Get a specific field.
Args:
base_id: The ID of the base containing the table.
table_id: The ID of the table containing the field.
field_id: The ID of the field to retrieve.
Returns:
TableField: The field object.
Raises:
AirtableAPIException: If the API request fails.
"""
try:
response = self._requests.get(
f"{self.API_BASE_URL}/bases/{base_id}/tables/{table_id}/fields/{field_id}"
)
data = self._handle_response(response)
return TableField(**data)
except Exception as e:
raise AirtableAPIException(f"Failed to get field: {str(e)}", 500)
def create_field(
self,
base_id: str,
table_id: str,
name: str,
field_type: str,
options: Optional[Dict[str, Any]] = None,
) -> TableField:
"""
Create a new field in a table.
Args:
base_id: The ID of the base containing the table.
table_id: The ID of the table to create the field in.
name: The name of the new field.
field_type: The type of the new field.
options: Optional field type options.
Returns:
TableField: The created field object.
Raises:
AirtableAPIException: If the API request fails.
"""
try:
payload = {
"name": name,
"type": field_type,
}
if options:
payload["options"] = options
response = self._requests.post(
f"{self.API_BASE_URL}/meta/bases/{base_id}/tables/{table_id}/fields",
json=payload,
)
data = self._handle_response(response)
return TableField(**data)
except Exception as e:
raise AirtableAPIException(f"Failed to create field: {str(e)}", 500)
# Record Methods
def list_records(
self,
base_id: str,
table_id: str,
filter_formula: Optional[str] = None,
offset: Optional[str] = None,
) -> ListRecordsResponse:
"""
List records in a table, with optional filtering.
Args:
base_id: The ID of the base containing the table.
table_id: The ID of the table to list records from.
filter_formula: Optional formula to filter records.
offset: Optional pagination offset.
Returns:
ListRecordsResponse: Object containing the list of records.
Raises:
AirtableAPIException: If the API request fails.
"""
try:
params = {}
if filter_formula:
params["filterByFormula"] = filter_formula
if offset:
params["offset"] = offset
response = self._requests.get(
f"{self.API_BASE_URL}/bases/{base_id}/tables/{table_id}/records",
params=params,
)
data = self._handle_response(response)
return ListRecordsResponse(**data)
except Exception as e:
raise AirtableAPIException(f"Failed to list records: {str(e)}", 500)
def get_record(self, base_id: str, table_id: str, record_id: str) -> Record:
"""
Get a specific record.
Args:
base_id: The ID of the base containing the table.
table_id: The ID of the table containing the record.
record_id: The ID of the record to retrieve.
Returns:
Record: The record object.
Raises:
AirtableAPIException: If the API request fails.
"""
try:
response = self._requests.get(
f"{self.API_BASE_URL}/bases/{base_id}/tables/{table_id}/records/{record_id}"
)
data = self._handle_response(response)
return Record(**data)
except Exception as e:
raise AirtableAPIException(f"Failed to get record: {str(e)}", 500)
def create_record(
self, base_id: str, table_id: str, fields: Dict[str, Any]
) -> Record:
"""
Create a new record in a table.
Args:
base_id: The ID of the base containing the table.
table_id: The ID of the table to create the record in.
fields: The field values for the new record.
Returns:
Record: The created record object.
Raises:
AirtableAPIException: If the API request fails.
"""
try:
payload = {"fields": fields}
response = self._requests.post(
f"{self.API_BASE_URL}/bases/{base_id}/tables/{table_id}/records",
json=payload,
)
data = self._handle_response(response)
return Record(**data)
except Exception as e:
raise AirtableAPIException(f"Failed to create record: {str(e)}", 500)
def update_record(
self, base_id: str, table_id: str, record_id: str, fields: Dict[str, Any]
) -> Record:
"""
Update a record in a table.
Args:
base_id: The ID of the base containing the table.
table_id: The ID of the table containing the record.
record_id: The ID of the record to update.
fields: The field values to update.
Returns:
Record: The updated record object.
Raises:
AirtableAPIException: If the API request fails.
"""
try:
payload = {"fields": fields}
response = self._requests.patch(
f"{self.API_BASE_URL}/bases/{base_id}/tables/{table_id}/records/{record_id}",
json=payload,
)
data = self._handle_response(response)
return Record(**data)
except Exception as e:
raise AirtableAPIException(f"Failed to update record: {str(e)}", 500)
def delete_record(self, base_id: str, table_id: str, record_id: str) -> bool:
"""
Delete a record from a table.
Args:
base_id: The ID of the base containing the table.
table_id: The ID of the table containing the record.
record_id: The ID of the record to delete.
Returns:
bool: True if the deletion was successful.
Raises:
AirtableAPIException: If the API request fails.
"""
try:
response = self._requests.delete(
f"{self.API_BASE_URL}/bases/{base_id}/tables/{table_id}/records/{record_id}"
)
self._handle_response(response)
return True
except Exception as e:
raise AirtableAPIException(f"Failed to delete record: {str(e)}", 500)

View File

@@ -0,0 +1,37 @@
"""
Authentication module for Airtable API integration.
This module provides credential types and test credentials for the Airtable API integration.
It defines the structure for API key credentials used to authenticate with the Airtable API
and provides mock credentials for testing purposes.
"""
from typing import Literal
from pydantic import SecretStr
from backend.data.model import APIKeyCredentials, CredentialsMetaInput
from backend.integrations.providers import ProviderName
# Define the type of credentials input expected for Airtable API
AirtableCredentialsInput = CredentialsMetaInput[
Literal[ProviderName.AIRTABLE], Literal["api_key"]
]
# Mock credentials for testing Airtable API integration
TEST_CREDENTIALS = APIKeyCredentials(
id="7a91c8f0-399f-4235-a79c-59c0e37454d5",
provider="airtable",
api_key=SecretStr("mock-airtable-api-key"),
title="Mock Airtable API key",
expires_at=None,
)
# Dictionary representation of test credentials for input fields
TEST_CREDENTIALS_INPUT = {
"provider": TEST_CREDENTIALS.provider,
"id": TEST_CREDENTIALS.id,
"type": TEST_CREDENTIALS.type,
"title": TEST_CREDENTIALS.title,
}

View File

@@ -0,0 +1,384 @@
"""
Airtable API integration blocks.
This module provides blocks for interacting with the Airtable API,
including operations for tables, fields, and records.
"""
import logging
from typing import Dict, List, Optional
from pydantic import BaseModel
from backend.data.block import Block, BlockCategory, BlockOutput, BlockSchema
from backend.data.model import APIKeyCredentials, CredentialsField, SchemaField
from ._api import AirtableAPIException, AirtableClient
from ._auth import TEST_CREDENTIALS, TEST_CREDENTIALS_INPUT, AirtableCredentialsInput
logger = logging.getLogger(__name__)
# Common response models
class AirtableTable(BaseModel):
id: str
name: str
description: Optional[str] = None
class AirtableField(BaseModel):
id: str
name: str
type: str
class AirtableRecord(BaseModel):
id: str
fields: Dict[str, any]
created_time: Optional[str] = None
class AirtableTablesBlock(Block):
"""Block for listing, getting, and creating tables in Airtable."""
class Input(BlockSchema):
base_id: str = SchemaField(
description="The ID of the Airtable base",
placeholder="appXXXXXXXXXXXXXX",
)
operation: str = SchemaField(
description="The operation to perform on tables",
placeholder="list",
choices=["list", "get", "create"],
)
table_id: Optional[str] = SchemaField(
description="The ID of the table (required for 'get' operation)",
placeholder="tblXXXXXXXXXXXXXX",
advanced=True,
)
table_name: Optional[str] = SchemaField(
description="The name of the new table (required for 'create' operation)",
placeholder="My New Table",
advanced=True,
)
table_description: Optional[str] = SchemaField(
description="The description of the new table (for 'create' operation)",
placeholder="Description of my table",
advanced=True,
)
fields: Optional[List[Dict[str, str]]] = SchemaField(
description="The fields to create in the new table (for 'create' operation)",
placeholder='[{"name": "Name", "type": "text"}]',
advanced=True,
)
credentials: AirtableCredentialsInput = CredentialsField(
description="The credentials for the Airtable API"
)
class Output(BlockSchema):
tables: Optional[List[AirtableTable]] = SchemaField(
description="List of tables in the base"
)
table: Optional[AirtableTable] = SchemaField(
description="The retrieved or created table"
)
error: Optional[str] = SchemaField(description="Error message if any")
def __init__(self):
super().__init__(
id="da53b48c-6e97-4c1c-afb9-4ecf10c81856",
description="List, get, or create tables in an Airtable base",
categories={BlockCategory.DATA},
input_schema=AirtableTablesBlock.Input,
output_schema=AirtableTablesBlock.Output,
test_input={
"base_id": "appXXXXXXXXXXXXXX",
"operation": "list",
"credentials": TEST_CREDENTIALS_INPUT,
},
test_output=[
("tables", [AirtableTable(id="tbl123", name="Example Table")])
],
test_mock={
"list_tables": lambda *args, **kwargs: {
"tables": [{"id": "tbl123", "name": "Example Table"}]
}
},
test_credentials=TEST_CREDENTIALS,
)
def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
"""
Perform operations on Airtable tables.
Args:
input_data: The input parameters for the block.
credentials: The Airtable API credentials.
Yields:
BlockOutput: The result of the table operation.
"""
try:
client = AirtableClient(credentials=credentials)
if input_data.operation == "list":
# List all tables in the base
response = client.list_tables(input_data.base_id)
tables = [
AirtableTable(
id=table.id, name=table.name, description=table.description
)
for table in response.tables
]
yield "tables", tables
elif input_data.operation == "get":
# Get a specific table
if not input_data.table_id:
yield "error", "Table ID is required for 'get' operation"
return
table = client.get_table(input_data.base_id, input_data.table_id)
yield "table", AirtableTable(
id=table.id, name=table.name, description=table.description
)
elif input_data.operation == "create":
# Create a new table
if not input_data.table_name:
yield "error", "Table name is required for 'create' operation"
return
if not input_data.fields or len(input_data.fields) == 0:
yield "error", "At least one field is required for 'create' operation"
return
table = client.create_table(
input_data.base_id,
input_data.table_name,
input_data.table_description or "",
input_data.fields,
)
yield "table", AirtableTable(
id=table.id, name=table.name, description=table.description
)
else:
yield "error", f"Unknown operation: {input_data.operation}"
except AirtableAPIException as e:
yield "error", f"Airtable API error: {str(e)}"
except Exception as e:
logger.exception("Error in AirtableTablesBlock")
yield "error", f"Error: {str(e)}"
class AirtableFieldsBlock(Block):
"""Block for listing, getting, and creating fields in Airtable tables."""
class Input(BlockSchema):
base_id: str = SchemaField(
description="The ID of the Airtable base",
placeholder="appXXXXXXXXXXXXXX",
)
table_id: str = SchemaField(
description="The ID of the table",
placeholder="tblXXXXXXXXXXXXXX",
)
operation: str = SchemaField(
description="The operation to perform on fields",
placeholder="list",
choices=["list", "get", "create"],
)
field_id: Optional[str] = SchemaField(
description="The ID of the field (required for 'get' operation)",
placeholder="fldXXXXXXXXXXXXXX",
advanced=True,
)
field_name: Optional[str] = SchemaField(
description="The name of the new field (required for 'create' operation)",
placeholder="My New Field",
advanced=True,
)
field_type: Optional[str] = SchemaField(
description="The type of the new field (required for 'create' operation)",
placeholder="text",
advanced=True,
choices=[
"text",
"number",
"checkbox",
"singleSelect",
"multipleSelects",
"date",
"dateTime",
"attachment",
"link",
"multipleRecordLinks",
"formula",
"rollup",
"count",
"lookup",
"currency",
"percent",
"duration",
"rating",
"richText",
"barcode",
"button",
],
)
credentials: AirtableCredentialsInput = CredentialsField(
description="The credentials for the Airtable API"
)
class Output(BlockSchema):
fields: Optional[List[AirtableField]] = SchemaField(
description="List of fields in the table"
)
field: Optional[AirtableField] = SchemaField(
description="The retrieved or created field"
)
error: Optional[str] = SchemaField(description="Error message if any")
def __init__(self):
super().__init__(
id="c27a6a11-8c09-4f8c-afeb-82c7a0c81857",
description="List, get, or create fields in an Airtable table",
categories={BlockCategory.DATA},
input_schema=AirtableFieldsBlock.Input,
output_schema=AirtableFieldsBlock.Output,
test_input={
"base_id": "appXXXXXXXXXXXXXX",
"table_id": "tblXXXXXXXXXXXXXX",
"operation": "list",
"credentials": TEST_CREDENTIALS_INPUT,
},
test_output=[
("fields", [AirtableField(id="fld123", name="Name", type="text")])
],
test_mock={
"list_fields": lambda *args, **kwargs: [
{"id": "fld123", "name": "Name", "type": "text"}
]
},
test_credentials=TEST_CREDENTIALS,
)
def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
"""
Perform operations on Airtable fields.
Args:
input_data: The input parameters for the block.
credentials: The Airtable API credentials.
Yields:
BlockOutput: The result of the field operation.
"""
try:
client = AirtableClient(credentials=credentials)
if input_data.operation == "list":
# List all fields in the table
fields_list = client.list_fields(
input_data.base_id, input_data.table_id
)
fields = [
AirtableField(id=field.id, name=field.name, type=field.type)
for field in fields_list
]
yield "fields", fields
elif input_data.operation == "get":
# Get a specific field
if not input_data.field_id:
yield "error", "Field ID is required for 'get' operation"
return
field = client.get_field(
input_data.base_id, input_data.table_id, input_data.field_id
)
yield "field", AirtableField(
id=field.id, name=field.name, type=field.type
)
elif input_data.operation == "create":
# Create a new field
if not input_data.field_name:
yield "error", "Field name is required for 'create' operation"
return
if not input_data.field_type:
yield "error", "Field type is required for 'create' operation"
return
field = client.create_field(
input_data.base_id,
input_data.table_id,
input_data.field_name,
input_data.field_type,
)
yield "field", AirtableField(
id=field.id, name=field.name, type=field.type
)
else:
yield "error", f"Unknown operation: {input_data.operation}"
except AirtableAPIException as e:
yield "error", f"Airtable API error: {str(e)}"
except Exception as e:
logger.exception("Error in AirtableFieldsBlock")
yield "error", f"Error: {str(e)}"
class AirtableRecordsBlock(Block):
"""Block for creating, reading, updating, and deleting records in Airtable."""
class Input(BlockSchema):
base_id: str = SchemaField(
description="The ID of the Airtable base",
placeholder="appXXXXXXXXXXXXXX",
)
table_id: str = SchemaField(
description="The ID of the table",
placeholder="tblXXXXXXXXXXXXXX",
)
operation: str = SchemaField(
description="The operation to perform on records",
placeholder="list",
choices=["list", "get", "create", "update", "delete"],
)
record_id: Optional[str] = SchemaField(
description="The ID of the record (required for 'get', 'update', and 'delete' operations)",
placeholder="recXXXXXXXXXXXXXX",
advanced=True,
)
filter_formula: Optional[str] = SchemaField(
description="Filter formula for listing records (optional for 'list' operation)",
placeholder="{Field}='Value'",
advanced=True,
)
fields: Optional[Dict[str, any]] = SchemaField(
description="The field values (required for 'create' and 'update' operations)",
placeholder='{"Name": "John Doe", "Email": "john@example.com"}',
advanced=True,
)
credentials: AirtableCredentialsInput = CredentialsField(
description="The credentials for the Airtable API"
)
class Output(BlockSchema):
records: Optional[List[AirtableRecord]] = SchemaField(
description="List of records in the table"
)
record: Optional[AirtableRecord] = SchemaField(
description="The retrieved, created, or updated record"
)
success: Optional[bool] = SchemaField(
description="Success status for delete operation"
)
error: Optional[str] = SchemaField(description="Error message if any")

View File

@@ -0,0 +1,87 @@
"""
Module for Airtable webhook triggers.
This module provides trigger blocks that respond to Airtable webhook events.
"""
import logging
from typing import Dict
from strenum import StrEnum
from backend.data.block import (
Block,
BlockCategory,
BlockManualWebhookConfig,
BlockOutput,
BlockSchema,
)
from backend.data.model import SchemaField
logger = logging.getLogger(__name__)
class AirtableWebhookEventType(StrEnum):
"""Types of webhook events supported by Airtable."""
RECORDS_CREATED = "records:created"
RECORDS_UPDATED = "records:updated"
RECORDS_DELETED = "records:deleted"
class AirtableWebhookTriggerBlock(Block):
"""
A trigger block that responds to Airtable webhook events.
This block is activated when a webhook event is received from Airtable.
"""
class Input(BlockSchema):
# The payload field is hidden because it's automatically populated by the webhook system
payload: Dict = SchemaField(hidden=True)
class Output(BlockSchema):
event_data: Dict = SchemaField(
description="The contents of the Airtable webhook event."
)
base_id: str = SchemaField(description="The ID of the Airtable base.")
table_id: str = SchemaField(description="The ID of the Airtable table.")
event_type: str = SchemaField(description="The type of event that occurred.")
def __init__(self):
super().__init__(
id="8c3b52d1-f7e9-4c5d-a6f1-60e937d94d2a",
description="This block will output the contents of an Airtable webhook event.",
categories={BlockCategory.DATA},
input_schema=AirtableWebhookTriggerBlock.Input,
output_schema=AirtableWebhookTriggerBlock.Output,
webhook_config=BlockManualWebhookConfig(
provider="airtable",
webhook_type=AirtableWebhookEventType.RECORDS_UPDATED,
),
test_input=[
{
"payload": {
"baseId": "app123",
"tableId": "tbl456",
"event": "records:updated",
"data": {},
}
}
],
test_output=[
(
"event_data",
{
"baseId": "app123",
"tableId": "tbl456",
"event": "records:updated",
"data": {},
},
)
],
)
def run(self, input_data: Input, **kwargs) -> BlockOutput:
"""Process the Airtable webhook event and yield its contents."""
logger.info("Airtable webhook trigger received payload: %s", input_data.payload)
yield "event_data", input_data.payload

View File

@@ -2,6 +2,11 @@ from typing import Type
from backend.blocks.ai_music_generator import AIMusicGeneratorBlock
from backend.blocks.ai_shortform_video_block import AIShortformVideoCreatorBlock
from backend.blocks.airtable.airtable import (
AirtableFieldsBlock,
AirtableRecordsBlock,
AirtableTablesBlock,
)
from backend.blocks.ideogram import IdeogramModelBlock
from backend.blocks.jina.embeddings import JinaEmbeddingBlock
from backend.blocks.jina.search import ExtractWebsiteContentBlock, SearchTheWebBlock
@@ -21,6 +26,7 @@ from backend.blocks.text_to_speech_block import UnrealTextToSpeechBlock
from backend.data.block import Block
from backend.data.cost import BlockCost, BlockCostType
from backend.integrations.credentials_store import (
airtable_credentials,
anthropic_credentials,
did_credentials,
groq_credentials,
@@ -266,5 +272,41 @@ BLOCK_COSTS: dict[Type[Block], list[BlockCost]] = {
},
)
],
AirtableTablesBlock: [
BlockCost(
cost_amount=1,
cost_filter={
"credentials": {
"id": airtable_credentials.id,
"provider": airtable_credentials.provider,
"type": airtable_credentials.type,
}
},
)
],
AirtableFieldsBlock: [
BlockCost(
cost_amount=1,
cost_filter={
"credentials": {
"id": airtable_credentials.id,
"provider": airtable_credentials.provider,
"type": airtable_credentials.type,
}
},
)
],
AirtableRecordsBlock: [
BlockCost(
cost_amount=1,
cost_filter={
"credentials": {
"id": airtable_credentials.id,
"provider": airtable_credentials.provider,
"type": airtable_credentials.type,
}
},
)
],
SmartDecisionMakerBlock: LLM_COST,
}

View File

@@ -1,5 +1,5 @@
from backend.app import run_processes
from backend.executor import DatabaseManager, ExecutionManager
from backend.executor import ExecutionManager
def main():

View File

@@ -169,6 +169,14 @@ zerobounce_credentials = APIKeyCredentials(
expires_at=None,
)
airtable_credentials = APIKeyCredentials(
id="b3c7f68f-bb6a-4995-99ec-b45b40d33499",
provider="airtable",
api_key=SecretStr(settings.secrets.airtable_api_key),
title="Use Credits for Airtable",
expires_at=None,
)
DEFAULT_CREDENTIALS = [
ollama_credentials,
revid_credentials,
@@ -186,6 +194,7 @@ DEFAULT_CREDENTIALS = [
e2b_credentials,
mem0_credentials,
nvidia_credentials,
airtable_credentials,
screenshotone_credentials,
apollo_credentials,
smartlead_credentials,
@@ -225,6 +234,8 @@ class IntegrationCredentialsStore:
all_credentials.append(ollama_credentials)
# These will only be added if the API key is set
if settings.secrets.airtable_api_key:
all_credentials.append(airtable_credentials)
if settings.secrets.revid_api_key:
all_credentials.append(revid_credentials)
if settings.secrets.ideogram_api_key:

View File

@@ -6,6 +6,7 @@ class ProviderName(str, Enum):
ANTHROPIC = "anthropic"
APOLLO = "apollo"
COMPASS = "compass"
AIRTABLE = "airtable"
DISCORD = "discord"
D_ID = "d_id"
E2B = "e2b"

View File

@@ -1,5 +1,6 @@
from typing import TYPE_CHECKING
from .airtable import AirtableWebhookManager
from .compass import CompassWebhookManager
from .github import GithubWebhooksManager
from .slant3d import Slant3DWebhooksManager
@@ -15,6 +16,7 @@ WEBHOOK_MANAGERS_BY_NAME: dict["ProviderName", type["BaseWebhooksManager"]] = {
CompassWebhookManager,
GithubWebhooksManager,
Slant3DWebhooksManager,
AirtableWebhookManager,
]
}
# --8<-- [end:WEBHOOK_MANAGERS_BY_NAME]

View File

@@ -0,0 +1,120 @@
"""
Webhook manager for Airtable webhooks.
This module manages the registration and processing of webhooks from Airtable.
"""
import logging
from typing import Dict, Tuple
import requests
from fastapi import Request
from strenum import StrEnum
from backend.data import integrations
from backend.data.model import APIKeyCredentials, Credentials
from backend.integrations.providers import ProviderName
from ._manual_base import ManualWebhookManagerBase
logger = logging.getLogger(__name__)
class AirtableWebhookEventType(StrEnum):
"""Types of webhook events supported by Airtable."""
RECORDS_CREATED = "records:created"
RECORDS_UPDATED = "records:updated"
RECORDS_DELETED = "records:deleted"
class AirtableWebhookManager(ManualWebhookManagerBase):
"""Manager class for Airtable webhooks."""
# Provider name for this webhook manager
PROVIDER_NAME = ProviderName.AIRTABLE
# Define the webhook event types this manager can handle
WebhookEventType = AirtableWebhookEventType
# Airtable API URL for webhooks
BASE_URL = "https://api.airtable.com/v0"
@classmethod
async def validate_payload(
cls, webhook: integrations.Webhook, request: Request
) -> Tuple[Dict, str]:
"""
Validate the incoming webhook payload.
Args:
webhook: The webhook object from the database.
request: The incoming request containing the webhook payload.
Returns:
A tuple of (payload_dict, event_type)
"""
# Extract the JSON payload from the request
payload = await request.json()
# Determine the event type from the payload
event_type = payload.get("event", AirtableWebhookEventType.RECORDS_UPDATED)
return payload, event_type
async def _register_webhook(
self,
credentials: Credentials,
webhook_type: str,
resource: str,
events: list[str],
ingress_url: str,
secret: str,
) -> Tuple[str, Dict]:
"""
Register a webhook with Airtable.
Args:
credentials: The API credentials.
webhook_type: The type of webhook to register.
resource: The base ID to register webhooks for.
events: List of event types to listen for.
ingress_url: URL where webhook notifications should be sent.
secret: Secret for webhook security.
Returns:
Tuple of (webhook_id, webhook_config)
"""
if not isinstance(credentials, APIKeyCredentials):
raise ValueError("API key is required to register Airtable webhook")
headers = {
"Authorization": f"Bearer {credentials.api_key.get_secret_value()}",
"Content-Type": "application/json",
}
payload = {
"url": ingress_url,
"event": webhook_type, # Use the webhook_type as the event type
}
response = requests.post(
f"{self.BASE_URL}/bases/{resource}/webhooks",
headers=headers,
json=payload,
)
if not response.ok:
error = response.json().get("error", "Unknown error")
raise ValueError(f"Failed to register Airtable webhook: {error}")
webhook_data = response.json()
webhook_id = webhook_data.get("id", "")
webhook_config = {
"provider": self.PROVIDER_NAME,
"base_id": resource,
"event": webhook_type,
"url": ingress_url,
}
return webhook_id, webhook_config

View File

@@ -405,6 +405,7 @@ class Secrets(UpdateTrackingModel["Secrets"], BaseSettings):
zerobounce_api_key: str = Field(default="", description="ZeroBounce API Key")
# Add more secret fields as needed
airtable_api_key: str = Field(default="", description="Airtable API Key")
model_config = SettingsConfigDict(
env_file=".env",

View File

@@ -55,6 +55,7 @@ export const providerIcons: Record<
CredentialsProviderName,
React.FC<{ className?: string }>
> = {
airtable: fallbackIcon,
anthropic: fallbackIcon,
apollo: fallbackIcon,
e2b: fallbackIcon,

View File

@@ -17,6 +17,7 @@ const CREDENTIALS_PROVIDER_NAMES = Object.values(
// --8<-- [start:CredentialsProviderNames]
const providerDisplayNames: Record<CredentialsProviderName, string> = {
airtable: "Airtable",
anthropic: "Anthropic",
apollo: "Apollo",
discord: "Discord",

View File

@@ -113,6 +113,7 @@ export type Credentials =
// --8<-- [start:BlockIOCredentialsSubSchema]
export const PROVIDER_NAMES = {
AIRTABLE: "airtable",
ANTHROPIC: "anthropic",
APOLLO: "apollo",
D_ID: "d_id",