mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-04-08 03:00:28 -04:00
Moving examples and docs to dedicated pr
This commit is contained in:
@@ -1,105 +0,0 @@
|
||||
# Example Blocks Using the SDK Provider Pattern
|
||||
|
||||
This directory contains example blocks demonstrating how to use the AutoGPT SDK with the provider builder pattern.
|
||||
|
||||
## Provider Builder Pattern
|
||||
|
||||
The provider builder pattern is the recommended way to configure providers for your blocks. It provides a clean, declarative way to set up authentication, costs, rate limits, and other provider-specific settings.
|
||||
|
||||
### Basic Provider Configuration
|
||||
|
||||
Create a `_config.py` file in your block directory:
|
||||
|
||||
```python
|
||||
from backend.sdk import BlockCostType, ProviderBuilder
|
||||
|
||||
# Configure your provider
|
||||
my_provider = (
|
||||
ProviderBuilder("my-service")
|
||||
.with_api_key("MY_SERVICE_API_KEY", "My Service API Key")
|
||||
.with_base_cost(1, BlockCostType.RUN)
|
||||
.build()
|
||||
)
|
||||
```
|
||||
|
||||
### Using the Provider in Your Block
|
||||
|
||||
Import the provider and use its `credentials_field()` method:
|
||||
|
||||
```python
|
||||
from backend.sdk import Block, BlockSchema, CredentialsMetaInput
|
||||
from ._config import my_provider
|
||||
|
||||
class MyBlock(Block):
|
||||
class Input(BlockSchema):
|
||||
credentials: CredentialsMetaInput = my_provider.credentials_field(
|
||||
description="Credentials for My Service"
|
||||
)
|
||||
```
|
||||
|
||||
## Examples in This Directory
|
||||
|
||||
### 1. Simple Example Block (`simple_example_block.py`)
|
||||
- Basic block without authentication
|
||||
- Shows minimal SDK usage
|
||||
|
||||
### 2. Example SDK Block (`example_sdk_block.py`)
|
||||
- Uses provider builder pattern for API key authentication
|
||||
- Demonstrates credential handling
|
||||
|
||||
### 3. Webhook Example Block (`example_webhook_sdk_block.py`)
|
||||
- Shows webhook configuration with provider pattern
|
||||
- Includes webhook manager setup
|
||||
|
||||
### 4. Advanced Provider Example (`advanced_provider_example.py`)
|
||||
- Multiple authentication types (API Key and OAuth)
|
||||
- Custom API client integration
|
||||
- Rate limiting configuration
|
||||
- Advanced provider features
|
||||
|
||||
## Benefits of the Provider Pattern
|
||||
|
||||
1. **Centralized Configuration**: All provider settings in one place
|
||||
2. **Type Safety**: Full type hints and IDE support
|
||||
3. **Reusability**: Share provider config across multiple blocks
|
||||
4. **Consistency**: Standardized pattern across the codebase
|
||||
5. **Easy Testing**: Mock providers for unit tests
|
||||
|
||||
## Best Practices
|
||||
|
||||
1. Always create a `_config.py` file for your provider configurations
|
||||
2. Use descriptive names for your providers
|
||||
3. Include proper descriptions in `credentials_field()`
|
||||
4. Set appropriate base costs for your blocks
|
||||
5. Configure rate limits if your provider has them
|
||||
6. Use OAuth when available for better security
|
||||
|
||||
## Testing
|
||||
|
||||
When testing blocks with providers:
|
||||
|
||||
```python
|
||||
from backend.sdk import APIKeyCredentials, SecretStr
|
||||
|
||||
# Create test credentials
|
||||
test_creds = APIKeyCredentials(
|
||||
id="test-creds",
|
||||
provider="my-service",
|
||||
api_key=SecretStr("test-api-key"),
|
||||
title="Test API Key",
|
||||
)
|
||||
|
||||
# Use in your tests
|
||||
block = MyBlock()
|
||||
async for output_name, output_value in block.run(
|
||||
MyBlock.Input(
|
||||
credentials={
|
||||
"provider": "my-service",
|
||||
"id": "test-creds",
|
||||
"type": "api_key",
|
||||
}
|
||||
),
|
||||
credentials=test_creds,
|
||||
):
|
||||
# Process outputs
|
||||
```
|
||||
@@ -1,83 +0,0 @@
|
||||
"""
|
||||
Shared configuration for example blocks using the new SDK pattern.
|
||||
"""
|
||||
|
||||
from backend.sdk import APIKeyCredentials, BlockCostType, ProviderBuilder, SecretStr
|
||||
|
||||
# Configure the example service provider
|
||||
example_service = (
|
||||
ProviderBuilder("example-service")
|
||||
.with_api_key("EXAMPLE_SERVICE_API_KEY", "Example Service API Key")
|
||||
.with_base_cost(1, BlockCostType.RUN)
|
||||
.build()
|
||||
)
|
||||
|
||||
# Test credentials for example service
|
||||
EXAMPLE_SERVICE_TEST_CREDENTIALS = APIKeyCredentials(
|
||||
id="01234567-89ab-cdef-0123-456789abcdef",
|
||||
provider="example-service",
|
||||
api_key=SecretStr("mock-example-api-key"),
|
||||
title="Mock Example Service API key",
|
||||
expires_at=None,
|
||||
)
|
||||
|
||||
EXAMPLE_SERVICE_TEST_CREDENTIALS_INPUT = {
|
||||
"provider": EXAMPLE_SERVICE_TEST_CREDENTIALS.provider,
|
||||
"id": EXAMPLE_SERVICE_TEST_CREDENTIALS.id,
|
||||
"type": EXAMPLE_SERVICE_TEST_CREDENTIALS.type,
|
||||
"title": EXAMPLE_SERVICE_TEST_CREDENTIALS.title,
|
||||
}
|
||||
|
||||
# Configure the example webhook provider
|
||||
example_webhook = (
|
||||
ProviderBuilder(name="examplewebhook")
|
||||
.with_api_key(
|
||||
env_var_name="EXAMPLE_WEBHOOK_API_KEY", title="Example Webhook API Key"
|
||||
)
|
||||
.with_base_cost(
|
||||
amount=0, cost_type=BlockCostType.RUN
|
||||
) # Webhooks typically don't have run costs
|
||||
.build()
|
||||
)
|
||||
|
||||
# Advanced provider configuration
|
||||
advanced_service = (
|
||||
ProviderBuilder(name="advanced-service")
|
||||
.with_api_key(env_var_name="ADVANCED_API_KEY", title="Advanced Service API Key")
|
||||
.with_base_cost(amount=2, cost_type=BlockCostType.RUN)
|
||||
.build()
|
||||
)
|
||||
|
||||
|
||||
# Example of a provider with custom API client
|
||||
class CustomAPIProvider:
|
||||
"""Example custom API client for demonstration."""
|
||||
|
||||
def __init__(self, credentials):
|
||||
self.credentials = credentials
|
||||
|
||||
async def request(self, method: str, endpoint: str, **kwargs):
|
||||
# Example of how to use Requests module:
|
||||
# from backend.sdk import Requests
|
||||
# response = await Requests().post(
|
||||
# url="https://api.example.com" + endpoint,
|
||||
# headers={
|
||||
# "Content-Type": "application/json",
|
||||
# "x-api-key": self.credentials.api_key.get_secret_value()
|
||||
# },
|
||||
# json=kwargs.get("data", {})
|
||||
# )
|
||||
# return response.json()
|
||||
|
||||
# Simulated API request for example
|
||||
return {"status": "ok", "data": kwargs.get("data", {})}
|
||||
|
||||
|
||||
# Configure provider with custom API client
|
||||
custom_api = (
|
||||
ProviderBuilder(name="custom-api")
|
||||
.with_api_key(env_var_name="CUSTOM_API_KEY", title="Custom API Key")
|
||||
.with_api_client(factory=lambda creds: CustomAPIProvider(creds))
|
||||
.with_base_cost(amount=3, cost_type=BlockCostType.RUN)
|
||||
.build()
|
||||
)
|
||||
@@ -1,150 +0,0 @@
|
||||
"""
|
||||
Advanced Provider Example using the SDK
|
||||
|
||||
This demonstrates more advanced provider configurations including:
|
||||
1. API Key authentication
|
||||
2. Custom API client integration
|
||||
3. Error handling patterns
|
||||
4. Multiple provider configurations
|
||||
"""
|
||||
|
||||
import logging
|
||||
|
||||
from backend.sdk import (
|
||||
APIKeyCredentials,
|
||||
Block,
|
||||
BlockCategory,
|
||||
BlockOutput,
|
||||
BlockSchema,
|
||||
CredentialsMetaInput,
|
||||
SchemaField,
|
||||
)
|
||||
|
||||
from ._config import advanced_service, custom_api
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class AdvancedProviderBlock(Block):
|
||||
"""
|
||||
Advanced example block demonstrating API key authentication
|
||||
and provider configuration features.
|
||||
"""
|
||||
|
||||
class Input(BlockSchema):
|
||||
credentials: CredentialsMetaInput = advanced_service.credentials_field(
|
||||
description="Credentials for Advanced Service",
|
||||
)
|
||||
operation: str = SchemaField(
|
||||
description="Operation to perform",
|
||||
default="read",
|
||||
)
|
||||
data: str = SchemaField(
|
||||
description="Data to process",
|
||||
default="",
|
||||
)
|
||||
|
||||
class Output(BlockSchema):
|
||||
result: str = SchemaField(description="Operation result")
|
||||
auth_type: str = SchemaField(description="Authentication type used")
|
||||
success: bool = SchemaField(description="Whether operation succeeded")
|
||||
|
||||
def __init__(self):
|
||||
super().__init__(
|
||||
id="d0086843-4c6c-4b9a-a490-d0e7b4cb317e",
|
||||
description="Advanced provider example with multiple auth types",
|
||||
categories={BlockCategory.DEVELOPER_TOOLS},
|
||||
input_schema=AdvancedProviderBlock.Input,
|
||||
output_schema=AdvancedProviderBlock.Output,
|
||||
)
|
||||
|
||||
async def run(
|
||||
self,
|
||||
input_data: Input,
|
||||
*,
|
||||
credentials: APIKeyCredentials,
|
||||
**kwargs,
|
||||
) -> BlockOutput:
|
||||
try:
|
||||
logger.debug(
|
||||
"Starting AdvancedProviderBlock run with operation: %s",
|
||||
input_data.operation,
|
||||
)
|
||||
# Use API key authentication
|
||||
_ = (
|
||||
credentials.api_key.get_secret_value()
|
||||
) # Would be used in real implementation
|
||||
logger.debug("Successfully authenticated with API key")
|
||||
|
||||
result = f"Performed {input_data.operation} with API key auth"
|
||||
logger.debug("Operation completed successfully")
|
||||
|
||||
yield "result", result
|
||||
yield "auth_type", "api_key"
|
||||
yield "success", True
|
||||
|
||||
except Exception as e:
|
||||
logger.error("Error in AdvancedProviderBlock: %s", str(e))
|
||||
yield "result", f"Error: {str(e)}"
|
||||
yield "auth_type", "error"
|
||||
yield "success", False
|
||||
|
||||
|
||||
class CustomAPIBlock(Block):
|
||||
"""
|
||||
Example block using a provider with custom API client.
|
||||
"""
|
||||
|
||||
class Input(BlockSchema):
|
||||
credentials: CredentialsMetaInput = custom_api.credentials_field(
|
||||
description="Credentials for Custom API",
|
||||
)
|
||||
endpoint: str = SchemaField(
|
||||
description="API endpoint to call",
|
||||
default="/data",
|
||||
)
|
||||
payload: str = SchemaField(
|
||||
description="Payload to send",
|
||||
default="{}",
|
||||
)
|
||||
|
||||
class Output(BlockSchema):
|
||||
response: str = SchemaField(description="API response")
|
||||
status: str = SchemaField(description="Response status")
|
||||
|
||||
def __init__(self):
|
||||
super().__init__(
|
||||
id="979ccdfd-db5a-4179-ad57-aeb277999d79",
|
||||
description="Example using custom API client provider",
|
||||
categories={BlockCategory.DEVELOPER_TOOLS},
|
||||
input_schema=CustomAPIBlock.Input,
|
||||
output_schema=CustomAPIBlock.Output,
|
||||
)
|
||||
|
||||
async def run(
|
||||
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
|
||||
) -> BlockOutput:
|
||||
try:
|
||||
logger.debug(
|
||||
"Starting CustomAPIBlock run with endpoint: %s", input_data.endpoint
|
||||
)
|
||||
# Get API client from provider
|
||||
api_client = custom_api.get_api(credentials)
|
||||
logger.debug("Successfully obtained API client")
|
||||
|
||||
# Make API request
|
||||
logger.debug("Making API request with payload: %s", input_data.payload)
|
||||
response = await api_client.request(
|
||||
method="POST",
|
||||
endpoint=input_data.endpoint,
|
||||
data=input_data.payload,
|
||||
)
|
||||
logger.debug("Received API response: %s", response)
|
||||
|
||||
yield "response", str(response)
|
||||
yield "status", response.get("status", "unknown")
|
||||
|
||||
except Exception as e:
|
||||
logger.error("Error in CustomAPIBlock: %s", str(e))
|
||||
yield "response", f"Error: {str(e)}"
|
||||
yield "status", "error"
|
||||
@@ -1,166 +0,0 @@
|
||||
"""
|
||||
Example Block demonstrating the cost decorator
|
||||
|
||||
This shows how to define custom costs for a block using the @cost decorator.
|
||||
"""
|
||||
|
||||
from backend.sdk import (
|
||||
Block,
|
||||
BlockCategory,
|
||||
BlockCost,
|
||||
BlockCostType,
|
||||
BlockOutput,
|
||||
BlockSchema,
|
||||
CredentialsMetaInput,
|
||||
SchemaField,
|
||||
cost,
|
||||
)
|
||||
|
||||
from ._config import example_service
|
||||
|
||||
|
||||
# Example 1: Simple block with fixed cost
|
||||
@cost(BlockCost(cost_type=BlockCostType.RUN, cost_amount=5))
|
||||
class FixedCostBlock(Block):
|
||||
"""Block with a fixed cost per run."""
|
||||
|
||||
class Input(BlockSchema):
|
||||
data: str = SchemaField(description="Input data")
|
||||
|
||||
class Output(BlockSchema):
|
||||
result: str = SchemaField(description="Processed data")
|
||||
cost: int = SchemaField(description="Cost in credits")
|
||||
|
||||
def __init__(self):
|
||||
super().__init__(
|
||||
id="96f31d13-d741-46a1-97d4-f7e1c3beb9b5",
|
||||
description="Example block with fixed cost of 5 credits per run",
|
||||
categories={BlockCategory.DEVELOPER_TOOLS},
|
||||
input_schema=self.Input,
|
||||
output_schema=self.Output,
|
||||
)
|
||||
|
||||
async def run(self, input_data: Input, **kwargs) -> BlockOutput:
|
||||
yield "result", f"Processed: {input_data.data}"
|
||||
yield "cost", 5
|
||||
|
||||
|
||||
# Example 2: Block with tiered costs based on input
|
||||
@cost(
|
||||
BlockCost(
|
||||
cost_type=BlockCostType.RUN,
|
||||
cost_amount=1,
|
||||
cost_filter={"tier": "basic"},
|
||||
),
|
||||
BlockCost(
|
||||
cost_type=BlockCostType.RUN,
|
||||
cost_amount=5,
|
||||
cost_filter={"tier": "standard"},
|
||||
),
|
||||
BlockCost(
|
||||
cost_type=BlockCostType.RUN,
|
||||
cost_amount=10,
|
||||
cost_filter={"tier": "premium"},
|
||||
),
|
||||
)
|
||||
class TieredCostBlock(Block):
|
||||
"""Block with different costs based on selected tier."""
|
||||
|
||||
class Input(BlockSchema):
|
||||
data: str = SchemaField(description="Input data")
|
||||
tier: str = SchemaField(
|
||||
description="Service tier (basic, standard, or premium)",
|
||||
default="basic",
|
||||
)
|
||||
|
||||
class Output(BlockSchema):
|
||||
result: str = SchemaField(description="Processed data")
|
||||
tier_used: str = SchemaField(description="Service tier used")
|
||||
cost: int = SchemaField(description="Cost in credits")
|
||||
|
||||
def __init__(self):
|
||||
super().__init__(
|
||||
id="fb30be87-f8f7-4701-86b0-6e8cc8046750",
|
||||
description="Example block with tiered pricing",
|
||||
categories={BlockCategory.DEVELOPER_TOOLS},
|
||||
input_schema=self.Input,
|
||||
output_schema=self.Output,
|
||||
)
|
||||
|
||||
async def run(self, input_data: Input, **kwargs) -> BlockOutput:
|
||||
# Simulate different processing based on tier
|
||||
tier_costs = {"basic": 1, "standard": 5, "premium": 10}
|
||||
cost = tier_costs.get(input_data.tier, 1)
|
||||
|
||||
yield "result", f"Processed with {input_data.tier} tier: {input_data.data}"
|
||||
yield "tier_used", input_data.tier
|
||||
yield "cost", cost
|
||||
|
||||
|
||||
# Example 3: Block that uses provider base cost (no @cost decorator)
|
||||
class ProviderCostBlock(Block):
|
||||
"""Block that inherits cost from its provider configuration."""
|
||||
|
||||
class Input(BlockSchema):
|
||||
credentials: CredentialsMetaInput = example_service.credentials_field(
|
||||
description="Example service credentials",
|
||||
)
|
||||
data: str = SchemaField(description="Input data")
|
||||
|
||||
class Output(BlockSchema):
|
||||
result: str = SchemaField(description="Processed data")
|
||||
provider_used: str = SchemaField(description="Provider name")
|
||||
|
||||
def __init__(self):
|
||||
super().__init__(
|
||||
id="68668919-91c7-4374-aa27-b130c456319b",
|
||||
description="Example block using provider base cost (1 credit per run)",
|
||||
categories={BlockCategory.DEVELOPER_TOOLS},
|
||||
input_schema=self.Input,
|
||||
output_schema=self.Output,
|
||||
)
|
||||
|
||||
async def run(self, input_data: Input, **kwargs) -> BlockOutput:
|
||||
# This block will use the base cost from example_service (1 credit)
|
||||
yield "result", f"Processed by provider: {input_data.data}"
|
||||
yield "provider_used", "example-service"
|
||||
|
||||
|
||||
# Example 4: Block with data-based cost
|
||||
@cost(
|
||||
BlockCost(
|
||||
cost_type=BlockCostType.BYTE,
|
||||
cost_amount=1, # 1 credit per byte
|
||||
)
|
||||
)
|
||||
class DataBasedCostBlock(Block):
|
||||
"""Block that charges based on data size."""
|
||||
|
||||
class Input(BlockSchema):
|
||||
data: str = SchemaField(description="Input data")
|
||||
process_intensive: bool = SchemaField(
|
||||
description="Use intensive processing",
|
||||
default=False,
|
||||
)
|
||||
|
||||
class Output(BlockSchema):
|
||||
result: str = SchemaField(description="Processed data")
|
||||
data_size: int = SchemaField(description="Data size in bytes")
|
||||
estimated_cost: str = SchemaField(description="Estimated cost")
|
||||
|
||||
def __init__(self):
|
||||
super().__init__(
|
||||
id="cd928dc6-75f0-4548-9004-7fae8f4cc677",
|
||||
description="Example block that charges 1 credit per byte",
|
||||
categories={BlockCategory.DEVELOPER_TOOLS},
|
||||
input_schema=self.Input,
|
||||
output_schema=self.Output,
|
||||
)
|
||||
|
||||
async def run(self, input_data: Input, **kwargs) -> BlockOutput:
|
||||
data_size = len(input_data.data.encode("utf-8"))
|
||||
estimated_cost = data_size * 1.0 # 1 credit per byte
|
||||
|
||||
yield "result", f"Processed {data_size} bytes"
|
||||
yield "data_size", data_size
|
||||
yield "estimated_cost", f"{estimated_cost:.3f} credits"
|
||||
@@ -1,97 +0,0 @@
|
||||
"""
|
||||
Example Block using the new SDK
|
||||
|
||||
This demonstrates:
|
||||
1. Import from backend.sdk module
|
||||
2. Auto-registration decorators
|
||||
3. No external configuration needed
|
||||
"""
|
||||
|
||||
from backend.sdk import (
|
||||
APIKeyCredentials,
|
||||
Block,
|
||||
BlockCategory,
|
||||
BlockOutput,
|
||||
BlockSchema,
|
||||
CredentialsMetaInput,
|
||||
SchemaField,
|
||||
)
|
||||
|
||||
from ._config import (
|
||||
EXAMPLE_SERVICE_TEST_CREDENTIALS,
|
||||
EXAMPLE_SERVICE_TEST_CREDENTIALS_INPUT,
|
||||
example_service,
|
||||
)
|
||||
|
||||
|
||||
# Example of a simple service
|
||||
class ExampleSDKBlock(Block):
|
||||
"""
|
||||
Example block demonstrating the new SDK system.
|
||||
|
||||
With the new SDK:
|
||||
- All imports come from 'backend.sdk'
|
||||
- Costs are registered via @cost_config decorator
|
||||
- Default credentials via @default_credentials decorator
|
||||
- Provider name via @provider decorator
|
||||
- No need to modify any files outside the blocks folder!
|
||||
"""
|
||||
|
||||
class Input(BlockSchema):
|
||||
credentials: CredentialsMetaInput = example_service.credentials_field(
|
||||
description="Credentials for Example Service API",
|
||||
)
|
||||
text: str = SchemaField(description="Text to process", default="Hello, World!")
|
||||
max_length: int = SchemaField(
|
||||
description="Maximum length of output", default=100
|
||||
)
|
||||
|
||||
class Output(BlockSchema):
|
||||
result: str = SchemaField(description="Processed text result")
|
||||
length: int = SchemaField(description="Length of the result")
|
||||
api_key_used: bool = SchemaField(description="Whether API key was used")
|
||||
error: str = SchemaField(description="Error message if any")
|
||||
|
||||
def __init__(self):
|
||||
super().__init__(
|
||||
id="83815e8c-1273-418e-a8c2-4c454e042060",
|
||||
description="Example block showing SDK capabilities with auto-registration",
|
||||
categories={BlockCategory.TEXT, BlockCategory.BASIC},
|
||||
input_schema=ExampleSDKBlock.Input,
|
||||
output_schema=ExampleSDKBlock.Output,
|
||||
test_input={
|
||||
"credentials": EXAMPLE_SERVICE_TEST_CREDENTIALS_INPUT,
|
||||
"text": "Test input",
|
||||
"max_length": 50,
|
||||
},
|
||||
test_output=[
|
||||
("result", "PROCESSED: Test input"),
|
||||
("length", 21), # Length of "PROCESSED: Test input"
|
||||
("api_key_used", True),
|
||||
],
|
||||
test_credentials=EXAMPLE_SERVICE_TEST_CREDENTIALS,
|
||||
)
|
||||
|
||||
async def run(
|
||||
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
|
||||
) -> BlockOutput:
|
||||
try:
|
||||
# Get API key from credentials
|
||||
api_key = credentials.api_key.get_secret_value()
|
||||
|
||||
# Simulate API processing
|
||||
processed_text = f"PROCESSED: {input_data.text}"
|
||||
|
||||
# Truncate if needed
|
||||
if len(processed_text) > input_data.max_length:
|
||||
processed_text = processed_text[: input_data.max_length]
|
||||
|
||||
yield "result", processed_text
|
||||
yield "length", len(processed_text)
|
||||
yield "api_key_used", bool(api_key)
|
||||
|
||||
except Exception as e:
|
||||
yield "error", str(e)
|
||||
yield "result", ""
|
||||
yield "length", 0
|
||||
yield "api_key_used", False
|
||||
@@ -1,149 +0,0 @@
|
||||
"""
|
||||
Example Webhook Block using the new SDK
|
||||
|
||||
This demonstrates webhook auto-registration without modifying
|
||||
files outside the blocks folder.
|
||||
"""
|
||||
|
||||
from enum import Enum
|
||||
|
||||
from backend.sdk import (
|
||||
BaseModel,
|
||||
BaseWebhooksManager,
|
||||
Block,
|
||||
BlockCategory,
|
||||
BlockOutput,
|
||||
BlockSchema,
|
||||
BlockType,
|
||||
BlockWebhookConfig,
|
||||
CredentialsMetaInput,
|
||||
Field,
|
||||
ProviderName,
|
||||
SchemaField,
|
||||
)
|
||||
|
||||
from ._config import example_webhook
|
||||
|
||||
|
||||
# Define event filter model
|
||||
class ExampleEventFilter(BaseModel):
|
||||
created: bool = Field(default=True, description="Listen for created events")
|
||||
updated: bool = Field(default=True, description="Listen for updated events")
|
||||
deleted: bool = Field(default=False, description="Listen for deleted events")
|
||||
|
||||
|
||||
# First, define a simple webhook manager for our example service
|
||||
class ExampleWebhookManager(BaseWebhooksManager):
|
||||
"""Example webhook manager for demonstration."""
|
||||
|
||||
PROVIDER_NAME = ProviderName.GITHUB # Reuse GitHub for example
|
||||
|
||||
class WebhookType(str, Enum):
|
||||
EXAMPLE = "example"
|
||||
|
||||
@classmethod
|
||||
async def validate_payload(cls, webhook, request) -> tuple[dict, str]:
|
||||
"""Validate incoming webhook payload."""
|
||||
payload = await request.json()
|
||||
event_type = request.headers.get("X-Example-Event", "unknown")
|
||||
return payload, event_type
|
||||
|
||||
async def _register_webhook(
|
||||
self,
|
||||
credentials,
|
||||
webhook_type: str,
|
||||
resource: str,
|
||||
events: list[str],
|
||||
ingress_url: str,
|
||||
secret: str,
|
||||
) -> tuple[str, dict]:
|
||||
"""Register webhook with external service."""
|
||||
# In real implementation, this would call the external API
|
||||
return "example-webhook-id", {"registered": True}
|
||||
|
||||
async def _deregister_webhook(self, webhook, credentials) -> None:
|
||||
"""Deregister webhook from external service."""
|
||||
# In real implementation, this would call the external API
|
||||
pass
|
||||
|
||||
|
||||
# Now create the webhook block
|
||||
class ExampleWebhookSDKBlock(Block):
|
||||
"""
|
||||
Example webhook block demonstrating webhook capabilities.
|
||||
"""
|
||||
|
||||
class Input(BlockSchema):
|
||||
credentials: CredentialsMetaInput = example_webhook.credentials_field(
|
||||
description="Credentials for webhook service",
|
||||
)
|
||||
webhook_url: str = SchemaField(
|
||||
description="URL to receive webhooks (auto-generated)",
|
||||
default="",
|
||||
hidden=True,
|
||||
)
|
||||
event_filter: ExampleEventFilter = SchemaField(
|
||||
description="Filter for specific events", default=ExampleEventFilter()
|
||||
)
|
||||
payload: dict = SchemaField(
|
||||
description="Webhook payload data", default={}, hidden=True
|
||||
)
|
||||
|
||||
class Output(BlockSchema):
|
||||
event_type: str = SchemaField(description="Type of webhook event")
|
||||
event_data: dict = SchemaField(description="Event payload data")
|
||||
timestamp: str = SchemaField(description="Event timestamp")
|
||||
error: str = SchemaField(description="Error message if any")
|
||||
|
||||
def __init__(self):
|
||||
super().__init__(
|
||||
id="7e50eb33-f854-4b73-99a1-50cad2819ae0",
|
||||
description="Example webhook block with auto-registration",
|
||||
categories={BlockCategory.INPUT},
|
||||
input_schema=ExampleWebhookSDKBlock.Input,
|
||||
output_schema=ExampleWebhookSDKBlock.Output,
|
||||
block_type=BlockType.WEBHOOK,
|
||||
webhook_config=BlockWebhookConfig(
|
||||
provider=ProviderName.GITHUB, # Using GitHub for example
|
||||
webhook_type="example",
|
||||
event_filter_input="event_filter",
|
||||
resource_format="{event}",
|
||||
),
|
||||
)
|
||||
|
||||
async def run(self, input_data: Input, **kwargs) -> BlockOutput:
|
||||
try:
|
||||
# Extract webhook payload
|
||||
payload = input_data.payload
|
||||
|
||||
# Get event type and timestamp
|
||||
event_type = payload.get("action", "unknown")
|
||||
timestamp = payload.get("timestamp", "")
|
||||
|
||||
# Filter events based on event filter settings
|
||||
event_filter = input_data.event_filter
|
||||
should_process = False
|
||||
|
||||
if event_type == "created" and event_filter.created:
|
||||
should_process = True
|
||||
elif event_type == "updated" and event_filter.updated:
|
||||
should_process = True
|
||||
elif event_type == "deleted" and event_filter.deleted:
|
||||
should_process = True
|
||||
|
||||
if not should_process:
|
||||
yield "event_type", "filtered"
|
||||
yield "event_data", {}
|
||||
yield "timestamp", timestamp
|
||||
yield "error", ""
|
||||
return
|
||||
|
||||
yield "event_type", event_type
|
||||
yield "event_data", payload
|
||||
yield "timestamp", timestamp
|
||||
|
||||
except Exception as e:
|
||||
yield "error", str(e)
|
||||
yield "event_type", "error"
|
||||
yield "event_data", {}
|
||||
yield "timestamp", ""
|
||||
@@ -1,48 +0,0 @@
|
||||
"""
|
||||
Simple Example Block using the new SDK
|
||||
|
||||
This demonstrates the new SDK import pattern.
|
||||
Before SDK: Multiple complex imports from various modules
|
||||
After SDK: Single import statement
|
||||
"""
|
||||
|
||||
# === OLD WAY (Before SDK) ===
|
||||
# from backend.data.block import Block, BlockCategory, BlockOutput, BlockSchema
|
||||
# from backend.data.model import SchemaField, CredentialsField, CredentialsMetaInput
|
||||
# from backend.integrations.providers import ProviderName
|
||||
# from backend.data.cost import BlockCost, BlockCostType
|
||||
# from typing import List, Optional, Dict
|
||||
# from pydantic import SecretStr
|
||||
|
||||
# === NEW WAY (With SDK) ===
|
||||
from backend.sdk import Block, BlockCategory, BlockOutput, BlockSchema, SchemaField
|
||||
|
||||
|
||||
class SimpleExampleBlock(Block):
|
||||
"""
|
||||
A simple example block showing the power of the SDK.
|
||||
|
||||
Key benefits:
|
||||
1. Single import: from backend.sdk import *
|
||||
2. Clean, simple block structure
|
||||
"""
|
||||
|
||||
class Input(BlockSchema):
|
||||
text: str = SchemaField(description="Input text")
|
||||
count: int = SchemaField(description="Number of repetitions", default=1)
|
||||
|
||||
class Output(BlockSchema):
|
||||
result: str = SchemaField(description="Output result")
|
||||
|
||||
def __init__(self):
|
||||
super().__init__(
|
||||
id="4a6db1ae-e9d5-4a57-b8b6-9186174a6dd3",
|
||||
description="Simple example block using SDK",
|
||||
categories={BlockCategory.TEXT},
|
||||
input_schema=SimpleExampleBlock.Input,
|
||||
output_schema=SimpleExampleBlock.Output,
|
||||
)
|
||||
|
||||
async def run(self, input_data: Input, **kwargs) -> BlockOutput:
|
||||
result = input_data.text * input_data.count
|
||||
yield "result", result
|
||||
@@ -1,821 +0,0 @@
|
||||
# AutoGPT Platform SDK
|
||||
|
||||
The AutoGPT Platform SDK simplifies block development by providing a unified interface for creating blocks with authentication, webhooks, cost tracking, and more.
|
||||
|
||||
## Table of Contents
|
||||
|
||||
- [Quick Start](#quick-start)
|
||||
- [Basic Block Structure](#basic-block-structure)
|
||||
- [Provider Configuration](#provider-configuration)
|
||||
- [API Key Authentication](#api-key-authentication)
|
||||
- [OAuth Authentication](#oauth-authentication)
|
||||
- [Multiple Authentication Methods](#multiple-authentication-methods)
|
||||
- [Cost Management](#cost-management)
|
||||
- [Default Provider Costs](#default-provider-costs)
|
||||
- [Block-Specific Costs](#block-specific-costs)
|
||||
- [Cost Types](#cost-types)
|
||||
- [Tiered and Conditional Costs](#tiered-and-conditional-costs)
|
||||
- [Webhooks](#webhooks)
|
||||
- [Auto-Managed Webhooks](#auto-managed-webhooks)
|
||||
- [Manual Webhooks](#manual-webhooks)
|
||||
- [Advanced Features](#advanced-features)
|
||||
- [Custom API Clients](#custom-api-clients)
|
||||
- [Test Credentials](#test-credentials)
|
||||
- [Best Practices](#best-practices)
|
||||
- [Complete Examples](#complete-examples)
|
||||
|
||||
## Quick Start
|
||||
|
||||
The SDK uses a single import pattern instead of multiple complex imports:
|
||||
|
||||
```python
|
||||
from backend.sdk import (
|
||||
Block,
|
||||
BlockCategory,
|
||||
BlockOutput,
|
||||
BlockSchema,
|
||||
SchemaField,
|
||||
)
|
||||
|
||||
# Additional standard library imports as needed
|
||||
from typing import Optional, Literal # For type hints
|
||||
from enum import Enum # For enumerations
|
||||
```
|
||||
|
||||
## Basic Block Structure
|
||||
|
||||
Every block inherits from the `Block` base class and defines input/output schemas:
|
||||
|
||||
```python
|
||||
from backend.sdk import Block, BlockCategory, BlockOutput, BlockSchema, SchemaField
|
||||
|
||||
class MyBlock(Block):
|
||||
class Input(BlockSchema):
|
||||
text: str = SchemaField(description="Input text to process")
|
||||
count: int = SchemaField(description="Number of times to repeat", default=1)
|
||||
|
||||
class Output(BlockSchema):
|
||||
result: str = SchemaField(description="Processed result")
|
||||
|
||||
def __init__(self):
|
||||
super().__init__(
|
||||
id="unique-uuid-here", # Generate with uuid.uuid4()
|
||||
description="My block description",
|
||||
categories={BlockCategory.TEXT},
|
||||
input_schema=MyBlock.Input,
|
||||
output_schema=MyBlock.Output,
|
||||
)
|
||||
|
||||
async def run(self, input_data: Input, **kwargs) -> BlockOutput:
|
||||
result = input_data.text * input_data.count
|
||||
yield "result", result
|
||||
```
|
||||
|
||||
## Provider Configuration
|
||||
|
||||
Providers manage authentication, costs, and API client configuration. Create a `_config.py` file in your block directory:
|
||||
|
||||
### API Key Authentication
|
||||
|
||||
```python
|
||||
from backend.sdk import BlockCostType, ProviderBuilder
|
||||
|
||||
# API key from environment variable
|
||||
my_service = (
|
||||
ProviderBuilder("my-service")
|
||||
.with_api_key("MY_SERVICE_API_KEY", "My Service API Key")
|
||||
.with_base_cost(1, BlockCostType.RUN)
|
||||
.build()
|
||||
)
|
||||
```
|
||||
|
||||
Use in your block:
|
||||
|
||||
```python
|
||||
from backend.sdk import APIKeyCredentials, Block, BlockSchema, CredentialsMetaInput, SchemaField
|
||||
from ._config import my_service
|
||||
|
||||
class MyServiceBlock(Block):
|
||||
class Input(BlockSchema):
|
||||
credentials: CredentialsMetaInput = my_service.credentials_field(
|
||||
description="API credentials for My Service"
|
||||
)
|
||||
query: str = SchemaField(description="Query to process")
|
||||
|
||||
async def run(
|
||||
self,
|
||||
input_data: Input,
|
||||
*,
|
||||
credentials: APIKeyCredentials,
|
||||
**kwargs
|
||||
) -> BlockOutput:
|
||||
api_key = credentials.api_key.get_secret_value()
|
||||
# Use api_key to make API calls
|
||||
yield "result", f"Processed {input_data.query}"
|
||||
```
|
||||
|
||||
### OAuth Authentication
|
||||
|
||||
OAuth requires an OAuth handler class:
|
||||
|
||||
```python
|
||||
from backend.sdk import BaseOAuthHandler, OAuth2Credentials, ProviderName, SecretStr
|
||||
from typing import Optional
|
||||
from urllib.parse import urlencode
|
||||
|
||||
class MyServiceOAuthHandler(BaseOAuthHandler):
|
||||
PROVIDER_NAME = ProviderName("my-service")
|
||||
|
||||
def __init__(self, client_id: str, client_secret: str, redirect_uri: str):
|
||||
self.client_id = client_id
|
||||
self.client_secret = client_secret
|
||||
self.redirect_uri = redirect_uri
|
||||
self.auth_base_url = "https://api.myservice.com/oauth/authorize"
|
||||
self.token_url = "https://api.myservice.com/oauth/token"
|
||||
|
||||
def get_login_url(self, scopes: list[str], state: str, code_challenge: Optional[str]) -> str:
|
||||
# Build and return OAuth login URL
|
||||
params = {
|
||||
"client_id": self.client_id,
|
||||
"redirect_uri": self.redirect_uri,
|
||||
"response_type": "code",
|
||||
"scope": " ".join(scopes),
|
||||
"state": state,
|
||||
}
|
||||
return f"{self.auth_base_url}?{urlencode(params)}"
|
||||
|
||||
async def exchange_code_for_tokens(
|
||||
self, code: str, scopes: list[str], code_verifier: Optional[str]
|
||||
) -> OAuth2Credentials:
|
||||
# Exchange authorization code for tokens
|
||||
data = {
|
||||
"grant_type": "authorization_code",
|
||||
"code": code,
|
||||
"client_id": self.client_id,
|
||||
"client_secret": self.client_secret,
|
||||
"redirect_uri": self.redirect_uri,
|
||||
}
|
||||
# Make request and return OAuth2Credentials
|
||||
response = await Requests().post(self.token_url, json=data)
|
||||
tokens = response.json()
|
||||
|
||||
return OAuth2Credentials(
|
||||
provider="my-service",
|
||||
access_token=SecretStr(tokens["access_token"]),
|
||||
refresh_token=SecretStr(tokens.get("refresh_token")),
|
||||
expires_at=tokens.get("expires_at"),
|
||||
scopes=scopes,
|
||||
title="My Service OAuth"
|
||||
)
|
||||
```
|
||||
|
||||
Configure the provider with OAuth:
|
||||
|
||||
```python
|
||||
# In _config.py
|
||||
import os
|
||||
from backend.sdk import BlockCostType, ProviderBuilder
|
||||
from ._oauth import MyServiceOAuthHandler
|
||||
|
||||
# Check if OAuth is configured
|
||||
client_id = os.getenv("MY_SERVICE_CLIENT_ID")
|
||||
client_secret = os.getenv("MY_SERVICE_CLIENT_SECRET")
|
||||
OAUTH_IS_CONFIGURED = bool(client_id and client_secret)
|
||||
|
||||
# Build provider
|
||||
builder = ProviderBuilder("my-service").with_base_cost(1, BlockCostType.RUN)
|
||||
|
||||
if OAUTH_IS_CONFIGURED:
|
||||
builder = builder.with_oauth(
|
||||
MyServiceOAuthHandler,
|
||||
scopes=["read", "write"]
|
||||
)
|
||||
|
||||
my_service = builder.build()
|
||||
```
|
||||
|
||||
### Multiple Authentication Methods
|
||||
|
||||
Providers built with ProviderBuilder can support multiple authentication methods:
|
||||
|
||||
```python
|
||||
my_service = (
|
||||
ProviderBuilder("my-service")
|
||||
.with_api_key("MY_SERVICE_API_KEY", "My Service API Key")
|
||||
.with_oauth(MyServiceOAuthHandler, scopes=["read", "write"])
|
||||
.with_base_cost(1, BlockCostType.RUN)
|
||||
.build()
|
||||
)
|
||||
```
|
||||
|
||||
## Cost Management
|
||||
|
||||
### Default Provider Costs
|
||||
|
||||
Set a base cost that applies to all blocks using the provider:
|
||||
|
||||
```python
|
||||
my_service = (
|
||||
ProviderBuilder("my-service")
|
||||
.with_api_key("MY_SERVICE_API_KEY", "API Key")
|
||||
.with_base_cost(1, BlockCostType.RUN) # 1 credit per run
|
||||
.build()
|
||||
)
|
||||
```
|
||||
|
||||
### Block-Specific Costs
|
||||
|
||||
Override provider costs using the `@cost` decorator:
|
||||
|
||||
```python
|
||||
from backend.sdk import cost, BlockCost, BlockCostType
|
||||
|
||||
@cost(BlockCost(cost_type=BlockCostType.RUN, cost_amount=5))
|
||||
class ExpensiveBlock(Block):
|
||||
# This block costs 5 credits per run, overriding provider default
|
||||
...
|
||||
```
|
||||
|
||||
### Cost Types
|
||||
|
||||
The SDK supports different cost calculation methods:
|
||||
|
||||
```python
|
||||
# Fixed cost per run
|
||||
@cost(BlockCost(cost_type=BlockCostType.RUN, cost_amount=10))
|
||||
|
||||
# Cost based on data size (per byte)
|
||||
@cost(BlockCost(cost_type=BlockCostType.BYTE, cost_amount=0.001))
|
||||
|
||||
# Cost based on execution time (per second)
|
||||
@cost(BlockCost(cost_type=BlockCostType.SECOND, cost_amount=0.1))
|
||||
```
|
||||
|
||||
### Tiered and Conditional Costs
|
||||
|
||||
Define multiple costs with filters for tiered pricing:
|
||||
|
||||
```python
|
||||
from backend.sdk import cost, BlockCost, BlockCostType
|
||||
from enum import Enum
|
||||
|
||||
class ServiceTier(str, Enum):
|
||||
BASIC = "basic"
|
||||
PREMIUM = "premium"
|
||||
ENTERPRISE = "enterprise"
|
||||
|
||||
@cost(
|
||||
BlockCost(
|
||||
cost_type=BlockCostType.RUN,
|
||||
cost_amount=1,
|
||||
cost_filter={"tier": "basic"}
|
||||
),
|
||||
BlockCost(
|
||||
cost_type=BlockCostType.RUN,
|
||||
cost_amount=5,
|
||||
cost_filter={"tier": "premium"}
|
||||
),
|
||||
BlockCost(
|
||||
cost_type=BlockCostType.RUN,
|
||||
cost_amount=20,
|
||||
cost_filter={"tier": "enterprise"}
|
||||
)
|
||||
)
|
||||
class TieredServiceBlock(Block):
|
||||
class Input(BlockSchema):
|
||||
tier: ServiceTier = SchemaField(
|
||||
description="Service tier",
|
||||
default=ServiceTier.BASIC
|
||||
)
|
||||
# ... other fields
|
||||
```
|
||||
|
||||
## Webhooks
|
||||
|
||||
### Auto-Managed Webhooks
|
||||
|
||||
For services that require webhook registration/deregistration:
|
||||
|
||||
```python
|
||||
from backend.sdk import BaseWebhooksManager, Webhook, ProviderName
|
||||
from enum import Enum
|
||||
|
||||
class MyServiceWebhookManager(BaseWebhooksManager):
|
||||
PROVIDER_NAME = ProviderName("my-service")
|
||||
|
||||
class WebhookType(str, Enum):
|
||||
DATA_UPDATE = "data_update"
|
||||
STATUS_CHANGE = "status_change"
|
||||
|
||||
async def validate_payload(self, webhook: Webhook, request) -> tuple[dict, str]:
|
||||
"""Validate incoming webhook payload."""
|
||||
payload = await request.json()
|
||||
event_type = request.headers.get("X-Event-Type", "unknown")
|
||||
return payload, event_type
|
||||
|
||||
async def _register_webhook(
|
||||
self,
|
||||
credentials,
|
||||
webhook_type: str,
|
||||
resource: str,
|
||||
events: list[str],
|
||||
ingress_url: str,
|
||||
secret: str,
|
||||
) -> tuple[str, dict]:
|
||||
# Register webhook with external service
|
||||
# Return (webhook_id, metadata)
|
||||
api_key = credentials.api_key.get_secret_value()
|
||||
response = await Requests().post(
|
||||
"https://api.myservice.com/webhooks",
|
||||
headers={"Authorization": f"Bearer {api_key}"},
|
||||
json={
|
||||
"url": ingress_url,
|
||||
"events": events,
|
||||
"secret": secret,
|
||||
}
|
||||
)
|
||||
data = response.json()
|
||||
return data["id"], {"events": events}
|
||||
|
||||
async def _deregister_webhook(self, webhook: Webhook, credentials) -> None:
|
||||
# Deregister webhook from external service
|
||||
api_key = credentials.api_key.get_secret_value()
|
||||
await Requests().delete(
|
||||
f"https://api.myservice.com/webhooks/{webhook.provider_webhook_id}",
|
||||
headers={"Authorization": f"Bearer {api_key}"}
|
||||
)
|
||||
```
|
||||
|
||||
Configure the provider:
|
||||
|
||||
```python
|
||||
my_service = (
|
||||
ProviderBuilder("my-service")
|
||||
.with_api_key("MY_SERVICE_API_KEY", "API Key")
|
||||
.with_webhook_manager(MyServiceWebhookManager)
|
||||
.build()
|
||||
)
|
||||
```
|
||||
|
||||
Use in a webhook block:
|
||||
|
||||
```python
|
||||
from backend.sdk import Block, BlockType, BlockWebhookConfig, ProviderName
|
||||
|
||||
class MyWebhookBlock(Block):
|
||||
class Input(BlockSchema):
|
||||
credentials: CredentialsMetaInput = my_service.credentials_field(
|
||||
description="Credentials for webhook service"
|
||||
)
|
||||
webhook_url: str = SchemaField(
|
||||
description="URL to receive webhooks (auto-generated)",
|
||||
default="",
|
||||
hidden=True,
|
||||
)
|
||||
event_filter: dict = SchemaField(
|
||||
description="Filter for specific events",
|
||||
default={}
|
||||
)
|
||||
payload: dict = SchemaField(
|
||||
description="Webhook payload data",
|
||||
default={},
|
||||
hidden=True
|
||||
)
|
||||
|
||||
def __init__(self):
|
||||
super().__init__(
|
||||
id="unique-webhook-block-id",
|
||||
description="Receives webhooks from My Service",
|
||||
categories={BlockCategory.INPUT},
|
||||
input_schema=self.Input,
|
||||
output_schema=self.Output,
|
||||
block_type=BlockType.WEBHOOK,
|
||||
webhook_config=BlockWebhookConfig(
|
||||
provider=ProviderName("my-service"),
|
||||
webhook_type="data_update",
|
||||
event_filter_input="event_filter",
|
||||
resource_format="{resource_id}",
|
||||
),
|
||||
)
|
||||
```
|
||||
|
||||
### Manual Webhooks
|
||||
|
||||
For simple webhooks that don't require registration:
|
||||
|
||||
```python
|
||||
from backend.sdk import BlockManualWebhookConfig, ManualWebhookManagerBase
|
||||
|
||||
class SimpleWebhookBlock(Block):
|
||||
def __init__(self):
|
||||
super().__init__(
|
||||
id="simple-webhook-id",
|
||||
description="Simple webhook receiver",
|
||||
categories={BlockCategory.INPUT},
|
||||
input_schema=self.Input,
|
||||
output_schema=self.Output,
|
||||
block_type=BlockType.WEBHOOK,
|
||||
webhook_config=BlockManualWebhookConfig(
|
||||
provider=ProviderName("generic_webhook"),
|
||||
webhook_type="plain",
|
||||
),
|
||||
)
|
||||
```
|
||||
|
||||
## Advanced Features
|
||||
|
||||
### Custom API Clients
|
||||
|
||||
Provide a custom API client factory:
|
||||
|
||||
```python
|
||||
from backend.sdk import Requests
|
||||
|
||||
class MyAPIClient:
|
||||
def __init__(self, credentials):
|
||||
self.api_key = credentials.api_key.get_secret_value()
|
||||
self.base_url = "https://api.myservice.com"
|
||||
|
||||
async def request(self, method: str, endpoint: str, **kwargs):
|
||||
# Implement API request logic
|
||||
headers = kwargs.get("headers", {})
|
||||
headers["Authorization"] = f"Bearer {self.api_key}"
|
||||
kwargs["headers"] = headers
|
||||
|
||||
response = await Requests().request(
|
||||
method,
|
||||
f"{self.base_url}{endpoint}",
|
||||
**kwargs
|
||||
)
|
||||
return response.json()
|
||||
|
||||
my_service = (
|
||||
ProviderBuilder("my-service")
|
||||
.with_api_key("MY_SERVICE_API_KEY", "API Key")
|
||||
.with_api_client(lambda creds: MyAPIClient(creds))
|
||||
.build()
|
||||
)
|
||||
```
|
||||
|
||||
Use the API client in your block:
|
||||
|
||||
```python
|
||||
async def run(self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs):
|
||||
api_client = my_service.get_api(credentials)
|
||||
result = await api_client.request("GET", "/data")
|
||||
yield "result", result
|
||||
```
|
||||
|
||||
|
||||
### Test Credentials
|
||||
|
||||
Define test credentials for development and testing:
|
||||
|
||||
```python
|
||||
# In _config.py
|
||||
from backend.sdk import APIKeyCredentials, SecretStr
|
||||
|
||||
MY_SERVICE_TEST_CREDENTIALS = APIKeyCredentials(
|
||||
id="test-creds-id",
|
||||
provider="my-service",
|
||||
api_key=SecretStr("test-api-key"),
|
||||
title="Test API Key",
|
||||
expires_at=None,
|
||||
)
|
||||
|
||||
MY_SERVICE_TEST_CREDENTIALS_INPUT = {
|
||||
"provider": MY_SERVICE_TEST_CREDENTIALS.provider,
|
||||
"id": MY_SERVICE_TEST_CREDENTIALS.id,
|
||||
"type": MY_SERVICE_TEST_CREDENTIALS.type,
|
||||
"title": MY_SERVICE_TEST_CREDENTIALS.title,
|
||||
}
|
||||
```
|
||||
|
||||
## Best Practices
|
||||
|
||||
1. **Always use `_config.py`**: Keep provider configuration separate from block logic
|
||||
2. **Generate unique UUIDs**: Use `uuid.uuid4()` for block IDs
|
||||
3. **Set appropriate costs**: Consider API pricing when setting block costs
|
||||
4. **Handle errors gracefully**: Always wrap API calls in try-except blocks
|
||||
5. **Document thoroughly**: Use clear descriptions for all fields
|
||||
6. **Test with mock credentials**: Create test credentials for unit tests
|
||||
7. **Follow naming conventions**: Use lowercase with underscores for provider names
|
||||
8. **Check environment variables**: Verify OAuth credentials before configuring
|
||||
|
||||
## Complete Examples
|
||||
|
||||
### Simple API Key Block
|
||||
|
||||
```python
|
||||
# _config.py
|
||||
from backend.sdk import BlockCostType, ProviderBuilder
|
||||
|
||||
weather_api = (
|
||||
ProviderBuilder("weather_api")
|
||||
.with_api_key("WEATHER_API_KEY", "Weather API Key")
|
||||
.with_base_cost(1, BlockCostType.RUN)
|
||||
.build()
|
||||
)
|
||||
|
||||
# weather_block.py
|
||||
from backend.sdk import *
|
||||
from ._config import weather_api
|
||||
|
||||
class WeatherBlock(Block):
|
||||
class Input(BlockSchema):
|
||||
credentials: CredentialsMetaInput = weather_api.credentials_field(
|
||||
description="Weather API credentials"
|
||||
)
|
||||
city: str = SchemaField(description="City name")
|
||||
|
||||
class Output(BlockSchema):
|
||||
temperature: float = SchemaField(description="Temperature in Celsius")
|
||||
description: str = SchemaField(description="Weather description")
|
||||
|
||||
def __init__(self):
|
||||
super().__init__(
|
||||
id="a1b2c3d4-5678-90ab-cdef-1234567890ab",
|
||||
description="Get current weather for a city",
|
||||
categories={BlockCategory.SEARCH},
|
||||
input_schema=WeatherBlock.Input,
|
||||
output_schema=WeatherBlock.Output,
|
||||
)
|
||||
|
||||
async def run(
|
||||
self,
|
||||
input_data: Input,
|
||||
*,
|
||||
credentials: APIKeyCredentials,
|
||||
**kwargs
|
||||
) -> BlockOutput:
|
||||
api_key = credentials.api_key.get_secret_value()
|
||||
# Make API call with api_key
|
||||
response = await Requests().get(
|
||||
f"https://api.weather.com/v1/current",
|
||||
params={"city": input_data.city, "api_key": api_key}
|
||||
)
|
||||
data = response.json()
|
||||
yield "temperature", data["temp"]
|
||||
yield "description", data["desc"]
|
||||
```
|
||||
|
||||
### OAuth Block with Custom Costs
|
||||
|
||||
```python
|
||||
# _oauth.py
|
||||
from backend.sdk import BaseOAuthHandler, OAuth2Credentials, ProviderName, Requests, SecretStr
|
||||
from typing import Optional
|
||||
from urllib.parse import urlencode
|
||||
|
||||
class SocialOAuthHandler(BaseOAuthHandler):
|
||||
PROVIDER_NAME = ProviderName("social-api")
|
||||
|
||||
def __init__(self, client_id: str, client_secret: str, redirect_uri: str):
|
||||
self.client_id = client_id
|
||||
self.client_secret = client_secret
|
||||
self.redirect_uri = redirect_uri
|
||||
|
||||
def get_login_url(self, scopes: list[str], state: str, code_challenge: Optional[str]) -> str:
|
||||
params = {
|
||||
"client_id": self.client_id,
|
||||
"redirect_uri": self.redirect_uri,
|
||||
"response_type": "code",
|
||||
"scope": " ".join(scopes),
|
||||
"state": state,
|
||||
}
|
||||
return f"https://social-api.com/oauth/authorize?{urlencode(params)}"
|
||||
|
||||
async def exchange_code_for_tokens(
|
||||
self, code: str, scopes: list[str], code_verifier: Optional[str]
|
||||
) -> OAuth2Credentials:
|
||||
response = await Requests().post(
|
||||
"https://social-api.com/oauth/token",
|
||||
json={
|
||||
"grant_type": "authorization_code",
|
||||
"code": code,
|
||||
"client_id": self.client_id,
|
||||
"client_secret": self.client_secret,
|
||||
"redirect_uri": self.redirect_uri,
|
||||
}
|
||||
)
|
||||
tokens = response.json()
|
||||
|
||||
return OAuth2Credentials(
|
||||
provider="social-api",
|
||||
access_token=SecretStr(tokens["access_token"]),
|
||||
refresh_token=SecretStr(tokens.get("refresh_token")),
|
||||
expires_at=tokens.get("expires_at"),
|
||||
scopes=scopes,
|
||||
title="Social API OAuth"
|
||||
)
|
||||
|
||||
# _config.py
|
||||
import os
|
||||
from backend.sdk import BlockCostType, ProviderBuilder
|
||||
from ._oauth import SocialOAuthHandler
|
||||
|
||||
social_api = (
|
||||
ProviderBuilder("social-api")
|
||||
.with_oauth(SocialOAuthHandler, scopes=["read", "write"])
|
||||
.with_base_cost(2, BlockCostType.RUN)
|
||||
.build()
|
||||
)
|
||||
|
||||
# social_block.py
|
||||
from backend.sdk import *
|
||||
from typing import Literal
|
||||
from ._config import social_api
|
||||
|
||||
@cost(
|
||||
BlockCost(cost_type=BlockCostType.RUN, cost_amount=5, cost_filter={"action": "post"}),
|
||||
BlockCost(cost_type=BlockCostType.RUN, cost_amount=1, cost_filter={"action": "read"})
|
||||
)
|
||||
class SocialBlock(Block):
|
||||
class Input(BlockSchema):
|
||||
credentials: CredentialsMetaInput = social_api.credentials_field(
|
||||
description="Social API OAuth credentials"
|
||||
)
|
||||
action: Literal["read", "post"] = SchemaField(
|
||||
description="Action to perform",
|
||||
default="read"
|
||||
)
|
||||
content: str = SchemaField(description="Content for the action")
|
||||
|
||||
class Output(BlockSchema):
|
||||
result: str = SchemaField(description="Action result")
|
||||
cost_applied: int = SchemaField(description="Credits charged")
|
||||
|
||||
def __init__(self):
|
||||
super().__init__(
|
||||
id="b2c3d4e5-6789-01ab-cdef-234567890abc",
|
||||
description="Interact with social media API",
|
||||
categories={BlockCategory.SOCIAL},
|
||||
input_schema=SocialBlock.Input,
|
||||
output_schema=SocialBlock.Output,
|
||||
)
|
||||
|
||||
async def run(
|
||||
self,
|
||||
input_data: Input,
|
||||
*,
|
||||
credentials: OAuth2Credentials,
|
||||
**kwargs
|
||||
) -> BlockOutput:
|
||||
# Use OAuth2 credentials
|
||||
access_token = credentials.access_token.get_secret_value()
|
||||
|
||||
headers = {"Authorization": f"Bearer {access_token}"}
|
||||
|
||||
if input_data.action == "post":
|
||||
# Costs 5 credits
|
||||
response = await Requests().post(
|
||||
"https://social-api.com/v1/posts",
|
||||
headers=headers,
|
||||
json={"content": input_data.content}
|
||||
)
|
||||
yield "result", f"Posted: {input_data.content}"
|
||||
yield "cost_applied", 5
|
||||
else:
|
||||
# Costs 1 credit
|
||||
response = await Requests().get(
|
||||
"https://social-api.com/v1/timeline",
|
||||
headers=headers
|
||||
)
|
||||
yield "result", f"Read timeline: {len(response.json())} items"
|
||||
yield "cost_applied", 1
|
||||
```
|
||||
|
||||
### Webhook Block with Event Filtering
|
||||
|
||||
```python
|
||||
# _webhook.py
|
||||
from backend.sdk import BaseWebhooksManager, Webhook, ProviderName, Requests
|
||||
from enum import Enum
|
||||
|
||||
class DataServiceWebhookManager(BaseWebhooksManager):
|
||||
PROVIDER_NAME = ProviderName("data-service")
|
||||
|
||||
class WebhookType(str, Enum):
|
||||
DATA_CHANGE = "data_change"
|
||||
|
||||
async def validate_payload(self, webhook: Webhook, request) -> tuple[dict, str]:
|
||||
"""Validate incoming webhook payload."""
|
||||
payload = await request.json()
|
||||
event_type = request.headers.get("X-Event-Type", "unknown")
|
||||
|
||||
# Verify webhook signature if needed
|
||||
signature = request.headers.get("X-Signature")
|
||||
# ... signature validation logic
|
||||
|
||||
return payload, event_type
|
||||
|
||||
async def _register_webhook(
|
||||
self,
|
||||
credentials,
|
||||
webhook_type: str,
|
||||
resource: str,
|
||||
events: list[str],
|
||||
ingress_url: str,
|
||||
secret: str,
|
||||
) -> tuple[str, dict]:
|
||||
api_key = credentials.api_key.get_secret_value()
|
||||
response = await Requests().post(
|
||||
f"https://api.dataservice.com/webhooks",
|
||||
headers={"Authorization": f"Bearer {api_key}"},
|
||||
json={
|
||||
"url": ingress_url,
|
||||
"resource": resource,
|
||||
"events": events,
|
||||
"secret": secret,
|
||||
}
|
||||
)
|
||||
webhook_data = response.json()
|
||||
return webhook_data["id"], {"resource": resource, "events": events}
|
||||
|
||||
async def _deregister_webhook(self, webhook: Webhook, credentials) -> None:
|
||||
api_key = credentials.api_key.get_secret_value()
|
||||
await Requests().delete(
|
||||
f"https://api.dataservice.com/webhooks/{webhook.provider_webhook_id}",
|
||||
headers={"Authorization": f"Bearer {api_key}"}
|
||||
)
|
||||
|
||||
# _config.py
|
||||
from backend.sdk import BlockCostType, ProviderBuilder
|
||||
from ._webhook import DataServiceWebhookManager
|
||||
|
||||
data_service = (
|
||||
ProviderBuilder("data-service")
|
||||
.with_api_key("DATA_SERVICE_API_KEY", "Data Service API Key")
|
||||
.with_webhook_manager(DataServiceWebhookManager)
|
||||
.with_base_cost(0, BlockCostType.RUN) # Webhooks typically free
|
||||
.build()
|
||||
)
|
||||
|
||||
# data_webhook_block.py
|
||||
from backend.sdk import *
|
||||
from ._config import data_service
|
||||
|
||||
class DataWebhookBlock(Block):
|
||||
class Input(BlockSchema):
|
||||
credentials: CredentialsMetaInput = data_service.credentials_field(
|
||||
description="Data Service credentials"
|
||||
)
|
||||
webhook_url: str = SchemaField(
|
||||
description="Webhook URL (auto-generated)",
|
||||
default="",
|
||||
hidden=True,
|
||||
)
|
||||
resource_id: str = SchemaField(
|
||||
description="Resource ID to monitor",
|
||||
default=""
|
||||
)
|
||||
event_types: list[str] = SchemaField(
|
||||
description="Event types to listen for",
|
||||
default=["created", "updated", "deleted"]
|
||||
)
|
||||
payload: dict = SchemaField(
|
||||
description="Webhook payload",
|
||||
default={},
|
||||
hidden=True
|
||||
)
|
||||
|
||||
class Output(BlockSchema):
|
||||
event_type: str = SchemaField(description="Type of event")
|
||||
resource_id: str = SchemaField(description="ID of affected resource")
|
||||
data: dict = SchemaField(description="Event data")
|
||||
timestamp: str = SchemaField(description="Event timestamp")
|
||||
|
||||
def __init__(self):
|
||||
super().__init__(
|
||||
id="c3d4e5f6-7890-12ab-cdef-345678901234",
|
||||
description="Receives data change events via webhook",
|
||||
categories={BlockCategory.INPUT},
|
||||
input_schema=DataWebhookBlock.Input,
|
||||
output_schema=DataWebhookBlock.Output,
|
||||
block_type=BlockType.WEBHOOK,
|
||||
webhook_config=BlockWebhookConfig(
|
||||
provider=ProviderName("data-service"),
|
||||
webhook_type="data_change",
|
||||
event_filter_input="event_types",
|
||||
resource_format="{resource_id}",
|
||||
),
|
||||
)
|
||||
|
||||
async def run(self, input_data: Input, **kwargs) -> BlockOutput:
|
||||
payload = input_data.payload
|
||||
|
||||
# Extract event details
|
||||
event_type = payload.get("event_type", "unknown")
|
||||
resource_id = payload.get("resource_id", input_data.resource_id)
|
||||
|
||||
# Filter events if needed
|
||||
if event_type not in input_data.event_types:
|
||||
return # Skip unwanted events
|
||||
|
||||
yield "event_type", event_type
|
||||
yield "resource_id", resource_id
|
||||
yield "data", payload.get("data", {})
|
||||
yield "timestamp", payload.get("timestamp", "")
|
||||
```
|
||||
|
||||
For more examples, see the `/autogpt_platform/backend/backend/blocks/examples/` directory.
|
||||
Reference in New Issue
Block a user