Example block

This commit is contained in:
SwiftyOS
2025-02-28 14:47:39 +01:00
committed by Swifty
parent c1e329497c
commit dbf014f936
6 changed files with 330 additions and 0 deletions

View File

@@ -0,0 +1,160 @@
import logging
from typing import Any, Literal
from pydantic import SecretStr
from backend.data.block import (
Block,
BlockCategory,
BlockManualWebhookConfig,
BlockOutput,
BlockSchema,
BlockType,
)
from backend.data.model import (
APIKeyCredentials,
ContributorDetails,
CredentialsField,
CredentialsMetaInput,
SchemaField,
)
from backend.integrations.providers import ProviderName
from backend.integrations.webhooks.example import ExampleWebhookType
logger = logging.getLogger(__name__)
ExampleCredentials = CredentialsMetaInput[
ProviderName.EXAMPLE_PROVIDER, Literal["api_key"]
]
TEST_CREDENTIALS = APIKeyCredentials(
id="9191c4f0-498f-4235-a79c-59c0e37454d4",
provider="example-provider",
api_key=SecretStr("mock-example-api-key"),
title="Mock Example API key",
expires_at=None,
)
TEST_CREDENTIALS_INPUT = {
"provider": TEST_CREDENTIALS.provider,
"id": TEST_CREDENTIALS.id,
"type": TEST_CREDENTIALS.type,
"title": TEST_CREDENTIALS.title,
}
class ExampleBlock(Block):
class ExampleBlockInput(BlockSchema):
name: str = SchemaField(description="The name of the example block")
greeting: str = SchemaField(description="The greeting to display")
is_funny: bool = SchemaField(description="Whether the block is funny")
# Only if the block needs credentials
credentials: ExampleCredentials = CredentialsField()
class ExampleBlockOutput(BlockSchema):
response: dict[str, Any] = SchemaField(
description="The response object generated by the example block."
)
error: str = SchemaField(description="The error from the example block")
def __init__(self):
super().__init__(
# The unique identifier for the block, this value will be persisted in the DB.
# It should be unique and constant across the application run.
# Use the UUID format for the ID.
id="380694d5-3b2e-4130-bced-b43752b70de9",
# The description of the block, explaining what the block does.
description="The example block",
# The list of contributors who contributed to the block.
# Each contributor is represented by a ContributorDetails object.
contributors=[ContributorDetails(name="Craig Swift")],
# The set of categories that the block belongs to.
# Each category is an instance of BlockCategory Enum.
categories={BlockCategory.BASIC},
# The schema, defined as a Pydantic model, for the input data.
input_schema=self.ExampleBlockInput,
# The schema, defined as a Pydantic model, for the output data.
output_schema=self.ExampleBlockOutput,
# The list or single sample input data for the block, for testing.
# This is an instance of the Input schema with sample values.
test_input=self.ExampleBlockInput(
name="Craig", greeting="Hello", is_funny=True
).model_dump(),
# The list or single expected output if the test_input is run.
# Each output is a tuple of (output_name, output_data).
test_output=[
("response", {"message": "Hello, world!"}),
],
# Function names on the block implementation to mock on test run.
# Each mock is a dictionary with function names as keys and mock implementations as values.
test_mock={
"my_function_that_can_be_mocked": lambda *args, **kwargs: "Hello, world!"
},
# The credentials required for testing the block.
# This is an instance of APIKeyCredentials with sample values.
test_credentials=TEST_CREDENTIALS,
# The type of the block, which is an instance of BlockType Enum.
block_type=BlockType.STANDARD,
# The webhook configuration for the block, if any.
# This can be an instance of BlockWebhookConfig or BlockManualWebhookConfig.
webhook_config=None,
)
def my_function_that_can_be_mocked(self, input: str) -> str:
logger.info("my_function_that_can_be_mocked called with input: %s", input)
return "Hello, world!"
def run(self, input_data: ExampleBlockInput, **kwargs) -> BlockOutput:
try:
message = self.my_function_that_can_be_mocked(input_data.greeting)
yield "response", {"message": message}
except Exception as e:
logger.error("Error in run method: %s", e)
yield "error", str(e)
class ExampleTriggerBlock(Block):
"""
A trigger block that is activated by an external webhook event.
Unlike standard blocks that are manually executed, trigger blocks are automatically
activated when a webhook event is received from the specified provider.
"""
class Input(BlockSchema):
# The payload field is hidden because it's automatically populated by the webhook
# system rather than being manually entered by the user
payload: dict = SchemaField(hidden=True)
class Output(BlockSchema):
event_data: dict = SchemaField(
description="The contents of the example webhook event."
)
def __init__(self):
super().__init__(
id="7c5933ce-d60c-42dd-9c4e-db82496474a3",
description="This block will output the contents of an example webhook event.",
categories={BlockCategory.BASIC},
input_schema=ExampleTriggerBlock.Input,
output_schema=ExampleTriggerBlock.Output,
# The webhook_config is a key difference from standard blocks
# It defines which external service can trigger this block and what type of events it responds to
webhook_config=BlockManualWebhookConfig(
provider="example_provider", # The external service that will send webhook events
webhook_type=ExampleWebhookType.EXAMPLE, # The specific event type this block responds to
),
# Test input for trigger blocks should mimic the payload structure that would be received from the webhook
test_input=[
{
"payload": {
"event_type": "example",
"data": "Sample webhook data",
}
}
],
)
def run(self, input_data: Input, **kwargs) -> BlockOutput:
# For trigger blocks, the run method is called automatically when a webhook event is received
# The payload from the webhook is passed in as input_data.payload
yield "event_data", input_data.payload

View File

@@ -2,6 +2,7 @@ from typing import Type
from backend.blocks.ai_music_generator import AIMusicGeneratorBlock
from backend.blocks.ai_shortform_video_block import AIShortformVideoCreatorBlock
from backend.blocks.example import ExampleBlock
from backend.blocks.ideogram import IdeogramModelBlock
from backend.blocks.jina.embeddings import JinaEmbeddingBlock
from backend.blocks.jina.search import ExtractWebsiteContentBlock, SearchTheWebBlock
@@ -23,6 +24,7 @@ from backend.data.cost import BlockCost, BlockCostType
from backend.integrations.credentials_store import (
anthropic_credentials,
did_credentials,
example_credentials,
groq_credentials,
ideogram_credentials,
jina_credentials,
@@ -267,4 +269,16 @@ BLOCK_COSTS: dict[Type[Block], list[BlockCost]] = {
)
],
SmartDecisionMakerBlock: LLM_COST,
ExampleBlock: [
BlockCost(
cost_amount=1,
cost_filter={
"credentials": {
"id": example_credentials.id,
"provider": example_credentials.provider,
"type": example_credentials.type,
}
},
)
],
}

View File

@@ -169,6 +169,14 @@ zerobounce_credentials = APIKeyCredentials(
expires_at=None,
)
example_credentials = APIKeyCredentials(
id="a2b7f68f-aa6a-4995-99ec-b45b40d33498",
provider="example-provider",
api_key=SecretStr("mock-example-api-key"),
title="Use Credits for Example",
expires_at=None,
)
DEFAULT_CREDENTIALS = [
ollama_credentials,
revid_credentials,

View File

@@ -10,6 +10,7 @@ class ProviderName(str, Enum):
D_ID = "d_id"
E2B = "e2b"
EXA = "exa"
EXAMPLE_PROVIDER = "example-provider"
FAL = "fal"
GITHUB = "github"
GOOGLE = "google"

View File

@@ -1,6 +1,7 @@
from typing import TYPE_CHECKING
from .compass import CompassWebhookManager
from .example import ExampleWebhookManager
from .github import GithubWebhooksManager
from .slant3d import Slant3DWebhooksManager
@@ -15,6 +16,7 @@ WEBHOOK_MANAGERS_BY_NAME: dict["ProviderName", type["BaseWebhooksManager"]] = {
CompassWebhookManager,
GithubWebhooksManager,
Slant3DWebhooksManager,
ExampleWebhookManager,
]
}
# --8<-- [end:WEBHOOK_MANAGERS_BY_NAME]

View File

@@ -0,0 +1,145 @@
import logging
import requests
from fastapi import Request
from strenum import StrEnum
from backend.data import integrations
from backend.data.model import APIKeyCredentials, Credentials
from backend.integrations.providers import ProviderName
from ._manual_base import ManualWebhookManagerBase
logger = logging.getLogger(__name__)
class ExampleWebhookType(StrEnum):
EXAMPLE = "example"
EXAMPLE_2 = "example_2"
# ExampleWebhookManager is a class that manages webhooks for a hypothetical provider.
# It extends ManualWebhookManagerBase, which provides base functionality for manual webhook management.
class ExampleWebhookManager(ManualWebhookManagerBase):
# Define the provider name for this webhook manager.
PROVIDER_NAME = ProviderName.EXAMPLE_PROVIDER
# Define the types of webhooks this manager can handle.
WebhookType = ExampleWebhookType
BASE_URL = "https://api.example.com"
@classmethod
async def validate_payload(
cls, webhook: integrations.Webhook, request: Request
) -> tuple[dict, str]:
"""
Validate the incoming webhook payload.
Args:
webhook (integrations.Webhook): The webhook object.
request (Request): The incoming request object.
Returns:
tuple: A tuple containing the payload as a dictionary and the event type as a string.
"""
# Extract the JSON payload from the request.
payload = await request.json()
# Set the event type based on the webhook type in the payload.
event_type = payload.get("webhook_type", ExampleWebhookType.EXAMPLE)
return payload, event_type
async def _register_webhook(
self,
credentials: Credentials,
webhook_type: str,
resource: str,
events: list[str],
ingress_url: str,
secret: str,
) -> tuple[str, dict]:
"""
Register a new webhook with the provider.
Args:
credentials (Credentials): The credentials required for authentication.
webhook_type (str): The type of webhook to register.
resource (str): The resource associated with the webhook.
events (list[str]): The list of events to subscribe to.
ingress_url (str): The URL where the webhook will send data.
secret (str): A secret for securing the webhook.
Returns:
tuple: A tuple containing an empty string and the webhook configuration as a dictionary.
"""
# Ensure the credentials are of the correct type.
if not isinstance(credentials, APIKeyCredentials):
raise ValueError("API key is required to register a webhook")
# Prepare the headers for the request, including the API key.
headers = {
"api-key": credentials.api_key.get_secret_value(),
"Content-Type": "application/json",
}
# Prepare the payload for the request. Note that the events list is not used.
# This is just a fake example
payload = {"endPoint": ingress_url}
# Send a POST request to register the webhook.
response = requests.post(
f"{self.BASE_URL}/example/webhookSubscribe", headers=headers, json=payload
)
# Check if the response indicates a failure.
if not response.ok:
error = response.json().get("error", "Unknown error")
raise RuntimeError(f"Failed to register webhook: {error}")
# Prepare the webhook configuration to return.
webhook_config = {
"endpoint": ingress_url,
"provider": self.PROVIDER_NAME,
"events": ["example_event"],
"type": webhook_type,
}
return "", webhook_config
async def _deregister_webhook(
self, webhook: integrations.Webhook, credentials: Credentials
) -> None:
"""
Deregister a webhook with the provider.
Args:
webhook (integrations.Webhook): The webhook object to deregister.
credentials (Credentials): The credentials associated with the webhook.
Raises:
ValueError: If the webhook doesn't belong to the credentials or if deregistration fails.
"""
if webhook.credentials_id != credentials.id:
raise ValueError(
f"Webhook #{webhook.id} does not belong to credentials {credentials.id}"
)
if not isinstance(credentials, APIKeyCredentials):
raise ValueError("API key is required to deregister a webhook")
headers = {
"api-key": credentials.api_key.get_secret_value(),
"Content-Type": "application/json",
}
# Construct the delete URL based on the webhook information
delete_url = f"{self.BASE_URL}/example/webhooks/{webhook.provider_webhook_id}"
response = requests.delete(delete_url, headers=headers)
if response.status_code not in [204, 404]:
# 204 means successful deletion, 404 means the webhook was already deleted
error = response.json().get("error", "Unknown error")
raise ValueError(f"Failed to delete webhook: {error}")
# If we reach here, the webhook was successfully deleted or didn't exist