loads of stuff

This commit is contained in:
SwiftyOS
2025-07-08 10:08:10 +02:00
parent 3fbd3d79af
commit 39758a7ee0
40 changed files with 10687 additions and 3 deletions

3
.gitignore vendored
View File

@@ -177,3 +177,6 @@ autogpt_platform/backend/settings.py
*.ign.*
.test-contents
.claude/settings.local.json
api.md
blocks.md

View File

@@ -207,3 +207,27 @@ To maintain a uniform standard and ensure seamless compatibility with many curre
<a href="https://github.com/Significant-Gravitas/AutoGPT/graphs/contributors" alt="View Contributors">
<img src="https://contrib.rocks/image?repo=Significant-Gravitas/AutoGPT&max=1000&columns=10" alt="Contributors" />
</a>
https://linear.app/oauth/authorize?client_id=698520b201f8b6ef229a88c474ce0dcb&redirect_uri=https%3A%2F%2Fdev-server.agpt.co%2Fauth%2Fintegrations%2Foauth_callback&response_type=code&scope=read%2Cissues%3Acreate&state=nJC6COTwdKzSuOORF4YnGNJWlLe3uTBtzyJVoDtYhwE
https://73df-79-150-4-8.ngrok-free.app
agent id f45171bc-0087-45e1-aacc-e2e8b4c98372
curl -X POST http://localhost:3000/api/integrations/generic_webhook/webhooks/043b8670-d006-4721-a545-89671543e36f/ingress \
-H "Content-Type: application/json" \
-d '{
"message": "Test webhook message",
"timestamp": "2024-01-01T12:00:00Z",
"data": {
"key1": "value1",
"key2": "value2",
"nested": {
"field": "test"
}
}
}'

View File

@@ -0,0 +1,84 @@
"""
Airtable integration for AutoGPT Platform.
This integration provides comprehensive access to the Airtable Web API,
including:
- Webhook triggers and management
- Record CRUD operations
- Attachment uploads
- Schema and table management
- Metadata operations
"""
# Attachments
from .attachments import AirtableUploadAttachmentBlock
# Metadata
from .metadata import (
AirtableGetViewBlock,
AirtableListBasesBlock,
AirtableListViewsBlock,
)
# Record Operations
from .records import (
AirtableCreateRecordsBlock,
AirtableDeleteRecordsBlock,
AirtableGetRecordBlock,
AirtableListRecordsBlock,
AirtableUpdateRecordsBlock,
AirtableUpsertRecordsBlock,
)
# Schema & Table Management
from .schema import (
AirtableAddFieldBlock,
AirtableCreateTableBlock,
AirtableDeleteFieldBlock,
AirtableDeleteTableBlock,
AirtableListSchemaBlock,
AirtableUpdateFieldBlock,
AirtableUpdateTableBlock,
)
# Webhook Triggers
from .triggers import AirtableWebhookTriggerBlock
# Webhook Management
from .webhooks import (
AirtableCreateWebhookBlock,
AirtableDeleteWebhookBlock,
AirtableFetchWebhookPayloadsBlock,
AirtableRefreshWebhookBlock,
)
__all__ = [
# Webhook Triggers
"AirtableWebhookTriggerBlock",
# Webhook Management
"AirtableCreateWebhookBlock",
"AirtableDeleteWebhookBlock",
"AirtableFetchWebhookPayloadsBlock",
"AirtableRefreshWebhookBlock",
# Record Operations
"AirtableCreateRecordsBlock",
"AirtableDeleteRecordsBlock",
"AirtableGetRecordBlock",
"AirtableListRecordsBlock",
"AirtableUpdateRecordsBlock",
"AirtableUpsertRecordsBlock",
# Attachments
"AirtableUploadAttachmentBlock",
# Schema & Table Management
"AirtableAddFieldBlock",
"AirtableCreateTableBlock",
"AirtableDeleteFieldBlock",
"AirtableDeleteTableBlock",
"AirtableListSchemaBlock",
"AirtableUpdateFieldBlock",
"AirtableUpdateTableBlock",
# Metadata
"AirtableGetViewBlock",
"AirtableListBasesBlock",
"AirtableListViewsBlock",
]

View File

@@ -0,0 +1,16 @@
"""
Shared configuration for all Airtable blocks using the SDK pattern.
"""
from backend.sdk import BlockCostType, ProviderBuilder
from ._webhook import AirtableWebhookManager
# Configure the Airtable provider with API key authentication
airtable = (
ProviderBuilder("airtable")
.with_api_key("AIRTABLE_API_KEY", "Airtable Personal Access Token")
.with_webhook_manager(AirtableWebhookManager)
.with_base_cost(1, BlockCostType.RUN)
.build()
)

View File

@@ -0,0 +1,125 @@
"""
Webhook management for Airtable blocks.
"""
import hashlib
import hmac
from enum import Enum
from typing import Tuple
from backend.sdk import (
APIKeyCredentials,
BaseWebhooksManager,
Credentials,
ProviderName,
Requests,
Webhook,
)
class AirtableWebhookManager(BaseWebhooksManager):
"""Webhook manager for Airtable API."""
PROVIDER_NAME = ProviderName("airtable")
class WebhookType(str, Enum):
TABLE_CHANGE = "table_change"
@classmethod
async def validate_payload(cls, webhook: Webhook, request) -> Tuple[dict, str]:
"""Validate incoming webhook payload and signature."""
payload = await request.json()
# Verify webhook signature using HMAC-SHA256
if webhook.secret:
mac_secret = webhook.config.get("mac_secret")
if mac_secret:
# Get the raw body for signature verification
body = await request.body()
# Calculate expected signature
expected_mac = hmac.new(
mac_secret.encode(), body, hashlib.sha256
).hexdigest()
# Get signature from headers
signature = request.headers.get("X-Airtable-Content-MAC")
if signature and not hmac.compare_digest(signature, expected_mac):
raise ValueError("Invalid webhook signature")
# Airtable sends the cursor in the payload
event_type = "notification"
return payload, event_type
async def _register_webhook(
self,
credentials: Credentials,
webhook_type: str,
resource: str,
events: list[str],
ingress_url: str,
secret: str,
) -> Tuple[str, dict]:
"""Register webhook with Airtable API."""
if not isinstance(credentials, APIKeyCredentials):
raise ValueError("Airtable webhooks require API key credentials")
api_key = credentials.api_key.get_secret_value()
# Parse resource to get base_id and table_id/name
# Resource format: "{base_id}/{table_id_or_name}"
parts = resource.split("/", 1)
if len(parts) != 2:
raise ValueError("Resource must be in format: {base_id}/{table_id_or_name}")
base_id, table_id_or_name = parts
# Prepare webhook specification
specification = {
"filters": {
"dataTypes": events or ["tableData", "tableFields", "tableMetadata"]
}
}
# If specific table is provided, add to specification
if table_id_or_name and table_id_or_name != "*":
specification["filters"]["recordChangeScope"] = [table_id_or_name]
# Create webhook
response = await Requests().post(
f"https://api.airtable.com/v0/bases/{base_id}/webhooks",
headers={"Authorization": f"Bearer {api_key}"},
json={"notificationUrl": ingress_url, "specification": specification},
)
webhook_data = response.json()
webhook_id = webhook_data["id"]
mac_secret = webhook_data.get("macSecretBase64")
return webhook_id, {
"base_id": base_id,
"table_id_or_name": table_id_or_name,
"events": events,
"mac_secret": mac_secret,
"cursor": 1, # Start from cursor 1
"expiration_time": webhook_data.get("expirationTime"),
}
async def _deregister_webhook(
self, webhook: Webhook, credentials: Credentials
) -> None:
"""Deregister webhook from Airtable API."""
if not isinstance(credentials, APIKeyCredentials):
raise ValueError("Airtable webhooks require API key credentials")
api_key = credentials.api_key.get_secret_value()
base_id = webhook.config.get("base_id")
if not base_id:
raise ValueError("Missing base_id in webhook metadata")
await Requests().delete(
f"https://api.airtable.com/v0/bases/{base_id}/webhooks/{webhook.provider_webhook_id}",
headers={"Authorization": f"Bearer {api_key}"},
)

View File

@@ -0,0 +1,98 @@
"""
Airtable attachment blocks.
"""
import base64
from typing import Union
from backend.sdk import (
APIKeyCredentials,
Block,
BlockCategory,
BlockOutput,
BlockSchema,
CredentialsMetaInput,
Requests,
SchemaField,
)
from ._config import airtable
class AirtableUploadAttachmentBlock(Block):
"""
Uploads a file to Airtable for use as an attachment.
Files can be uploaded directly (up to 5MB) or via URL.
The returned attachment ID can be used when creating or updating records.
"""
class Input(BlockSchema):
credentials: CredentialsMetaInput = airtable.credentials_field(
description="Airtable API credentials"
)
base_id: str = SchemaField(description="The Airtable base ID", default="")
filename: str = SchemaField(description="Name of the file")
file: Union[bytes, str] = SchemaField(
description="File content (binary data or base64 string)"
)
content_type: str = SchemaField(
description="MIME type of the file", default="application/octet-stream"
)
class Output(BlockSchema):
attachment: dict = SchemaField(
description="Attachment object with id, url, size, and type"
)
attachment_id: str = SchemaField(description="ID of the uploaded attachment")
url: str = SchemaField(description="URL of the uploaded attachment")
size: int = SchemaField(description="Size of the file in bytes")
def __init__(self):
super().__init__(
id="962e801b-5a6f-4c56-a929-83e816343a41",
description="Upload a file to Airtable for use as an attachment",
categories={BlockCategory.DATA},
input_schema=self.Input,
output_schema=self.Output,
)
async def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
api_key = credentials.api_key.get_secret_value()
# Convert file to base64 if it's bytes
if isinstance(input_data.file, bytes):
file_data = base64.b64encode(input_data.file).decode("utf-8")
else:
# Assume it's already base64 encoded
file_data = input_data.file
# Check file size (5MB limit)
file_bytes = base64.b64decode(file_data)
if len(file_bytes) > 5 * 1024 * 1024:
raise ValueError(
"File size exceeds 5MB limit. Use URL upload for larger files."
)
# Upload the attachment
response = await Requests().post(
f"https://api.airtable.com/v0/bases/{input_data.base_id}/attachments/upload",
headers={
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
},
json={
"content": file_data,
"filename": input_data.filename,
"type": input_data.content_type,
},
)
attachment_data = response.json()
yield "attachment", attachment_data
yield "attachment_id", attachment_data.get("id", "")
yield "url", attachment_data.get("url", "")
yield "size", attachment_data.get("size", 0)

View File

@@ -0,0 +1,145 @@
"""
Airtable metadata blocks for bases and views.
"""
from backend.sdk import (
APIKeyCredentials,
Block,
BlockCategory,
BlockOutput,
BlockSchema,
CredentialsMetaInput,
Requests,
SchemaField,
)
from ._config import airtable
class AirtableListBasesBlock(Block):
"""
Lists all Airtable bases accessible by the API token.
"""
class Input(BlockSchema):
credentials: CredentialsMetaInput = airtable.credentials_field(
description="Airtable API credentials"
)
class Output(BlockSchema):
bases: list[dict] = SchemaField(
description="Array of base objects with id and name"
)
def __init__(self):
super().__init__(
id="613f9907-bef8-468a-be6d-2dd7a53f96e7",
description="List all accessible Airtable bases",
categories={BlockCategory.SEARCH},
input_schema=self.Input,
output_schema=self.Output,
)
async def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
api_key = credentials.api_key.get_secret_value()
# List bases
response = await Requests().get(
"https://api.airtable.com/v0/meta/bases",
headers={"Authorization": f"Bearer {api_key}"},
)
data = response.json()
yield "bases", data.get("bases", [])
class AirtableListViewsBlock(Block):
"""
Lists all views in an Airtable base with their associated tables.
"""
class Input(BlockSchema):
credentials: CredentialsMetaInput = airtable.credentials_field(
description="Airtable API credentials"
)
base_id: str = SchemaField(description="The Airtable base ID", default="")
class Output(BlockSchema):
views: list[dict] = SchemaField(
description="Array of view objects with tableId"
)
def __init__(self):
super().__init__(
id="3878cf82-d384-40c2-aace-097042233f6a",
description="List all views in an Airtable base",
categories={BlockCategory.SEARCH},
input_schema=self.Input,
output_schema=self.Output,
)
async def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
api_key = credentials.api_key.get_secret_value()
# Get base schema which includes views
response = await Requests().get(
f"https://api.airtable.com/v0/meta/bases/{input_data.base_id}/tables",
headers={"Authorization": f"Bearer {api_key}"},
)
data = response.json()
# Extract all views from all tables
all_views = []
for table in data.get("tables", []):
table_id = table.get("id")
for view in table.get("views", []):
view_with_table = {**view, "tableId": table_id}
all_views.append(view_with_table)
yield "views", all_views
class AirtableGetViewBlock(Block):
"""
Gets detailed information about a specific view in an Airtable base.
"""
class Input(BlockSchema):
credentials: CredentialsMetaInput = airtable.credentials_field(
description="Airtable API credentials"
)
base_id: str = SchemaField(description="The Airtable base ID", default="")
view_id: str = SchemaField(description="The view ID to retrieve")
class Output(BlockSchema):
view: dict = SchemaField(description="Full view object with configuration")
def __init__(self):
super().__init__(
id="ad0dd9f3-b3f4-446b-8142-e81a566797c4",
description="Get details of a specific Airtable view",
categories={BlockCategory.SEARCH},
input_schema=self.Input,
output_schema=self.Output,
)
async def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
api_key = credentials.api_key.get_secret_value()
# Get specific view
response = await Requests().get(
f"https://api.airtable.com/v0/meta/bases/{input_data.base_id}/views/{input_data.view_id}",
headers={"Authorization": f"Bearer {api_key}"},
)
view_data = response.json()
yield "view", view_data

View File

@@ -0,0 +1,395 @@
"""
Airtable record operation blocks.
"""
from typing import Optional
from backend.sdk import (
APIKeyCredentials,
Block,
BlockCategory,
BlockOutput,
BlockSchema,
CredentialsMetaInput,
Requests,
SchemaField,
)
from ._config import airtable
class AirtableListRecordsBlock(Block):
"""
Lists records from an Airtable table with optional filtering, sorting, and pagination.
"""
class Input(BlockSchema):
credentials: CredentialsMetaInput = airtable.credentials_field(
description="Airtable API credentials"
)
base_id: str = SchemaField(description="The Airtable base ID", default="")
table_id_or_name: str = SchemaField(description="Table ID or name", default="")
filter_formula: str = SchemaField(
description="Airtable formula to filter records", default=""
)
view: str = SchemaField(description="View ID or name to use", default="")
sort: list[dict] = SchemaField(
description="Sort configuration (array of {field, direction})", default=[]
)
max_records: int = SchemaField(
description="Maximum number of records to return", default=100
)
page_size: int = SchemaField(
description="Number of records per page (max 100)", default=100
)
offset: str = SchemaField(
description="Pagination offset from previous request", default=""
)
return_fields: list[str] = SchemaField(
description="Specific fields to return (comma-separated)", default=[]
)
class Output(BlockSchema):
records: list[dict] = SchemaField(description="Array of record objects")
offset: Optional[str] = SchemaField(
description="Offset for next page (null if no more records)", default=None
)
def __init__(self):
super().__init__(
id="588a9fde-5733-4da7-b03c-35f5671e960f",
description="List records from an Airtable table",
categories={BlockCategory.SEARCH},
input_schema=self.Input,
output_schema=self.Output,
)
async def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
api_key = credentials.api_key.get_secret_value()
# Build query parameters
params = {}
if input_data.filter_formula:
params["filterByFormula"] = input_data.filter_formula
if input_data.view:
params["view"] = input_data.view
if input_data.sort:
for i, sort_config in enumerate(input_data.sort):
params[f"sort[{i}][field]"] = sort_config.get("field", "")
params[f"sort[{i}][direction]"] = sort_config.get("direction", "asc")
if input_data.max_records:
params["maxRecords"] = input_data.max_records
if input_data.page_size:
params["pageSize"] = min(input_data.page_size, 100)
if input_data.offset:
params["offset"] = input_data.offset
if input_data.return_fields:
for i, field in enumerate(input_data.return_fields):
params[f"fields[{i}]"] = field
# Make request
response = await Requests().get(
f"https://api.airtable.com/v0/{input_data.base_id}/{input_data.table_id_or_name}",
headers={"Authorization": f"Bearer {api_key}"},
params=params,
)
data = response.json()
yield "records", data.get("records", [])
yield "offset", data.get("offset", None)
class AirtableGetRecordBlock(Block):
"""
Retrieves a single record from an Airtable table by its ID.
"""
class Input(BlockSchema):
credentials: CredentialsMetaInput = airtable.credentials_field(
description="Airtable API credentials"
)
base_id: str = SchemaField(description="The Airtable base ID", default="")
table_id_or_name: str = SchemaField(description="Table ID or name", default="")
record_id: str = SchemaField(description="The record ID to retrieve")
return_fields: list[str] = SchemaField(
description="Specific fields to return", default=[]
)
class Output(BlockSchema):
record: dict = SchemaField(description="The record object")
def __init__(self):
super().__init__(
id="c29c5cbf-0aff-40f9-bbb5-f26061792d2b",
description="Get a single record from Airtable",
categories={BlockCategory.SEARCH},
input_schema=self.Input,
output_schema=self.Output,
)
async def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
api_key = credentials.api_key.get_secret_value()
# Build query parameters
params = {}
if input_data.return_fields:
for i, field in enumerate(input_data.return_fields):
params[f"fields[{i}]"] = field
# Make request
response = await Requests().get(
f"https://api.airtable.com/v0/{input_data.base_id}/{input_data.table_id_or_name}/{input_data.record_id}",
headers={"Authorization": f"Bearer {api_key}"},
params=params,
)
record = response.json()
yield "record", record
class AirtableCreateRecordsBlock(Block):
"""
Creates one or more records in an Airtable table.
"""
class Input(BlockSchema):
credentials: CredentialsMetaInput = airtable.credentials_field(
description="Airtable API credentials"
)
base_id: str = SchemaField(description="The Airtable base ID", default="")
table_id_or_name: str = SchemaField(description="Table ID or name", default="")
records: list[dict] = SchemaField(
description="Array of records to create (each with 'fields' object)"
)
typecast: bool = SchemaField(
description="Automatically convert string values to appropriate types",
default=False,
)
return_fields: list[str] = SchemaField(
description="Specific fields to return in created records", default=[]
)
class Output(BlockSchema):
records: list[dict] = SchemaField(description="Array of created record objects")
def __init__(self):
super().__init__(
id="42527e98-47b6-44ce-ac0e-86b4883721d3",
description="Create records in an Airtable table",
categories={BlockCategory.DATA},
input_schema=self.Input,
output_schema=self.Output,
)
async def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
api_key = credentials.api_key.get_secret_value()
# Build request body
body = {"records": input_data.records, "typecast": input_data.typecast}
# Build query parameters for return fields
params = {}
if input_data.return_fields:
for i, field in enumerate(input_data.return_fields):
params[f"fields[{i}]"] = field
# Make request
response = await Requests().post(
f"https://api.airtable.com/v0/{input_data.base_id}/{input_data.table_id_or_name}",
headers={"Authorization": f"Bearer {api_key}"},
json=body,
params=params,
)
data = response.json()
yield "records", data.get("records", [])
class AirtableUpdateRecordsBlock(Block):
"""
Updates one or more existing records in an Airtable table.
"""
class Input(BlockSchema):
credentials: CredentialsMetaInput = airtable.credentials_field(
description="Airtable API credentials"
)
base_id: str = SchemaField(description="The Airtable base ID", default="")
table_id_or_name: str = SchemaField(description="Table ID or name", default="")
records: list[dict] = SchemaField(
description="Array of records to update (each with 'id' and 'fields')"
)
typecast: bool = SchemaField(
description="Automatically convert string values to appropriate types",
default=False,
)
return_fields: list[str] = SchemaField(
description="Specific fields to return in updated records", default=[]
)
class Output(BlockSchema):
records: list[dict] = SchemaField(description="Array of updated record objects")
def __init__(self):
super().__init__(
id="6e7d2590-ac2b-4b5d-b08c-fc039cd77e1f",
description="Update records in an Airtable table",
categories={BlockCategory.DATA},
input_schema=self.Input,
output_schema=self.Output,
)
async def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
api_key = credentials.api_key.get_secret_value()
# Build request body
body = {"records": input_data.records, "typecast": input_data.typecast}
# Build query parameters for return fields
params = {}
if input_data.return_fields:
for i, field in enumerate(input_data.return_fields):
params[f"fields[{i}]"] = field
# Make request
response = await Requests().patch(
f"https://api.airtable.com/v0/{input_data.base_id}/{input_data.table_id_or_name}",
headers={"Authorization": f"Bearer {api_key}"},
json=body,
params=params,
)
data = response.json()
yield "records", data.get("records", [])
class AirtableUpsertRecordsBlock(Block):
"""
Creates or updates records in an Airtable table based on a merge field.
If a record with the same value in the merge field exists, it will be updated.
Otherwise, a new record will be created.
"""
class Input(BlockSchema):
credentials: CredentialsMetaInput = airtable.credentials_field(
description="Airtable API credentials"
)
base_id: str = SchemaField(description="The Airtable base ID", default="")
table_id_or_name: str = SchemaField(description="Table ID or name", default="")
records: list[dict] = SchemaField(
description="Array of records to upsert (each with 'fields' object)"
)
merge_field: str = SchemaField(
description="Field to use for matching existing records"
)
typecast: bool = SchemaField(
description="Automatically convert string values to appropriate types",
default=False,
)
return_fields: list[str] = SchemaField(
description="Specific fields to return in upserted records", default=[]
)
class Output(BlockSchema):
records: list[dict] = SchemaField(
description="Array of created/updated record objects"
)
def __init__(self):
super().__init__(
id="99f78a9d-3418-429f-a6fb-9d2166638e99",
description="Create or update records based on a merge field",
categories={BlockCategory.DATA},
input_schema=self.Input,
output_schema=self.Output,
)
async def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
api_key = credentials.api_key.get_secret_value()
# Build request body
body = {
"performUpsert": {"fieldsToMergeOn": [input_data.merge_field]},
"records": input_data.records,
"typecast": input_data.typecast,
}
# Build query parameters for return fields
params = {}
if input_data.return_fields:
for i, field in enumerate(input_data.return_fields):
params[f"fields[{i}]"] = field
# Make request
response = await Requests().post(
f"https://api.airtable.com/v0/{input_data.base_id}/{input_data.table_id_or_name}",
headers={"Authorization": f"Bearer {api_key}"},
json=body,
params=params,
)
data = response.json()
yield "records", data.get("records", [])
class AirtableDeleteRecordsBlock(Block):
"""
Deletes one or more records from an Airtable table.
"""
class Input(BlockSchema):
credentials: CredentialsMetaInput = airtable.credentials_field(
description="Airtable API credentials"
)
base_id: str = SchemaField(description="The Airtable base ID", default="")
table_id_or_name: str = SchemaField(description="Table ID or name", default="")
record_ids: list[str] = SchemaField(description="Array of record IDs to delete")
class Output(BlockSchema):
records: list[dict] = SchemaField(description="Array of deletion results")
def __init__(self):
super().__init__(
id="93e22b8b-3642-4477-aefb-1c0929a4a3a6",
description="Delete records from an Airtable table",
categories={BlockCategory.DATA},
input_schema=self.Input,
output_schema=self.Output,
)
async def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
api_key = credentials.api_key.get_secret_value()
# Build query parameters
params = {}
for i, record_id in enumerate(input_data.record_ids):
params[f"records[{i}]"] = record_id
# Make request
response = await Requests().delete(
f"https://api.airtable.com/v0/{input_data.base_id}/{input_data.table_id_or_name}",
headers={"Authorization": f"Bearer {api_key}"},
params=params,
)
data = response.json()
yield "records", data.get("records", [])

View File

@@ -0,0 +1,328 @@
"""
Airtable schema and table management blocks.
"""
from backend.sdk import (
APIKeyCredentials,
Block,
BlockCategory,
BlockOutput,
BlockSchema,
CredentialsMetaInput,
Requests,
SchemaField,
)
from ._config import airtable
class AirtableListSchemaBlock(Block):
"""
Retrieves the complete schema of an Airtable base, including all tables,
fields, and views.
"""
class Input(BlockSchema):
credentials: CredentialsMetaInput = airtable.credentials_field(
description="Airtable API credentials"
)
base_id: str = SchemaField(description="The Airtable base ID", default="")
class Output(BlockSchema):
base_schema: dict = SchemaField(
description="Complete base schema with tables, fields, and views"
)
tables: list[dict] = SchemaField(description="Array of table objects")
def __init__(self):
super().__init__(
id="64291d3c-99b5-47b7-a976-6d94293cdb2d",
description="Get the complete schema of an Airtable base",
categories={BlockCategory.SEARCH},
input_schema=self.Input,
output_schema=self.Output,
)
async def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
api_key = credentials.api_key.get_secret_value()
# Get base schema
response = await Requests().get(
f"https://api.airtable.com/v0/meta/bases/{input_data.base_id}/tables",
headers={"Authorization": f"Bearer {api_key}"},
)
data = response.json()
yield "base_schema", data
yield "tables", data.get("tables", [])
class AirtableCreateTableBlock(Block):
"""
Creates a new table in an Airtable base with specified fields and views.
"""
class Input(BlockSchema):
credentials: CredentialsMetaInput = airtable.credentials_field(
description="Airtable API credentials"
)
base_id: str = SchemaField(description="The Airtable base ID", default="")
table_definition: dict = SchemaField(
description="Table definition with name, description, fields, and views",
default={
"name": "New Table",
"fields": [{"name": "Name", "type": "singleLineText"}],
},
)
class Output(BlockSchema):
table: dict = SchemaField(description="Created table object")
table_id: str = SchemaField(description="ID of the created table")
def __init__(self):
super().__init__(
id="fcc20ced-d817-42ea-9b40-c35e7bf34b4f",
description="Create a new table in an Airtable base",
categories={BlockCategory.DATA},
input_schema=self.Input,
output_schema=self.Output,
)
async def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
api_key = credentials.api_key.get_secret_value()
# Create table
response = await Requests().post(
f"https://api.airtable.com/v0/meta/bases/{input_data.base_id}/tables",
headers={"Authorization": f"Bearer {api_key}"},
json=input_data.table_definition,
)
table_data = response.json()
yield "table", table_data
yield "table_id", table_data.get("id", "")
class AirtableUpdateTableBlock(Block):
"""
Updates an existing table's properties such as name or description.
"""
class Input(BlockSchema):
credentials: CredentialsMetaInput = airtable.credentials_field(
description="Airtable API credentials"
)
base_id: str = SchemaField(description="The Airtable base ID", default="")
table_id: str = SchemaField(description="The table ID to update")
patch: dict = SchemaField(
description="Properties to update (name, description)", default={}
)
class Output(BlockSchema):
table: dict = SchemaField(description="Updated table object")
def __init__(self):
super().__init__(
id="34077c5f-f962-49f2-9ec6-97c67077013a",
description="Update table properties",
categories={BlockCategory.DATA},
input_schema=self.Input,
output_schema=self.Output,
)
async def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
api_key = credentials.api_key.get_secret_value()
# Update table
response = await Requests().patch(
f"https://api.airtable.com/v0/meta/bases/{input_data.base_id}/tables/{input_data.table_id}",
headers={"Authorization": f"Bearer {api_key}"},
json=input_data.patch,
)
table_data = response.json()
yield "table", table_data
class AirtableDeleteTableBlock(Block):
"""
Deletes a table from an Airtable base.
"""
class Input(BlockSchema):
credentials: CredentialsMetaInput = airtable.credentials_field(
description="Airtable API credentials"
)
base_id: str = SchemaField(description="The Airtable base ID", default="")
table_id: str = SchemaField(description="The table ID to delete")
class Output(BlockSchema):
deleted: bool = SchemaField(
description="Confirmation that the table was deleted"
)
def __init__(self):
super().__init__(
id="6b96c196-d0ad-4fb2-981f-7a330549bc22",
description="Delete a table from an Airtable base",
categories={BlockCategory.DATA},
input_schema=self.Input,
output_schema=self.Output,
)
async def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
api_key = credentials.api_key.get_secret_value()
# Delete table
response = await Requests().delete(
f"https://api.airtable.com/v0/meta/bases/{input_data.base_id}/tables/{input_data.table_id}",
headers={"Authorization": f"Bearer {api_key}"},
)
deleted = response.status in [200, 204]
yield "deleted", deleted
class AirtableAddFieldBlock(Block):
"""
Adds a new field (column) to an existing Airtable table.
"""
class Input(BlockSchema):
credentials: CredentialsMetaInput = airtable.credentials_field(
description="Airtable API credentials"
)
base_id: str = SchemaField(description="The Airtable base ID", default="")
table_id: str = SchemaField(description="The table ID to add field to")
field_definition: dict = SchemaField(
description="Field definition with name, type, and options",
default={"name": "New Field", "type": "singleLineText"},
)
class Output(BlockSchema):
field: dict = SchemaField(description="Created field object")
field_id: str = SchemaField(description="ID of the created field")
def __init__(self):
super().__init__(
id="6c98a32f-dbf9-45d8-a2a8-5e97e8326351",
description="Add a new field to an Airtable table",
categories={BlockCategory.DATA},
input_schema=self.Input,
output_schema=self.Output,
)
async def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
api_key = credentials.api_key.get_secret_value()
# Add field
response = await Requests().post(
f"https://api.airtable.com/v0/meta/bases/{input_data.base_id}/tables/{input_data.table_id}/fields",
headers={"Authorization": f"Bearer {api_key}"},
json=input_data.field_definition,
)
field_data = response.json()
yield "field", field_data
yield "field_id", field_data.get("id", "")
class AirtableUpdateFieldBlock(Block):
"""
Updates an existing field's properties in an Airtable table.
"""
class Input(BlockSchema):
credentials: CredentialsMetaInput = airtable.credentials_field(
description="Airtable API credentials"
)
base_id: str = SchemaField(description="The Airtable base ID", default="")
table_id: str = SchemaField(description="The table ID containing the field")
field_id: str = SchemaField(description="The field ID to update")
patch: dict = SchemaField(description="Field properties to update", default={})
class Output(BlockSchema):
field: dict = SchemaField(description="Updated field object")
def __init__(self):
super().__init__(
id="f46ac716-3b18-4da1-92e4-34ca9a464d48",
description="Update field properties in an Airtable table",
categories={BlockCategory.DATA},
input_schema=self.Input,
output_schema=self.Output,
)
async def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
api_key = credentials.api_key.get_secret_value()
# Update field
response = await Requests().patch(
f"https://api.airtable.com/v0/meta/bases/{input_data.base_id}/tables/{input_data.table_id}/fields/{input_data.field_id}",
headers={"Authorization": f"Bearer {api_key}"},
json=input_data.patch,
)
field_data = response.json()
yield "field", field_data
class AirtableDeleteFieldBlock(Block):
"""
Deletes a field from an Airtable table.
"""
class Input(BlockSchema):
credentials: CredentialsMetaInput = airtable.credentials_field(
description="Airtable API credentials"
)
base_id: str = SchemaField(description="The Airtable base ID", default="")
table_id: str = SchemaField(description="The table ID containing the field")
field_id: str = SchemaField(description="The field ID to delete")
class Output(BlockSchema):
deleted: bool = SchemaField(
description="Confirmation that the field was deleted"
)
def __init__(self):
super().__init__(
id="ca6ebacb-be8b-4c54-80a3-1fb519ad51c6",
description="Delete a field from an Airtable table",
categories={BlockCategory.DATA},
input_schema=self.Input,
output_schema=self.Output,
)
async def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
api_key = credentials.api_key.get_secret_value()
# Delete field
response = await Requests().delete(
f"https://api.airtable.com/v0/meta/bases/{input_data.base_id}/tables/{input_data.table_id}/fields/{input_data.field_id}",
headers={"Authorization": f"Bearer {api_key}"},
)
deleted = response.status in [200, 204]
yield "deleted", deleted

View File

@@ -0,0 +1,149 @@
"""
Airtable webhook trigger blocks.
"""
from backend.sdk import (
Block,
BlockCategory,
BlockOutput,
BlockSchema,
BlockType,
BlockWebhookConfig,
CredentialsMetaInput,
ProviderName,
SchemaField,
)
from ._config import airtable
class AirtableWebhookTriggerBlock(Block):
"""
Starts a flow whenever Airtable pings your webhook URL.
If auto-fetch is enabled, it automatically fetches the full payloads
after receiving the notification.
"""
class Input(BlockSchema):
credentials: CredentialsMetaInput = airtable.credentials_field(
description="Airtable API credentials"
)
webhook_url: str = SchemaField(
description="URL to receive webhooks (auto-generated)",
default="",
hidden=True,
)
base_id: str = SchemaField(
description="The Airtable base ID to monitor",
default="",
)
table_id_or_name: str = SchemaField(
description="Table ID or name to monitor (leave empty for all tables)",
default="",
)
event_types: list[str] = SchemaField(
description="Event types to listen for",
default=["tableData", "tableFields", "tableMetadata"],
)
auto_fetch: bool = SchemaField(
description="Automatically fetch full payloads after notification",
default=True,
)
payload: dict = SchemaField(
description="Webhook payload data",
default={},
hidden=True,
)
class Output(BlockSchema):
ping: dict = SchemaField(description="Raw webhook notification body")
headers: dict = SchemaField(description="Webhook request headers")
verified: bool = SchemaField(
description="Whether the webhook signature was verified"
)
# Fields populated when auto_fetch is True
payloads: list[dict] = SchemaField(
description="Array of change payloads (when auto-fetch is enabled)",
default=[],
)
next_cursor: int = SchemaField(
description="Next cursor for pagination (when auto-fetch is enabled)",
default=0,
)
might_have_more: bool = SchemaField(
description="Whether there might be more payloads (when auto-fetch is enabled)",
default=False,
)
def __init__(self):
super().__init__(
id="d0180ce6-ccb9-48c7-8256-b39e93e62801",
description="Starts a flow whenever Airtable pings your webhook URL",
categories={BlockCategory.INPUT},
input_schema=self.Input,
output_schema=self.Output,
block_type=BlockType.WEBHOOK,
webhook_config=BlockWebhookConfig(
provider=ProviderName("airtable"),
webhook_type="table_change",
# event_filter_input="event_types",
resource_format="{base_id}/{table_id_or_name}",
),
)
async def run(self, input_data: Input, **kwargs) -> BlockOutput:
payload = input_data.payload
# Extract headers from the webhook request (passed through kwargs)
headers = kwargs.get("webhook_headers", {})
# Check if signature was verified (handled by webhook manager)
verified = True # Webhook manager raises error if verification fails
# Output basic webhook data
yield "ping", payload
yield "headers", headers
yield "verified", verified
# If auto-fetch is enabled and we have a cursor, fetch the full payloads
if input_data.auto_fetch and payload.get("base", {}).get("id"):
base_id = payload["base"]["id"]
webhook_id = payload.get("webhook", {}).get("id", "")
cursor = payload.get("cursor", 1)
if webhook_id and cursor:
# Get credentials from kwargs
credentials = kwargs.get("credentials")
if credentials:
# Fetch payloads using the Airtable API
api_key = credentials.api_key.get_secret_value()
from backend.sdk import Requests
response = await Requests().get(
f"https://api.airtable.com/v0/bases/{base_id}/webhooks/{webhook_id}/payloads",
headers={"Authorization": f"Bearer {api_key}"},
params={"cursor": cursor},
)
if response.status == 200:
data = response.json()
yield "payloads", data.get("payloads", [])
yield "next_cursor", data.get("cursor", cursor)
yield "might_have_more", data.get("mightHaveMore", False)
else:
# On error, still output empty payloads
yield "payloads", []
yield "next_cursor", cursor
yield "might_have_more", False
else:
# No credentials, can't fetch
yield "payloads", []
yield "next_cursor", cursor
yield "might_have_more", False
else:
# Auto-fetch disabled or missing data
yield "payloads", []
yield "next_cursor", 0
yield "might_have_more", False

View File

@@ -0,0 +1,229 @@
"""
Airtable webhook management blocks.
"""
from backend.sdk import (
APIKeyCredentials,
Block,
BlockCategory,
BlockOutput,
BlockSchema,
CredentialsMetaInput,
Requests,
SchemaField,
)
from ._config import airtable
class AirtableFetchWebhookPayloadsBlock(Block):
"""
Fetches accumulated event payloads for a webhook.
Use this to pull the full change details after receiving a webhook notification,
or run on a schedule to poll for changes.
"""
class Input(BlockSchema):
credentials: CredentialsMetaInput = airtable.credentials_field(
description="Airtable API credentials"
)
base_id: str = SchemaField(description="The Airtable base ID")
webhook_id: str = SchemaField(
description="The webhook ID to fetch payloads for"
)
cursor: int = SchemaField(
description="Cursor position (0 = all payloads)", default=0
)
class Output(BlockSchema):
payloads: list[dict] = SchemaField(description="Array of webhook payloads")
next_cursor: int = SchemaField(description="Next cursor for pagination")
might_have_more: bool = SchemaField(
description="Whether there might be more payloads"
)
def __init__(self):
super().__init__(
id="7172db38-e338-4561-836f-9fa282c99949",
description="Fetch webhook payloads from Airtable",
categories={BlockCategory.DEVELOPER_TOOLS},
input_schema=self.Input,
output_schema=self.Output,
)
async def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
api_key = credentials.api_key.get_secret_value()
# Fetch payloads from Airtable
params = {}
if input_data.cursor > 0:
params["cursor"] = input_data.cursor
response = await Requests().get(
f"https://api.airtable.com/v0/bases/{input_data.base_id}/webhooks/{input_data.webhook_id}/payloads",
headers={"Authorization": f"Bearer {api_key}"},
params=params,
)
data = response.json()
yield "payloads", data.get("payloads", [])
yield "next_cursor", data.get("cursor", input_data.cursor)
yield "might_have_more", data.get("mightHaveMore", False)
class AirtableRefreshWebhookBlock(Block):
"""
Refreshes a webhook to extend its expiration by another 7 days.
Webhooks expire after 7 days of inactivity. Use this block in a daily
cron job to keep long-lived webhooks active.
"""
class Input(BlockSchema):
credentials: CredentialsMetaInput = airtable.credentials_field(
description="Airtable API credentials"
)
base_id: str = SchemaField(description="The Airtable base ID")
webhook_id: str = SchemaField(description="The webhook ID to refresh")
class Output(BlockSchema):
expiration_time: str = SchemaField(
description="New expiration time (ISO format)"
)
webhook: dict = SchemaField(description="Full webhook object")
def __init__(self):
super().__init__(
id="5e82d957-02b8-47eb-8974-7bdaf8caff78",
description="Refresh a webhook to extend its expiration",
categories={BlockCategory.DEVELOPER_TOOLS},
input_schema=self.Input,
output_schema=self.Output,
)
async def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
api_key = credentials.api_key.get_secret_value()
# Refresh the webhook
response = await Requests().post(
f"https://api.airtable.com/v0/bases/{input_data.base_id}/webhooks/{input_data.webhook_id}/refresh",
headers={"Authorization": f"Bearer {api_key}"},
)
webhook_data = response.json()
yield "expiration_time", webhook_data.get("expirationTime", "")
yield "webhook", webhook_data
class AirtableCreateWebhookBlock(Block):
"""
Creates a new webhook for monitoring changes in an Airtable base.
The webhook will send notifications to the specified URL when changes occur.
"""
class Input(BlockSchema):
credentials: CredentialsMetaInput = airtable.credentials_field(
description="Airtable API credentials"
)
base_id: str = SchemaField(description="The Airtable base ID to monitor")
notification_url: str = SchemaField(
description="URL to receive webhook notifications"
)
specification: dict = SchemaField(
description="Webhook specification (filters, options)",
default={
"filters": {"dataTypes": ["tableData", "tableFields", "tableMetadata"]}
},
)
class Output(BlockSchema):
webhook: dict = SchemaField(description="Created webhook object")
webhook_id: str = SchemaField(description="ID of the created webhook")
mac_secret: str = SchemaField(
description="MAC secret for signature verification"
)
expiration_time: str = SchemaField(description="Webhook expiration time")
def __init__(self):
super().__init__(
id="b9f1f4ec-f4d1-4fbd-ab0b-b219c0e4da9a",
description="Create a new Airtable webhook",
categories={BlockCategory.DEVELOPER_TOOLS},
input_schema=self.Input,
output_schema=self.Output,
)
async def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
api_key = credentials.api_key.get_secret_value()
# Create the webhook
response = await Requests().post(
f"https://api.airtable.com/v0/bases/{input_data.base_id}/webhooks",
headers={"Authorization": f"Bearer {api_key}"},
json={
"notificationUrl": input_data.notification_url,
"specification": input_data.specification,
},
)
webhook_data = response.json()
yield "webhook", webhook_data
yield "webhook_id", webhook_data.get("id", "")
yield "mac_secret", webhook_data.get("macSecretBase64", "")
yield "expiration_time", webhook_data.get("expirationTime", "")
class AirtableDeleteWebhookBlock(Block):
"""
Deletes a webhook from an Airtable base.
This will stop all notifications from the webhook.
"""
class Input(BlockSchema):
credentials: CredentialsMetaInput = airtable.credentials_field(
description="Airtable API credentials"
)
base_id: str = SchemaField(description="The Airtable base ID")
webhook_id: str = SchemaField(description="The webhook ID to delete")
class Output(BlockSchema):
deleted: bool = SchemaField(
description="Whether the webhook was successfully deleted"
)
def __init__(self):
super().__init__(
id="e4ded448-1515-4fe2-b93e-3e4db527df83",
description="Delete an Airtable webhook",
categories={BlockCategory.DEVELOPER_TOOLS},
input_schema=self.Input,
output_schema=self.Output,
)
async def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
api_key = credentials.api_key.get_secret_value()
# Delete the webhook
response = await Requests().delete(
f"https://api.airtable.com/v0/bases/{input_data.base_id}/webhooks/{input_data.webhook_id}",
headers={"Authorization": f"Bearer {api_key}"},
)
# Check if deletion was successful
deleted = response.status in [200, 204]
yield "deleted", deleted

View File

@@ -0,0 +1,66 @@
"""
Meeting BaaS integration for AutoGPT Platform.
This integration provides comprehensive access to the Meeting BaaS API,
including:
- Bot management for meeting recordings
- Calendar integration (Google/Microsoft)
- Event management and scheduling
- Webhook triggers for real-time events
"""
# Bot (Recording) Blocks
from .bots import (
BaasBotDeleteRecordingBlock,
BaasBotFetchMeetingDataBlock,
BaasBotFetchScreenshotsBlock,
BaasBotJoinMeetingBlock,
BaasBotLeaveMeetingBlock,
BaasBotRetranscribeBlock,
)
# Calendar Blocks
from .calendars import (
BaasCalendarConnectBlock,
BaasCalendarDeleteBlock,
BaasCalendarListAllBlock,
BaasCalendarResyncAllBlock,
BaasCalendarUpdateCredsBlock,
)
# Event Blocks
from .events import (
BaasEventGetDetailsBlock,
BaasEventListBlock,
BaasEventPatchBotBlock,
BaasEventScheduleBotBlock,
BaasEventUnscheduleBotBlock,
)
# Webhook Triggers
from .triggers import BaasOnCalendarEventBlock, BaasOnMeetingEventBlock
__all__ = [
# Bot (Recording) Blocks
"BaasBotJoinMeetingBlock",
"BaasBotLeaveMeetingBlock",
"BaasBotFetchMeetingDataBlock",
"BaasBotFetchScreenshotsBlock",
"BaasBotDeleteRecordingBlock",
"BaasBotRetranscribeBlock",
# Calendar Blocks
"BaasCalendarConnectBlock",
"BaasCalendarListAllBlock",
"BaasCalendarUpdateCredsBlock",
"BaasCalendarDeleteBlock",
"BaasCalendarResyncAllBlock",
# Event Blocks
"BaasEventListBlock",
"BaasEventGetDetailsBlock",
"BaasEventScheduleBotBlock",
"BaasEventUnscheduleBotBlock",
"BaasEventPatchBotBlock",
# Webhook Triggers
"BaasOnMeetingEventBlock",
"BaasOnCalendarEventBlock",
]

View File

@@ -0,0 +1,16 @@
"""
Shared configuration for all Meeting BaaS blocks using the SDK pattern.
"""
from backend.sdk import BlockCostType, ProviderBuilder
from ._webhook import BaasWebhookManager
# Configure the Meeting BaaS provider with API key authentication
baas = (
ProviderBuilder("baas")
.with_api_key("MEETING_BAAS_API_KEY", "Meeting BaaS API Key")
.with_webhook_manager(BaasWebhookManager)
.with_base_cost(5, BlockCostType.RUN) # Higher cost for meeting recording service
.build()
)

View File

@@ -0,0 +1,83 @@
"""
Webhook management for Meeting BaaS blocks.
"""
from enum import Enum
from typing import Tuple
from backend.sdk import (
APIKeyCredentials,
BaseWebhooksManager,
Credentials,
ProviderName,
Webhook,
)
class BaasWebhookManager(BaseWebhooksManager):
"""Webhook manager for Meeting BaaS API."""
PROVIDER_NAME = ProviderName("baas")
class WebhookType(str, Enum):
MEETING_EVENT = "meeting_event"
CALENDAR_EVENT = "calendar_event"
@classmethod
async def validate_payload(cls, webhook: Webhook, request) -> Tuple[dict, str]:
"""Validate incoming webhook payload."""
payload = await request.json()
# Verify API key in header
api_key_header = request.headers.get("x-meeting-baas-api-key")
if webhook.secret and api_key_header != webhook.secret:
raise ValueError("Invalid webhook API key")
# Extract event type from payload
event_type = payload.get("event", "unknown")
return payload, event_type
async def _register_webhook(
self,
credentials: Credentials,
webhook_type: str,
resource: str,
events: list[str],
ingress_url: str,
secret: str,
) -> Tuple[str, dict]:
"""
Register webhook with Meeting BaaS.
Note: Meeting BaaS doesn't have a webhook registration API.
Webhooks are configured per-bot or as account defaults.
This returns a synthetic webhook ID.
"""
if not isinstance(credentials, APIKeyCredentials):
raise ValueError("Meeting BaaS webhooks require API key credentials")
# Generate a synthetic webhook ID since BaaS doesn't provide one
import uuid
webhook_id = str(uuid.uuid4())
return webhook_id, {
"webhook_type": webhook_type,
"resource": resource,
"events": events,
"ingress_url": ingress_url,
"api_key": credentials.api_key.get_secret_value(),
}
async def _deregister_webhook(
self, webhook: Webhook, credentials: Credentials
) -> None:
"""
Deregister webhook from Meeting BaaS.
Note: Meeting BaaS doesn't have a webhook deregistration API.
Webhooks are removed by updating bot/calendar configurations.
"""
# No-op since BaaS doesn't have webhook deregistration
pass

View File

@@ -0,0 +1,367 @@
"""
Meeting BaaS bot (recording) blocks.
"""
from typing import Optional
from backend.sdk import (
APIKeyCredentials,
Block,
BlockCategory,
BlockOutput,
BlockSchema,
CredentialsMetaInput,
Requests,
SchemaField,
)
from ._config import baas
class BaasBotJoinMeetingBlock(Block):
"""
Deploy a bot immediately or at a scheduled start_time to join and record a meeting.
"""
class Input(BlockSchema):
credentials: CredentialsMetaInput = baas.credentials_field(
description="Meeting BaaS API credentials"
)
meeting_url: str = SchemaField(
description="The URL of the meeting the bot should join"
)
bot_name: str = SchemaField(
description="Display name for the bot in the meeting"
)
bot_image: str = SchemaField(
description="URL to an image for the bot's avatar (16:9 ratio recommended)",
default="",
)
entry_message: str = SchemaField(
description="Chat message the bot will post upon entry", default=""
)
reserved: bool = SchemaField(
description="Use a reserved bot slot (joins 4 min before meeting)",
default=False,
)
start_time: Optional[int] = SchemaField(
description="Unix timestamp (ms) when bot should join", default=None
)
speech_to_text: dict = SchemaField(
description="Speech-to-text configuration", default={"provider": "Gladia"}
)
webhook_url: str = SchemaField(
description="URL to receive webhook events for this bot", default=""
)
timeouts: dict = SchemaField(
description="Automatic leave timeouts configuration", default={}
)
extra: dict = SchemaField(
description="Custom metadata to attach to the bot", default={}
)
class Output(BlockSchema):
bot_id: str = SchemaField(description="UUID of the deployed bot")
join_response: dict = SchemaField(
description="Full response from join operation"
)
def __init__(self):
super().__init__(
id="7f8e9d0c-1b2a-3c4d-5e6f-7a8b9c0d1e2f",
description="Deploy a bot to join and record a meeting",
categories={BlockCategory.COMMUNICATION},
input_schema=self.Input,
output_schema=self.Output,
)
async def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
api_key = credentials.api_key.get_secret_value()
# Build request body
body = {
"meeting_url": input_data.meeting_url,
"bot_name": input_data.bot_name,
"reserved": input_data.reserved,
"speech_to_text": input_data.speech_to_text,
}
# Add optional fields
if input_data.bot_image:
body["bot_image"] = input_data.bot_image
if input_data.entry_message:
body["entry_message"] = input_data.entry_message
if input_data.start_time is not None:
body["start_time"] = input_data.start_time
if input_data.webhook_url:
body["webhook_url"] = input_data.webhook_url
if input_data.timeouts:
body["automatic_leave"] = input_data.timeouts
if input_data.extra:
body["extra"] = input_data.extra
# Join meeting
response = await Requests().post(
"https://api.meetingbaas.com/bots",
headers={"x-meeting-baas-api-key": api_key},
json=body,
)
data = response.json()
yield "bot_id", data.get("bot_id", "")
yield "join_response", data
class BaasBotLeaveMeetingBlock(Block):
"""
Force the bot to exit the call.
"""
class Input(BlockSchema):
credentials: CredentialsMetaInput = baas.credentials_field(
description="Meeting BaaS API credentials"
)
bot_id: str = SchemaField(description="UUID of the bot to remove from meeting")
class Output(BlockSchema):
left: bool = SchemaField(description="Whether the bot successfully left")
def __init__(self):
super().__init__(
id="8a9b0c1d-2e3f-4a5b-6c7d-8e9f0a1b2c3d",
description="Remove a bot from an ongoing meeting",
categories={BlockCategory.COMMUNICATION},
input_schema=self.Input,
output_schema=self.Output,
)
async def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
api_key = credentials.api_key.get_secret_value()
# Leave meeting
response = await Requests().delete(
f"https://api.meetingbaas.com/bots/{input_data.bot_id}",
headers={"x-meeting-baas-api-key": api_key},
)
# Check if successful
left = response.status in [200, 204]
yield "left", left
class BaasBotFetchMeetingDataBlock(Block):
"""
Pull MP4 URL, transcript & metadata for a completed meeting.
"""
class Input(BlockSchema):
credentials: CredentialsMetaInput = baas.credentials_field(
description="Meeting BaaS API credentials"
)
bot_id: str = SchemaField(description="UUID of the bot whose data to fetch")
include_transcripts: bool = SchemaField(
description="Include transcript data in response", default=True
)
class Output(BlockSchema):
mp4_url: str = SchemaField(
description="URL to download the meeting recording (time-limited)"
)
transcript: list = SchemaField(description="Meeting transcript data")
metadata: dict = SchemaField(description="Meeting metadata and bot information")
def __init__(self):
super().__init__(
id="9b0c1d2e-3f4a-5b6c-7d8e-9f0a1b2c3d4e",
description="Retrieve recorded meeting data",
categories={BlockCategory.DATA},
input_schema=self.Input,
output_schema=self.Output,
)
async def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
api_key = credentials.api_key.get_secret_value()
# Build query parameters
params = {
"bot_id": input_data.bot_id,
"include_transcripts": str(input_data.include_transcripts).lower(),
}
# Fetch meeting data
response = await Requests().get(
"https://api.meetingbaas.com/bots/meeting_data",
headers={"x-meeting-baas-api-key": api_key},
params=params,
)
data = response.json()
yield "mp4_url", data.get("mp4", "")
yield "transcript", data.get("bot_data", {}).get("transcripts", [])
yield "metadata", data.get("bot_data", {}).get("bot", {})
class BaasBotFetchScreenshotsBlock(Block):
"""
List screenshots captured during the call.
"""
class Input(BlockSchema):
credentials: CredentialsMetaInput = baas.credentials_field(
description="Meeting BaaS API credentials"
)
bot_id: str = SchemaField(
description="UUID of the bot whose screenshots to fetch"
)
class Output(BlockSchema):
screenshots: list[dict] = SchemaField(
description="Array of screenshot objects with date and url"
)
def __init__(self):
super().__init__(
id="0c1d2e3f-4a5b-6c7d-8e9f-0a1b2c3d4e5f",
description="Retrieve screenshots captured during a meeting",
categories={BlockCategory.DATA},
input_schema=self.Input,
output_schema=self.Output,
)
async def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
api_key = credentials.api_key.get_secret_value()
# Fetch screenshots
response = await Requests().get(
f"https://api.meetingbaas.com/bots/{input_data.bot_id}/screenshots",
headers={"x-meeting-baas-api-key": api_key},
)
screenshots = response.json()
yield "screenshots", screenshots
class BaasBotDeleteRecordingBlock(Block):
"""
Purge MP4 + transcript data for privacy or storage management.
"""
class Input(BlockSchema):
credentials: CredentialsMetaInput = baas.credentials_field(
description="Meeting BaaS API credentials"
)
bot_id: str = SchemaField(description="UUID of the bot whose data to delete")
class Output(BlockSchema):
deleted: bool = SchemaField(
description="Whether the data was successfully deleted"
)
def __init__(self):
super().__init__(
id="1d2e3f4a-5b6c-7d8e-9f0a-1b2c3d4e5f6a",
description="Permanently delete a meeting's recorded data",
categories={BlockCategory.DATA},
input_schema=self.Input,
output_schema=self.Output,
)
async def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
api_key = credentials.api_key.get_secret_value()
# Delete recording data
response = await Requests().post(
f"https://api.meetingbaas.com/bots/{input_data.bot_id}/delete_data",
headers={"x-meeting-baas-api-key": api_key},
)
# Check if successful
deleted = response.status == 200
yield "deleted", deleted
class BaasBotRetranscribeBlock(Block):
"""
Re-run STT on past audio with a different provider or settings.
"""
class Input(BlockSchema):
credentials: CredentialsMetaInput = baas.credentials_field(
description="Meeting BaaS API credentials"
)
bot_id: str = SchemaField(
description="UUID of the bot whose audio to retranscribe"
)
provider: str = SchemaField(
description="Speech-to-text provider to use (e.g., Gladia, Deepgram)"
)
webhook_url: str = SchemaField(
description="URL to receive transcription complete event", default=""
)
custom_options: dict = SchemaField(
description="Provider-specific options", default={}
)
class Output(BlockSchema):
job_id: Optional[str] = SchemaField(
description="Transcription job ID if available"
)
accepted: bool = SchemaField(
description="Whether the retranscription request was accepted"
)
def __init__(self):
super().__init__(
id="2e3f4a5b-6c7d-8e9f-0a1b-2c3d4e5f6a7b",
description="Re-run transcription on a meeting's audio",
categories={BlockCategory.AI},
input_schema=self.Input,
output_schema=self.Output,
)
async def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
api_key = credentials.api_key.get_secret_value()
# Build request body
body = {"bot_uuid": input_data.bot_id, "provider": input_data.provider}
if input_data.webhook_url:
body["webhook_url"] = input_data.webhook_url
if input_data.custom_options:
body.update(input_data.custom_options)
# Start retranscription
response = await Requests().post(
"https://api.meetingbaas.com/bots/retranscribe",
headers={"x-meeting-baas-api-key": api_key},
json=body,
)
# Check if accepted
accepted = response.status in [200, 202]
job_id = None
if accepted and response.status == 200:
data = response.json()
job_id = data.get("job_id")
yield "job_id", job_id
yield "accepted", accepted

View File

@@ -0,0 +1,265 @@
"""
Meeting BaaS calendar blocks.
"""
from backend.sdk import (
APIKeyCredentials,
Block,
BlockCategory,
BlockOutput,
BlockSchema,
CredentialsMetaInput,
Requests,
SchemaField,
)
from ._config import baas
class BaasCalendarConnectBlock(Block):
"""
One-time integration of a Google or Microsoft calendar.
"""
class Input(BlockSchema):
credentials: CredentialsMetaInput = baas.credentials_field(
description="Meeting BaaS API credentials"
)
oauth_client_id: str = SchemaField(description="OAuth client ID from provider")
oauth_client_secret: str = SchemaField(description="OAuth client secret")
oauth_refresh_token: str = SchemaField(
description="OAuth refresh token with calendar access"
)
platform: str = SchemaField(
description="Calendar platform (Google or Microsoft)"
)
calendar_email_or_id: str = SchemaField(
description="Specific calendar email/ID to connect", default=""
)
class Output(BlockSchema):
calendar_id: str = SchemaField(description="UUID of the connected calendar")
calendar_obj: dict = SchemaField(description="Full calendar object")
def __init__(self):
super().__init__(
id="3f4a5b6c-7d8e-9f0a-1b2c-3d4e5f6a7b8c",
description="Connect a Google or Microsoft calendar for integration",
categories={BlockCategory.DEVELOPER_TOOLS},
input_schema=self.Input,
output_schema=self.Output,
)
async def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
api_key = credentials.api_key.get_secret_value()
# Build request body
body = {
"oauth_client_id": input_data.oauth_client_id,
"oauth_client_secret": input_data.oauth_client_secret,
"oauth_refresh_token": input_data.oauth_refresh_token,
"platform": input_data.platform,
}
if input_data.calendar_email_or_id:
body["calendar_email"] = input_data.calendar_email_or_id
# Connect calendar
response = await Requests().post(
"https://api.meetingbaas.com/calendars",
headers={"x-meeting-baas-api-key": api_key},
json=body,
)
calendar = response.json()
yield "calendar_id", calendar.get("uuid", "")
yield "calendar_obj", calendar
class BaasCalendarListAllBlock(Block):
"""
Enumerate connected calendars.
"""
class Input(BlockSchema):
credentials: CredentialsMetaInput = baas.credentials_field(
description="Meeting BaaS API credentials"
)
class Output(BlockSchema):
calendars: list[dict] = SchemaField(
description="Array of connected calendar objects"
)
def __init__(self):
super().__init__(
id="4a5b6c7d-8e9f-0a1b-2c3d-4e5f6a7b8c9d",
description="List all integrated calendars",
categories={BlockCategory.SEARCH},
input_schema=self.Input,
output_schema=self.Output,
)
async def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
api_key = credentials.api_key.get_secret_value()
# List calendars
response = await Requests().get(
"https://api.meetingbaas.com/calendars",
headers={"x-meeting-baas-api-key": api_key},
)
calendars = response.json()
yield "calendars", calendars
class BaasCalendarUpdateCredsBlock(Block):
"""
Refresh OAuth or switch provider for an existing calendar.
"""
class Input(BlockSchema):
credentials: CredentialsMetaInput = baas.credentials_field(
description="Meeting BaaS API credentials"
)
calendar_id: str = SchemaField(description="UUID of the calendar to update")
oauth_client_id: str = SchemaField(
description="New OAuth client ID", default=""
)
oauth_client_secret: str = SchemaField(
description="New OAuth client secret", default=""
)
oauth_refresh_token: str = SchemaField(
description="New OAuth refresh token", default=""
)
platform: str = SchemaField(description="New platform if switching", default="")
class Output(BlockSchema):
calendar_obj: dict = SchemaField(description="Updated calendar object")
def __init__(self):
super().__init__(
id="5b6c7d8e-9f0a-1b2c-3d4e-5f6a7b8c9d0e",
description="Update calendar credentials or platform",
categories={BlockCategory.DEVELOPER_TOOLS},
input_schema=self.Input,
output_schema=self.Output,
)
async def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
api_key = credentials.api_key.get_secret_value()
# Build request body with only provided fields
body = {}
if input_data.oauth_client_id:
body["oauth_client_id"] = input_data.oauth_client_id
if input_data.oauth_client_secret:
body["oauth_client_secret"] = input_data.oauth_client_secret
if input_data.oauth_refresh_token:
body["oauth_refresh_token"] = input_data.oauth_refresh_token
if input_data.platform:
body["platform"] = input_data.platform
# Update calendar
response = await Requests().patch(
f"https://api.meetingbaas.com/calendars/{input_data.calendar_id}",
headers={"x-meeting-baas-api-key": api_key},
json=body,
)
calendar = response.json()
yield "calendar_obj", calendar
class BaasCalendarDeleteBlock(Block):
"""
Disconnect calendar & unschedule future bots.
"""
class Input(BlockSchema):
credentials: CredentialsMetaInput = baas.credentials_field(
description="Meeting BaaS API credentials"
)
calendar_id: str = SchemaField(description="UUID of the calendar to delete")
class Output(BlockSchema):
deleted: bool = SchemaField(
description="Whether the calendar was successfully deleted"
)
def __init__(self):
super().__init__(
id="6c7d8e9f-0a1b-2c3d-4e5f-6a7b8c9d0e1f",
description="Remove a calendar integration",
categories={BlockCategory.DEVELOPER_TOOLS},
input_schema=self.Input,
output_schema=self.Output,
)
async def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
api_key = credentials.api_key.get_secret_value()
# Delete calendar
response = await Requests().delete(
f"https://api.meetingbaas.com/calendars/{input_data.calendar_id}",
headers={"x-meeting-baas-api-key": api_key},
)
deleted = response.status in [200, 204]
yield "deleted", deleted
class BaasCalendarResyncAllBlock(Block):
"""
Force full sync now (maintenance).
"""
class Input(BlockSchema):
credentials: CredentialsMetaInput = baas.credentials_field(
description="Meeting BaaS API credentials"
)
class Output(BlockSchema):
synced_ids: list[str] = SchemaField(
description="Calendar UUIDs that synced successfully"
)
errors: list[list] = SchemaField(
description="Array of [calendar_id, error_message] tuples"
)
def __init__(self):
super().__init__(
id="7d8e9f0a-1b2c-3d4e-5f6a-7b8c9d0e1f2a",
description="Force immediate re-sync of all connected calendars",
categories={BlockCategory.DEVELOPER_TOOLS},
input_schema=self.Input,
output_schema=self.Output,
)
async def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
api_key = credentials.api_key.get_secret_value()
# Resync all calendars
response = await Requests().post(
"https://api.meetingbaas.com/internal/calendar/resync_all",
headers={"x-meeting-baas-api-key": api_key},
)
data = response.json()
yield "synced_ids", data.get("synced_calendars", [])
yield "errors", data.get("errors", [])

View File

@@ -0,0 +1,276 @@
"""
Meeting BaaS calendar event blocks.
"""
from typing import Union
from backend.sdk import (
APIKeyCredentials,
Block,
BlockCategory,
BlockOutput,
BlockSchema,
CredentialsMetaInput,
Requests,
SchemaField,
)
from ._config import baas
class BaasEventListBlock(Block):
"""
Get events for a calendar & date range.
"""
class Input(BlockSchema):
credentials: CredentialsMetaInput = baas.credentials_field(
description="Meeting BaaS API credentials"
)
calendar_id: str = SchemaField(
description="UUID of the calendar to list events from"
)
start_date_gte: str = SchemaField(
description="ISO date string for start date (greater than or equal)",
default="",
)
start_date_lte: str = SchemaField(
description="ISO date string for start date (less than or equal)",
default="",
)
cursor: str = SchemaField(
description="Pagination cursor from previous request", default=""
)
class Output(BlockSchema):
events: list[dict] = SchemaField(description="Array of calendar events")
next_cursor: str = SchemaField(description="Cursor for next page of results")
def __init__(self):
super().__init__(
id="8e9f0a1b-2c3d-4e5f-6a7b-8c9d0e1f2a3b",
description="List calendar events with optional date filtering",
categories={BlockCategory.SEARCH},
input_schema=self.Input,
output_schema=self.Output,
)
async def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
api_key = credentials.api_key.get_secret_value()
# Build query parameters
params = {"calendar_id": input_data.calendar_id}
if input_data.start_date_gte:
params["start_date_gte"] = input_data.start_date_gte
if input_data.start_date_lte:
params["start_date_lte"] = input_data.start_date_lte
if input_data.cursor:
params["cursor"] = input_data.cursor
# List events
response = await Requests().get(
"https://api.meetingbaas.com/calendar_events",
headers={"x-meeting-baas-api-key": api_key},
params=params,
)
data = response.json()
yield "events", data.get("events", [])
yield "next_cursor", data.get("next", "")
class BaasEventGetDetailsBlock(Block):
"""
Fetch full object for one event.
"""
class Input(BlockSchema):
credentials: CredentialsMetaInput = baas.credentials_field(
description="Meeting BaaS API credentials"
)
event_id: str = SchemaField(description="UUID of the event to retrieve")
class Output(BlockSchema):
event: dict = SchemaField(description="Full event object with all details")
def __init__(self):
super().__init__(
id="9f0a1b2c-3d4e-5f6a-7b8c-9d0e1f2a3b4c",
description="Get detailed information for a specific calendar event",
categories={BlockCategory.SEARCH},
input_schema=self.Input,
output_schema=self.Output,
)
async def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
api_key = credentials.api_key.get_secret_value()
# Get event details
response = await Requests().get(
f"https://api.meetingbaas.com/calendar_events/{input_data.event_id}",
headers={"x-meeting-baas-api-key": api_key},
)
event = response.json()
yield "event", event
class BaasEventScheduleBotBlock(Block):
"""
Attach bot config to the event for automatic recording.
"""
class Input(BlockSchema):
credentials: CredentialsMetaInput = baas.credentials_field(
description="Meeting BaaS API credentials"
)
event_id: str = SchemaField(description="UUID of the event to schedule bot for")
all_occurrences: bool = SchemaField(
description="Apply to all occurrences of recurring event", default=False
)
bot_config: dict = SchemaField(
description="Bot configuration (same as Bot → Join Meeting)"
)
class Output(BlockSchema):
events: Union[dict, list[dict]] = SchemaField(
description="Updated event(s) with bot scheduled"
)
def __init__(self):
super().__init__(
id="0a1b2c3d-4e5f-6a7b-8c9d-0e1f2a3b4c5d",
description="Schedule a recording bot for a calendar event",
categories={BlockCategory.COMMUNICATION},
input_schema=self.Input,
output_schema=self.Output,
)
async def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
api_key = credentials.api_key.get_secret_value()
# Build query parameters
params = {"all_occurrences": str(input_data.all_occurrences).lower()}
# Schedule bot
response = await Requests().post(
f"https://api.meetingbaas.com/calendar_events/{input_data.event_id}/bot",
headers={"x-meeting-baas-api-key": api_key},
params=params,
json=input_data.bot_config,
)
events = response.json()
yield "events", events
class BaasEventUnscheduleBotBlock(Block):
"""
Remove bot from event/series.
"""
class Input(BlockSchema):
credentials: CredentialsMetaInput = baas.credentials_field(
description="Meeting BaaS API credentials"
)
event_id: str = SchemaField(
description="UUID of the event to unschedule bot from"
)
all_occurrences: bool = SchemaField(
description="Apply to all occurrences of recurring event", default=False
)
class Output(BlockSchema):
events: Union[dict, list[dict]] = SchemaField(
description="Updated event(s) with bot removed"
)
def __init__(self):
super().__init__(
id="1b2c3d4e-5f6a-7b8c-9d0e-1f2a3b4c5d6e",
description="Cancel a scheduled recording for an event",
categories={BlockCategory.COMMUNICATION},
input_schema=self.Input,
output_schema=self.Output,
)
async def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
api_key = credentials.api_key.get_secret_value()
# Build query parameters
params = {"all_occurrences": str(input_data.all_occurrences).lower()}
# Unschedule bot
response = await Requests().delete(
f"https://api.meetingbaas.com/calendar_events/{input_data.event_id}/bot",
headers={"x-meeting-baas-api-key": api_key},
params=params,
)
events = response.json()
yield "events", events
class BaasEventPatchBotBlock(Block):
"""
Modify an already-scheduled bot configuration.
"""
class Input(BlockSchema):
credentials: CredentialsMetaInput = baas.credentials_field(
description="Meeting BaaS API credentials"
)
event_id: str = SchemaField(description="UUID of the event with scheduled bot")
all_occurrences: bool = SchemaField(
description="Apply to all occurrences of recurring event", default=False
)
bot_patch: dict = SchemaField(description="Bot configuration fields to update")
class Output(BlockSchema):
events: Union[dict, list[dict]] = SchemaField(
description="Updated event(s) with modified bot config"
)
def __init__(self):
super().__init__(
id="2c3d4e5f-6a7b-8c9d-0e1f-2a3b4c5d6e7f",
description="Update configuration of a scheduled bot",
categories={BlockCategory.COMMUNICATION},
input_schema=self.Input,
output_schema=self.Output,
)
async def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
api_key = credentials.api_key.get_secret_value()
# Build query parameters
params = {}
if input_data.all_occurrences is not None:
params["all_occurrences"] = str(input_data.all_occurrences).lower()
# Patch bot
response = await Requests().patch(
f"https://api.meetingbaas.com/calendar_events/{input_data.event_id}/bot",
headers={"x-meeting-baas-api-key": api_key},
params=params,
json=input_data.bot_patch,
)
events = response.json()
yield "events", events

View File

@@ -0,0 +1,185 @@
"""
Meeting BaaS webhook trigger blocks.
"""
from pydantic import BaseModel
from backend.sdk import (
Block,
BlockCategory,
BlockOutput,
BlockSchema,
BlockType,
BlockWebhookConfig,
CredentialsMetaInput,
ProviderName,
SchemaField,
)
from ._config import baas
class BaasOnMeetingEventBlock(Block):
"""
Trigger when Meeting BaaS sends meeting-related events:
bot.status_change, complete, failed, transcription_complete
"""
class Input(BlockSchema):
credentials: CredentialsMetaInput = baas.credentials_field(
description="Meeting BaaS API credentials"
)
webhook_url: str = SchemaField(
description="URL to receive webhooks (auto-generated)",
default="",
hidden=True,
)
class EventsFilter(BaseModel):
"""Meeting event types to subscribe to"""
bot_status_change: bool = SchemaField(
description="Bot status changes", default=True
)
complete: bool = SchemaField(description="Meeting completed", default=True)
failed: bool = SchemaField(description="Meeting failed", default=True)
transcription_complete: bool = SchemaField(
description="Transcription completed", default=True
)
events: EventsFilter = SchemaField(
title="Events", description="The events to subscribe to"
)
payload: dict = SchemaField(
description="Webhook payload data",
default={},
hidden=True,
)
class Output(BlockSchema):
event_type: str = SchemaField(description="Type of event received")
data: dict = SchemaField(description="Event data payload")
def __init__(self):
super().__init__(
id="3d4e5f6a-7b8c-9d0e-1f2a-3b4c5d6e7f8a",
description="Receive meeting events from Meeting BaaS webhooks",
categories={BlockCategory.INPUT},
input_schema=self.Input,
output_schema=self.Output,
block_type=BlockType.WEBHOOK,
webhook_config=BlockWebhookConfig(
provider=ProviderName("baas"),
webhook_type="meeting_event",
event_filter_input="events",
resource_format="meeting",
),
)
async def run(self, input_data: Input, **kwargs) -> BlockOutput:
payload = input_data.payload
# Extract event type and data
event_type = payload.get("event", "unknown")
data = payload.get("data", {})
# Map event types to filter fields
event_filter_map = {
"bot.status_change": input_data.events.bot_status_change,
"complete": input_data.events.complete,
"failed": input_data.events.failed,
"transcription_complete": input_data.events.transcription_complete,
}
# Filter events if needed
if not event_filter_map.get(event_type, False):
return # Skip unwanted events
yield "event_type", event_type
yield "data", data
class BaasOnCalendarEventBlock(Block):
"""
Trigger when Meeting BaaS sends calendar-related events:
event.added, event.updated, event.deleted, calendar.synced
"""
class Input(BlockSchema):
credentials: CredentialsMetaInput = baas.credentials_field(
description="Meeting BaaS API credentials"
)
webhook_url: str = SchemaField(
description="URL to receive webhooks (auto-generated)",
default="",
hidden=True,
)
class EventsFilter(BaseModel):
"""Calendar event types to subscribe to"""
event_added: bool = SchemaField(
description="Calendar event added", default=True
)
event_updated: bool = SchemaField(
description="Calendar event updated", default=True
)
event_deleted: bool = SchemaField(
description="Calendar event deleted", default=True
)
calendar_synced: bool = SchemaField(
description="Calendar synced", default=True
)
events: EventsFilter = SchemaField(
title="Events", description="The events to subscribe to"
)
payload: dict = SchemaField(
description="Webhook payload data",
default={},
hidden=True,
)
class Output(BlockSchema):
event_type: str = SchemaField(description="Type of event received")
data: dict = SchemaField(description="Event data payload")
def __init__(self):
super().__init__(
id="4e5f6a7b-8c9d-0e1f-2a3b-4c5d6e7f8a9b",
description="Receive calendar events from Meeting BaaS webhooks",
categories={BlockCategory.INPUT},
input_schema=self.Input,
output_schema=self.Output,
block_type=BlockType.WEBHOOK,
webhook_config=BlockWebhookConfig(
provider=ProviderName("baas"),
webhook_type="calendar_event",
event_filter_input="events",
resource_format="calendar",
),
)
async def run(self, input_data: Input, **kwargs) -> BlockOutput:
payload = input_data.payload
# Extract event type and data
event_type = payload.get("event", "unknown")
data = payload.get("data", {})
# Map event types to filter fields
event_filter_map = {
"event.added": input_data.events.event_added,
"event.updated": input_data.events.event_updated,
"event.deleted": input_data.events.event_deleted,
"calendar.synced": input_data.events.calendar_synced,
}
# Filter events if needed
if not event_filter_map.get(event_type, False):
return # Skip unwanted events
yield "event_type", event_type
yield "data", data

View File

@@ -186,3 +186,31 @@ class UniversalTypeConverterBlock(Block):
yield "value", converted_value
except Exception as e:
yield "error", f"Failed to convert value: {str(e)}"
class ReverseListOrderBlock(Block):
"""
A block which takes in a list and returns it in the opposite order.
"""
class Input(BlockSchema):
input_list: list[Any] = SchemaField(description="The list to reverse")
class Output(BlockSchema):
reversed_list: list[Any] = SchemaField(description="The list in reversed order")
def __init__(self):
super().__init__(
id="422cb708-3109-4277-bfe3-bc2ae5812777",
description="Reverses the order of elements in a list",
categories={BlockCategory.BASIC},
input_schema=ReverseListOrderBlock.Input,
output_schema=ReverseListOrderBlock.Output,
test_input={"input_list": [1, 2, 3, 4, 5]},
test_output=[("reversed_list", [5, 4, 3, 2, 1])],
)
async def run(self, input_data: Input, **kwargs) -> BlockOutput:
reversed_list = list(input_data.input_list)
reversed_list.reverse()
yield "reversed_list", reversed_list

View File

@@ -0,0 +1,48 @@
"""
ElevenLabs integration blocks for AutoGPT Platform.
"""
# Speech generation blocks
from .speech import (
ElevenLabsGenerateSpeechBlock,
ElevenLabsGenerateSpeechWithTimestampsBlock,
)
# Speech-to-text blocks
from .transcription import (
ElevenLabsTranscribeAudioAsyncBlock,
ElevenLabsTranscribeAudioSyncBlock,
)
# Webhook trigger blocks
from .triggers import ElevenLabsWebhookTriggerBlock
# Utility blocks
from .utility import ElevenLabsGetUsageStatsBlock, ElevenLabsListModelsBlock
# Voice management blocks
from .voices import (
ElevenLabsCreateVoiceCloneBlock,
ElevenLabsDeleteVoiceBlock,
ElevenLabsGetVoiceDetailsBlock,
ElevenLabsListVoicesBlock,
)
__all__ = [
# Voice management
"ElevenLabsListVoicesBlock",
"ElevenLabsGetVoiceDetailsBlock",
"ElevenLabsCreateVoiceCloneBlock",
"ElevenLabsDeleteVoiceBlock",
# Speech generation
"ElevenLabsGenerateSpeechBlock",
"ElevenLabsGenerateSpeechWithTimestampsBlock",
# Speech-to-text
"ElevenLabsTranscribeAudioSyncBlock",
"ElevenLabsTranscribeAudioAsyncBlock",
# Utility
"ElevenLabsListModelsBlock",
"ElevenLabsGetUsageStatsBlock",
# Webhook triggers
"ElevenLabsWebhookTriggerBlock",
]

View File

@@ -0,0 +1,16 @@
"""
Shared configuration for all ElevenLabs blocks using the SDK pattern.
"""
from backend.sdk import BlockCostType, ProviderBuilder
from ._webhook import ElevenLabsWebhookManager
# Configure the ElevenLabs provider with API key authentication
elevenlabs = (
ProviderBuilder("elevenlabs")
.with_api_key("ELEVENLABS_API_KEY", "ElevenLabs API Key")
.with_webhook_manager(ElevenLabsWebhookManager)
.with_base_cost(2, BlockCostType.RUN) # Base cost for API calls
.build()
)

View File

@@ -0,0 +1,82 @@
"""
ElevenLabs webhook manager for handling webhook events.
"""
import hashlib
import hmac
from typing import Tuple
from backend.data.model import Credentials
from backend.sdk import BaseWebhooksManager, ProviderName, Webhook
class ElevenLabsWebhookManager(BaseWebhooksManager):
"""Manages ElevenLabs webhook events."""
PROVIDER_NAME = ProviderName("elevenlabs")
@classmethod
async def validate_payload(cls, webhook: Webhook, request) -> Tuple[dict, str]:
"""
Validate incoming webhook payload and signature.
ElevenLabs supports HMAC authentication for webhooks.
"""
payload = await request.json()
# Verify webhook signature if configured
if webhook.secret:
webhook_secret = webhook.config.get("webhook_secret")
if webhook_secret:
# Get the raw body for signature verification
body = await request.body()
# Calculate expected signature
expected_signature = hmac.new(
webhook_secret.encode(), body, hashlib.sha256
).hexdigest()
# Get signature from headers
signature = request.headers.get("x-elevenlabs-signature")
if signature and not hmac.compare_digest(signature, expected_signature):
raise ValueError("Invalid webhook signature")
# Extract event type from payload
event_type = payload.get("type", "unknown")
return payload, event_type
async def _register_webhook(
self,
credentials: Credentials,
webhook_type: str,
resource: str,
events: list[str],
ingress_url: str,
secret: str,
) -> tuple[str, dict]:
"""
Register a webhook with ElevenLabs.
Note: ElevenLabs webhook registration is done through their dashboard,
not via API. This is a placeholder implementation.
"""
# ElevenLabs requires manual webhook setup through dashboard
# Return empty webhook ID and config with instructions
config = {
"manual_setup_required": True,
"webhook_secret": secret,
"instructions": "Please configure webhook URL in ElevenLabs dashboard",
}
return "", config
async def _deregister_webhook(
self, webhook: Webhook, credentials: Credentials
) -> None:
"""
Deregister a webhook with ElevenLabs.
Note: ElevenLabs webhook removal is done through their dashboard.
"""
# ElevenLabs requires manual webhook removal through dashboard
pass

View File

@@ -0,0 +1,179 @@
"""
ElevenLabs speech generation (text-to-speech) blocks.
"""
from typing import Optional
from backend.sdk import (
APIKeyCredentials,
Block,
BlockCategory,
BlockOutput,
BlockSchema,
CredentialsMetaInput,
Requests,
SchemaField,
)
from ._config import elevenlabs
class ElevenLabsGenerateSpeechBlock(Block):
"""
Turn text into audio (binary).
"""
class Input(BlockSchema):
credentials: CredentialsMetaInput = elevenlabs.credentials_field(
description="ElevenLabs API credentials"
)
voice_id: str = SchemaField(description="ID of the voice to use")
text: str = SchemaField(description="Text to convert to speech")
model_id: str = SchemaField(
description="Model ID to use for generation",
default="eleven_multilingual_v2",
)
output_format: str = SchemaField(
description="Audio format (e.g., mp3_44100_128)",
default="mp3_44100_128",
)
voice_settings: Optional[dict] = SchemaField(
description="Override voice settings (stability, similarity_boost, etc.)",
default=None,
)
language_code: Optional[str] = SchemaField(
description="Language code to enforce output language", default=None
)
seed: Optional[int] = SchemaField(
description="Seed for reproducible output", default=None
)
class Output(BlockSchema):
audio: str = SchemaField(description="Base64-encoded audio data")
def __init__(self):
super().__init__(
id="c5d6e7f8-a9b0-c1d2-e3f4-a5b6c7d8e9f0",
description="Generate speech audio from text using a specified voice",
categories={BlockCategory.AI},
input_schema=self.Input,
output_schema=self.Output,
)
async def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
import base64
api_key = credentials.api_key.get_secret_value()
# Build request body
body: dict[str, str | int | dict] = {
"text": input_data.text,
"model_id": input_data.model_id,
}
# Add optional fields
if input_data.voice_settings:
body["voice_settings"] = input_data.voice_settings
if input_data.language_code:
body["language_code"] = input_data.language_code
if input_data.seed is not None:
body["seed"] = input_data.seed
# Generate speech
response = await Requests().post(
f"https://api.elevenlabs.io/v1/text-to-speech/{input_data.voice_id}",
headers={
"xi-api-key": api_key,
"Content-Type": "application/json",
},
json=body,
params={"output_format": input_data.output_format},
)
# Get audio data and encode to base64
audio_data = response.content
audio_base64 = base64.b64encode(audio_data).decode("utf-8")
yield "audio", audio_base64
class ElevenLabsGenerateSpeechWithTimestampsBlock(Block):
"""
Text to audio AND per-character timing data.
"""
class Input(BlockSchema):
credentials: CredentialsMetaInput = elevenlabs.credentials_field(
description="ElevenLabs API credentials"
)
voice_id: str = SchemaField(description="ID of the voice to use")
text: str = SchemaField(description="Text to convert to speech")
model_id: str = SchemaField(
description="Model ID to use for generation",
default="eleven_multilingual_v2",
)
output_format: str = SchemaField(
description="Audio format (e.g., mp3_44100_128)",
default="mp3_44100_128",
)
voice_settings: Optional[dict] = SchemaField(
description="Override voice settings (stability, similarity_boost, etc.)",
default=None,
)
language_code: Optional[str] = SchemaField(
description="Language code to enforce output language", default=None
)
class Output(BlockSchema):
audio_base64: str = SchemaField(description="Base64-encoded audio data")
alignment: dict = SchemaField(
description="Character-level timing alignment data"
)
normalized_alignment: dict = SchemaField(
description="Normalized text alignment data"
)
def __init__(self):
super().__init__(
id="d6e7f8a9-b0c1-d2e3-f4a5-b6c7d8e9f0a1",
description="Generate speech with character-level timestamp information",
categories={BlockCategory.AI},
input_schema=self.Input,
output_schema=self.Output,
)
async def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
api_key = credentials.api_key.get_secret_value()
# Build request body
body: dict[str, str | dict] = {
"text": input_data.text,
"model_id": input_data.model_id,
}
# Add optional fields
if input_data.voice_settings:
body["voice_settings"] = input_data.voice_settings
if input_data.language_code:
body["language_code"] = input_data.language_code
# Generate speech with timestamps
response = await Requests().post(
f"https://api.elevenlabs.io/v1/text-to-speech/{input_data.voice_id}/with-timestamps",
headers={
"xi-api-key": api_key,
"Content-Type": "application/json",
},
json=body,
params={"output_format": input_data.output_format},
)
data = response.json()
yield "audio_base64", data.get("audio_base64", "")
yield "alignment", data.get("alignment", {})
yield "normalized_alignment", data.get("normalized_alignment", {})

View File

@@ -0,0 +1,232 @@
"""
ElevenLabs speech-to-text (transcription) blocks.
"""
from typing import Optional
from backend.sdk import (
APIKeyCredentials,
Block,
BlockCategory,
BlockOutput,
BlockSchema,
CredentialsMetaInput,
Requests,
SchemaField,
)
from ._config import elevenlabs
class ElevenLabsTranscribeAudioSyncBlock(Block):
"""
Synchronously convert audio to text (+ word timestamps, diarization).
"""
class Input(BlockSchema):
credentials: CredentialsMetaInput = elevenlabs.credentials_field(
description="ElevenLabs API credentials"
)
model_id: str = SchemaField(
description="Model ID for transcription", default="scribe_v1"
)
file: Optional[str] = SchemaField(
description="Base64-encoded audio file", default=None
)
cloud_storage_url: Optional[str] = SchemaField(
description="URL to audio file in cloud storage", default=None
)
language_code: Optional[str] = SchemaField(
description="Language code (ISO 639-1 or -3) to improve accuracy",
default=None,
)
diarize: bool = SchemaField(
description="Enable speaker diarization", default=False
)
num_speakers: Optional[int] = SchemaField(
description="Expected number of speakers (max 32)", default=None
)
timestamps_granularity: str = SchemaField(
description="Timestamp detail level: word, character, or none",
default="word",
)
tag_audio_events: bool = SchemaField(
description="Tag non-speech sounds (laughter, noise)", default=True
)
class Output(BlockSchema):
text: str = SchemaField(description="Full transcribed text")
words: list[dict] = SchemaField(
description="Array with word timing and speaker info"
)
language_code: str = SchemaField(description="Detected language code")
language_probability: float = SchemaField(
description="Confidence in language detection"
)
def __init__(self):
super().__init__(
id="e7f8a9b0-c1d2-e3f4-a5b6-c7d8e9f0a1b2",
description="Transcribe audio to text with timing and speaker information",
categories={BlockCategory.AI},
input_schema=self.Input,
output_schema=self.Output,
)
async def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
import base64
from io import BytesIO
api_key = credentials.api_key.get_secret_value()
# Validate input - must have either file or URL
if not input_data.file and not input_data.cloud_storage_url:
raise ValueError("Either 'file' or 'cloud_storage_url' must be provided")
if input_data.file and input_data.cloud_storage_url:
raise ValueError(
"Only one of 'file' or 'cloud_storage_url' should be provided"
)
# Build form data
form_data = {
"model_id": input_data.model_id,
"diarize": str(input_data.diarize).lower(),
"timestamps_granularity": input_data.timestamps_granularity,
"tag_audio_events": str(input_data.tag_audio_events).lower(),
}
if input_data.language_code:
form_data["language_code"] = input_data.language_code
if input_data.num_speakers is not None:
form_data["num_speakers"] = str(input_data.num_speakers)
# Handle file or URL
files = None
if input_data.file:
# Decode base64 file
file_data = base64.b64decode(input_data.file)
files = [("file", ("audio.wav", BytesIO(file_data), "audio/wav"))]
elif input_data.cloud_storage_url:
form_data["cloud_storage_url"] = input_data.cloud_storage_url
# Transcribe audio
response = await Requests().post(
"https://api.elevenlabs.io/v1/speech-to-text",
headers={"xi-api-key": api_key},
data=form_data,
files=files,
)
data = response.json()
yield "text", data.get("text", "")
yield "words", data.get("words", [])
yield "language_code", data.get("language_code", "")
yield "language_probability", data.get("language_probability", 0.0)
class ElevenLabsTranscribeAudioAsyncBlock(Block):
"""
Kick off transcription that returns quickly; result arrives via webhook.
"""
class Input(BlockSchema):
credentials: CredentialsMetaInput = elevenlabs.credentials_field(
description="ElevenLabs API credentials"
)
model_id: str = SchemaField(
description="Model ID for transcription", default="scribe_v1"
)
file: Optional[str] = SchemaField(
description="Base64-encoded audio file", default=None
)
cloud_storage_url: Optional[str] = SchemaField(
description="URL to audio file in cloud storage", default=None
)
language_code: Optional[str] = SchemaField(
description="Language code (ISO 639-1 or -3) to improve accuracy",
default=None,
)
diarize: bool = SchemaField(
description="Enable speaker diarization", default=False
)
num_speakers: Optional[int] = SchemaField(
description="Expected number of speakers (max 32)", default=None
)
timestamps_granularity: str = SchemaField(
description="Timestamp detail level: word, character, or none",
default="word",
)
webhook_url: str = SchemaField(
description="URL to receive transcription result",
default="",
)
class Output(BlockSchema):
tracking_id: str = SchemaField(description="ID to track the transcription job")
def __init__(self):
super().__init__(
id="f8a9b0c1-d2e3-f4a5-b6c7-d8e9f0a1b2c3",
description="Start async transcription with webhook callback",
categories={BlockCategory.AI},
input_schema=self.Input,
output_schema=self.Output,
)
async def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
import base64
import uuid
from io import BytesIO
api_key = credentials.api_key.get_secret_value()
# Validate input
if not input_data.file and not input_data.cloud_storage_url:
raise ValueError("Either 'file' or 'cloud_storage_url' must be provided")
if input_data.file and input_data.cloud_storage_url:
raise ValueError(
"Only one of 'file' or 'cloud_storage_url' should be provided"
)
# Build form data
form_data = {
"model_id": input_data.model_id,
"diarize": str(input_data.diarize).lower(),
"timestamps_granularity": input_data.timestamps_granularity,
"webhook": "true", # Enable async mode
}
if input_data.language_code:
form_data["language_code"] = input_data.language_code
if input_data.num_speakers is not None:
form_data["num_speakers"] = str(input_data.num_speakers)
if input_data.webhook_url:
form_data["webhook_url"] = input_data.webhook_url
# Handle file or URL
files = None
if input_data.file:
# Decode base64 file
file_data = base64.b64decode(input_data.file)
files = [("file", ("audio.wav", BytesIO(file_data), "audio/wav"))]
elif input_data.cloud_storage_url:
form_data["cloud_storage_url"] = input_data.cloud_storage_url
# Start async transcription
response = await Requests().post(
"https://api.elevenlabs.io/v1/speech-to-text",
headers={"xi-api-key": api_key},
data=form_data,
files=files,
)
# Generate tracking ID (API might return one)
data = response.json()
tracking_id = data.get("tracking_id", str(uuid.uuid4()))
yield "tracking_id", tracking_id

View File

@@ -0,0 +1,160 @@
"""
ElevenLabs webhook trigger blocks.
"""
from pydantic import BaseModel
from backend.sdk import (
Block,
BlockCategory,
BlockOutput,
BlockSchema,
BlockType,
BlockWebhookConfig,
CredentialsMetaInput,
ProviderName,
SchemaField,
)
from ._config import elevenlabs
class ElevenLabsWebhookTriggerBlock(Block):
"""
Starts a flow when ElevenLabs POSTs an event (STT finished, voice removal, etc.).
"""
class Input(BlockSchema):
credentials: CredentialsMetaInput = elevenlabs.credentials_field(
description="ElevenLabs API credentials"
)
webhook_url: str = SchemaField(
description="URL to receive webhooks (auto-generated)",
default="",
hidden=True,
)
class EventsFilter(BaseModel):
"""ElevenLabs event types to subscribe to"""
speech_to_text_completed: bool = SchemaField(
description="Speech-to-text transcription completed", default=True
)
post_call_transcription: bool = SchemaField(
description="Conversational AI call transcription completed",
default=True,
)
voice_removal_notice: bool = SchemaField(
description="Voice scheduled for removal", default=True
)
voice_removed: bool = SchemaField(
description="Voice has been removed", default=True
)
voice_removal_notice_withdrawn: bool = SchemaField(
description="Voice removal cancelled", default=True
)
events: EventsFilter = SchemaField(
title="Events", description="The events to subscribe to"
)
# Webhook payload - populated by the system
payload: dict = SchemaField(
description="Webhook payload data",
default={},
hidden=True,
)
class Output(BlockSchema):
type: str = SchemaField(description="Event type")
event_timestamp: int = SchemaField(description="Unix timestamp of the event")
data: dict = SchemaField(description="Event-specific data payload")
def __init__(self):
super().__init__(
id="c1d2e3f4-a5b6-c7d8-e9f0-a1b2c3d4e5f6",
description="Receive webhook events from ElevenLabs",
categories={BlockCategory.DEVELOPER_TOOLS},
input_schema=self.Input,
output_schema=self.Output,
block_type=BlockType.WEBHOOK,
webhook_config=BlockWebhookConfig(
provider=ProviderName("elevenlabs"),
webhook_type="notification",
event_filter_input="events",
resource_format="",
),
)
async def run(self, input_data: Input, **kwargs) -> BlockOutput:
# Extract webhook data
payload = input_data.payload
# Extract event type
event_type = payload.get("type", "unknown")
# Map event types to filter fields
event_filter_map = {
"speech_to_text_completed": input_data.events.speech_to_text_completed,
"post_call_transcription": input_data.events.post_call_transcription,
"voice_removal_notice": input_data.events.voice_removal_notice,
"voice_removed": input_data.events.voice_removed,
"voice_removal_notice_withdrawn": input_data.events.voice_removal_notice_withdrawn,
}
# Check if this event type is enabled
if not event_filter_map.get(event_type, False):
# Skip this event
return
# Extract common fields
yield "type", event_type
yield "event_timestamp", payload.get("event_timestamp", 0)
# Extract event-specific data
data = payload.get("data", {})
# Process based on event type
if event_type == "speech_to_text_completed":
# STT transcription completed
processed_data = {
"transcription_id": data.get("transcription_id"),
"text": data.get("text"),
"words": data.get("words", []),
"language_code": data.get("language_code"),
"language_probability": data.get("language_probability"),
}
elif event_type == "post_call_transcription":
# Conversational AI call transcription
processed_data = {
"agent_id": data.get("agent_id"),
"conversation_id": data.get("conversation_id"),
"transcript": data.get("transcript"),
"metadata": data.get("metadata", {}),
}
elif event_type == "voice_removal_notice":
# Voice scheduled for removal
processed_data = {
"voice_id": data.get("voice_id"),
"voice_name": data.get("voice_name"),
"removal_date": data.get("removal_date"),
"reason": data.get("reason"),
}
elif event_type == "voice_removal_notice_withdrawn":
# Voice removal cancelled
processed_data = {
"voice_id": data.get("voice_id"),
"voice_name": data.get("voice_name"),
}
elif event_type == "voice_removed":
# Voice has been removed
processed_data = {
"voice_id": data.get("voice_id"),
"voice_name": data.get("voice_name"),
"removed_at": data.get("removed_at"),
}
else:
# Unknown event type, pass through raw data
processed_data = data
yield "data", processed_data

View File

@@ -0,0 +1,116 @@
"""
ElevenLabs utility blocks for models and usage stats.
"""
from backend.sdk import (
APIKeyCredentials,
Block,
BlockCategory,
BlockOutput,
BlockSchema,
CredentialsMetaInput,
Requests,
SchemaField,
)
from ._config import elevenlabs
class ElevenLabsListModelsBlock(Block):
"""
Get all available model IDs & capabilities.
"""
class Input(BlockSchema):
credentials: CredentialsMetaInput = elevenlabs.credentials_field(
description="ElevenLabs API credentials"
)
class Output(BlockSchema):
models: list[dict] = SchemaField(
description="Array of model objects with capabilities"
)
def __init__(self):
super().__init__(
id="a9b0c1d2-e3f4-a5b6-c7d8-e9f0a1b2c3d4",
description="List all available voice models and their capabilities",
categories={BlockCategory.DEVELOPER_TOOLS},
input_schema=self.Input,
output_schema=self.Output,
)
async def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
api_key = credentials.api_key.get_secret_value()
# Fetch models
response = await Requests().get(
"https://api.elevenlabs.io/v1/models",
headers={"xi-api-key": api_key},
)
models = response.json()
yield "models", models
class ElevenLabsGetUsageStatsBlock(Block):
"""
Character / credit usage for billing dashboards.
"""
class Input(BlockSchema):
credentials: CredentialsMetaInput = elevenlabs.credentials_field(
description="ElevenLabs API credentials"
)
start_unix: int = SchemaField(
description="Start timestamp in Unix epoch seconds"
)
end_unix: int = SchemaField(description="End timestamp in Unix epoch seconds")
aggregation_interval: str = SchemaField(
description="Aggregation interval: daily or monthly",
default="daily",
)
class Output(BlockSchema):
usage: list[dict] = SchemaField(description="Array of usage data per interval")
total_character_count: int = SchemaField(
description="Total characters used in period"
)
total_requests: int = SchemaField(description="Total API requests in period")
def __init__(self):
super().__init__(
id="b0c1d2e3-f4a5-b6c7-d8e9-f0a1b2c3d4e5",
description="Get character and credit usage statistics",
categories={BlockCategory.DEVELOPER_TOOLS},
input_schema=self.Input,
output_schema=self.Output,
)
async def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
api_key = credentials.api_key.get_secret_value()
# Build query parameters
params = {
"start_unix": input_data.start_unix,
"end_unix": input_data.end_unix,
"aggregation_interval": input_data.aggregation_interval,
}
# Fetch usage stats
response = await Requests().get(
"https://api.elevenlabs.io/v1/usage/character-stats",
headers={"xi-api-key": api_key},
params=params,
)
data = response.json()
yield "usage", data.get("usage", [])
yield "total_character_count", data.get("total_character_count", 0)
yield "total_requests", data.get("total_requests", 0)

View File

@@ -0,0 +1,249 @@
"""
ElevenLabs voice management blocks.
"""
from typing import Optional
from backend.sdk import (
APIKeyCredentials,
Block,
BlockCategory,
BlockOutput,
BlockSchema,
CredentialsMetaInput,
Requests,
SchemaField,
)
from ._config import elevenlabs
class ElevenLabsListVoicesBlock(Block):
"""
Fetch all voices the account can use (for pick-lists, UI menus, etc.).
"""
class Input(BlockSchema):
credentials: CredentialsMetaInput = elevenlabs.credentials_field(
description="ElevenLabs API credentials"
)
search: str = SchemaField(
description="Search term to filter voices", default=""
)
voice_type: Optional[str] = SchemaField(
description="Filter by voice type: premade, cloned, or professional",
default=None,
)
page_size: int = SchemaField(
description="Number of voices per page (max 100)", default=10
)
next_page_token: str = SchemaField(
description="Token for fetching next page", default=""
)
class Output(BlockSchema):
voices: list[dict] = SchemaField(
description="Array of voice objects with id, name, category, etc."
)
next_page_token: Optional[str] = SchemaField(
description="Token for fetching next page, null if no more pages"
)
def __init__(self):
super().__init__(
id="e1a2b3c4-d5e6-f7a8-b9c0-d1e2f3a4b5c6",
description="List all available voices with filtering and pagination",
categories={BlockCategory.AI},
input_schema=self.Input,
output_schema=self.Output,
)
async def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
api_key = credentials.api_key.get_secret_value()
# Build query parameters
params: dict[str, str | int] = {"page_size": input_data.page_size}
if input_data.search:
params["search"] = input_data.search
if input_data.voice_type:
params["voice_type"] = input_data.voice_type
if input_data.next_page_token:
params["next_page_token"] = input_data.next_page_token
# Fetch voices
response = await Requests().get(
"https://api.elevenlabs.io/v2/voices",
headers={"xi-api-key": api_key},
params=params,
)
data = response.json()
yield "voices", data.get("voices", [])
yield "next_page_token", data.get("next_page_token")
class ElevenLabsGetVoiceDetailsBlock(Block):
"""
Retrieve metadata/settings for a single voice.
"""
class Input(BlockSchema):
credentials: CredentialsMetaInput = elevenlabs.credentials_field(
description="ElevenLabs API credentials"
)
voice_id: str = SchemaField(description="The ID of the voice to retrieve")
class Output(BlockSchema):
voice: dict = SchemaField(
description="Voice object with name, labels, settings, etc."
)
def __init__(self):
super().__init__(
id="f2a3b4c5-d6e7-f8a9-b0c1-d2e3f4a5b6c7",
description="Get detailed information about a specific voice",
categories={BlockCategory.AI},
input_schema=self.Input,
output_schema=self.Output,
)
async def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
api_key = credentials.api_key.get_secret_value()
# Fetch voice details
response = await Requests().get(
f"https://api.elevenlabs.io/v1/voices/{input_data.voice_id}",
headers={"xi-api-key": api_key},
)
voice = response.json()
yield "voice", voice
class ElevenLabsCreateVoiceCloneBlock(Block):
"""
Upload sample clips to create a custom (IVC) voice.
"""
class Input(BlockSchema):
credentials: CredentialsMetaInput = elevenlabs.credentials_field(
description="ElevenLabs API credentials"
)
name: str = SchemaField(description="Name for the new voice")
files: list[str] = SchemaField(
description="Base64-encoded audio files (1-10 files, max 25MB each)"
)
description: str = SchemaField(
description="Description of the voice", default=""
)
labels: dict = SchemaField(
description="Metadata labels (e.g., accent, age)", default={}
)
remove_background_noise: bool = SchemaField(
description="Whether to remove background noise from samples", default=False
)
class Output(BlockSchema):
voice_id: str = SchemaField(description="ID of the newly created voice")
requires_verification: bool = SchemaField(
description="Whether the voice requires verification"
)
def __init__(self):
super().__init__(
id="a3b4c5d6-e7f8-a9b0-c1d2-e3f4a5b6c7d8",
description="Create a new voice clone from audio samples",
categories={BlockCategory.AI},
input_schema=self.Input,
output_schema=self.Output,
)
async def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
import base64
import json
from io import BytesIO
api_key = credentials.api_key.get_secret_value()
# Prepare multipart form data
form_data = {
"name": input_data.name,
}
if input_data.description:
form_data["description"] = input_data.description
if input_data.labels:
form_data["labels"] = json.dumps(input_data.labels)
if input_data.remove_background_noise:
form_data["remove_background_noise"] = "true"
# Prepare files
files = []
for i, file_b64 in enumerate(input_data.files):
file_data = base64.b64decode(file_b64)
files.append(
("files", (f"sample_{i}.mp3", BytesIO(file_data), "audio/mpeg"))
)
# Create voice
response = await Requests().post(
"https://api.elevenlabs.io/v1/voices/add",
headers={"xi-api-key": api_key},
data=form_data,
files=files,
)
result = response.json()
yield "voice_id", result.get("voice_id", "")
yield "requires_verification", result.get("requires_verification", False)
class ElevenLabsDeleteVoiceBlock(Block):
"""
Permanently remove a custom voice.
"""
class Input(BlockSchema):
credentials: CredentialsMetaInput = elevenlabs.credentials_field(
description="ElevenLabs API credentials"
)
voice_id: str = SchemaField(description="The ID of the voice to delete")
class Output(BlockSchema):
status: str = SchemaField(description="Deletion status (ok or error)")
def __init__(self):
super().__init__(
id="b4c5d6e7-f8a9-b0c1-d2e3-f4a5b6c7d8e9",
description="Delete a custom voice from your account",
categories={BlockCategory.AI},
input_schema=self.Input,
output_schema=self.Output,
)
async def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
api_key = credentials.api_key.get_secret_value()
# Delete voice
response = await Requests().delete(
f"https://api.elevenlabs.io/v1/voices/{input_data.voice_id}",
headers={"xi-api-key": api_key},
)
# Check if successful
if response.status in [200, 204]:
yield "status", "ok"
else:
yield "status", "error"

View File

@@ -6,10 +6,10 @@ import hashlib
import hmac
from enum import Enum
from backend.data.model import Credentials
from backend.sdk import (
APIKeyCredentials,
BaseWebhooksManager,
Credentials,
ProviderName,
Requests,
Webhook,

View File

@@ -0,0 +1,190 @@
Exa home pagelight logo
Search or ask...
⌘K
Exa Search
Log In
API Dashboard
Documentation
Examples
Integrations
SDKs
Websets
Changelog
Discord
Blog
Getting Started
Overview
Quickstart
API Reference
POST
Search
POST
Get contents
POST
Find similar links
POST
Answer
OpenAPI Specification
RAG Quick Start Guide
RAG with Exa and OpenAI
RAG with LangChain
OpenAI Exa Wrapper
CrewAI agents with Exa
RAG with LlamaIndex
Tool calling with GPT
Tool calling with Claude
OpenAI Chat Completions
OpenAI Responses API
Concepts
How Exa Search Works
The Exa Index
Contents retrieval with Exa API
Exa's Capabilities Explained
FAQs
Crawling Subpages with Exa
Exa LiveCrawl
Admin
Setting Up and Managing Your Team
Rate Limits
Enterprise Documentation & Security
API Reference
Answer
Get an LLM answer to a question informed by Exa search results. Fully compatible with OpenAIs chat completions endpoint - docs here. /answer performs an Exa search and uses an LLM to generate either:
A direct answer for specific queries. (i.e. “What is the capital of France?” would return “Paris”)
A detailed summary with citations for open-ended queries (i.e. “What is the state of ai in healthcare?” would return a summary with citations to relevant sources)
The response includes both the generated answer and the sources used to create it. The endpoint also supports streaming (as stream=True), which will returns tokens as they are generated.
POST
/
answer
Try it
Get your Exa API key
Authorizations
x-api-key
stringheaderrequired
API key can be provided either via x-api-key header or Authorization header with Bearer scheme
Body
application/json
query
stringrequired
The question or query to answer.
Minimum length: 1
Example:
"What is the latest valuation of SpaceX?"
stream
booleandefault:false
If true, the response is returned as a server-sent events (SSS) stream.
text
booleandefault:false
If true, the response includes full text content in the search results
model
enum<string>default:exa
The search model to use for the answer. Exa passes only one query to exa, while exa-pro also passes 2 expanded queries to our search model.
Available options: exa, exa-pro
Response
200
application/json
OK
answer
string
The generated answer based on search results.
Example:
"$350 billion."
citations
object[]
Search results used to generate the answer.
Show child attributes
costDollars
object
Show child attributes
Find similar links
OpenAPI Specification
x
discord
Powered by Mintlify
cURL
Python
JavaScript
Copy
# pip install exa-py
from exa_py import Exa
exa = Exa('YOUR_EXA_API_KEY')
result = exa.answer(
"What is the latest valuation of SpaceX?",
text=True
)
print(result)
200
Copy
{
"answer": "$350 billion.",
"citations": [
{
"id": "https://www.theguardian.com/science/2024/dec/11/spacex-valued-at-350bn-as-company-agrees-to-buy-shares-from-employees",
"url": "https://www.theguardian.com/science/2024/dec/11/spacex-valued-at-350bn-as-company-agrees-to-buy-shares-from-employees",
"title": "SpaceX valued at $350bn as company agrees to buy shares from ...",
"author": "Dan Milmon",
"publishedDate": "2023-11-16T01:36:32.547Z",
"text": "SpaceX valued at $350bn as company agrees to buy shares from ...",
"image": "https://i.guim.co.uk/img/media/7cfee7e84b24b73c97a079c402642a333ad31e77/0_380_6176_3706/master/6176.jpg?width=1200&height=630&quality=85&auto=format&fit=crop&overlay-align=bottom%2Cleft&overlay-width=100p&overlay-base64=L2ltZy9zdGF0aWMvb3ZlcmxheXMvdGctZGVmYXVsdC5wbmc&enable=upscale&s=71ebb2fbf458c185229d02d380c01530",
"favicon": "https://assets.guim.co.uk/static/frontend/icons/homescreen/apple-touch-icon.svg"
}
],
"costDollars": {
"total": 0.005,
"breakDown": [
{
"search": 0.005,
"contents": 0,
"breakdown": {
"keywordSearch": 0,
"neuralSearch": 0.005,
"contentText": 0,
"contentHighlight": 0,
"contentSummary": 0
}
}
],
"perRequestPrices": {
"neuralSearch_1_25_results": 0.005,
"neuralSearch_26_100_results": 0.025,
"neuralSearch_100_plus_results": 1,
"keywordSearch_1_100_results": 0.0025,
"keywordSearch_100_plus_results": 3
},
"perPagePrices": {
"contentText": 0.001,
"contentHighlight": 0.001,
"contentSummary": 0.001
}
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,81 @@
# Example Blocks Deployment Guide
## Overview
Example blocks are disabled by default in production environments to keep the production block list clean and focused on real functionality. This guide explains how to control the visibility of example blocks.
## Configuration
Example blocks are controlled by the `ENABLE_EXAMPLE_BLOCKS` setting:
- **Default**: `false` (example blocks are hidden)
- **Development**: Set to `true` to show example blocks
## How to Enable/Disable
### Method 1: Environment Variable (Recommended)
Add to your `.env` file:
```bash
# Enable example blocks in development
ENABLE_EXAMPLE_BLOCKS=true
# Disable example blocks in production (default)
ENABLE_EXAMPLE_BLOCKS=false
```
### Method 2: Configuration File
If you're using a `config.json` file:
```json
{
"enable_example_blocks": true
}
```
## Implementation Details
The setting is checked in `backend/blocks/__init__.py` during the block loading process:
1. The `load_all_blocks()` function reads the `enable_example_blocks` setting from `Config`
2. If disabled (default), any Python files in the `examples/` directory are skipped
3. If enabled, example blocks are loaded normally
## Production Deployment
For production deployments:
1. **Do not set** `ENABLE_EXAMPLE_BLOCKS` in your production `.env` file (it defaults to `false`)
2. Or explicitly set `ENABLE_EXAMPLE_BLOCKS=false` for clarity
3. Example blocks will not appear in the block list or be available for use
## Development Environment
For local development:
1. Set `ENABLE_EXAMPLE_BLOCKS=true` in your `.env` file
2. Restart your backend server
3. Example blocks will be available for testing and demonstration
## Verification
To verify the setting is working:
```python
# Check current setting
from backend.util.settings import Config
config = Config()
print(f"Example blocks enabled: {config.enable_example_blocks}")
# Check loaded blocks
from backend.blocks import load_all_blocks
blocks = load_all_blocks()
example_blocks = [b for b in blocks.values() if 'examples' in b.__module__]
print(f"Example blocks loaded: {len(example_blocks)}")
```
## Security Note
Example blocks are for demonstration purposes only and may not follow production security standards. Always keep them disabled in production environments.

View File

@@ -0,0 +1,13 @@
"""
Shared configuration for all GEM blocks using the new SDK pattern.
"""
from backend.sdk import BlockCostType, ProviderBuilder
# Configure the GEM provider once for all blocks
gem = (
ProviderBuilder("gem")
.with_api_key("GEM_API_KEY", "GEM API Key")
.with_base_cost(1, BlockCostType.RUN)
.build()
)

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,25 @@
"""
Oxylabs Web Scraper API integration blocks.
"""
from .blocks import (
OxylabsCallbackerIPListBlock,
OxylabsCheckJobStatusBlock,
OxylabsGetJobResultsBlock,
OxylabsProcessWebhookBlock,
OxylabsProxyFetchBlock,
OxylabsSubmitBatchBlock,
OxylabsSubmitJobAsyncBlock,
OxylabsSubmitJobRealtimeBlock,
)
__all__ = [
"OxylabsSubmitJobAsyncBlock",
"OxylabsSubmitJobRealtimeBlock",
"OxylabsSubmitBatchBlock",
"OxylabsCheckJobStatusBlock",
"OxylabsGetJobResultsBlock",
"OxylabsProxyFetchBlock",
"OxylabsProcessWebhookBlock",
"OxylabsCallbackerIPListBlock",
]

View File

@@ -0,0 +1,15 @@
"""
Shared configuration for all Oxylabs blocks using the SDK pattern.
"""
from backend.sdk import BlockCostType, ProviderBuilder
# Configure the Oxylabs provider with username/password authentication
oxylabs = (
ProviderBuilder("oxylabs")
.with_user_password(
"OXYLABS_USERNAME", "OXYLABS_PASSWORD", "Oxylabs API Credentials"
)
.with_base_cost(10, BlockCostType.RUN) # Higher cost for web scraping service
.build()
)

View File

@@ -0,0 +1,811 @@
"""
Oxylabs Web Scraper API Blocks
This module implements blocks for interacting with the Oxylabs Web Scraper API.
Oxylabs provides powerful web scraping capabilities with anti-blocking measures,
JavaScript rendering, and built-in parsers for various sources.
"""
import base64
from datetime import datetime
from enum import Enum
from typing import Any, Dict, List, Literal, Optional, Union
from backend.sdk import (
Block,
BlockCategory,
BlockOutput,
BlockSchema,
CredentialsMetaInput,
Requests,
SchemaField,
UserPasswordCredentials,
)
from ._config import oxylabs
# Enums for Oxylabs API
class OxylabsSource(str, Enum):
"""Available scraping sources"""
AMAZON_PRODUCT = "amazon_product"
AMAZON_SEARCH = "amazon_search"
GOOGLE_SEARCH = "google_search"
GOOGLE_SHOPPING = "google_shopping"
UNIVERSAL = "universal"
# Add more sources as needed
class UserAgentType(str, Enum):
"""User agent types for scraping"""
DESKTOP_CHROME = "desktop_chrome"
DESKTOP_FIREFOX = "desktop_firefox"
DESKTOP_SAFARI = "desktop_safari"
DESKTOP_EDGE = "desktop_edge"
MOBILE_ANDROID = "mobile_android"
MOBILE_IOS = "mobile_ios"
class RenderType(str, Enum):
"""Rendering options"""
NONE = "none"
HTML = "html"
PNG = "png"
class ResultType(str, Enum):
"""Result format types"""
DEFAULT = "default"
RAW = "raw"
PARSED = "parsed"
PNG = "png"
class JobStatus(str, Enum):
"""Job status values"""
PENDING = "pending"
DONE = "done"
FAULTED = "faulted"
# Base class for Oxylabs blocks
class OxylabsBlockBase(Block):
"""Base class for all Oxylabs blocks with common functionality."""
@staticmethod
def get_auth_header(credentials: UserPasswordCredentials) -> str:
"""Create Basic Auth header from username and password."""
username = credentials.username
password = credentials.password.get_secret_value()
auth_string = f"{username}:{password}"
encoded = base64.b64encode(auth_string.encode()).decode()
return f"Basic {encoded}"
@staticmethod
async def make_request(
method: str,
url: str,
credentials: UserPasswordCredentials,
json_data: Optional[dict] = None,
params: Optional[dict] = None,
timeout: int = 300, # 5 minutes default for scraping
) -> dict:
"""Make an authenticated request to the Oxylabs API."""
headers = {
"Authorization": OxylabsBlockBase.get_auth_header(credentials),
"Content-Type": "application/json",
}
response = await Requests().request(
method=method,
url=url,
headers=headers,
json=json_data,
params=params,
timeout=timeout,
)
if response.status < 200 or response.status >= 300:
try:
error_data = response.json()
except Exception:
error_data = {"message": response.text()}
raise Exception(f"Oxylabs API error ({response.status}): {error_data}")
# Handle empty responses (204 No Content)
if response.status == 204:
return {}
return response.json()
# 1. Submit Job (Async)
class OxylabsSubmitJobAsyncBlock(OxylabsBlockBase):
"""
Submit a scraping job asynchronously to Oxylabs.
Returns a job ID for later polling or webhook delivery.
"""
class Input(BlockSchema):
credentials: CredentialsMetaInput = oxylabs.credentials_field(
description="Oxylabs username and password"
)
source: OxylabsSource = SchemaField(description="The source/site to scrape")
url: Optional[str] = SchemaField(
description="URL to scrape (for URL-based sources)", default=None
)
query: Optional[str] = SchemaField(
description="Query/keyword/ID to search (for query-based sources)",
default=None,
)
geo_location: Optional[str] = SchemaField(
description="Geographical location (e.g., 'United States', '90210')",
default=None,
)
parse: bool = SchemaField(
description="Return structured JSON output", default=False
)
render: RenderType = SchemaField(
description="Enable JS rendering or screenshots", default=RenderType.NONE
)
user_agent_type: Optional[UserAgentType] = SchemaField(
description="User agent type for the request", default=None
)
callback_url: Optional[str] = SchemaField(
description="Webhook URL for job completion notification", default=None
)
advanced_options: Optional[Dict[str, Any]] = SchemaField(
description="Additional parameters (e.g., storage_type, context)",
default=None,
)
class Output(BlockSchema):
job_id: str = SchemaField(description="The Oxylabs job ID")
status: str = SchemaField(description="Job status (usually 'pending')")
self_url: str = SchemaField(description="URL to check job status")
results_url: str = SchemaField(description="URL to get results (when done)")
def __init__(self):
super().__init__(
id="a7c3b5d9-8e2f-4a1b-9c6d-3f7e8b9a0d5c",
description="Submit an asynchronous scraping job to Oxylabs",
categories={BlockCategory.SEARCH},
input_schema=self.Input,
output_schema=self.Output,
)
async def run(
self, input_data: Input, *, credentials: UserPasswordCredentials, **kwargs
) -> BlockOutput:
# Build request payload
payload: Dict[str, Any] = {"source": input_data.source}
# Add URL or query based on what's provided
if input_data.url:
payload["url"] = input_data.url
elif input_data.query:
payload["query"] = input_data.query
else:
raise ValueError("Either 'url' or 'query' must be provided")
# Add optional parameters
if input_data.geo_location:
payload["geo_location"] = input_data.geo_location
if input_data.parse:
payload["parse"] = True
if input_data.render != RenderType.NONE:
payload["render"] = input_data.render
if input_data.user_agent_type:
payload["user_agent_type"] = input_data.user_agent_type
if input_data.callback_url:
payload["callback_url"] = input_data.callback_url
# Merge advanced options
if input_data.advanced_options:
payload.update(input_data.advanced_options)
# Submit job
result = await self.make_request(
method="POST",
url="https://data.oxylabs.io/v1/queries",
credentials=credentials,
json_data=payload,
)
# Extract job info
job_id = result.get("id", "")
status = result.get("status", "pending")
# Build URLs
self_url = f"https://data.oxylabs.io/v1/queries/{job_id}"
results_url = f"https://data.oxylabs.io/v1/queries/{job_id}/results"
yield "job_id", job_id
yield "status", status
yield "self_url", self_url
yield "results_url", results_url
# 2. Submit Job (Realtime)
class OxylabsSubmitJobRealtimeBlock(OxylabsBlockBase):
"""
Submit a scraping job and wait for the result synchronously.
The connection is held open until the scraping completes.
"""
class Input(BlockSchema):
credentials: CredentialsMetaInput = oxylabs.credentials_field(
description="Oxylabs username and password"
)
source: OxylabsSource = SchemaField(description="The source/site to scrape")
url: Optional[str] = SchemaField(
description="URL to scrape (for URL-based sources)", default=None
)
query: Optional[str] = SchemaField(
description="Query/keyword/ID to search (for query-based sources)",
default=None,
)
geo_location: Optional[str] = SchemaField(
description="Geographical location (e.g., 'United States', '90210')",
default=None,
)
parse: bool = SchemaField(
description="Return structured JSON output", default=False
)
render: RenderType = SchemaField(
description="Enable JS rendering or screenshots", default=RenderType.NONE
)
user_agent_type: Optional[UserAgentType] = SchemaField(
description="User agent type for the request", default=None
)
advanced_options: Optional[Dict[str, Any]] = SchemaField(
description="Additional parameters", default=None
)
class Output(BlockSchema):
status: Literal["done", "faulted"] = SchemaField(
description="Job completion status"
)
result: Union[str, dict, bytes] = SchemaField(
description="Scraped content (HTML, JSON, or image)"
)
meta: Dict[str, Any] = SchemaField(description="Job metadata")
def __init__(self):
super().__init__(
id="b8d4c6e0-9f3a-5b2c-0d7e-4a8f9c0b1e6d",
description="Submit a synchronous scraping job to Oxylabs",
categories={BlockCategory.SEARCH},
input_schema=self.Input,
output_schema=self.Output,
)
async def run(
self, input_data: Input, *, credentials: UserPasswordCredentials, **kwargs
) -> BlockOutput:
# Build request payload (similar to async, but no callback)
payload: Dict[str, Any] = {"source": input_data.source}
if input_data.url:
payload["url"] = input_data.url
elif input_data.query:
payload["query"] = input_data.query
else:
raise ValueError("Either 'url' or 'query' must be provided")
# Add optional parameters
if input_data.geo_location:
payload["geo_location"] = input_data.geo_location
if input_data.parse:
payload["parse"] = True
if input_data.render != RenderType.NONE:
payload["render"] = input_data.render
if input_data.user_agent_type:
payload["user_agent_type"] = input_data.user_agent_type
# Merge advanced options
if input_data.advanced_options:
payload.update(input_data.advanced_options)
# Submit job synchronously (using realtime endpoint)
result = await self.make_request(
method="POST",
url="https://realtime.oxylabs.io/v1/queries",
credentials=credentials,
json_data=payload,
timeout=600, # 10 minutes for realtime
)
# Extract results
status = "done" if result else "faulted"
# Handle different result types
content = result
if input_data.parse and "results" in result:
content = result["results"]
elif "content" in result:
content = result["content"]
meta = {
"source": input_data.source,
"timestamp": datetime.utcnow().isoformat(),
}
yield "status", status
yield "result", content
yield "meta", meta
# 3. Submit Batch
class OxylabsSubmitBatchBlock(OxylabsBlockBase):
"""
Submit multiple scraping jobs in one request (up to 5,000).
Returns an array of job IDs for batch processing.
"""
class Input(BlockSchema):
credentials: CredentialsMetaInput = oxylabs.credentials_field(
description="Oxylabs username and password"
)
source: OxylabsSource = SchemaField(
description="The source/site to scrape (applies to all)"
)
url_list: Optional[List[str]] = SchemaField(
description="List of URLs to scrape", default=None
)
query_list: Optional[List[str]] = SchemaField(
description="List of queries/keywords to search", default=None
)
geo_location: Optional[str] = SchemaField(
description="Geographical location (applies to all)", default=None
)
parse: bool = SchemaField(
description="Return structured JSON output", default=False
)
render: RenderType = SchemaField(
description="Enable JS rendering or screenshots", default=RenderType.NONE
)
user_agent_type: Optional[UserAgentType] = SchemaField(
description="User agent type for the requests", default=None
)
callback_url: Optional[str] = SchemaField(
description="Webhook URL for job completion notifications", default=None
)
advanced_options: Optional[Dict[str, Any]] = SchemaField(
description="Additional parameters", default=None
)
class Output(BlockSchema):
job_ids: List[str] = SchemaField(description="List of job IDs")
count: int = SchemaField(description="Number of jobs created")
def __init__(self):
super().__init__(
id="c9e5d7f1-0a4b-6c3d-1e8f-5b9a0c2d3f7e",
description="Submit batch scraping jobs to Oxylabs",
categories={BlockCategory.SEARCH},
input_schema=self.Input,
output_schema=self.Output,
)
async def run(
self, input_data: Input, *, credentials: UserPasswordCredentials, **kwargs
) -> BlockOutput:
# Build batch request payload
payload: Dict[str, Any] = {"source": input_data.source}
# Add URL list or query list
if input_data.url_list:
if len(input_data.url_list) > 5000:
raise ValueError("Batch size cannot exceed 5,000 URLs")
payload["url"] = input_data.url_list
elif input_data.query_list:
if len(input_data.query_list) > 5000:
raise ValueError("Batch size cannot exceed 5,000 queries")
payload["query"] = input_data.query_list
else:
raise ValueError("Either 'url_list' or 'query_list' must be provided")
# Add optional parameters (apply to all items)
if input_data.geo_location:
payload["geo_location"] = input_data.geo_location
if input_data.parse:
payload["parse"] = True
if input_data.render != RenderType.NONE:
payload["render"] = input_data.render
if input_data.user_agent_type:
payload["user_agent_type"] = input_data.user_agent_type
if input_data.callback_url:
payload["callback_url"] = input_data.callback_url
# Merge advanced options
if input_data.advanced_options:
payload.update(input_data.advanced_options)
# Submit batch
result = await self.make_request(
method="POST",
url="https://data.oxylabs.io/v1/queries/batch",
credentials=credentials,
json_data=payload,
)
# Extract job IDs
queries = result.get("queries", [])
job_ids = [q.get("id", "") for q in queries if q.get("id")]
yield "job_ids", job_ids
yield "count", len(job_ids)
# 4. Check Job Status
class OxylabsCheckJobStatusBlock(OxylabsBlockBase):
"""
Check the status of a scraping job.
Can optionally wait for completion by polling.
"""
class Input(BlockSchema):
credentials: CredentialsMetaInput = oxylabs.credentials_field(
description="Oxylabs username and password"
)
job_id: str = SchemaField(description="Job ID to check")
wait_for_completion: bool = SchemaField(
description="Poll until job leaves 'pending' status", default=False
)
class Output(BlockSchema):
status: JobStatus = SchemaField(description="Current job status")
updated_at: Optional[str] = SchemaField(
description="Last update timestamp", default=None
)
results_url: Optional[str] = SchemaField(
description="URL to get results (when done)", default=None
)
raw_status: Dict[str, Any] = SchemaField(description="Full status response")
def __init__(self):
super().__init__(
id="d0f6e8a2-1b5c-7d4e-2f9a-6c0b1d3e4a8f",
description="Check the status of an Oxylabs scraping job",
categories={BlockCategory.SEARCH},
input_schema=self.Input,
output_schema=self.Output,
)
async def run(
self, input_data: Input, *, credentials: UserPasswordCredentials, **kwargs
) -> BlockOutput:
import asyncio
url = f"https://data.oxylabs.io/v1/queries/{input_data.job_id}"
# Check status (with optional polling)
max_attempts = 60 if input_data.wait_for_completion else 1
delay = 5 # seconds between polls
# Initialize variables that will be used outside the loop
result = {}
status = "pending"
for attempt in range(max_attempts):
result = await self.make_request(
method="GET",
url=url,
credentials=credentials,
)
status = result.get("status", "pending")
# If not waiting or job is complete, return
if not input_data.wait_for_completion or status != "pending":
break
# Wait before next poll
if attempt < max_attempts - 1:
await asyncio.sleep(delay)
# Extract results URL if job is done
results_url = None
if status == "done":
links = result.get("_links", [])
for link in links:
if link.get("rel") == "results":
results_url = link.get("href")
break
yield "status", JobStatus(status)
yield "updated_at", result.get("updated_at")
yield "results_url", results_url
yield "raw_status", result
# 5. Get Job Results
class OxylabsGetJobResultsBlock(OxylabsBlockBase):
"""
Download the scraped data for a completed job.
Supports different result formats (raw, parsed, screenshot).
"""
class Input(BlockSchema):
credentials: CredentialsMetaInput = oxylabs.credentials_field(
description="Oxylabs username and password"
)
job_id: str = SchemaField(description="Job ID to get results for")
result_type: ResultType = SchemaField(
description="Type of result to retrieve", default=ResultType.DEFAULT
)
class Output(BlockSchema):
content: Union[str, dict, bytes] = SchemaField(description="The scraped data")
content_type: str = SchemaField(description="MIME type of the content")
meta: Dict[str, Any] = SchemaField(description="Result metadata")
def __init__(self):
super().__init__(
id="e1a7f9b3-2c6d-8e5f-3a0b-7d1c2e4f5b9a",
description="Get results from a completed Oxylabs job",
categories={BlockCategory.SEARCH},
input_schema=self.Input,
output_schema=self.Output,
)
async def run(
self, input_data: Input, *, credentials: UserPasswordCredentials, **kwargs
) -> BlockOutput:
url = f"https://data.oxylabs.io/v1/queries/{input_data.job_id}/results"
# Add result type parameter if not default
params = {}
if input_data.result_type != ResultType.DEFAULT:
params["type"] = input_data.result_type
# Get results
headers = {
"Authorization": self.get_auth_header(credentials),
}
# For PNG results, we need to handle binary data
if input_data.result_type == ResultType.PNG:
response = await Requests().request(
method="GET",
url=url,
headers=headers,
params=params,
)
if response.status < 200 or response.status >= 300:
raise Exception(f"Failed to get results: {response.status}")
content = response.content # Binary content
content_type = response.headers.get("Content-Type", "image/png")
else:
# JSON or text results
result = await self.make_request(
method="GET",
url=url,
credentials=credentials,
params=params,
)
content = result
content_type = "application/json"
meta = {
"job_id": input_data.job_id,
"result_type": input_data.result_type,
"retrieved_at": datetime.utcnow().isoformat(),
}
yield "content", content
yield "content_type", content_type
yield "meta", meta
# 6. Proxy Fetch URL
class OxylabsProxyFetchBlock(OxylabsBlockBase):
"""
Fetch a URL through Oxylabs' HTTPS proxy endpoint.
Ideal for one-off page downloads without job management.
"""
class Input(BlockSchema):
credentials: CredentialsMetaInput = oxylabs.credentials_field(
description="Oxylabs username and password"
)
target_url: str = SchemaField(
description="URL to fetch (must include https://)"
)
geo_location: Optional[str] = SchemaField(
description="Geographical location", default=None
)
user_agent_type: Optional[UserAgentType] = SchemaField(
description="User agent type", default=None
)
render: Literal["none", "html"] = SchemaField(
description="Enable JavaScript rendering", default="none"
)
class Output(BlockSchema):
html: str = SchemaField(description="Page HTML content")
status_code: int = SchemaField(description="HTTP status code")
headers: Dict[str, str] = SchemaField(description="Response headers")
def __init__(self):
super().__init__(
id="f2b8a0c4-3d7e-9f6a-4b1c-8e2d3f5a6c0b",
description="Fetch a URL through Oxylabs proxy",
categories={BlockCategory.SEARCH},
input_schema=self.Input,
output_schema=self.Output,
)
async def run(
self, input_data: Input, *, credentials: UserPasswordCredentials, **kwargs
) -> BlockOutput:
# Prepare proxy headers
headers = {
"Authorization": self.get_auth_header(credentials),
}
if input_data.geo_location:
headers["x-oxylabs-geo-location"] = input_data.geo_location
if input_data.user_agent_type:
headers["x-oxylabs-user-agent-type"] = input_data.user_agent_type
if input_data.render != "none":
headers["x-oxylabs-render"] = input_data.render
# Use the proxy endpoint
# Note: In a real implementation, you'd configure the HTTP client
# to use realtime.oxylabs.io:60000 as an HTTPS proxy
# For this example, we'll use the regular API endpoint
payload = {
"source": "universal",
"url": input_data.target_url,
}
if input_data.geo_location:
payload["geo_location"] = input_data.geo_location
if input_data.user_agent_type:
payload["user_agent_type"] = input_data.user_agent_type
if input_data.render != "none":
payload["render"] = input_data.render
result = await self.make_request(
method="POST",
url="https://realtime.oxylabs.io/v1/queries",
credentials=credentials,
json_data=payload,
timeout=300,
)
# Extract content
html = result.get("content", "")
status_code = result.get("status_code", 200)
headers = result.get("headers", {})
yield "html", html
yield "status_code", status_code
yield "headers", headers
# 7. Callback Trigger (Webhook) - This would be handled by the platform's webhook system
# We'll create a block to process webhook data instead
class OxylabsProcessWebhookBlock(OxylabsBlockBase):
"""
Process incoming Oxylabs webhook callback data.
Extracts job information from the webhook payload.
"""
class Input(BlockSchema):
webhook_payload: Dict[str, Any] = SchemaField(
description="Raw webhook payload from Oxylabs"
)
verify_ip: bool = SchemaField(
description="Verify the request came from Oxylabs IPs", default=True
)
source_ip: Optional[str] = SchemaField(
description="IP address of the webhook sender", default=None
)
class Output(BlockSchema):
job_id: str = SchemaField(description="Job ID from callback")
status: JobStatus = SchemaField(description="Job completion status")
results_url: Optional[str] = SchemaField(
description="URL to fetch the results", default=None
)
raw_callback: Dict[str, Any] = SchemaField(description="Full callback payload")
def __init__(self):
super().__init__(
id="a3c9b1d5-4e8f-0b2d-5c6e-9f0a1d3f7b8c",
description="Process Oxylabs webhook callback data",
categories={BlockCategory.SEARCH},
input_schema=self.Input,
output_schema=self.Output,
)
async def run(
self, input_data: Input, *, credentials: UserPasswordCredentials, **kwargs
) -> BlockOutput:
payload = input_data.webhook_payload
# Extract job information
job_id = payload.get("id", "")
status = JobStatus(payload.get("status", "pending"))
# Find results URL
results_url = None
links = payload.get("_links", [])
for link in links:
if link.get("rel") == "results":
results_url = link.get("href")
break
# If IP verification is requested, we'd check against the callbacker IPs
# This is simplified for the example
if input_data.verify_ip and input_data.source_ip:
# In a real implementation, we'd fetch and cache the IP list
# and verify the source_ip is in that list
pass
yield "job_id", job_id
yield "status", status
yield "results_url", results_url
yield "raw_callback", payload
# 8. Callbacker IP List
class OxylabsCallbackerIPListBlock(OxylabsBlockBase):
"""
Get the list of IP addresses used by Oxylabs for callbacks.
Use this for firewall whitelisting.
"""
class Input(BlockSchema):
credentials: CredentialsMetaInput = oxylabs.credentials_field(
description="Oxylabs username and password"
)
class Output(BlockSchema):
ip_list: List[str] = SchemaField(description="List of Oxylabs callback IPs")
updated_at: str = SchemaField(description="Timestamp of retrieval")
def __init__(self):
super().__init__(
id="b4d0c2e6-5f9a-1c3e-6d7f-0a1b2d4e8c9d",
description="Get Oxylabs callback IP addresses",
categories={BlockCategory.SEARCH},
input_schema=self.Input,
output_schema=self.Output,
)
async def run(
self, input_data: Input, *, credentials: UserPasswordCredentials, **kwargs
) -> BlockOutput:
result = await self.make_request(
method="GET",
url="https://data.oxylabs.io/v1/info/callbacker_ips",
credentials=credentials,
)
# Extract IP list
ip_list = result.get("callbacker_ips", [])
updated_at = datetime.utcnow().isoformat()
yield "ip_list", ip_list
yield "updated_at", updated_at

View File

@@ -194,6 +194,39 @@ if OAUTH_IS_CONFIGURED:
my_service = builder.build()
```
### Username/Password Authentication
For services that use HTTP Basic Auth or username/password authentication:
```python
# In _config.py
from backend.sdk import BlockCostType, ProviderBuilder
my_service = (
ProviderBuilder("my-service")
.with_user_password("MY_SERVICE_USERNAME", "MY_SERVICE_PASSWORD", "My Service Credentials")
.with_base_cost(1, BlockCostType.RUN)
.build()
)
```
In your blocks, the credentials will be provided as `UserPasswordCredentials`:
```python
from backend.sdk import UserPasswordCredentials
async def run(
self,
input_data: Input,
*,
credentials: UserPasswordCredentials,
**kwargs
) -> BlockOutput:
username = credentials.username.get_secret_value()
password = credentials.password.get_secret_value()
# Use for HTTP Basic Auth or other authentication
```
### Multiple Authentication Methods
Providers built with ProviderBuilder can support multiple authentication methods:
@@ -818,4 +851,162 @@ class DataWebhookBlock(Block):
yield "timestamp", payload.get("timestamp", "")
```
### Real-World Example: Exa Webhook Integration
```python
# _webhook.py
from enum import Enum
import hashlib
import hmac
from backend.sdk import (
BaseWebhooksManager,
Webhook,
ProviderName,
Requests,
APIKeyCredentials,
)
from backend.data.model import Credentials
class ExaWebhookManager(BaseWebhooksManager):
"""Webhook manager for Exa API."""
PROVIDER_NAME = ProviderName("exa")
class WebhookType(str, Enum):
WEBSET = "webset"
@classmethod
async def validate_payload(cls, webhook: Webhook, request) -> tuple[dict, str]:
"""Validate incoming webhook payload and signature."""
payload = await request.json()
event_type = payload.get("eventType", "unknown")
# Verify webhook signature if secret is available
if webhook.secret:
signature = request.headers.get("X-Exa-Signature")
if signature:
body = await request.body()
expected_signature = hmac.new(
webhook.secret.encode(),
body,
hashlib.sha256
).hexdigest()
if not hmac.compare_digest(signature, expected_signature):
raise ValueError("Invalid webhook signature")
return payload, event_type
async def _register_webhook(
self,
credentials: Credentials,
webhook_type: str,
resource: str,
events: list[str],
ingress_url: str,
secret: str,
) -> tuple[str, dict]:
"""Register webhook with Exa API."""
if not isinstance(credentials, APIKeyCredentials):
raise ValueError("Exa webhooks require API key credentials")
api_key = credentials.api_key.get_secret_value()
response = await Requests().post(
"https://api.exa.ai/v0/webhooks",
headers={"x-api-key": api_key},
json={
"url": ingress_url,
"events": events,
"metadata": {"resource": resource}
}
)
webhook_data = response.json()
return webhook_data["id"], {
"events": events,
"exa_secret": webhook_data.get("secret"),
}
async def _deregister_webhook(self, webhook: Webhook, credentials: Credentials) -> None:
"""Deregister webhook from Exa API."""
if not isinstance(credentials, APIKeyCredentials):
raise ValueError("Exa webhooks require API key credentials")
api_key = credentials.api_key.get_secret_value()
await Requests().delete(
f"https://api.exa.ai/v0/webhooks/{webhook.provider_webhook_id}",
headers={"x-api-key": api_key}
)
# _config.py
from backend.sdk import BlockCostType, ProviderBuilder
from ._webhook import ExaWebhookManager
exa = (
ProviderBuilder("exa")
.with_api_key("EXA_API_KEY", "Exa API Key")
.with_webhook_manager(ExaWebhookManager)
.with_base_cost(1, BlockCostType.RUN)
.build()
)
# webhook_blocks.py
from backend.sdk import *
from ._config import exa
class ExaWebsetWebhookBlock(Block):
"""Receives webhook notifications for Exa webset events."""
class Input(BlockSchema):
credentials: CredentialsMetaInput = exa.credentials_field(
description="Exa API credentials"
)
webhook_url: str = SchemaField(
description="URL to receive webhooks (auto-generated)",
default="",
hidden=True,
)
webset_id: str = SchemaField(
description="The webset ID to monitor",
default="",
)
event_filter: dict = SchemaField(
description="Configure which events to receive",
default={}
)
payload: dict = SchemaField(
description="Webhook payload data",
default={},
hidden=True
)
class Output(BlockSchema):
event_type: str = SchemaField(description="Type of event")
webset_id: str = SchemaField(description="ID of the affected webset")
data: dict = SchemaField(description="Event data")
def __init__(self):
super().__init__(
id="d1e2f3a4-b5c6-7d8e-9f0a-1b2c3d4e5f6a",
description="Receive notifications for Exa webset events",
categories={BlockCategory.INPUT},
input_schema=self.Input,
output_schema=self.Output,
block_type=BlockType.WEBHOOK,
webhook_config=BlockWebhookConfig(
provider=ProviderName("exa"),
webhook_type="webset",
event_filter_input="event_filter",
resource_format="{webset_id}",
),
)
async def run(self, input_data: Input, **kwargs) -> BlockOutput:
payload = input_data.payload
yield "event_type", payload.get("eventType", "unknown")
yield "webset_id", payload.get("websetId", input_data.webset_id)
yield "data", payload.get("data", {})
```
For more examples, see the `/autogpt_platform/backend/backend/blocks/examples/` directory.

View File

@@ -27,7 +27,7 @@ from backend.data.block import (
BlockWebhookConfig,
)
from backend.data.integrations import Webhook
from backend.data.model import APIKeyCredentials, CredentialsField
from backend.data.model import APIKeyCredentials, Credentials, CredentialsField
from backend.data.model import CredentialsMetaInput as _CredentialsMetaInput
from backend.data.model import (
NodeExecutionStats,
@@ -127,6 +127,7 @@ __all__ = [
"BlockManualWebhookConfig",
# Schema and Model Components
"SchemaField",
"Credentials",
"CredentialsField",
"CredentialsMetaInput",
"APIKeyCredentials",

View File

@@ -8,7 +8,7 @@ from typing import Callable, List, Optional, Type
from pydantic import SecretStr
from backend.data.cost import BlockCost, BlockCostType
from backend.data.model import APIKeyCredentials, Credentials
from backend.data.model import APIKeyCredentials, Credentials, UserPasswordCredentials
from backend.integrations.oauth.base import BaseOAuthHandler
from backend.integrations.webhooks._base import BaseWebhooksManager
from backend.sdk.provider import Provider
@@ -81,6 +81,27 @@ class ProviderBuilder:
)
return self
def with_user_password(
self, username_env_var: str, password_env_var: str, title: str
) -> "ProviderBuilder":
"""Add username/password support with environment variable names."""
self._supported_auth_types.add("user_password")
# Check if credentials exist in environment
username = os.getenv(username_env_var)
password = os.getenv(password_env_var)
if username and password:
self._default_credentials.append(
UserPasswordCredentials(
id=f"{self.name}-default",
provider=self.name,
username=SecretStr(username),
password=SecretStr(password),
title=title,
)
)
return self
def with_webhook_manager(
self, manager_class: Type[BaseWebhooksManager]
) -> "ProviderBuilder":