refactor example block into its own folder

This commit is contained in:
SwiftyOS
2025-02-28 16:28:38 +01:00
committed by Swifty
parent 211e53bf5d
commit 57223e6343
6 changed files with 358 additions and 160 deletions

View File

@@ -1,160 +0,0 @@
import logging
from typing import Any, Literal
from pydantic import SecretStr
from backend.data.block import (
Block,
BlockCategory,
BlockManualWebhookConfig,
BlockOutput,
BlockSchema,
BlockType,
)
from backend.data.model import (
APIKeyCredentials,
ContributorDetails,
CredentialsField,
CredentialsMetaInput,
SchemaField,
)
from backend.integrations.providers import ProviderName
from backend.integrations.webhooks.example import ExampleWebhookType
logger = logging.getLogger(__name__)
ExampleCredentials = CredentialsMetaInput[
ProviderName.EXAMPLE_PROVIDER, Literal["api_key"]
]
TEST_CREDENTIALS = APIKeyCredentials(
id="9191c4f0-498f-4235-a79c-59c0e37454d4",
provider="example-provider",
api_key=SecretStr("mock-example-api-key"),
title="Mock Example API key",
expires_at=None,
)
TEST_CREDENTIALS_INPUT = {
"provider": TEST_CREDENTIALS.provider,
"id": TEST_CREDENTIALS.id,
"type": TEST_CREDENTIALS.type,
"title": TEST_CREDENTIALS.title,
}
class ExampleBlock(Block):
class ExampleBlockInput(BlockSchema):
name: str = SchemaField(description="The name of the example block")
greeting: str = SchemaField(description="The greeting to display")
is_funny: bool = SchemaField(description="Whether the block is funny")
# Only if the block needs credentials
credentials: ExampleCredentials = CredentialsField()
class ExampleBlockOutput(BlockSchema):
response: dict[str, Any] = SchemaField(
description="The response object generated by the example block."
)
error: str = SchemaField(description="The error from the example block")
def __init__(self):
super().__init__(
# The unique identifier for the block, this value will be persisted in the DB.
# It should be unique and constant across the application run.
# Use the UUID format for the ID.
id="380694d5-3b2e-4130-bced-b43752b70de9",
# The description of the block, explaining what the block does.
description="The example block",
# The list of contributors who contributed to the block.
# Each contributor is represented by a ContributorDetails object.
contributors=[ContributorDetails(name="Craig Swift")],
# The set of categories that the block belongs to.
# Each category is an instance of BlockCategory Enum.
categories={BlockCategory.BASIC},
# The schema, defined as a Pydantic model, for the input data.
input_schema=self.ExampleBlockInput,
# The schema, defined as a Pydantic model, for the output data.
output_schema=self.ExampleBlockOutput,
# The list or single sample input data for the block, for testing.
# This is an instance of the Input schema with sample values.
test_input=self.ExampleBlockInput(
name="Craig", greeting="Hello", is_funny=True
).model_dump(),
# The list or single expected output if the test_input is run.
# Each output is a tuple of (output_name, output_data).
test_output=[
("response", {"message": "Hello, world!"}),
],
# Function names on the block implementation to mock on test run.
# Each mock is a dictionary with function names as keys and mock implementations as values.
test_mock={
"my_function_that_can_be_mocked": lambda *args, **kwargs: "Hello, world!"
},
# The credentials required for testing the block.
# This is an instance of APIKeyCredentials with sample values.
test_credentials=TEST_CREDENTIALS,
# The type of the block, which is an instance of BlockType Enum.
block_type=BlockType.STANDARD,
# The webhook configuration for the block, if any.
# This can be an instance of BlockWebhookConfig or BlockManualWebhookConfig.
webhook_config=None,
)
def my_function_that_can_be_mocked(self, input: str) -> str:
logger.info("my_function_that_can_be_mocked called with input: %s", input)
return "Hello, world!"
def run(self, input_data: ExampleBlockInput, **kwargs) -> BlockOutput:
try:
message = self.my_function_that_can_be_mocked(input_data.greeting)
yield "response", {"message": message}
except Exception as e:
logger.error("Error in run method: %s", e)
yield "error", str(e)
class ExampleTriggerBlock(Block):
"""
A trigger block that is activated by an external webhook event.
Unlike standard blocks that are manually executed, trigger blocks are automatically
activated when a webhook event is received from the specified provider.
"""
class Input(BlockSchema):
# The payload field is hidden because it's automatically populated by the webhook
# system rather than being manually entered by the user
payload: dict = SchemaField(hidden=True)
class Output(BlockSchema):
event_data: dict = SchemaField(
description="The contents of the example webhook event."
)
def __init__(self):
super().__init__(
id="7c5933ce-d60c-42dd-9c4e-db82496474a3",
description="This block will output the contents of an example webhook event.",
categories={BlockCategory.BASIC},
input_schema=ExampleTriggerBlock.Input,
output_schema=ExampleTriggerBlock.Output,
# The webhook_config is a key difference from standard blocks
# It defines which external service can trigger this block and what type of events it responds to
webhook_config=BlockManualWebhookConfig(
provider="example_provider", # The external service that will send webhook events
webhook_type=ExampleWebhookType.EXAMPLE, # The specific event type this block responds to
),
# Test input for trigger blocks should mimic the payload structure that would be received from the webhook
test_input=[
{
"payload": {
"event_type": "example",
"data": "Sample webhook data",
}
}
],
)
def run(self, input_data: Input, **kwargs) -> BlockOutput:
# For trigger blocks, the run method is called automatically when a webhook event is received
# The payload from the webhook is passed in as input_data.payload
yield "event_data", input_data.payload

View File

@@ -0,0 +1,144 @@
from __future__ import annotations
import json
from typing import Any, Dict, Optional
from backend.data.model import APIKeyCredentials
from backend.util.request import Requests
class ExampleAPIException(Exception):
def __init__(self, message: str, status_code: int):
super().__init__(message)
self.status_code = status_code
def _get_headers(credentials: APIKeyCredentials) -> dict[str, str]:
return {
"Authorization": credentials.api_key.get_secret_value(),
"Content-Type": "application/json",
}
def get_api(credentials: APIKeyCredentials) -> Requests:
"""
Creates a configured Requests instance for the Example API.
Args:
credentials: The Example API credentials to use for authentication.
Returns:
A Requests instance configured with the appropriate headers and trusted origins.
"""
return Requests(
trusted_origins=["https://api.example.com"],
extra_headers=_get_headers(credentials),
raise_for_status=False,
)
class ExampleClient:
"""Client for the Example API"""
API_BASE_URL = "https://api.example.com/v1"
def __init__(
self,
credentials: Optional[APIKeyCredentials] = None,
custom_requests: Optional[Requests] = None,
):
if custom_requests:
self._requests = custom_requests
else:
headers: Dict[str, str] = {
"Content-Type": "application/json",
}
if credentials:
headers["Authorization"] = credentials.auth_header()
self._requests = Requests(
extra_headers=headers,
trusted_origins=["https://api.example.com"],
raise_for_status=False,
)
def _handle_response(self, response) -> Any:
"""
Handles API response and checks for errors.
Args:
response: The response object from the request.
Returns:
The parsed JSON response data.
Raises:
ExampleAPIException: If the API request fails.
"""
if not response.ok:
try:
error_data = response.json()
error_message = error_data.get("error", {}).get("message", "")
except json.JSONDecodeError:
error_message = response.text
raise ExampleAPIException(
f"Example API request failed ({response.status_code}): {error_message}",
response.status_code,
)
response_data = response.json()
if "errors" in response_data:
error_messages = [
error.get("message", "") for error in response_data["errors"]
]
raise ExampleAPIException(
f"Example API returned errors: {', '.join(error_messages)}",
response.status_code,
)
return response_data
def get_resource(self, resource_id: str) -> Dict:
"""
Fetches a resource from the Example API.
Args:
resource_id: The ID of the resource to fetch.
Returns:
The resource data as a dictionary.
Raises:
ExampleAPIException: If the API request fails.
"""
try:
response = self._requests.get(
f"{self.API_BASE_URL}/resources/{resource_id}"
)
return self._handle_response(response)
except ExampleAPIException:
raise
except Exception as e:
raise ExampleAPIException(f"Failed to get resource: {str(e)}", 500)
def create_resource(self, data: Dict) -> Dict:
"""
Creates a new resource via the Example API.
Args:
data: The resource data to create.
Returns:
The created resource data as a dictionary.
Raises:
ExampleAPIException: If the API request fails.
"""
try:
response = self._requests.post(f"{self.API_BASE_URL}/resources", json=data)
return self._handle_response(response)
except ExampleAPIException:
raise
except Exception as e:
raise ExampleAPIException(f"Failed to create resource: {str(e)}", 500)

View File

@@ -0,0 +1,37 @@
"""
Authentication module for Example API integration.
This module provides credential types and test credentials for the Example API integration.
It defines the structure for API key credentials used to authenticate with the Example API
and provides mock credentials for testing purposes.
"""
from typing import Literal
from pydantic import SecretStr
from backend.data.model import APIKeyCredentials, CredentialsMetaInput
from backend.integrations.providers import ProviderName
# Define the type of credentials input expected for Example API
ExampleCredentialsInput = CredentialsMetaInput[
Literal[ProviderName.EXAMPLE_PROVIDER], Literal["api_key"]
]
# Mock credentials for testing Example API integration
TEST_CREDENTIALS = APIKeyCredentials(
id="9191c4f0-498f-4235-a79c-59c0e37454d4",
provider="example-provider",
api_key=SecretStr("mock-example-api-key"),
title="Mock Example API key",
expires_at=None,
)
# Dictionary representation of test credentials for input fields
TEST_CREDENTIALS_INPUT = {
"provider": TEST_CREDENTIALS.provider,
"id": TEST_CREDENTIALS.id,
"type": TEST_CREDENTIALS.type,
"title": TEST_CREDENTIALS.title,
}

View File

@@ -0,0 +1,115 @@
import logging
from typing import Any, Optional
from backend.data.block import Block, BlockCategory, BlockOutput, BlockSchema, BlockType
from backend.data.model import (
APIKeyCredentials,
ContributorDetails,
CredentialsField,
SchemaField,
)
from ._api import ExampleAPIException, ExampleClient
from ._auth import TEST_CREDENTIALS, TEST_CREDENTIALS_INPUT, ExampleCredentialsInput
logger = logging.getLogger(__name__)
class ExampleBlock(Block):
class Input(BlockSchema):
name: str = SchemaField(
description="The name of the example block", placeholder="Enter a name"
)
greeting: str = SchemaField(
description="The greeting to display", placeholder="Hello"
)
is_funny: bool = SchemaField(
description="Whether the block is funny", placeholder="True", default=True
)
# Only if the block needs credentials
credentials: ExampleCredentialsInput = CredentialsField(
description="The credentials for the example block"
)
class Output(BlockSchema):
response: dict[str, Any] = SchemaField(
description="The response object generated by the example block."
)
error: str = SchemaField(description="The error from the example block")
def __init__(self):
super().__init__(
# The unique identifier for the block, this value will be persisted in the DB.
# It should be unique and constant across the application run.
# Use the UUID format for the ID.
id="380694d5-3b2e-4130-bced-b43752b70de9",
# The description of the block, explaining what the block does.
description="The example block",
# The list of contributors who contributed to the block.
# Each contributor is represented by a ContributorDetails object.
contributors=[ContributorDetails(name="Craig Swift")],
# The set of categories that the block belongs to.
# Each category is an instance of BlockCategory Enum.
categories={BlockCategory.BASIC},
# The schema, defined as a Pydantic model, for the input data.
input_schema=ExampleBlock.Input,
# The schema, defined as a Pydantic model, for the output data.
output_schema=ExampleBlock.Output,
# The list or single sample input data for the block, for testing.
# This is an instance of the Input schema with sample values.
test_input={
"name": "Craig",
"greeting": "Hello",
"is_funny": True,
"credentials": TEST_CREDENTIALS_INPUT,
},
# The list or single expected output if the test_input is run.
# Each output is a tuple of (output_name, output_data).
test_output=[
("response", {"message": "Hello, world!"}),
],
# Function names on the block implementation to mock on test run.
# Each mock is a dictionary with function names as keys and mock implementations as values.
test_mock={
"my_function_that_can_be_mocked": lambda *args, **kwargs: "Hello, world!"
},
# The credentials required for testing the block.
# This is an instance of APIKeyCredentials with sample values.
test_credentials=TEST_CREDENTIALS,
# The type of the block, which is an instance of BlockType Enum.
block_type=BlockType.STANDARD,
# The webhook configuration for the block, if any.
# This can be an instance of BlockWebhookConfig or BlockManualWebhookConfig.
webhook_config=None,
)
@staticmethod
def my_static_method(input: str) -> str:
logger.info("my_static_method called with input: %s", input)
return f"Hello, {input}!"
def my_function_that_can_be_mocked(
self, input: str, credentials: Optional[APIKeyCredentials] = None
) -> str:
logger.info("my_function_that_can_be_mocked called with input: %s", input)
if credentials:
# Use the ExampleClient from _api.py to make an API call
client = ExampleClient(credentials=credentials)
try:
# Create a sample resource using the client
resource_data = {"name": input, "type": "greeting"}
response = client.create_resource(resource_data)
return f"API response: {response.get('message', 'Hello, world!')}"
except ExampleAPIException as e:
logger.error("API call failed: %s", str(e))
return f"API error: {str(e)}"
# Fallback if no credentials provided
return "Hello, world!"
def run(self, input_data: Input, **kwargs) -> BlockOutput:
greeting = ExampleBlock.my_static_method(input_data.greeting)
message = self.my_function_that_can_be_mocked(greeting)
yield "response", {"message": message}

View File

@@ -0,0 +1,61 @@
import logging
from backend.data.block import (
Block,
BlockCategory,
BlockManualWebhookConfig,
BlockOutput,
BlockSchema,
)
from backend.data.model import SchemaField
from backend.integrations.webhooks.example import ExampleWebhookType
logger = logging.getLogger(__name__)
class ExampleTriggerBlock(Block):
"""
A trigger block that is activated by an external webhook event.
Unlike standard blocks that are manually executed, trigger blocks are automatically
activated when a webhook event is received from the specified provider.
"""
class Input(BlockSchema):
# The payload field is hidden because it's automatically populated by the webhook
# system rather than being manually entered by the user
payload: dict = SchemaField(hidden=True)
class Output(BlockSchema):
event_data: dict = SchemaField(
description="The contents of the example webhook event."
)
def __init__(self):
super().__init__(
id="7c5933ce-d60c-42dd-9c4e-db82496474a3",
description="This block will output the contents of an example webhook event.",
categories={BlockCategory.BASIC},
input_schema=ExampleTriggerBlock.Input,
output_schema=ExampleTriggerBlock.Output,
# The webhook_config is a key difference from standard blocks
# It defines which external service can trigger this block and what type of events it responds to
webhook_config=BlockManualWebhookConfig(
provider="example_provider", # The external service that will send webhook events
webhook_type=ExampleWebhookType.EXAMPLE, # The specific event type this block responds to
),
# Test input for trigger blocks should mimic the payload structure that would be received from the webhook
test_input=[
{
"payload": {
"event_type": "example",
"data": "Sample webhook data",
}
}
],
)
def run(self, input_data: Input, **kwargs) -> BlockOutput:
# For trigger blocks, the run method is called automatically when a webhook event is received
# The payload from the webhook is passed in as input_data.payload
yield "event_data", input_data.payload

View File

@@ -178,6 +178,7 @@ example_credentials = APIKeyCredentials(
)
DEFAULT_CREDENTIALS = [
example_credentials,
ollama_credentials,
revid_credentials,
ideogram_credentials,