diff --git a/autogpt_platform/backend/backend/blocks/exa/trigger.py b/autogpt_platform/backend/backend/blocks/exa/trigger.py new file mode 100644 index 0000000000..fd343f41d1 --- /dev/null +++ b/autogpt_platform/backend/backend/blocks/exa/trigger.py @@ -0,0 +1,212 @@ +import json +import logging +from pathlib import Path +from pydantic import BaseModel + +from backend.data.block import ( + Block, + BlockCategory, + BlockOutput, + BlockSchema, + BlockWebhookConfig, +) +from backend.data.model import SchemaField, APIKeyCredentials +from backend.integrations.providers import ProviderName + +from ._auth import ( + TEST_CREDENTIALS, + TEST_CREDENTIALS_INPUT, + ExaCredentialsField, + ExaCredentialsInput, +) + +logger = logging.getLogger(__name__) + + +class ExaTriggerBase: + """Base class for Exa webhook triggers.""" + + class Input(BlockSchema): + """Base input schema for Exa triggers.""" + credentials: ExaCredentialsInput = ExaCredentialsField() + # --8<-- [start:example-payload-field] + payload: dict = SchemaField(hidden=True, default_factory=dict) + # --8<-- [end:example-payload-field] + + class Output(BlockSchema): + """Base output schema for Exa triggers.""" + payload: dict = SchemaField( + description="The complete webhook payload that was received from Exa. " + "Includes information about the event type, data, and creation timestamp." + ) + error: str = SchemaField( + description="Error message if the payload could not be processed" + ) + + def run(self, input_data: Input, **kwargs) -> BlockOutput: + """Process the webhook payload from Exa. + + Args: + input_data: The input data containing the webhook payload + + Yields: + The complete webhook payload + """ + yield "payload", input_data.payload + + +class ExaWebsetTriggerBlock(ExaTriggerBase, Block): + """Block for handling Exa Webset webhook events. + + This block triggers on various Exa Webset events such as webset creation, + deletion, search completion, etc. and outputs the event details. + """ + + EXAMPLE_PAYLOAD_FILE = ( + Path(__file__).parent / "example_payloads" / "webset.created.json" + ) + + class Input(ExaTriggerBase.Input): + """Input schema for Exa Webset trigger with event filtering options.""" + + class EventsFilter(BaseModel): + """ + Event filter options for Exa Webset webhooks. + + See: https://docs.exa.ai/api-reference/webhooks + """ + # Webset events + webset_created: bool = False + webset_deleted: bool = False + webset_paused: bool = False + webset_idle: bool = False + + # Search events + webset_search_created: bool = False + webset_search_updated: bool = False + webset_search_completed: bool = False + webset_search_canceled: bool = False + + # Item events + webset_item_created: bool = False + webset_item_enriched: bool = False + + # Export events + webset_export_created: bool = False + webset_export_completed: bool = False + + events: EventsFilter = SchemaField( + title="Events", description="The events to subscribe to" + ) + + class Output(ExaTriggerBase.Output): + """Output schema for Exa Webset trigger with event-specific fields.""" + + event_type: str = SchemaField( + description="The type of event that triggered the webhook" + ) + webset_id: str = SchemaField( + description="The ID of the affected webset" + ) + created_at: str = SchemaField( + description="Timestamp when the event was created" + ) + data: dict = SchemaField( + description="Object containing the full resource that triggered the event" + ) + + def __init__(self): + """Initialize the Exa Webset trigger block with its configuration.""" + + # Define a webhook type constant for Exa + class ExaWebhookType: + """Constants for Exa webhook types.""" + WEBSET = "webset" + + # Create example payload + example_payload = { + "id": "663de972-bfe7-47ef-b4d7-179cfed7aa44", + "object": "event", + "type": "webset.created", + "data": { + "id": "wbs_123456789", + "name": "Example Webset", + "description": "An example webset for testing" + }, + "createdAt": "2023-06-01T12:00:00Z" + } + + # Map UI event names to API event names + self.event_mapping = { + "webset_created": "webset.created", + "webset_deleted": "webset.deleted", + "webset_paused": "webset.paused", + "webset_idle": "webset.idle", + "webset_search_created": "webset.search.created", + "webset_search_updated": "webset.search.updated", + "webset_search_completed": "webset.search.completed", + "webset_search_canceled": "webset.search.canceled", + "webset_item_created": "webset.item.created", + "webset_item_enriched": "webset.item.enriched", + "webset_export_created": "webset.export.created", + "webset_export_completed": "webset.export.completed" + } + + super().__init__( + id="804ac1ed-d692-4ccb-a390-739a846a2667", + description="This block triggers on Exa Webset events and outputs the event type and payload.", + categories={BlockCategory.DEVELOPER_TOOLS, BlockCategory.INPUT}, + input_schema=ExaWebsetTriggerBlock.Input, + output_schema=ExaWebsetTriggerBlock.Output, + webhook_config=BlockWebhookConfig( + provider=ProviderName.EXA, + webhook_type=ExaWebhookType.WEBSET, + resource_format="", # Exa doesn't require a specific resource format + event_filter_input="events", + event_format="webset.{event}", + ), + test_input={ + "credentials": TEST_CREDENTIALS_INPUT, + "events": {"webset_created": True, "webset_search_completed": True}, + "payload": example_payload, + }, + test_credentials=TEST_CREDENTIALS, + test_output=[ + ("payload", example_payload), + ("event_type", example_payload["type"]), + ("webset_id", "wbs_123456789"), + ("created_at", "2023-06-01T12:00:00Z"), + ("data", example_payload["data"]), + ], + ) + + def run(self, input_data: Input, **kwargs) -> BlockOutput: # type: ignore + """Process Exa Webset webhook events. + + Args: + input_data: The input data containing the webhook payload and event filter + + Yields: + Event details including event type, webset ID, creation timestamp, and data, + or an error message if the event type doesn't match the filter or if required + fields are missing from the payload. + """ + yield from super().run(input_data, **kwargs) + try: + # Get the event type from the payload + event_type = input_data.payload["type"] + + # Check if this event type is in the user's selected events + # Convert API event name to UI event name (reverse mapping) + ui_event_name = next((k for k, v in self.event_mapping.items() if v == event_type), None) + + # Only process events that match the filter + if ui_event_name and getattr(input_data.events, ui_event_name, False): + yield "event_type", event_type + yield "webset_id", input_data.payload["data"]["id"] + yield "created_at", input_data.payload["createdAt"] + yield "data", input_data.payload["data"] + else: + yield "error", f"Event type {event_type} not in selected events filter" + except KeyError as e: + yield "error", f"Missing expected field in payload: {str(e)}" diff --git a/autogpt_platform/backend/backend/blocks/exa/websets.py b/autogpt_platform/backend/backend/blocks/exa/websets.py new file mode 100644 index 0000000000..0d71d96d4c --- /dev/null +++ b/autogpt_platform/backend/backend/blocks/exa/websets.py @@ -0,0 +1,321 @@ +from typing import Any, Dict, List, Optional + +import requests + +from backend.blocks.exa._auth import ( + ExaCredentials, + ExaCredentialsField, + ExaCredentialsInput, +) +from backend.data.block import Block, BlockCategory, BlockOutput, BlockSchema +from backend.data.model import SchemaField + +# --- Create Webset Block --- +class ExaCreateWebsetBlock(Block): + """Block for creating a Webset using Exa's Websets API.""" + class Input(BlockSchema): + credentials: ExaCredentialsInput = ExaCredentialsField() + query: str = SchemaField(description="The search query for the webset") + count: int = SchemaField(description="Number of results to return", default=5) + enrichments: Optional[List[Dict[str, Any]]] = SchemaField( + description="List of enrichment dicts (optional)", default_factory=list, advanced=True + ) + external_id: Optional[str] = SchemaField( + description="Optional external identifier", default=None, advanced=True + ) + metadata: Optional[Dict[str, Any]] = SchemaField( + description="Optional metadata", default_factory=dict, advanced=True + ) + + class Output(BlockSchema): + webset: Optional[Dict[str, Any]] = SchemaField( + description="The created webset object (or None if error)", default=None + ) + error: str = SchemaField( + description="Error message if the request failed", default="" + ) + + def __init__(self): + """Initialize the ExaCreateWebsetBlock with its configuration.""" + super().__init__( + id="322351cc-35d7-45ec-8920-9a3c98920411", + description="Creates a Webset using Exa's Websets API", + categories={BlockCategory.SEARCH}, + input_schema=ExaCreateWebsetBlock.Input, + output_schema=ExaCreateWebsetBlock.Output, + ) + + def run(self, input_data: Input, *, credentials: ExaCredentials, **kwargs) -> BlockOutput: + """ + Execute the block to create a webset with Exa's API. + + Args: + input_data: The input parameters for creating a webset + credentials: The Exa API credentials + + Yields: + Either the created webset object or an error message + """ + url = "https://api.exa.ai/websets/v0/websets" + headers = { + "Content-Type": "application/json", + "x-api-key": credentials.api_key.get_secret_value(), + } + payload = { + "search": { + "query": input_data.query, + "count": input_data.count, + } + } + optional_fields = {} + if isinstance(input_data.enrichments, list) and input_data.enrichments: + optional_fields["enrichments"] = input_data.enrichments + if isinstance(input_data.external_id, str) and input_data.external_id: + optional_fields["externalId"] = input_data.external_id + if isinstance(input_data.metadata, dict) and input_data.metadata: + optional_fields["metadata"] = input_data.metadata + payload.update(optional_fields) + try: + response = requests.post(url, headers=headers, json=payload) + response.raise_for_status() + data = response.json() + yield "webset", data + except Exception as e: + yield "error", str(e) + +# --- Get Webset Block --- +class ExaGetWebsetBlock(Block): + """Block for retrieving a Webset by ID using Exa's Websets API.""" + class Input(BlockSchema): + credentials: ExaCredentialsInput = ExaCredentialsField() + webset_id: str = SchemaField(description="The Webset ID or externalId") + expand_items: bool = SchemaField(description="Expand with items", default=False, advanced=True) + + class Output(BlockSchema): + webset: Optional[Dict[str, Any]] = SchemaField(description="The webset object (or None if error)", default=None) + error: str = SchemaField(description="Error message if the request failed", default="") + + def __init__(self): + """Initialize the ExaGetWebsetBlock with its configuration.""" + super().__init__( + id="f9229293-cddf-43fc-94b3-48cbd1a44618", + description="Retrieves a Webset by ID using Exa's Websets API", + categories={BlockCategory.SEARCH}, + input_schema=ExaGetWebsetBlock.Input, + output_schema=ExaGetWebsetBlock.Output, + ) + + def run(self, input_data: Input, *, credentials: ExaCredentials, **kwargs) -> BlockOutput: + """ + Execute the block to retrieve a webset by ID from Exa's API. + + Args: + input_data: The input parameters including the webset ID + credentials: The Exa API credentials + + Yields: + Either the retrieved webset object or an error message + """ + url = f"https://api.exa.ai/websets/v0/websets/{input_data.webset_id}" + headers = { + "Content-Type": "application/json", + "x-api-key": credentials.api_key.get_secret_value(), + } + params = {"expand": "items"} if input_data.expand_items else None + try: + response = requests.get(url, headers=headers, params=params) + response.raise_for_status() + data = response.json() + yield "webset", data + except Exception as e: + yield "error", str(e) + +# --- Delete Webset Block --- +class ExaDeleteWebsetBlock(Block): + """Block for deleting a Webset by ID using Exa's Websets API.""" + class Input(BlockSchema): + credentials: ExaCredentialsInput = ExaCredentialsField() + webset_id: str = SchemaField(description="The Webset ID or externalId") + + class Output(BlockSchema): + deleted: Optional[Dict[str, Any]] = SchemaField(description="The deleted webset object (or None if error)", default=None) + error: str = SchemaField(description="Error message if the request failed", default="") + + def __init__(self): + """Initialize the ExaDeleteWebsetBlock with its configuration.""" + super().__init__( + id="a082e162-274e-4167-a467-a1839e644cbd", + description="Deletes a Webset by ID using Exa's Websets API", + categories={BlockCategory.SEARCH}, + input_schema=ExaDeleteWebsetBlock.Input, + output_schema=ExaDeleteWebsetBlock.Output, + ) + + def run(self, input_data: Input, *, credentials: ExaCredentials, **kwargs) -> BlockOutput: + """ + Execute the block to delete a webset by ID using Exa's API. + + Args: + input_data: The input parameters including the webset ID + credentials: The Exa API credentials + + Yields: + Either the deleted webset object or an error message + """ + url = f"https://api.exa.ai/websets/v0/websets/{input_data.webset_id}" + headers = { + "Content-Type": "application/json", + "x-api-key": credentials.api_key.get_secret_value(), + } + try: + response = requests.delete(url, headers=headers) + response.raise_for_status() + data = response.json() + yield "deleted", data + except Exception as e: + yield "error", str(e) + +# --- Update Webset Block --- +class ExaUpdateWebsetBlock(Block): + """Block for updating a Webset's metadata using Exa's Websets API.""" + class Input(BlockSchema): + credentials: ExaCredentialsInput = ExaCredentialsField() + webset_id: str = SchemaField(description="The Webset ID or externalId") + metadata: Dict[str, Any] = SchemaField(description="Metadata to update", default_factory=dict) + + class Output(BlockSchema): + webset: Optional[Dict[str, Any]] = SchemaField(description="The updated webset object (or None if error)", default=None) + error: str = SchemaField(description="Error message if the request failed", default="") + + def __init__(self): + """Initialize the ExaUpdateWebsetBlock with its configuration.""" + super().__init__( + id="e0c81b70-ac38-4239-8ecd-a75c1737c9ef", + description="Updates a Webset's metadata using Exa's Websets API", + categories={BlockCategory.SEARCH}, + input_schema=ExaUpdateWebsetBlock.Input, + output_schema=ExaUpdateWebsetBlock.Output, + ) + + def run(self, input_data: Input, *, credentials: ExaCredentials, **kwargs) -> BlockOutput: + """ + Execute the block to update a webset's metadata using Exa's API. + + Args: + input_data: The input parameters including the webset ID and metadata + credentials: The Exa API credentials + + Yields: + Either the updated webset object or an error message + """ + url = f"https://api.exa.ai/websets/v0/websets/{input_data.webset_id}" + headers = { + "Content-Type": "application/json", + "x-api-key": credentials.api_key.get_secret_value(), + } + payload = {"metadata": input_data.metadata} + try: + response = requests.post(url, headers=headers, json=payload) + response.raise_for_status() + data = response.json() + yield "webset", data + except Exception as e: + yield "error", str(e) + +# --- List Websets Block --- +class ExaListWebsetsBlock(Block): + """Block for listing all Websets using Exa's Websets API with pagination support.""" + class Input(BlockSchema): + credentials: ExaCredentialsInput = ExaCredentialsField() + limit: int = SchemaField(description="Number of websets to return (max 100)", default=25) + cursor: Optional[str] = SchemaField(description="Pagination cursor (optional)", default=None, advanced=True) + + class Output(BlockSchema): + data: Optional[List[Dict[str, Any]]] = SchemaField(description="List of websets", default=None) + has_more: Optional[bool] = SchemaField(description="Whether there are more results", default=None) + next_cursor: Optional[str] = SchemaField(description="Cursor for next page", default=None) + error: str = SchemaField(description="Error message if the request failed", default="") + + def __init__(self): + """Initialize the ExaListWebsetsBlock with its configuration.""" + super().__init__( + id="887a2dae-c9c3-4ae5-a079-fe3b52be64e4", + description="Lists all Websets using Exa's Websets API", + categories={BlockCategory.SEARCH}, + input_schema=ExaListWebsetsBlock.Input, + output_schema=ExaListWebsetsBlock.Output, + ) + + def run(self, input_data: Input, *, credentials: ExaCredentials, **kwargs) -> BlockOutput: + """ + Execute the block to list websets with pagination using Exa's API. + + Args: + input_data: The input parameters including limit and optional cursor + credentials: The Exa API credentials + + Yields: + The list of websets, pagination info, or an error message + """ + url = "https://api.exa.ai/websets/v0/websets" + headers = { + "Content-Type": "application/json", + "x-api-key": credentials.api_key.get_secret_value(), + } + params: dict[str, Any] = {"limit": int(input_data.limit)} + if isinstance(input_data.cursor, str) and input_data.cursor: + params["cursor"] = input_data.cursor + try: + response = requests.get(url, headers=headers, params=params) + response.raise_for_status() + data = response.json() + yield "data", data.get("data") + yield "has_more", data.get("hasMore") + yield "next_cursor", data.get("nextCursor") + except Exception as e: + yield "error", str(e) + +# --- Cancel Webset Block --- +class ExaCancelWebsetBlock(Block): + """Block for canceling a running Webset using Exa's Websets API.""" + class Input(BlockSchema): + credentials: ExaCredentialsInput = ExaCredentialsField() + webset_id: str = SchemaField(description="The Webset ID or externalId") + + class Output(BlockSchema): + webset: Optional[Dict[str, Any]] = SchemaField(description="The canceled webset object (or None if error)", default=None) + error: str = SchemaField(description="Error message if the request failed", default="") + + def __init__(self): + """Initialize the ExaCancelWebsetBlock with its configuration.""" + super().__init__( + id="f7f0b19c-71e8-4c2f-bc68-904a6a61faf7", + description="Cancels a running Webset using Exa's Websets API", + categories={BlockCategory.SEARCH}, + input_schema=ExaCancelWebsetBlock.Input, + output_schema=ExaCancelWebsetBlock.Output, + ) + + def run(self, input_data: Input, *, credentials: ExaCredentials, **kwargs) -> BlockOutput: + """ + Execute the block to cancel a running webset using Exa's API. + + Args: + input_data: The input parameters including the webset ID + credentials: The Exa API credentials + + Yields: + Either the canceled webset object or an error message + """ + url = f"https://api.exa.ai/websets/v0/websets/{input_data.webset_id}/cancel" + headers = { + "Content-Type": "application/json", + "x-api-key": credentials.api_key.get_secret_value(), + } + try: + response = requests.post(url, headers=headers) + response.raise_for_status() + data = response.json() + yield "webset", data + except Exception as e: + yield "error", str(e) diff --git a/autogpt_platform/backend/backend/integrations/webhooks/__init__.py b/autogpt_platform/backend/backend/integrations/webhooks/__init__.py index b1774aa3fd..a266084e2b 100644 --- a/autogpt_platform/backend/backend/integrations/webhooks/__init__.py +++ b/autogpt_platform/backend/backend/integrations/webhooks/__init__.py @@ -16,6 +16,7 @@ def load_webhook_managers() -> dict["ProviderName", type["BaseWebhooksManager"]] from .generic import GenericWebhooksManager from .github import GithubWebhooksManager from .slant3d import Slant3DWebhooksManager + from .exa import ExaWebhooksManager _WEBHOOK_MANAGERS.update( { @@ -25,6 +26,7 @@ def load_webhook_managers() -> dict["ProviderName", type["BaseWebhooksManager"]] GithubWebhooksManager, Slant3DWebhooksManager, GenericWebhooksManager, + ExaWebhooksManager, ] } ) diff --git a/autogpt_platform/backend/backend/integrations/webhooks/exa.py b/autogpt_platform/backend/backend/integrations/webhooks/exa.py new file mode 100644 index 0000000000..35b3015614 --- /dev/null +++ b/autogpt_platform/backend/backend/integrations/webhooks/exa.py @@ -0,0 +1,119 @@ +import logging + +import requests +from fastapi import Request + +from backend.data import integrations +from backend.data.model import APIKeyCredentials, Credentials +from backend.integrations.providers import ProviderName +from backend.integrations.webhooks._base import BaseWebhooksManager + +logger = logging.getLogger(__name__) + + +class ExaWebhooksManager(BaseWebhooksManager): + """Manager for Exa webhooks""" + + PROVIDER_NAME = ProviderName.EXA + BASE_URL = "https://api.exa.ai/websets/v0" + + async def _register_webhook( + self, + credentials: Credentials, + webhook_type: str, + resource: str, + events: list[str], + ingress_url: str, + secret: str, + ) -> tuple[str, dict]: + """Register a new webhook with Exa""" + + if not isinstance(credentials, APIKeyCredentials): + raise ValueError("API key is required to register a webhook") + + headers = { + "x-api-key": credentials.api_key.get_secret_value(), + "Content-Type": "application/json", + } + + payload = { + "events": events, + "url": ingress_url, + "metadata": {} # Optional metadata can be added here + } + + response = requests.post( + f"{self.BASE_URL}/webhooks", headers=headers, json=payload + ) + + if not response.ok: + error = response.json().get("error", "Unknown error") + raise RuntimeError(f"Failed to register webhook: {error}") + + response_data = response.json() + webhook_id = response_data.get("id", "") + + webhook_config = { + "endpoint": ingress_url, + "provider": self.PROVIDER_NAME, + "events": events, + "type": webhook_type, + "webhook_id": webhook_id, + "secret": response_data.get("secret", "") + } + + return webhook_id, webhook_config + + @classmethod + async def validate_payload( + cls, webhook: integrations.Webhook, request: Request + ) -> tuple[dict, str]: + """Validate incoming webhook payload from Exa""" + + payload = await request.json() + + # Validate required fields from Exa API spec + required_fields = ["id", "object", "type", "data", "createdAt"] + missing_fields = [field for field in required_fields if field not in payload] + + if missing_fields: + raise ValueError(f"Missing required fields: {', '.join(missing_fields)}") + + # Normalize payload structure + normalized_payload = { + "id": payload["id"], + "type": payload["type"], + "data": payload["data"], + "createdAt": payload["createdAt"] + } + + # Extract event type from the payload + event_type = payload["type"] + + return normalized_payload, event_type + + async def _deregister_webhook( + self, webhook: integrations.Webhook, credentials: Credentials + ) -> None: + """Deregister a webhook with Exa""" + + if not isinstance(credentials, APIKeyCredentials): + raise ValueError("API key is required to deregister a webhook") + + webhook_id = webhook.config.get("webhook_id") + if not webhook_id: + logger.warning(f"No webhook ID found for webhook {webhook.id}, cannot deregister") + return + + headers = { + "x-api-key": credentials.api_key.get_secret_value(), + } + + response = requests.delete( + f"{self.BASE_URL}/webhooks/{webhook_id}", headers=headers + ) + + if not response.ok: + error = response.json().get("error", "Unknown error") + logger.error(f"Failed to deregister webhook {webhook_id}: {error}") + raise RuntimeError(f"Failed to deregister webhook: {error}")