exa websets

This commit is contained in:
SwiftyOS
2025-05-14 16:27:05 +02:00
parent 9471fd6b58
commit 5ebe9219da
4 changed files with 654 additions and 0 deletions

View File

@@ -0,0 +1,212 @@
import json
import logging
from pathlib import Path
from pydantic import BaseModel
from backend.data.block import (
Block,
BlockCategory,
BlockOutput,
BlockSchema,
BlockWebhookConfig,
)
from backend.data.model import SchemaField, APIKeyCredentials
from backend.integrations.providers import ProviderName
from ._auth import (
TEST_CREDENTIALS,
TEST_CREDENTIALS_INPUT,
ExaCredentialsField,
ExaCredentialsInput,
)
logger = logging.getLogger(__name__)
class ExaTriggerBase:
"""Base class for Exa webhook triggers."""
class Input(BlockSchema):
"""Base input schema for Exa triggers."""
credentials: ExaCredentialsInput = ExaCredentialsField()
# --8<-- [start:example-payload-field]
payload: dict = SchemaField(hidden=True, default_factory=dict)
# --8<-- [end:example-payload-field]
class Output(BlockSchema):
"""Base output schema for Exa triggers."""
payload: dict = SchemaField(
description="The complete webhook payload that was received from Exa. "
"Includes information about the event type, data, and creation timestamp."
)
error: str = SchemaField(
description="Error message if the payload could not be processed"
)
def run(self, input_data: Input, **kwargs) -> BlockOutput:
"""Process the webhook payload from Exa.
Args:
input_data: The input data containing the webhook payload
Yields:
The complete webhook payload
"""
yield "payload", input_data.payload
class ExaWebsetTriggerBlock(ExaTriggerBase, Block):
"""Block for handling Exa Webset webhook events.
This block triggers on various Exa Webset events such as webset creation,
deletion, search completion, etc. and outputs the event details.
"""
EXAMPLE_PAYLOAD_FILE = (
Path(__file__).parent / "example_payloads" / "webset.created.json"
)
class Input(ExaTriggerBase.Input):
"""Input schema for Exa Webset trigger with event filtering options."""
class EventsFilter(BaseModel):
"""
Event filter options for Exa Webset webhooks.
See: https://docs.exa.ai/api-reference/webhooks
"""
# Webset events
webset_created: bool = False
webset_deleted: bool = False
webset_paused: bool = False
webset_idle: bool = False
# Search events
webset_search_created: bool = False
webset_search_updated: bool = False
webset_search_completed: bool = False
webset_search_canceled: bool = False
# Item events
webset_item_created: bool = False
webset_item_enriched: bool = False
# Export events
webset_export_created: bool = False
webset_export_completed: bool = False
events: EventsFilter = SchemaField(
title="Events", description="The events to subscribe to"
)
class Output(ExaTriggerBase.Output):
"""Output schema for Exa Webset trigger with event-specific fields."""
event_type: str = SchemaField(
description="The type of event that triggered the webhook"
)
webset_id: str = SchemaField(
description="The ID of the affected webset"
)
created_at: str = SchemaField(
description="Timestamp when the event was created"
)
data: dict = SchemaField(
description="Object containing the full resource that triggered the event"
)
def __init__(self):
"""Initialize the Exa Webset trigger block with its configuration."""
# Define a webhook type constant for Exa
class ExaWebhookType:
"""Constants for Exa webhook types."""
WEBSET = "webset"
# Create example payload
example_payload = {
"id": "663de972-bfe7-47ef-b4d7-179cfed7aa44",
"object": "event",
"type": "webset.created",
"data": {
"id": "wbs_123456789",
"name": "Example Webset",
"description": "An example webset for testing"
},
"createdAt": "2023-06-01T12:00:00Z"
}
# Map UI event names to API event names
self.event_mapping = {
"webset_created": "webset.created",
"webset_deleted": "webset.deleted",
"webset_paused": "webset.paused",
"webset_idle": "webset.idle",
"webset_search_created": "webset.search.created",
"webset_search_updated": "webset.search.updated",
"webset_search_completed": "webset.search.completed",
"webset_search_canceled": "webset.search.canceled",
"webset_item_created": "webset.item.created",
"webset_item_enriched": "webset.item.enriched",
"webset_export_created": "webset.export.created",
"webset_export_completed": "webset.export.completed"
}
super().__init__(
id="804ac1ed-d692-4ccb-a390-739a846a2667",
description="This block triggers on Exa Webset events and outputs the event type and payload.",
categories={BlockCategory.DEVELOPER_TOOLS, BlockCategory.INPUT},
input_schema=ExaWebsetTriggerBlock.Input,
output_schema=ExaWebsetTriggerBlock.Output,
webhook_config=BlockWebhookConfig(
provider=ProviderName.EXA,
webhook_type=ExaWebhookType.WEBSET,
resource_format="", # Exa doesn't require a specific resource format
event_filter_input="events",
event_format="webset.{event}",
),
test_input={
"credentials": TEST_CREDENTIALS_INPUT,
"events": {"webset_created": True, "webset_search_completed": True},
"payload": example_payload,
},
test_credentials=TEST_CREDENTIALS,
test_output=[
("payload", example_payload),
("event_type", example_payload["type"]),
("webset_id", "wbs_123456789"),
("created_at", "2023-06-01T12:00:00Z"),
("data", example_payload["data"]),
],
)
def run(self, input_data: Input, **kwargs) -> BlockOutput: # type: ignore
"""Process Exa Webset webhook events.
Args:
input_data: The input data containing the webhook payload and event filter
Yields:
Event details including event type, webset ID, creation timestamp, and data,
or an error message if the event type doesn't match the filter or if required
fields are missing from the payload.
"""
yield from super().run(input_data, **kwargs)
try:
# Get the event type from the payload
event_type = input_data.payload["type"]
# Check if this event type is in the user's selected events
# Convert API event name to UI event name (reverse mapping)
ui_event_name = next((k for k, v in self.event_mapping.items() if v == event_type), None)
# Only process events that match the filter
if ui_event_name and getattr(input_data.events, ui_event_name, False):
yield "event_type", event_type
yield "webset_id", input_data.payload["data"]["id"]
yield "created_at", input_data.payload["createdAt"]
yield "data", input_data.payload["data"]
else:
yield "error", f"Event type {event_type} not in selected events filter"
except KeyError as e:
yield "error", f"Missing expected field in payload: {str(e)}"

View File

@@ -0,0 +1,321 @@
from typing import Any, Dict, List, Optional
import requests
from backend.blocks.exa._auth import (
ExaCredentials,
ExaCredentialsField,
ExaCredentialsInput,
)
from backend.data.block import Block, BlockCategory, BlockOutput, BlockSchema
from backend.data.model import SchemaField
# --- Create Webset Block ---
class ExaCreateWebsetBlock(Block):
"""Block for creating a Webset using Exa's Websets API."""
class Input(BlockSchema):
credentials: ExaCredentialsInput = ExaCredentialsField()
query: str = SchemaField(description="The search query for the webset")
count: int = SchemaField(description="Number of results to return", default=5)
enrichments: Optional[List[Dict[str, Any]]] = SchemaField(
description="List of enrichment dicts (optional)", default_factory=list, advanced=True
)
external_id: Optional[str] = SchemaField(
description="Optional external identifier", default=None, advanced=True
)
metadata: Optional[Dict[str, Any]] = SchemaField(
description="Optional metadata", default_factory=dict, advanced=True
)
class Output(BlockSchema):
webset: Optional[Dict[str, Any]] = SchemaField(
description="The created webset object (or None if error)", default=None
)
error: str = SchemaField(
description="Error message if the request failed", default=""
)
def __init__(self):
"""Initialize the ExaCreateWebsetBlock with its configuration."""
super().__init__(
id="322351cc-35d7-45ec-8920-9a3c98920411",
description="Creates a Webset using Exa's Websets API",
categories={BlockCategory.SEARCH},
input_schema=ExaCreateWebsetBlock.Input,
output_schema=ExaCreateWebsetBlock.Output,
)
def run(self, input_data: Input, *, credentials: ExaCredentials, **kwargs) -> BlockOutput:
"""
Execute the block to create a webset with Exa's API.
Args:
input_data: The input parameters for creating a webset
credentials: The Exa API credentials
Yields:
Either the created webset object or an error message
"""
url = "https://api.exa.ai/websets/v0/websets"
headers = {
"Content-Type": "application/json",
"x-api-key": credentials.api_key.get_secret_value(),
}
payload = {
"search": {
"query": input_data.query,
"count": input_data.count,
}
}
optional_fields = {}
if isinstance(input_data.enrichments, list) and input_data.enrichments:
optional_fields["enrichments"] = input_data.enrichments
if isinstance(input_data.external_id, str) and input_data.external_id:
optional_fields["externalId"] = input_data.external_id
if isinstance(input_data.metadata, dict) and input_data.metadata:
optional_fields["metadata"] = input_data.metadata
payload.update(optional_fields)
try:
response = requests.post(url, headers=headers, json=payload)
response.raise_for_status()
data = response.json()
yield "webset", data
except Exception as e:
yield "error", str(e)
# --- Get Webset Block ---
class ExaGetWebsetBlock(Block):
"""Block for retrieving a Webset by ID using Exa's Websets API."""
class Input(BlockSchema):
credentials: ExaCredentialsInput = ExaCredentialsField()
webset_id: str = SchemaField(description="The Webset ID or externalId")
expand_items: bool = SchemaField(description="Expand with items", default=False, advanced=True)
class Output(BlockSchema):
webset: Optional[Dict[str, Any]] = SchemaField(description="The webset object (or None if error)", default=None)
error: str = SchemaField(description="Error message if the request failed", default="")
def __init__(self):
"""Initialize the ExaGetWebsetBlock with its configuration."""
super().__init__(
id="f9229293-cddf-43fc-94b3-48cbd1a44618",
description="Retrieves a Webset by ID using Exa's Websets API",
categories={BlockCategory.SEARCH},
input_schema=ExaGetWebsetBlock.Input,
output_schema=ExaGetWebsetBlock.Output,
)
def run(self, input_data: Input, *, credentials: ExaCredentials, **kwargs) -> BlockOutput:
"""
Execute the block to retrieve a webset by ID from Exa's API.
Args:
input_data: The input parameters including the webset ID
credentials: The Exa API credentials
Yields:
Either the retrieved webset object or an error message
"""
url = f"https://api.exa.ai/websets/v0/websets/{input_data.webset_id}"
headers = {
"Content-Type": "application/json",
"x-api-key": credentials.api_key.get_secret_value(),
}
params = {"expand": "items"} if input_data.expand_items else None
try:
response = requests.get(url, headers=headers, params=params)
response.raise_for_status()
data = response.json()
yield "webset", data
except Exception as e:
yield "error", str(e)
# --- Delete Webset Block ---
class ExaDeleteWebsetBlock(Block):
"""Block for deleting a Webset by ID using Exa's Websets API."""
class Input(BlockSchema):
credentials: ExaCredentialsInput = ExaCredentialsField()
webset_id: str = SchemaField(description="The Webset ID or externalId")
class Output(BlockSchema):
deleted: Optional[Dict[str, Any]] = SchemaField(description="The deleted webset object (or None if error)", default=None)
error: str = SchemaField(description="Error message if the request failed", default="")
def __init__(self):
"""Initialize the ExaDeleteWebsetBlock with its configuration."""
super().__init__(
id="a082e162-274e-4167-a467-a1839e644cbd",
description="Deletes a Webset by ID using Exa's Websets API",
categories={BlockCategory.SEARCH},
input_schema=ExaDeleteWebsetBlock.Input,
output_schema=ExaDeleteWebsetBlock.Output,
)
def run(self, input_data: Input, *, credentials: ExaCredentials, **kwargs) -> BlockOutput:
"""
Execute the block to delete a webset by ID using Exa's API.
Args:
input_data: The input parameters including the webset ID
credentials: The Exa API credentials
Yields:
Either the deleted webset object or an error message
"""
url = f"https://api.exa.ai/websets/v0/websets/{input_data.webset_id}"
headers = {
"Content-Type": "application/json",
"x-api-key": credentials.api_key.get_secret_value(),
}
try:
response = requests.delete(url, headers=headers)
response.raise_for_status()
data = response.json()
yield "deleted", data
except Exception as e:
yield "error", str(e)
# --- Update Webset Block ---
class ExaUpdateWebsetBlock(Block):
"""Block for updating a Webset's metadata using Exa's Websets API."""
class Input(BlockSchema):
credentials: ExaCredentialsInput = ExaCredentialsField()
webset_id: str = SchemaField(description="The Webset ID or externalId")
metadata: Dict[str, Any] = SchemaField(description="Metadata to update", default_factory=dict)
class Output(BlockSchema):
webset: Optional[Dict[str, Any]] = SchemaField(description="The updated webset object (or None if error)", default=None)
error: str = SchemaField(description="Error message if the request failed", default="")
def __init__(self):
"""Initialize the ExaUpdateWebsetBlock with its configuration."""
super().__init__(
id="e0c81b70-ac38-4239-8ecd-a75c1737c9ef",
description="Updates a Webset's metadata using Exa's Websets API",
categories={BlockCategory.SEARCH},
input_schema=ExaUpdateWebsetBlock.Input,
output_schema=ExaUpdateWebsetBlock.Output,
)
def run(self, input_data: Input, *, credentials: ExaCredentials, **kwargs) -> BlockOutput:
"""
Execute the block to update a webset's metadata using Exa's API.
Args:
input_data: The input parameters including the webset ID and metadata
credentials: The Exa API credentials
Yields:
Either the updated webset object or an error message
"""
url = f"https://api.exa.ai/websets/v0/websets/{input_data.webset_id}"
headers = {
"Content-Type": "application/json",
"x-api-key": credentials.api_key.get_secret_value(),
}
payload = {"metadata": input_data.metadata}
try:
response = requests.post(url, headers=headers, json=payload)
response.raise_for_status()
data = response.json()
yield "webset", data
except Exception as e:
yield "error", str(e)
# --- List Websets Block ---
class ExaListWebsetsBlock(Block):
"""Block for listing all Websets using Exa's Websets API with pagination support."""
class Input(BlockSchema):
credentials: ExaCredentialsInput = ExaCredentialsField()
limit: int = SchemaField(description="Number of websets to return (max 100)", default=25)
cursor: Optional[str] = SchemaField(description="Pagination cursor (optional)", default=None, advanced=True)
class Output(BlockSchema):
data: Optional[List[Dict[str, Any]]] = SchemaField(description="List of websets", default=None)
has_more: Optional[bool] = SchemaField(description="Whether there are more results", default=None)
next_cursor: Optional[str] = SchemaField(description="Cursor for next page", default=None)
error: str = SchemaField(description="Error message if the request failed", default="")
def __init__(self):
"""Initialize the ExaListWebsetsBlock with its configuration."""
super().__init__(
id="887a2dae-c9c3-4ae5-a079-fe3b52be64e4",
description="Lists all Websets using Exa's Websets API",
categories={BlockCategory.SEARCH},
input_schema=ExaListWebsetsBlock.Input,
output_schema=ExaListWebsetsBlock.Output,
)
def run(self, input_data: Input, *, credentials: ExaCredentials, **kwargs) -> BlockOutput:
"""
Execute the block to list websets with pagination using Exa's API.
Args:
input_data: The input parameters including limit and optional cursor
credentials: The Exa API credentials
Yields:
The list of websets, pagination info, or an error message
"""
url = "https://api.exa.ai/websets/v0/websets"
headers = {
"Content-Type": "application/json",
"x-api-key": credentials.api_key.get_secret_value(),
}
params: dict[str, Any] = {"limit": int(input_data.limit)}
if isinstance(input_data.cursor, str) and input_data.cursor:
params["cursor"] = input_data.cursor
try:
response = requests.get(url, headers=headers, params=params)
response.raise_for_status()
data = response.json()
yield "data", data.get("data")
yield "has_more", data.get("hasMore")
yield "next_cursor", data.get("nextCursor")
except Exception as e:
yield "error", str(e)
# --- Cancel Webset Block ---
class ExaCancelWebsetBlock(Block):
"""Block for canceling a running Webset using Exa's Websets API."""
class Input(BlockSchema):
credentials: ExaCredentialsInput = ExaCredentialsField()
webset_id: str = SchemaField(description="The Webset ID or externalId")
class Output(BlockSchema):
webset: Optional[Dict[str, Any]] = SchemaField(description="The canceled webset object (or None if error)", default=None)
error: str = SchemaField(description="Error message if the request failed", default="")
def __init__(self):
"""Initialize the ExaCancelWebsetBlock with its configuration."""
super().__init__(
id="f7f0b19c-71e8-4c2f-bc68-904a6a61faf7",
description="Cancels a running Webset using Exa's Websets API",
categories={BlockCategory.SEARCH},
input_schema=ExaCancelWebsetBlock.Input,
output_schema=ExaCancelWebsetBlock.Output,
)
def run(self, input_data: Input, *, credentials: ExaCredentials, **kwargs) -> BlockOutput:
"""
Execute the block to cancel a running webset using Exa's API.
Args:
input_data: The input parameters including the webset ID
credentials: The Exa API credentials
Yields:
Either the canceled webset object or an error message
"""
url = f"https://api.exa.ai/websets/v0/websets/{input_data.webset_id}/cancel"
headers = {
"Content-Type": "application/json",
"x-api-key": credentials.api_key.get_secret_value(),
}
try:
response = requests.post(url, headers=headers)
response.raise_for_status()
data = response.json()
yield "webset", data
except Exception as e:
yield "error", str(e)

View File

@@ -16,6 +16,7 @@ def load_webhook_managers() -> dict["ProviderName", type["BaseWebhooksManager"]]
from .generic import GenericWebhooksManager
from .github import GithubWebhooksManager
from .slant3d import Slant3DWebhooksManager
from .exa import ExaWebhooksManager
_WEBHOOK_MANAGERS.update(
{
@@ -25,6 +26,7 @@ def load_webhook_managers() -> dict["ProviderName", type["BaseWebhooksManager"]]
GithubWebhooksManager,
Slant3DWebhooksManager,
GenericWebhooksManager,
ExaWebhooksManager,
]
}
)

View File

@@ -0,0 +1,119 @@
import logging
import requests
from fastapi import Request
from backend.data import integrations
from backend.data.model import APIKeyCredentials, Credentials
from backend.integrations.providers import ProviderName
from backend.integrations.webhooks._base import BaseWebhooksManager
logger = logging.getLogger(__name__)
class ExaWebhooksManager(BaseWebhooksManager):
"""Manager for Exa webhooks"""
PROVIDER_NAME = ProviderName.EXA
BASE_URL = "https://api.exa.ai/websets/v0"
async def _register_webhook(
self,
credentials: Credentials,
webhook_type: str,
resource: str,
events: list[str],
ingress_url: str,
secret: str,
) -> tuple[str, dict]:
"""Register a new webhook with Exa"""
if not isinstance(credentials, APIKeyCredentials):
raise ValueError("API key is required to register a webhook")
headers = {
"x-api-key": credentials.api_key.get_secret_value(),
"Content-Type": "application/json",
}
payload = {
"events": events,
"url": ingress_url,
"metadata": {} # Optional metadata can be added here
}
response = requests.post(
f"{self.BASE_URL}/webhooks", headers=headers, json=payload
)
if not response.ok:
error = response.json().get("error", "Unknown error")
raise RuntimeError(f"Failed to register webhook: {error}")
response_data = response.json()
webhook_id = response_data.get("id", "")
webhook_config = {
"endpoint": ingress_url,
"provider": self.PROVIDER_NAME,
"events": events,
"type": webhook_type,
"webhook_id": webhook_id,
"secret": response_data.get("secret", "")
}
return webhook_id, webhook_config
@classmethod
async def validate_payload(
cls, webhook: integrations.Webhook, request: Request
) -> tuple[dict, str]:
"""Validate incoming webhook payload from Exa"""
payload = await request.json()
# Validate required fields from Exa API spec
required_fields = ["id", "object", "type", "data", "createdAt"]
missing_fields = [field for field in required_fields if field not in payload]
if missing_fields:
raise ValueError(f"Missing required fields: {', '.join(missing_fields)}")
# Normalize payload structure
normalized_payload = {
"id": payload["id"],
"type": payload["type"],
"data": payload["data"],
"createdAt": payload["createdAt"]
}
# Extract event type from the payload
event_type = payload["type"]
return normalized_payload, event_type
async def _deregister_webhook(
self, webhook: integrations.Webhook, credentials: Credentials
) -> None:
"""Deregister a webhook with Exa"""
if not isinstance(credentials, APIKeyCredentials):
raise ValueError("API key is required to deregister a webhook")
webhook_id = webhook.config.get("webhook_id")
if not webhook_id:
logger.warning(f"No webhook ID found for webhook {webhook.id}, cannot deregister")
return
headers = {
"x-api-key": credentials.api_key.get_secret_value(),
}
response = requests.delete(
f"{self.BASE_URL}/webhooks/{webhook_id}", headers=headers
)
if not response.ok:
error = response.json().get("error", "Unknown error")
logger.error(f"Failed to deregister webhook {webhook_id}: {error}")
raise RuntimeError(f"Failed to deregister webhook: {error}")