Compare commits

...

17 Commits

Author SHA1 Message Date
SwiftyOS
1b5f2c6f11 elevenlabs integration 2025-03-12 15:36:04 +01:00
SwiftyOS
8aba4a5d48 Dict -> dict 2025-03-12 12:59:16 +01:00
SwiftyOS
6ba9fd9cb4 add preference for pydantic objects over dicts 2025-03-12 12:58:52 +01:00
SwiftyOS
b16c2eed52 update comments 2025-03-12 12:58:26 +01:00
Swifty
5af718c9f5 Update autogpt_platform/backend/backend/integrations/webhooks/example.py
Co-authored-by: Reinier van der Leer <pwuts@agpt.co>
2025-03-12 12:51:29 +01:00
SwiftyOS
a588cf1dc5 best practice improvements 2025-03-12 12:51:29 +01:00
SwiftyOS
98a1adc397 code quality improvements 2025-03-12 12:51:29 +01:00
SwiftyOS
19728ebc05 improved the example 2025-03-12 12:51:29 +01:00
SwiftyOS
3e117aac5d Added example of yielding multiple each item from the list as well as the list itself 2025-03-12 12:51:29 +01:00
SwiftyOS
8dabe6c70d fmt 2025-03-12 12:51:29 +01:00
SwiftyOS
a04919beca change credential handling 2025-03-12 12:51:29 +01:00
SwiftyOS
c6b22842a4 movec api key down the .env file 2025-03-12 12:51:29 +01:00
SwiftyOS
614f751a90 Added example provider to the frontend 2025-03-12 12:51:29 +01:00
SwiftyOS
c458bec9c7 Added example api key to settings 2025-03-12 12:51:29 +01:00
SwiftyOS
040bde3f49 fix import 2025-03-12 12:51:29 +01:00
SwiftyOS
71028d57d7 refactor example block into its own folder 2025-03-12 12:51:29 +01:00
SwiftyOS
dbf014f936 Example block 2025-03-12 12:51:29 +01:00
17 changed files with 1611 additions and 0 deletions

View File

@@ -173,6 +173,9 @@ EXA_API_KEY=
# E2B
E2B_API_KEY=
# Example API Key
EXAMPLE_API_KEY=
# Mem0
MEM0_API_KEY=

View File

@@ -0,0 +1,491 @@
"""
API module for ElevenLabs API integration.
This module provides a client for the ElevenLabs API, which offers text-to-speech
and speech-to-speech capabilities.
"""
import json
from io import BytesIO
from typing import Any, Dict, List, Optional
from pydantic import BaseModel
from backend.data.model import APIKeyCredentials
from backend.util.request import Requests
class ElevenLabsException(Exception):
"""Exception raised for ElevenLabs API errors."""
def __init__(self, message: str, status_code: int):
super().__init__(message)
self.status_code = status_code
class VoiceSettings(BaseModel):
"""Model for voice settings."""
stability: float
similarity_boost: float
class ElevenLabsClient:
"""Client for the ElevenLabs API."""
API_BASE_URL = "https://api.elevenlabs.io/v1"
def __init__(
self,
credentials: Optional[APIKeyCredentials] = None,
custom_requests: Optional[Requests] = None,
):
"""
Initialize the ElevenLabs API client.
Args:
credentials: API key credentials for ElevenLabs.
custom_requests: Custom request handler (for testing).
"""
if custom_requests:
self._requests = custom_requests
else:
headers: Dict[str, str] = {
"Content-Type": "application/json",
}
if credentials:
headers["xi-api-key"] = credentials.api_key.get_secret_value()
self._requests = Requests(
extra_headers=headers,
raise_for_status=False,
)
def _handle_response(self, response, raw_binary=False) -> Any:
"""
Handle API response and check for errors.
Args:
response: Response object from the request.
raw_binary: If True, return the raw binary content instead of parsing as JSON.
Returns:
Parsed response data.
Raises:
ElevenLabsException: If the API request fails.
"""
if not response.ok:
error_message = "Unknown error"
try:
error_data = response.json()
if isinstance(error_data, dict):
error_message = error_data.get(
"detail", error_data.get("message", "Unknown error")
)
except Exception:
error_message = response.text or f"Error: HTTP {response.status_code}"
raise ElevenLabsException(
f"ElevenLabs API error ({response.status_code}): {error_message}",
response.status_code,
)
if raw_binary:
return response.content
try:
return response.json()
except Exception:
return response.content
# === Voice Management Endpoints ===
def get_models(self) -> List[Dict[str, Any]]:
"""
Retrieve a list of all available TTS and STS models.
Returns:
List of model objects.
"""
response = self._requests.get(f"{self.API_BASE_URL}/models")
return self._handle_response(response)
def get_voices(self) -> Dict[str, Any]:
"""
Retrieve a list of all voices available to the user.
Returns:
Dictionary containing the list of voices.
"""
response = self._requests.get(f"{self.API_BASE_URL}/voices")
return self._handle_response(response)
def get_voice(self, voice_id: str) -> Dict[str, Any]:
"""
Retrieve metadata about a specific voice.
Args:
voice_id: ID of the voice to retrieve.
Returns:
Voice metadata.
"""
response = self._requests.get(f"{self.API_BASE_URL}/voices/{voice_id}")
return self._handle_response(response)
def add_voice(
self,
name: str,
files: List[BytesIO],
description: Optional[str] = None,
labels: Optional[Dict[str, str]] = None,
) -> Dict[str, str]:
"""
Add a new voice by uploading audio samples.
Args:
name: Name for the new voice.
files: List of audio file objects.
description: Optional description for the voice.
labels: Optional labels as key-value pairs.
Returns:
Dictionary with the new voice_id.
"""
data = {"name": name}
if description:
data["description"] = description
if labels:
data["labels"] = json.dumps(labels)
files_data = []
for i, file_obj in enumerate(files):
files_data.append(("files", (f"sample_{i}.mp3", file_obj, "audio/mpeg")))
# Custom request for multipart/form-data
response = self._requests.post(
f"{self.API_BASE_URL}/voices/add",
headers={"xi-api-key": self._requests.extra_headers.get("xi-api-key", "") if self._requests.extra_headers else ""},
data=data,
files=files_data,
)
return self._handle_response(response)
def edit_voice(
self,
voice_id: str,
name: Optional[str] = None,
description: Optional[str] = None,
labels: Optional[Dict[str, str]] = None,
files: Optional[List[BytesIO]] = None,
) -> None:
"""
Edit an existing voice.
Args:
voice_id: ID of the voice to edit.
name: Optional new name for the voice.
description: Optional new description.
labels: Optional new labels.
files: Optional new audio samples.
"""
data = {}
if name:
data["name"] = name
if description:
data["description"] = description
if labels:
data["labels"] = json.dumps(labels)
files_data = []
if files:
for i, file_obj in enumerate(files):
files_data.append(
("files", (f"sample_{i}.mp3", file_obj, "audio/mpeg"))
)
# Custom request for multipart/form-data
response = self._requests.post(
f"{self.API_BASE_URL}/voices/{voice_id}/edit",
headers={"xi-api-key": self._requests.extra_headers.get("xi-api-key", "") if self._requests.extra_headers else ""},
data=data,
files=files_data,
)
self._handle_response(response)
def delete_voice(self, voice_id: str) -> None:
"""
Delete a voice.
Args:
voice_id: ID of the voice to delete.
"""
response = self._requests.delete(f"{self.API_BASE_URL}/voices/{voice_id}")
self._handle_response(response)
def get_voice_settings(self, voice_id: str) -> Dict[str, float]:
"""
Retrieve a voice's settings.
Args:
voice_id: ID of the voice.
Returns:
Voice settings (stability and similarity_boost).
"""
response = self._requests.get(f"{self.API_BASE_URL}/voices/{voice_id}/settings")
return self._handle_response(response)
def edit_voice_settings(
self, voice_id: str, stability: float, similarity_boost: float
) -> None:
"""
Edit a voice's settings.
Args:
voice_id: ID of the voice.
stability: Stability setting (0.0 to 1.0).
similarity_boost: Similarity boost setting (0.0 to 1.0).
"""
data = {"stability": stability, "similarity_boost": similarity_boost}
response = self._requests.post(
f"{self.API_BASE_URL}/voices/{voice_id}/settings/edit",
json=data,
)
self._handle_response(response)
# === Text-to-Speech Endpoints ===
def text_to_speech(
self,
voice_id: str,
text: str,
model_id: Optional[str] = None,
voice_settings: Optional[VoiceSettings] = None,
) -> bytes:
"""
Convert text to speech using a specific voice.
Args:
voice_id: ID of the voice to use.
text: Text to convert to speech.
model_id: Optional model ID to use (e.g., "eleven_multilingual_v2").
voice_settings: Optional voice settings to override defaults.
Returns:
Audio data as bytes.
"""
data = {"text": text }
if model_id:
data["model_id"] = model_id
if voice_settings:
data["voice_settings"] = voice_settings.model_dump()
response = self._requests.post(
f"{self.API_BASE_URL}/text-to-speech/{voice_id}",
json=data,
)
return self._handle_response(response, raw_binary=True)
def text_to_speech_stream(
self,
voice_id: str,
text: str,
model_id: Optional[str] = None,
voice_settings: Optional[VoiceSettings] = None,
) -> bytes:
"""
Stream text-to-speech in real-time.
Args:
voice_id: ID of the voice to use.
text: Text to convert to speech.
model_id: Optional model ID to use.
voice_settings: Optional voice settings.
Returns:
Complete audio data as bytes after stream finishes.
"""
data = {"text": text}
if model_id:
data["model_id"] = model_id
if voice_settings:
data["voice_settings"] = json.dumps(voice_settings.dict())
response = self._requests.post(
f"{self.API_BASE_URL}/text-to-speech/{voice_id}/stream",
json=data,
)
return self._handle_response(response, raw_binary=True)
# === Speech-to-Speech Endpoints ===
def speech_to_speech(
self,
voice_id: str,
audio: BytesIO,
voice_settings: Optional[VoiceSettings] = None,
) -> bytes:
"""
Transform audio from one voice to another.
Args:
voice_id: ID of the target voice.
audio: Input audio file.
voice_settings: Optional voice settings.
Returns:
Transformed audio data as bytes.
"""
data = {}
if voice_settings:
data["voice_settings"] = json.dumps(voice_settings.dict())
files = [("audio", ("input.mp3", audio, "audio/mpeg"))]
# Custom request for multipart/form-data
response = self._requests.post(
f"{self.API_BASE_URL}/speech-to-speech/{voice_id}",
headers={"xi-api-key": self._requests.extra_headers.get("xi-api-key", "") if self._requests.extra_headers else ""},
data=data,
files=files,
)
return self._handle_response(response, raw_binary=True)
def speech_to_speech_stream(
self,
voice_id: str,
audio: BytesIO,
voice_settings: Optional[VoiceSettings] = None,
) -> bytes:
"""
Stream speech-to-speech transformation in real-time.
Args:
voice_id: ID of the target voice.
audio: Input audio file.
voice_settings: Optional voice settings.
Returns:
Complete audio data as bytes after stream finishes.
"""
data = {}
if voice_settings:
data["voice_settings"] = json.dumps(voice_settings.dict())
files = [("audio", ("input.mp3", audio, "audio/mpeg"))]
# Custom request for multipart/form-data
response = self._requests.post(
f"{self.API_BASE_URL}/speech-to-speech/{voice_id}/stream",
headers={"xi-api-key": self._requests.extra_headers.get("xi-api-key", "") if self._requests.extra_headers else ""},
data=data,
files=files,
)
return self._handle_response(response, raw_binary=True)
# === History Endpoints ===
def get_history(
self,
page_size: Optional[int] = None,
start_after_history_item_id: Optional[str] = None,
) -> Dict[str, Any]:
"""
Retrieve a list of generation history items.
Args:
page_size: Optional number of items per page.
start_after_history_item_id: Optional pagination marker.
Returns:
History data.
"""
params = {}
if page_size:
params["page_size"] = page_size
if start_after_history_item_id:
params["start_after_history_item_id"] = start_after_history_item_id
response = self._requests.get(f"{self.API_BASE_URL}/history", params=params)
return self._handle_response(response)
def get_history_item(self, history_item_id: str) -> Dict[str, Any]:
"""
Retrieve metadata for a specific history item.
Args:
history_item_id: ID of the history item.
Returns:
History item metadata.
"""
response = self._requests.get(f"{self.API_BASE_URL}/history/{history_item_id}")
return self._handle_response(response)
def get_history_audio(self, history_item_id: str) -> bytes:
"""
Download audio from a history item.
Args:
history_item_id: ID of the history item.
Returns:
Audio data as bytes.
"""
response = self._requests.get(
f"{self.API_BASE_URL}/history/{history_item_id}/audio"
)
return self._handle_response(response, raw_binary=True)
def delete_history_item(self, history_item_id: str) -> None:
"""
Delete a history item.
Args:
history_item_id: ID of the history item to delete.
"""
response = self._requests.delete(
f"{self.API_BASE_URL}/history/{history_item_id}"
)
self._handle_response(response)
def download_history_items(self, history_item_ids: List[str]) -> bytes:
"""
Download multiple history items as a ZIP file.
Args:
history_item_ids: List of history item IDs to download.
Returns:
ZIP file data as bytes.
"""
data = {"history_item_ids": history_item_ids}
response = self._requests.post(
f"{self.API_BASE_URL}/history/download",
json=data,
)
return self._handle_response(response, raw_binary=True)
# === User Endpoints ===
def get_user_info(self) -> Dict[str, Any]:
"""
Retrieve user account information.
Returns:
User information.
"""
response = self._requests.get(f"{self.API_BASE_URL}/user")
return self._handle_response(response)
def get_subscription_info(self) -> Dict[str, Any]:
"""
Retrieve subscription details.
Returns:
Subscription information.
"""
response = self._requests.get(f"{self.API_BASE_URL}/user/subscription")
return self._handle_response(response)

View File

@@ -0,0 +1,36 @@
"""
Authentication module for ElevenLabs API integration.
This module provides credential types and test credentials for the ElevenLabs API integration.
It defines the structure for API key credentials used to authenticate with the ElevenLabs API
and provides mock credentials for testing purposes.
"""
from typing import Literal
from pydantic import SecretStr
from backend.data.model import APIKeyCredentials, CredentialsMetaInput
from backend.integrations.providers import ProviderName
# Define the type of credentials input expected for ElevenLabs API
ElevenLabsCredentialsInput = CredentialsMetaInput[
Literal[ProviderName.ELEVENLABS], Literal["api_key"]
]
# Mock credentials for testing ElevenLabs API integration
TEST_CREDENTIALS = APIKeyCredentials(
id="f8274359-45c8-48b7-b54d-a1c8c09ac2e8",
provider="elevenlabs",
api_key=SecretStr("mock-elevenlabs-api-key"),
title="Mock ElevenLabs API key",
expires_at=None,
)
# Dictionary representation of test credentials for input fields
TEST_CREDENTIALS_INPUT = {
"provider": TEST_CREDENTIALS.provider,
"id": TEST_CREDENTIALS.id,
"type": TEST_CREDENTIALS.type,
"title": TEST_CREDENTIALS.title,
}

View File

@@ -0,0 +1,495 @@
"""
ElevenLabs integration for text-to-speech capabilities.
This module provides blocks for interacting with the ElevenLabs API,
which offers high-quality text-to-speech and speech-to-speech conversion.
"""
import base64
import logging
from typing import Dict, List, Optional
from pydantic import BaseModel
from backend.data.block import Block, BlockCategory, BlockOutput, BlockSchema
from backend.data.model import APIKeyCredentials, CredentialsField, SchemaField
from ._api import ElevenLabsClient, ElevenLabsException, VoiceSettings
from ._auth import TEST_CREDENTIALS, TEST_CREDENTIALS_INPUT, ElevenLabsCredentialsInput
logger = logging.getLogger(__name__)
class Voice(BaseModel):
"""Model representing an ElevenLabs voice."""
voice_id: str
name: str
category: Optional[str] = None
description: Optional[str] = None
preview_url: Optional[str] = None
class TextToSpeechBlock(Block):
"""Block for converting text to speech using ElevenLabs API."""
class Input(BlockSchema):
text: str = SchemaField(
description="The text to convert to speech.",
placeholder="Enter your text here...",
)
voice_id: str = SchemaField(
description="The ID of the voice to use.",
placeholder="21m00Tcm4TlvDq8ikWAM",
)
model_id: Optional[str] = SchemaField(
description="The ID of the model to use (e.g., eleven_multilingual_v2).",
placeholder="eleven_multilingual_v2",
default=None,
advanced=True,
)
stability: float = SchemaField(
description="Voice stability (0.0 to 1.0). Higher values make voice more consistent.",
placeholder="0.75",
default=0.75,
advanced=True,
)
similarity_boost: float = SchemaField(
description="Similarity boost (0.0 to 1.0). Higher values make voice sound more like the original.",
placeholder="0.85",
default=0.85,
advanced=True,
)
stream: bool = SchemaField(
description="Whether to stream the audio response.",
default=False,
advanced=True,
)
credentials: ElevenLabsCredentialsInput = CredentialsField(
description="ElevenLabs API credentials."
)
class Output(BlockSchema):
audio_data: str = SchemaField(
description="The generated audio data in Base64 format."
)
content_type: str = SchemaField(
description="The MIME type of the audio (e.g., audio/mpeg)."
)
text: str = SchemaField(description="The text that was converted to speech.")
voice_id: str = SchemaField(description="The ID of the voice used.")
error: str = SchemaField(
description="Error message if the text-to-speech conversion failed."
)
def __init__(self):
super().__init__(
id="d923f6a8-beb2-4a57-90e2-b9c2f7e30f91",
description="Convert text to speech using ElevenLabs' high-quality voices.",
categories={BlockCategory.AI, BlockCategory.MULTIMEDIA},
input_schema=TextToSpeechBlock.Input,
output_schema=TextToSpeechBlock.Output,
test_input={
"text": "Hello, this is a test of the ElevenLabs text-to-speech API.",
"voice_id": "21m00Tcm4TlvDq8ikWAM",
"model_id": "eleven_multilingual_v2",
"stability": 0.75,
"similarity_boost": 0.85,
"stream": False,
"credentials": TEST_CREDENTIALS_INPUT,
},
test_output=[
("audio_data", "base64_encoded_audio_data"),
("content_type", "audio/mpeg"),
("text", "Hello, this is a test of the ElevenLabs text-to-speech API."),
("voice_id", "21m00Tcm4TlvDq8ikWAM"),
],
test_mock={
"generate_speech": lambda *args, **kwargs: (
base64.b64encode(b"mock_audio_data").decode("utf-8"),
"audio/mpeg",
)
},
test_credentials=TEST_CREDENTIALS,
)
def generate_speech(
self,
client: ElevenLabsClient,
text: str,
voice_id: str,
model_id: Optional[str] = None,
voice_settings: Optional[VoiceSettings] = None,
stream: bool = False,
) -> tuple[str, str]:
"""
Generate speech from text using ElevenLabs API.
Args:
client: Initialized ElevenLabsClient.
text: Text to convert to speech.
voice_id: ID of the voice to use.
model_id: Optional model ID.
voice_settings: Optional voice settings.
stream: Whether to use streaming endpoint.
Returns:
Tuple of (base64_encoded_audio_data, content_type).
"""
try:
if stream:
audio_data = client.text_to_speech_stream(
voice_id=voice_id,
text=text,
model_id=model_id,
voice_settings=voice_settings,
)
else:
audio_data = client.text_to_speech(
voice_id=voice_id,
text=text,
model_id=model_id,
voice_settings=voice_settings,
)
# Encode the binary audio data to base64 for transmission
base64_audio = base64.b64encode(audio_data).decode("utf-8")
return base64_audio, "audio/mpeg"
except ElevenLabsException as e:
logger.error(f"ElevenLabs API error: {str(e)}")
raise e
except Exception as e:
logger.error(f"Unexpected error in speech generation: {str(e)}")
raise ElevenLabsException(f"Failed to generate speech: {str(e)}", 500)
def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
"""
Run the text-to-speech conversion.
Args:
input_data: Input data containing text and voice settings.
credentials: ElevenLabs API credentials.
Yields:
Audio data and metadata.
"""
try:
client = ElevenLabsClient(credentials=credentials)
# Create voice settings if provided
voice_settings = None
if hasattr(input_data, "stability") and hasattr(
input_data, "similarity_boost"
):
voice_settings = VoiceSettings(
stability=input_data.stability,
similarity_boost=input_data.similarity_boost,
)
# Generate speech
audio_data, content_type = self.generate_speech(
client=client,
text=input_data.text,
voice_id=input_data.voice_id,
model_id=input_data.model_id,
voice_settings=voice_settings,
stream=input_data.stream,
)
# Yield results
yield "audio_data", audio_data
yield "content_type", content_type
yield "text", input_data.text
yield "voice_id", input_data.voice_id
except ElevenLabsException as e:
yield "error", f"ElevenLabs API error: {str(e)}"
except Exception as e:
yield "error", f"Unexpected error: {str(e)}"
class ListVoicesBlock(Block):
"""Block for listing available voices from ElevenLabs API."""
class Input(BlockSchema):
credentials: ElevenLabsCredentialsInput = CredentialsField(
description="ElevenLabs API credentials."
)
class Output(BlockSchema):
voices: List[Voice] = SchemaField(description="List of available voices.")
voice_ids: List[str] = SchemaField(description="List of voice IDs only.")
error: str = SchemaField(
description="Error message if the operation failed."
)
def __init__(self):
super().__init__(
id="4eaa8b1e-c0bc-45d2-a566-5fd4a5ce738d",
description="List all available voices from your ElevenLabs account.",
categories={BlockCategory.AI, BlockCategory.MULTIMEDIA},
input_schema=ListVoicesBlock.Input,
output_schema=ListVoicesBlock.Output,
test_input={
"credentials": TEST_CREDENTIALS_INPUT,
},
test_output=[
(
"voices",
[
Voice(
voice_id="21m00Tcm4TlvDq8ikWAM",
name="Rachel",
category="premade",
),
Voice(
voice_id="AZnzlk1XvdvUeBnXmlld",
name="Domi",
category="premade",
),
],
),
("voice_ids", ["21m00Tcm4TlvDq8ikWAM", "AZnzlk1XvdvUeBnXmlld"]),
],
test_mock={
"get_voices_list": lambda *args, **kwargs: [
Voice(
voice_id="21m00Tcm4TlvDq8ikWAM",
name="Rachel",
category="premade",
),
Voice(
voice_id="AZnzlk1XvdvUeBnXmlld",
name="Domi",
category="premade",
),
]
},
test_credentials=TEST_CREDENTIALS,
)
def get_voices_list(self, client: ElevenLabsClient) -> List[Voice]:
"""
Get list of voices from ElevenLabs.
Args:
client: Initialized ElevenLabsClient.
Returns:
List of Voice objects.
"""
try:
response = client.get_voices()
voices = []
for voice_data in response.get("voices", []):
voice = Voice(
voice_id=voice_data.get("voice_id"),
name=voice_data.get("name"),
category=voice_data.get("category"),
description=voice_data.get("description"),
preview_url=voice_data.get("preview_url"),
)
voices.append(voice)
return voices
except ElevenLabsException as e:
logger.error(f"ElevenLabs API error when listing voices: {str(e)}")
raise e
except Exception as e:
logger.error(f"Unexpected error when listing voices: {str(e)}")
raise ElevenLabsException(f"Failed to list voices: {str(e)}", 500)
def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
"""
Run the list voices operation.
Args:
input_data: Input data (mainly credentials).
credentials: ElevenLabs API credentials.
Yields:
List of voices and voice IDs.
"""
try:
client = ElevenLabsClient(credentials=credentials)
voices = self.get_voices_list(client)
yield "voices", voices
yield "voice_ids", [voice.voice_id for voice in voices]
except ElevenLabsException as e:
yield "error", f"ElevenLabs API error: {str(e)}"
except Exception as e:
yield "error", f"Unexpected error: {str(e)}"
class VoiceSettingsBlock(Block):
"""Block for managing voice settings in ElevenLabs."""
class Input(BlockSchema):
voice_id: str = SchemaField(
description="The ID of the voice to manage.",
placeholder="21m00Tcm4TlvDq8ikWAM",
)
action: str = SchemaField(
description="Action to perform on the voice settings.",
placeholder="get",
options=["get", "update"],
)
stability: Optional[float] = SchemaField(
description="Voice stability (0.0 to 1.0) for update action.",
placeholder="0.75",
default=None,
advanced=True,
)
similarity_boost: Optional[float] = SchemaField(
description="Similarity boost (0.0 to 1.0) for update action.",
placeholder="0.85",
default=None,
advanced=True,
)
credentials: ElevenLabsCredentialsInput = CredentialsField(
description="ElevenLabs API credentials."
)
class Output(BlockSchema):
stability: Optional[float] = SchemaField(
description="Current voice stability setting."
)
similarity_boost: Optional[float] = SchemaField(
description="Current voice similarity boost setting."
)
success: bool = SchemaField(description="Whether the operation was successful.")
message: str = SchemaField(description="Operation result message.")
error: str = SchemaField(
description="Error message if the operation failed."
)
def __init__(self):
super().__init__(
id="5f3c4b87-1a9d-47bc-9fb3-d7c4e63a10e9",
description="Get or update voice settings in ElevenLabs.",
categories={BlockCategory.AI, BlockCategory.MULTIMEDIA},
input_schema=VoiceSettingsBlock.Input,
output_schema=VoiceSettingsBlock.Output,
test_input={
"voice_id": "21m00Tcm4TlvDq8ikWAM",
"action": "get",
"credentials": TEST_CREDENTIALS_INPUT,
},
test_output=[
("stability", 0.75),
("similarity_boost", 0.85),
("success", True),
("message", "Voice settings retrieved successfully."),
],
test_mock={
"get_voice_settings": lambda *args, **kwargs: {
"stability": 0.75,
"similarity_boost": 0.85,
}
},
test_credentials=TEST_CREDENTIALS,
)
def get_voice_settings(
self, client: ElevenLabsClient, voice_id: str
) -> Dict[str, float]:
"""
Get current settings for a voice.
Args:
client: Initialized ElevenLabsClient.
voice_id: ID of the voice.
Returns:
Dictionary with stability and similarity_boost values.
"""
try:
return client.get_voice_settings(voice_id)
except Exception as e:
logger.error(f"Error getting voice settings: {str(e)}")
raise
def update_voice_settings(
self,
client: ElevenLabsClient,
voice_id: str,
stability: float,
similarity_boost: float,
) -> None:
"""
Update settings for a voice.
Args:
client: Initialized ElevenLabsClient.
voice_id: ID of the voice.
stability: Stability setting (0.0 to 1.0).
similarity_boost: Similarity boost setting (0.0 to 1.0).
"""
try:
client.edit_voice_settings(voice_id, stability, similarity_boost)
except Exception as e:
logger.error(f"Error updating voice settings: {str(e)}")
raise
def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
"""
Run the voice settings operation.
Args:
input_data: Input data with voice ID and action.
credentials: ElevenLabs API credentials.
Yields:
Current settings or operation result.
"""
try:
client = ElevenLabsClient(credentials=credentials)
if input_data.action == "get":
settings = self.get_voice_settings(client, input_data.voice_id)
yield "stability", settings.get("stability")
yield "similarity_boost", settings.get("similarity_boost")
yield "success", True
yield "message", "Voice settings retrieved successfully."
elif input_data.action == "update":
if input_data.stability is None or input_data.similarity_boost is None:
yield "error", "Both stability and similarity_boost must be provided for update action."
yield "success", False
yield "message", "Update failed: missing parameters."
return
self.update_voice_settings(
client,
input_data.voice_id,
input_data.stability,
input_data.similarity_boost,
)
# Get updated settings
updated_settings = self.get_voice_settings(client, input_data.voice_id)
yield "stability", updated_settings.get("stability")
yield "similarity_boost", updated_settings.get("similarity_boost")
yield "success", True
yield "message", "Voice settings updated successfully."
else:
yield "error", f"Unknown action: {input_data.action}"
yield "success", False
yield "message", f"Failed: {input_data.action} is not a valid action."
except ElevenLabsException as e:
yield "error", f"ElevenLabs API error: {str(e)}"
yield "success", False
yield "message", f"Operation failed: {str(e)}"
except Exception as e:
yield "error", f"Unexpected error: {str(e)}"
yield "success", False
yield "message", f"Operation failed with unexpected error: {str(e)}"

View File

@@ -0,0 +1,137 @@
"""
API module for Example API integration.
This module provides a example of how to create a client for an API.
"""
# We also have a Json Wrapper library available in backend.util.json
from json import JSONDecodeError
from typing import Any, Optional
from pydantic import BaseModel
from backend.data.model import APIKeyCredentials
# This is a wrapper around the requests library that is used to make API requests.
from backend.util.request import Requests
class ExampleAPIException(Exception):
def __init__(self, message: str, status_code: int):
super().__init__(message)
self.status_code = status_code
class CreateResourceResponse(BaseModel):
message: str
is_funny: bool
class GetResourceResponse(BaseModel):
message: str
is_funny: bool
class ExampleClient:
"""Client for the Example API"""
API_BASE_URL = "https://api.example.com/v1"
def __init__(
self,
credentials: Optional[APIKeyCredentials] = None,
custom_requests: Optional[Requests] = None,
):
if custom_requests:
self._requests = custom_requests
else:
headers: dict[str, str] = {
"Content-Type": "application/json",
}
if credentials:
headers["Authorization"] = credentials.auth_header()
self._requests = Requests(
extra_headers=headers,
raise_for_status=False,
)
@staticmethod
def _handle_response(response) -> Any:
"""
Handles API response and checks for errors.
Args:
response: The response object from the request.
Returns:
The parsed JSON response data.
Raises:
ExampleAPIException: If the API request fails.
"""
if not response.ok:
try:
error_data = response.json()
error_message = error_data.get("error", {}).get("message", "")
except JSONDecodeError:
error_message = response.text
raise ExampleAPIException(
f"Example API request failed ({response.status_code}): {error_message}",
response.status_code,
)
response_data = response.json()
if "errors" in response_data:
# This is an example error and needs to be
# replaced with how the real API returns errors
error_messages = [
error.get("message", "") for error in response_data["errors"]
]
raise ExampleAPIException(
f"Example API returned errors: {', '.join(error_messages)}",
response.status_code,
)
return response_data
def get_resource(self, resource_id: str) -> GetResourceResponse:
"""
Fetches a resource from the Example API.
Args:
resource_id: The ID of the resource to fetch.
Returns:
The resource data as a GetResourceResponse object.
Raises:
ExampleAPIException: If the API request fails.
"""
try:
response = self._requests.get(
f"{self.API_BASE_URL}/resources/{resource_id}"
)
return GetResourceResponse(**self._handle_response(response))
except Exception as e:
raise ExampleAPIException(f"Failed to get resource: {str(e)}", 500)
def create_resource(self, data: dict) -> CreateResourceResponse:
"""
Creates a new resource via the Example API.
Args:
data: The resource data to create.
Returns:
The created resource data as a CreateResourceResponse object.
Raises:
ExampleAPIException: If the API request fails.
"""
try:
response = self._requests.post(f"{self.API_BASE_URL}/resources", json=data)
return CreateResourceResponse(**self._handle_response(response))
except Exception as e:
raise ExampleAPIException(f"Failed to create resource: {str(e)}", 500)

View File

@@ -0,0 +1,37 @@
"""
Authentication module for Example API integration.
This module provides credential types and test credentials for the Example API integration.
It defines the structure for API key credentials used to authenticate with the Example API
and provides mock credentials for testing purposes.
"""
from typing import Literal
from pydantic import SecretStr
from backend.data.model import APIKeyCredentials, CredentialsMetaInput
from backend.integrations.providers import ProviderName
# Define the type of credentials input expected for Example API
ExampleCredentialsInput = CredentialsMetaInput[
Literal[ProviderName.EXAMPLE_PROVIDER], Literal["api_key"]
]
# Mock credentials for testing Example API integration
TEST_CREDENTIALS = APIKeyCredentials(
id="9191c4f0-498f-4235-a79c-59c0e37454d4",
provider="example-provider",
api_key=SecretStr("mock-example-api-key"),
title="Mock Example API key",
expires_at=None,
)
# Dictionary representation of test credentials for input fields
TEST_CREDENTIALS_INPUT = {
"provider": TEST_CREDENTIALS.provider,
"id": TEST_CREDENTIALS.id,
"type": TEST_CREDENTIALS.type,
"title": TEST_CREDENTIALS.title,
}

View File

@@ -0,0 +1,154 @@
import logging
from pydantic import BaseModel
from backend.data.block import Block, BlockCategory, BlockOutput, BlockSchema
from backend.data.model import APIKeyCredentials, CredentialsField, SchemaField
from ._api import ExampleClient
from ._auth import TEST_CREDENTIALS, TEST_CREDENTIALS_INPUT, ExampleCredentialsInput
logger = logging.getLogger(__name__)
class GreetingMessage(BaseModel):
message: str
is_funny: bool
class ExampleBlock(Block):
class Input(BlockSchema):
name: str = SchemaField(
description="The name of the example block", placeholder="Enter a name"
)
greetings: list[str] = SchemaField(
description="The greetings to display", default=["Hello", "Hi", "Hey"]
)
is_funny: bool = SchemaField(
description="Whether the block is funny",
placeholder="True",
default=True,
# Advanced fields are moved to the "Advanced" dropdown in the UI
advanced=True,
)
greeting_context: str = SchemaField(
description="The context of the greeting",
placeholder="Enter a context",
default="The user is looking for an inspirational greeting",
# Hidden fields are not shown in the UI at all
hidden=True,
)
# Only if the block needs credentials
credentials: ExampleCredentialsInput = CredentialsField(
description="The credentials for the example block"
)
class Output(BlockSchema):
response: GreetingMessage = SchemaField(
description="The response object generated by the example block."
)
all_responses: list[GreetingMessage] = SchemaField(
description="All the responses from the example block."
)
greeting_count: int = SchemaField(
description="The number of greetings in the input."
)
error: str = SchemaField(description="The error from the example block")
def __init__(self):
super().__init__(
# The unique identifier for the block, this value will be persisted in the DB.
# It should be unique and constant across the application run.
# Use the UUID format for the ID.
id="380694d5-3b2e-4130-bced-b43752b70de9",
# The description of the block, explaining what the block does.
description="The example block",
# The set of categories that the block belongs to.
# Each category is an instance of BlockCategory Enum.
categories={BlockCategory.BASIC},
# The schema, defined as a Pydantic model, for the input data.
input_schema=ExampleBlock.Input,
# The schema, defined as a Pydantic model, for the output data.
output_schema=ExampleBlock.Output,
# The list or single sample input data for the block, for testing.
# This is an instance of the Input schema with sample values.
test_input={
"name": "Craig",
"greetings": ["Hello", "Hi", "Hey"],
"is_funny": True,
"credentials": TEST_CREDENTIALS_INPUT,
},
# The list or single expected output if the test_input is run.
# Each output is a tuple of (output_name, output_data).
test_output=[
("response", GreetingMessage(message="Hello, world!", is_funny=True)),
(
"response",
GreetingMessage(message="Hello, world!", is_funny=True),
), # We mock the function
(
"response",
GreetingMessage(message="Hello, world!", is_funny=True),
), # We mock the function
(
"all_responses",
[
GreetingMessage(message="Hello, world!", is_funny=True),
GreetingMessage(message="Hello, world!", is_funny=True),
GreetingMessage(message="Hello, world!", is_funny=True),
],
),
("greeting_count", 3),
],
# Function names on the block implementation to mock on test run.
# Each mock is a dictionary with function names as keys and mock implementations as values.
test_mock={
"my_function_that_can_be_mocked": lambda *args, **kwargs: GreetingMessage(
message="Hello, world!", is_funny=True
)
},
# The credentials required for testing the block.
# This is an instance of APIKeyCredentials with sample values.
test_credentials=TEST_CREDENTIALS,
)
def my_function_that_can_be_mocked(
self, name: str, credentials: APIKeyCredentials
) -> GreetingMessage:
logger.info("my_function_that_can_be_mocked called with input: %s", name)
# Use the ExampleClient from _api.py to make an API call
client = ExampleClient(credentials=credentials)
# Create a sample resource using the client
resource_data = {"name": name, "type": "greeting"}
# If your API response object matches the return type of the function,
# there is no need to convert the object. In this case we have a different
# object type for the response and the return type of the function.
return GreetingMessage(**client.create_resource(resource_data).model_dump())
def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
"""
The run function implements the block's core logic. It processes the input_data
and yields the block's output.
In addition to credentials, the following parameters can be specified:
graph_id: The ID of the graph containing this block.
node_id: The ID of this block's node in the graph.
graph_exec_id: The ID of the current graph execution.
node_exec_id: The ID of the current node execution.
user_id: The ID of the user executing the block.
"""
rtn_all_responses: list[GreetingMessage] = []
# Here we deomonstrate best practice for blocks that need to yield multiple items.
# We yield each item from the list to allow for operations on each element.
# We also yield the complete list for situations when the full list is needed.
for greeting in input_data.greetings:
message = self.my_function_that_can_be_mocked(greeting, credentials)
rtn_all_responses.append(message)
yield "response", message
yield "all_responses", rtn_all_responses
yield "greeting_count", len(input_data.greetings)

View File

@@ -0,0 +1,65 @@
import logging
from backend.data.block import (
Block,
BlockCategory,
BlockManualWebhookConfig,
BlockOutput,
BlockSchema,
)
from backend.data.model import SchemaField
from backend.integrations.webhooks.example import ExampleWebhookEventType
logger = logging.getLogger(__name__)
class ExampleTriggerBlock(Block):
"""
A trigger block that is activated by an external webhook event.
Unlike standard blocks that are manually executed, trigger blocks are automatically
activated when a webhook event is received from the specified provider.
"""
class Input(BlockSchema):
# The payload field is hidden because it's automatically populated by the webhook
# system rather than being manually entered by the user
payload: dict = SchemaField(hidden=True)
class Output(BlockSchema):
event_data: dict = SchemaField(
description="The contents of the example webhook event."
)
def __init__(self):
super().__init__(
id="7c5933ce-d60c-42dd-9c4e-db82496474a3",
description="This block will output the contents of an example webhook event.",
categories={BlockCategory.BASIC},
input_schema=ExampleTriggerBlock.Input,
output_schema=ExampleTriggerBlock.Output,
# The webhook_config is a key difference from standard blocks
# It defines which external service can trigger this block and what type of events it responds to
webhook_config=BlockManualWebhookConfig(
provider="example_provider", # The external service that will send webhook events
webhook_type=ExampleWebhookEventType.EXAMPLE_EVENT, # The specific event type this block responds to
),
# Test input for trigger blocks should mimic the payload structure that would be received from the webhook
test_input=[
{
"payload": {
"event_type": "example",
"data": "Sample webhook data",
}
}
],
test_output=[
("event_data", {"event_type": "example", "data": "Sample webhook data"})
],
)
def run(self, input_data: Input, **kwargs) -> BlockOutput:
# For trigger blocks, the run method is called automatically when a webhook event is received
# The payload from the webhook is passed in as input_data.payload
logger.info("Example trigger block run with payload: %s", input_data.payload)
yield "event_data", input_data.payload

View File

@@ -2,6 +2,7 @@ from typing import Type
from backend.blocks.ai_music_generator import AIMusicGeneratorBlock
from backend.blocks.ai_shortform_video_block import AIShortformVideoCreatorBlock
from backend.blocks.example.example import ExampleBlock
from backend.blocks.ideogram import IdeogramModelBlock
from backend.blocks.jina.embeddings import JinaEmbeddingBlock
from backend.blocks.jina.search import ExtractWebsiteContentBlock, SearchTheWebBlock
@@ -23,6 +24,7 @@ from backend.data.cost import BlockCost, BlockCostType
from backend.integrations.credentials_store import (
anthropic_credentials,
did_credentials,
example_credentials,
groq_credentials,
ideogram_credentials,
jina_credentials,
@@ -267,4 +269,16 @@ BLOCK_COSTS: dict[Type[Block], list[BlockCost]] = {
)
],
SmartDecisionMakerBlock: LLM_COST,
ExampleBlock: [
BlockCost(
cost_amount=1,
cost_filter={
"credentials": {
"id": example_credentials.id,
"provider": example_credentials.provider,
"type": example_credentials.type,
}
},
)
],
}

View File

@@ -169,7 +169,24 @@ zerobounce_credentials = APIKeyCredentials(
expires_at=None,
)
example_credentials = APIKeyCredentials(
id="a2b7f68f-aa6a-4995-99ec-b45b40d33498",
provider="example-provider",
api_key=SecretStr(settings.secrets.example_api_key),
title="Use Credits for Example",
expires_at=None,
)
elevenlabs_credentials = APIKeyCredentials(
id="a2b7f68f-aa6a-4995-99ec-b45b40d33498",
provider="elevenlabs",
api_key=SecretStr(settings.secrets.elevenlabs_api_key),
title="Use Credits for ElevenLabs",
expires_at=None,
)
DEFAULT_CREDENTIALS = [
example_credentials,
ollama_credentials,
revid_credentials,
ideogram_credentials,
@@ -225,6 +242,8 @@ class IntegrationCredentialsStore:
all_credentials.append(ollama_credentials)
# These will only be added if the API key is set
if settings.secrets.example_api_key:
all_credentials.append(example_credentials)
if settings.secrets.revid_api_key:
all_credentials.append(revid_credentials)
if settings.secrets.ideogram_api_key:

View File

@@ -10,6 +10,7 @@ class ProviderName(str, Enum):
D_ID = "d_id"
E2B = "e2b"
EXA = "exa"
EXAMPLE_PROVIDER = "example-provider"
FAL = "fal"
GITHUB = "github"
GOOGLE = "google"
@@ -39,4 +40,5 @@ class ProviderName(str, Enum):
TODOIST = "todoist"
UNREAL_SPEECH = "unreal_speech"
ZEROBOUNCE = "zerobounce"
ELEVENLABS = "elevenlabs"
# --8<-- [end:ProviderName]

View File

@@ -1,6 +1,7 @@
from typing import TYPE_CHECKING
from .compass import CompassWebhookManager
from .example import ExampleWebhookManager
from .github import GithubWebhooksManager
from .slant3d import Slant3DWebhooksManager
@@ -15,6 +16,7 @@ WEBHOOK_MANAGERS_BY_NAME: dict["ProviderName", type["BaseWebhooksManager"]] = {
CompassWebhookManager,
GithubWebhooksManager,
Slant3DWebhooksManager,
ExampleWebhookManager,
]
}
# --8<-- [end:WEBHOOK_MANAGERS_BY_NAME]

View File

@@ -0,0 +1,147 @@
import logging
import requests
from fastapi import Request
from strenum import StrEnum
from backend.data import integrations
from backend.data.model import APIKeyCredentials, Credentials
from backend.integrations.providers import ProviderName
from ._manual_base import ManualWebhookManagerBase
logger = logging.getLogger(__name__)
class ExampleWebhookEventType(StrEnum):
EXAMPLE_EVENT = "example_event"
ANOTHER_EXAMPLE_EVENT = "another_example_event"
# ExampleWebhookManager is a class that manages webhooks for a hypothetical provider.
# It extends ManualWebhookManagerBase, which provides base functionality for manual webhook management.
class ExampleWebhookManager(ManualWebhookManagerBase):
# Define the provider name for this webhook manager.
PROVIDER_NAME = ProviderName.EXAMPLE_PROVIDER
# Define the types of webhooks this manager can handle.
WebhookEventType = ExampleWebhookEventType
BASE_URL = "https://api.example.com"
@classmethod
async def validate_payload(
cls, webhook: integrations.Webhook, request: Request
) -> tuple[dict, str]:
"""
Validate the incoming webhook payload.
Args:
webhook (integrations.Webhook): The webhook object.
request (Request): The incoming request object.
Returns:
tuple: A tuple containing the payload as a dictionary and the event type as a string.
"""
# Extract the JSON payload from the request.
payload = await request.json()
# Set the event type based on the webhook type in the payload.
event_type = payload.get("webhook_type", ExampleWebhookEventType.EXAMPLE_EVENT)
# For the payload its better to return a pydantic model
# rather than a weakly typed dict here
return payload, event_type
async def _register_webhook(
self,
credentials: Credentials,
webhook_type: str,
resource: str,
events: list[str],
ingress_url: str,
secret: str,
) -> tuple[str, dict]:
"""
Register a new webhook with the provider.
Args:
credentials (Credentials): The credentials required for authentication.
webhook_type (str): The type of webhook to register.
resource (str): The resource associated with the webhook.
events (list[str]): The list of events to subscribe to.
ingress_url (str): The URL where the webhook will send data.
secret (str): A secret for securing the webhook.
Returns:
tuple: A tuple containing the provider's webhook ID, if any, and the webhook configuration as a dictionary.
"""
# Ensure the credentials are of the correct type.
if not isinstance(credentials, APIKeyCredentials):
raise ValueError("API key is required to register a webhook")
# Prepare the headers for the request, including the API key.
headers = {
"api-key": credentials.api_key.get_secret_value(),
"Content-Type": "application/json",
}
# Prepare the payload for the request. Note that the events list is not used.
# This is just a fake example
payload = {"endPoint": ingress_url}
# Send a POST request to register the webhook.
response = requests.post(
f"{self.BASE_URL}/example/webhookSubscribe", headers=headers, json=payload
)
# Check if the response indicates a failure.
if not response.ok:
error = response.json().get("error", "Unknown error")
raise RuntimeError(f"Failed to register webhook: {error}")
# Prepare the webhook configuration to return.
webhook_config = {
"endpoint": ingress_url,
"provider": self.PROVIDER_NAME,
"events": ["example_event"],
"type": webhook_type,
}
return "", webhook_config
async def _deregister_webhook(
self, webhook: integrations.Webhook, credentials: Credentials
) -> None:
"""
Deregister a webhook with the provider.
Args:
webhook (integrations.Webhook): The webhook object to deregister.
credentials (Credentials): The credentials associated with the webhook.
Raises:
ValueError: If the webhook doesn't belong to the credentials or if deregistration fails.
"""
if webhook.credentials_id != credentials.id:
raise ValueError(
f"Webhook #{webhook.id} does not belong to credentials {credentials.id}"
)
if not isinstance(credentials, APIKeyCredentials):
raise ValueError("API key is required to deregister a webhook")
headers = {
"api-key": credentials.api_key.get_secret_value(),
"Content-Type": "application/json",
}
# Construct the delete URL based on the webhook information
delete_url = f"{self.BASE_URL}/example/webhooks/{webhook.provider_webhook_id}"
response = requests.delete(delete_url, headers=headers)
if response.status_code not in [204, 404]:
# 204 means successful deletion, 404 means the webhook was already deleted
error = response.json().get("error", "Unknown error")
raise ValueError(f"Failed to delete webhook: {error}")
# If we reach here, the webhook was successfully deleted or didn't exist

View File

@@ -404,8 +404,11 @@ class Secrets(UpdateTrackingModel["Secrets"], BaseSettings):
smartlead_api_key: str = Field(default="", description="SmartLead API Key")
zerobounce_api_key: str = Field(default="", description="ZeroBounce API Key")
example_api_key: str = Field(default="", description="Example API Key")
# Add more secret fields as needed
elevenlabs_api_key: str = Field(default="", description="ElevenLabs API Key")
model_config = SettingsConfigDict(
env_file=".env",
env_file_encoding="utf-8",

View File

@@ -90,6 +90,8 @@ export const providerIcons: Record<
smartlead: fallbackIcon,
todoist: fallbackIcon,
zerobounce: fallbackIcon,
example: fallbackIcon,
elevenlabs: fallbackIcon,
};
// --8<-- [end:ProviderIconsEmbed]

View File

@@ -52,6 +52,8 @@ const providerDisplayNames: Record<CredentialsProviderName, string> = {
todoist: "Todoist",
unreal_speech: "Unreal Speech",
zerobounce: "ZeroBounce",
example: "Example",
elevenlabs: "ElevenLabs",
} as const;
// --8<-- [end:CredentialsProviderNames]

View File

@@ -148,6 +148,8 @@ export const PROVIDER_NAMES = {
UNREAL_SPEECH: "unreal_speech",
TODOIST: "todoist",
ZEROBOUNCE: "zerobounce",
EXAMPLE: "example",
ELEVENLABS: "elevenlabs",
} as const;
// --8<-- [end:BlockIOCredentialsSubSchema]