Add Telegram blocks

This commit is contained in:
Krzysztof Czerwinski
2026-01-29 19:09:07 +09:00
parent 7668c17d9c
commit 808dddfc26
8 changed files with 1081 additions and 0 deletions

View File

@@ -0,0 +1 @@
# Telegram Bot blocks for AutoGPT Platform

View File

@@ -0,0 +1,117 @@
"""
Telegram Bot API helper functions.
Provides utilities for making authenticated requests to the Telegram Bot API.
"""
import logging
from typing import Any, Optional
from backend.data.model import APIKeyCredentials
from backend.util.request import Requests
logger = logging.getLogger(__name__)
TELEGRAM_API_BASE = "https://api.telegram.org"
class TelegramAPIException(Exception):
"""Exception raised for Telegram API errors."""
def __init__(self, message: str, error_code: int = 0):
super().__init__(message)
self.error_code = error_code
def get_bot_api_url(bot_token: str, method: str) -> str:
"""Construct Telegram Bot API URL for a method."""
return f"{TELEGRAM_API_BASE}/bot{bot_token}/{method}"
def get_file_url(bot_token: str, file_path: str) -> str:
"""Construct Telegram file download URL."""
return f"{TELEGRAM_API_BASE}/file/bot{bot_token}/{file_path}"
async def call_telegram_api(
credentials: APIKeyCredentials,
method: str,
data: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
"""
Make a request to the Telegram Bot API.
Args:
credentials: Bot token credentials
method: API method name (e.g., "sendMessage", "getFile")
data: Request parameters
Returns:
API response result
Raises:
TelegramAPIException: If the API returns an error
"""
token = credentials.api_key.get_secret_value()
url = get_bot_api_url(token, method)
response = await Requests().post(url, json=data or {})
result = response.json()
if not result.get("ok"):
error_code = result.get("error_code", 0)
description = result.get("description", "Unknown error")
raise TelegramAPIException(description, error_code)
return result.get("result", {})
async def get_file_info(
credentials: APIKeyCredentials, file_id: str
) -> dict[str, Any]:
"""
Get file information from Telegram.
Args:
credentials: Bot token credentials
file_id: Telegram file_id from message
Returns:
File info dict containing file_id, file_unique_id, file_size, file_path
"""
return await call_telegram_api(credentials, "getFile", {"file_id": file_id})
async def get_file_download_url(credentials: APIKeyCredentials, file_id: str) -> str:
"""
Get the download URL for a Telegram file.
Args:
credentials: Bot token credentials
file_id: Telegram file_id from message
Returns:
Full download URL
"""
token = credentials.api_key.get_secret_value()
result = await get_file_info(credentials, file_id)
file_path = result.get("file_path")
if not file_path:
raise TelegramAPIException("No file_path returned from getFile")
return get_file_url(token, file_path)
async def download_telegram_file(credentials: APIKeyCredentials, file_id: str) -> bytes:
"""
Download a file from Telegram servers.
Args:
credentials: Bot token credentials
file_id: Telegram file_id
Returns:
File content as bytes
"""
url = await get_file_download_url(credentials, file_id)
response = await Requests().get(url)
return response.content

View File

@@ -0,0 +1,47 @@
"""
Telegram Bot credentials handling.
Telegram bots use an API key (bot token) obtained from @BotFather.
"""
from typing import Literal
from pydantic import SecretStr
from backend.data.model import (
APIKeyCredentials,
CredentialsField,
CredentialsMetaInput,
)
from backend.integrations.providers import ProviderName
# Bot token credentials (API key style)
TelegramCredentials = APIKeyCredentials
TelegramCredentialsInput = CredentialsMetaInput[
Literal[ProviderName.TELEGRAM], Literal["api_key"]
]
def TelegramCredentialsField() -> TelegramCredentialsInput:
"""Creates a Telegram bot token credentials field."""
return CredentialsField(
description="Telegram Bot API token from @BotFather. "
"Create a bot at https://t.me/BotFather to get your token."
)
# Test credentials for unit tests
TEST_CREDENTIALS = APIKeyCredentials(
id="01234567-89ab-cdef-0123-456789abcdef",
provider="telegram",
api_key=SecretStr("test_telegram_bot_token"),
title="Mock Telegram Bot Token",
expires_at=None,
)
TEST_CREDENTIALS_INPUT = {
"provider": TEST_CREDENTIALS.provider,
"id": TEST_CREDENTIALS.id,
"type": TEST_CREDENTIALS.type,
"title": TEST_CREDENTIALS.type,
}

View File

@@ -0,0 +1,519 @@
"""
Telegram action blocks for sending messages, photos, and voice messages.
"""
import base64
import logging
from enum import Enum
from typing import Optional
from backend.data.block import (
Block,
BlockCategory,
BlockOutput,
BlockSchemaInput,
BlockSchemaOutput,
)
from backend.data.execution import ExecutionContext
from backend.data.model import APIKeyCredentials, SchemaField
from backend.util.file import store_media_file
from backend.util.type import MediaFileType
from ._api import call_telegram_api, download_telegram_file
from ._auth import (
TEST_CREDENTIALS,
TEST_CREDENTIALS_INPUT,
TelegramCredentialsField,
TelegramCredentialsInput,
)
logger = logging.getLogger(__name__)
class ParseMode(str, Enum):
"""Telegram message parse modes."""
NONE = ""
MARKDOWN = "Markdown"
MARKDOWNV2 = "MarkdownV2"
HTML = "HTML"
class SendTelegramMessageBlock(Block):
"""Send a text message to a Telegram chat."""
class Input(BlockSchemaInput):
credentials: TelegramCredentialsInput = TelegramCredentialsField()
chat_id: int = SchemaField(
description="The chat ID to send the message to. "
"Get this from the trigger block's chat_id output."
)
text: str = SchemaField(
description="The text message to send (max 4096 characters)"
)
parse_mode: ParseMode = SchemaField(
description="Message formatting mode (Markdown, HTML, or none)",
default=ParseMode.NONE,
advanced=True,
)
reply_to_message_id: Optional[int] = SchemaField(
description="Message ID to reply to",
default=None,
advanced=True,
)
disable_notification: bool = SchemaField(
description="Send message silently (no notification sound)",
default=False,
advanced=True,
)
class Output(BlockSchemaOutput):
message_id: int = SchemaField(description="The ID of the sent message")
status: str = SchemaField(description="Status of the operation")
def __init__(self):
super().__init__(
id="b2c3d4e5-f6a7-8901-bcde-f23456789012",
description="Send a text message to a Telegram chat.",
categories={BlockCategory.SOCIAL},
input_schema=SendTelegramMessageBlock.Input,
output_schema=SendTelegramMessageBlock.Output,
test_input={
"chat_id": 12345678,
"text": "Hello from AutoGPT!",
"credentials": TEST_CREDENTIALS_INPUT,
},
test_credentials=TEST_CREDENTIALS,
test_output=[
("message_id", 123),
("status", "Message sent"),
],
test_mock={
"_send_message": lambda *args, **kwargs: {
"message_id": 123,
}
},
)
async def _send_message(
self,
credentials: APIKeyCredentials,
chat_id: int,
text: str,
parse_mode: str,
reply_to_message_id: Optional[int],
disable_notification: bool,
) -> dict:
data: dict = {
"chat_id": chat_id,
"text": text,
}
if parse_mode:
data["parse_mode"] = parse_mode
if reply_to_message_id:
data["reply_to_message_id"] = reply_to_message_id
if disable_notification:
data["disable_notification"] = True
return await call_telegram_api(credentials, "sendMessage", data)
async def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
try:
result = await self._send_message(
credentials=credentials,
chat_id=input_data.chat_id,
text=input_data.text,
parse_mode=input_data.parse_mode.value,
reply_to_message_id=input_data.reply_to_message_id,
disable_notification=input_data.disable_notification,
)
yield "message_id", result.get("message_id", 0)
yield "status", "Message sent"
except Exception as e:
raise ValueError(f"Failed to send message: {e}")
class SendTelegramPhotoBlock(Block):
"""Send a photo to a Telegram chat."""
class Input(BlockSchemaInput):
credentials: TelegramCredentialsInput = TelegramCredentialsField()
chat_id: int = SchemaField(
description="The chat ID to send the photo to"
)
photo: MediaFileType = SchemaField(
description="Photo to send (URL, data URI, or workspace:// reference). "
"URLs are preferred as Telegram will fetch them directly."
)
caption: str = SchemaField(
description="Caption for the photo (max 1024 characters)",
default="",
advanced=True,
)
parse_mode: ParseMode = SchemaField(
description="Caption formatting mode",
default=ParseMode.NONE,
advanced=True,
)
reply_to_message_id: Optional[int] = SchemaField(
description="Message ID to reply to",
default=None,
advanced=True,
)
class Output(BlockSchemaOutput):
message_id: int = SchemaField(description="The ID of the sent message")
status: str = SchemaField(description="Status of the operation")
def __init__(self):
super().__init__(
id="c3d4e5f6-a7b8-9012-cdef-345678901234",
description="Send a photo to a Telegram chat.",
categories={BlockCategory.SOCIAL},
input_schema=SendTelegramPhotoBlock.Input,
output_schema=SendTelegramPhotoBlock.Output,
test_input={
"chat_id": 12345678,
"photo": "https://example.com/image.jpg",
"credentials": TEST_CREDENTIALS_INPUT,
},
test_credentials=TEST_CREDENTIALS,
test_output=[
("message_id", 123),
("status", "Photo sent"),
],
test_mock={
"_send_photo": lambda *args, **kwargs: {"message_id": 123}
},
)
async def _send_photo(
self,
credentials: APIKeyCredentials,
chat_id: int,
photo_data: str,
caption: str,
parse_mode: str,
reply_to_message_id: Optional[int],
) -> dict:
data: dict = {
"chat_id": chat_id,
"photo": photo_data,
}
if caption:
data["caption"] = caption
if parse_mode:
data["parse_mode"] = parse_mode
if reply_to_message_id:
data["reply_to_message_id"] = reply_to_message_id
return await call_telegram_api(credentials, "sendPhoto", data)
async def run(
self,
input_data: Input,
*,
credentials: APIKeyCredentials,
execution_context: ExecutionContext,
**kwargs,
) -> BlockOutput:
try:
photo_input = input_data.photo
# If it's a URL, pass it directly to Telegram (it will fetch it)
if photo_input.startswith(("http://", "https://")):
photo_data = photo_input
else:
# For data URIs or workspace:// references, use store_media_file
photo_data = await store_media_file(
file=photo_input,
execution_context=execution_context,
return_format="for_external_api",
)
result = await self._send_photo(
credentials=credentials,
chat_id=input_data.chat_id,
photo_data=photo_data,
caption=input_data.caption,
parse_mode=input_data.parse_mode.value,
reply_to_message_id=input_data.reply_to_message_id,
)
yield "message_id", result.get("message_id", 0)
yield "status", "Photo sent"
except Exception as e:
raise ValueError(f"Failed to send photo: {e}")
class SendTelegramVoiceBlock(Block):
"""Send a voice message to a Telegram chat."""
class Input(BlockSchemaInput):
credentials: TelegramCredentialsInput = TelegramCredentialsField()
chat_id: int = SchemaField(
description="The chat ID to send the voice message to"
)
voice: MediaFileType = SchemaField(
description="Voice message to send (OGG format with OPUS codec). "
"Can be URL, data URI, or workspace:// reference."
)
caption: str = SchemaField(
description="Caption for the voice message",
default="",
advanced=True,
)
duration: Optional[int] = SchemaField(
description="Duration in seconds",
default=None,
advanced=True,
)
reply_to_message_id: Optional[int] = SchemaField(
description="Message ID to reply to",
default=None,
advanced=True,
)
class Output(BlockSchemaOutput):
message_id: int = SchemaField(description="The ID of the sent message")
status: str = SchemaField(description="Status of the operation")
def __init__(self):
super().__init__(
id="d4e5f6a7-b8c9-0123-def0-456789012345",
description="Send a voice message to a Telegram chat. "
"Voice must be OGG format with OPUS codec.",
categories={BlockCategory.SOCIAL},
input_schema=SendTelegramVoiceBlock.Input,
output_schema=SendTelegramVoiceBlock.Output,
test_input={
"chat_id": 12345678,
"voice": "https://example.com/voice.ogg",
"credentials": TEST_CREDENTIALS_INPUT,
},
test_credentials=TEST_CREDENTIALS,
test_output=[
("message_id", 123),
("status", "Voice sent"),
],
test_mock={
"_send_voice": lambda *args, **kwargs: {"message_id": 123}
},
)
async def _send_voice(
self,
credentials: APIKeyCredentials,
chat_id: int,
voice_data: str,
caption: str,
duration: Optional[int],
reply_to_message_id: Optional[int],
) -> dict:
data: dict = {
"chat_id": chat_id,
"voice": voice_data,
}
if caption:
data["caption"] = caption
if duration:
data["duration"] = duration
if reply_to_message_id:
data["reply_to_message_id"] = reply_to_message_id
return await call_telegram_api(credentials, "sendVoice", data)
async def run(
self,
input_data: Input,
*,
credentials: APIKeyCredentials,
execution_context: ExecutionContext,
**kwargs,
) -> BlockOutput:
try:
voice_input = input_data.voice
# If it's a URL, pass it directly to Telegram
if voice_input.startswith(("http://", "https://")):
voice_data = voice_input
else:
voice_data = await store_media_file(
file=voice_input,
execution_context=execution_context,
return_format="for_external_api",
)
result = await self._send_voice(
credentials=credentials,
chat_id=input_data.chat_id,
voice_data=voice_data,
caption=input_data.caption,
duration=input_data.duration,
reply_to_message_id=input_data.reply_to_message_id,
)
yield "message_id", result.get("message_id", 0)
yield "status", "Voice sent"
except Exception as e:
raise ValueError(f"Failed to send voice: {e}")
class ReplyToTelegramMessageBlock(Block):
"""Reply to a specific Telegram message."""
class Input(BlockSchemaInput):
credentials: TelegramCredentialsInput = TelegramCredentialsField()
chat_id: int = SchemaField(
description="The chat ID where the message is"
)
reply_to_message_id: int = SchemaField(
description="The message ID to reply to"
)
text: str = SchemaField(description="The reply text")
parse_mode: ParseMode = SchemaField(
description="Message formatting mode",
default=ParseMode.NONE,
advanced=True,
)
class Output(BlockSchemaOutput):
message_id: int = SchemaField(description="The ID of the reply message")
status: str = SchemaField(description="Status of the operation")
def __init__(self):
super().__init__(
id="e5f6a7b8-c9d0-1234-ef01-567890123456",
description="Reply to a specific message in a Telegram chat.",
categories={BlockCategory.SOCIAL},
input_schema=ReplyToTelegramMessageBlock.Input,
output_schema=ReplyToTelegramMessageBlock.Output,
test_input={
"chat_id": 12345678,
"reply_to_message_id": 42,
"text": "This is a reply!",
"credentials": TEST_CREDENTIALS_INPUT,
},
test_credentials=TEST_CREDENTIALS,
test_output=[
("message_id", 123),
("status", "Reply sent"),
],
test_mock={
"_send_reply": lambda *args, **kwargs: {"message_id": 123}
},
)
async def _send_reply(
self,
credentials: APIKeyCredentials,
chat_id: int,
reply_to_message_id: int,
text: str,
parse_mode: str,
) -> dict:
data: dict = {
"chat_id": chat_id,
"text": text,
"reply_to_message_id": reply_to_message_id,
}
if parse_mode:
data["parse_mode"] = parse_mode
return await call_telegram_api(credentials, "sendMessage", data)
async def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
try:
result = await self._send_reply(
credentials=credentials,
chat_id=input_data.chat_id,
reply_to_message_id=input_data.reply_to_message_id,
text=input_data.text,
parse_mode=input_data.parse_mode.value,
)
yield "message_id", result.get("message_id", 0)
yield "status", "Reply sent"
except Exception as e:
raise ValueError(f"Failed to send reply: {e}")
class GetTelegramFileBlock(Block):
"""Download a file from Telegram by file_id."""
class Input(BlockSchemaInput):
credentials: TelegramCredentialsInput = TelegramCredentialsField()
file_id: str = SchemaField(
description="The Telegram file_id to download. "
"Get this from trigger outputs (photo_file_id, voice_file_id, etc.)"
)
class Output(BlockSchemaOutput):
file: MediaFileType = SchemaField(
description="The downloaded file (workspace:// reference or data URI)"
)
status: str = SchemaField(description="Status of the operation")
def __init__(self):
super().__init__(
id="f6a7b8c9-d0e1-2345-f012-678901234567",
description="Download a file from Telegram using its file_id. "
"Use this to process photos, voice messages, or documents received.",
categories={BlockCategory.SOCIAL},
input_schema=GetTelegramFileBlock.Input,
output_schema=GetTelegramFileBlock.Output,
test_input={
"file_id": "AgACAgIAAxkBAAI...",
"credentials": TEST_CREDENTIALS_INPUT,
},
test_credentials=TEST_CREDENTIALS,
test_output=[
("file", "data:application/octet-stream;base64,dGVzdA=="),
("status", "File downloaded"),
],
test_mock={
"_download_file": lambda *args, **kwargs: b"test"
},
)
async def _download_file(
self,
credentials: APIKeyCredentials,
file_id: str,
) -> bytes:
return await download_telegram_file(credentials, file_id)
async def run(
self,
input_data: Input,
*,
credentials: APIKeyCredentials,
execution_context: ExecutionContext,
**kwargs,
) -> BlockOutput:
try:
# Download the file from Telegram
file_content = await self._download_file(
credentials=credentials,
file_id=input_data.file_id,
)
# Convert to data URI
mime_type = "application/octet-stream"
data_uri = (
f"data:{mime_type};base64,"
f"{base64.b64encode(file_content).decode()}"
)
# Store and get appropriate output format
file_result = await store_media_file(
file=data_uri,
execution_context=execution_context,
return_format="for_block_output",
)
yield "file", file_result
yield "status", "File downloaded"
except Exception as e:
raise ValueError(f"Failed to download file: {e}")

View File

@@ -0,0 +1,230 @@
"""
Telegram trigger blocks for receiving messages via webhooks.
"""
import logging
from pydantic import BaseModel
from backend.data.block import (
Block,
BlockCategory,
BlockOutput,
BlockSchemaInput,
BlockSchemaOutput,
BlockWebhookConfig,
)
from backend.data.model import SchemaField
from backend.integrations.providers import ProviderName
from backend.integrations.webhooks.telegram import TelegramWebhookType
from ._auth import (
TEST_CREDENTIALS,
TEST_CREDENTIALS_INPUT,
TelegramCredentialsField,
TelegramCredentialsInput,
)
logger = logging.getLogger(__name__)
# Example payload for testing
EXAMPLE_MESSAGE_PAYLOAD = {
"update_id": 123456789,
"message": {
"message_id": 1,
"from": {
"id": 12345678,
"is_bot": False,
"first_name": "John",
"last_name": "Doe",
"username": "johndoe",
"language_code": "en",
},
"chat": {
"id": 12345678,
"first_name": "John",
"last_name": "Doe",
"username": "johndoe",
"type": "private",
},
"date": 1234567890,
"text": "Hello, bot!",
},
}
class TelegramTriggerBase:
"""Base class for Telegram trigger blocks."""
class Input(BlockSchemaInput):
credentials: TelegramCredentialsInput = TelegramCredentialsField()
payload: dict = SchemaField(hidden=True, default_factory=dict)
class TelegramMessageTriggerBlock(TelegramTriggerBase, Block):
"""
Triggers when a message is received by your Telegram bot.
Supports text, photos, voice messages, and audio files.
Connect the outputs to other blocks to process messages and send responses.
"""
class Input(TelegramTriggerBase.Input):
class EventsFilter(BaseModel):
"""Filter for message types to receive."""
text: bool = True
photo: bool = False
voice: bool = False
audio: bool = False
document: bool = False
video: bool = False
events: EventsFilter = SchemaField(
title="Message Types", description="Types of messages to receive"
)
class Output(BlockSchemaOutput):
payload: dict = SchemaField(
description="The complete webhook payload from Telegram"
)
chat_id: int = SchemaField(
description="The chat ID where the message was received. "
"Use this to send replies."
)
message_id: int = SchemaField(description="The unique message ID")
user_id: int = SchemaField(description="The user ID who sent the message")
username: str = SchemaField(
description="Username of the sender (may be empty)"
)
first_name: str = SchemaField(description="First name of the sender")
event: str = SchemaField(
description="The message type (text, photo, voice, audio, etc.)"
)
text: str = SchemaField(
description="Text content of the message (for text messages)"
)
photo_file_id: str = SchemaField(
description="File ID of the photo (for photo messages). "
"Use GetTelegramFileBlock to download."
)
voice_file_id: str = SchemaField(
description="File ID of the voice message (for voice messages). "
"Use GetTelegramFileBlock to download."
)
audio_file_id: str = SchemaField(
description="File ID of the audio file (for audio messages). "
"Use GetTelegramFileBlock to download."
)
caption: str = SchemaField(description="Caption for media messages")
def __init__(self):
super().__init__(
id="a1b2c3d4-e5f6-7890-abcd-ef1234567890",
description="Triggers when a message is received by your Telegram bot. "
"Supports text, photos, voice messages, and audio files.",
categories={BlockCategory.SOCIAL},
input_schema=TelegramMessageTriggerBlock.Input,
output_schema=TelegramMessageTriggerBlock.Output,
webhook_config=BlockWebhookConfig(
provider=ProviderName.TELEGRAM,
webhook_type=TelegramWebhookType.BOT,
resource_format="bot",
event_filter_input="events",
event_format="message.{event}",
),
test_input={
"events": {"text": True, "photo": True},
"credentials": TEST_CREDENTIALS_INPUT,
"payload": EXAMPLE_MESSAGE_PAYLOAD,
},
test_credentials=TEST_CREDENTIALS,
test_output=[
("payload", EXAMPLE_MESSAGE_PAYLOAD),
("chat_id", 12345678),
("message_id", 1),
("user_id", 12345678),
("username", "johndoe"),
("first_name", "John"),
("event", "text"),
("text", "Hello, bot!"),
("photo_file_id", ""),
("voice_file_id", ""),
("audio_file_id", ""),
("caption", ""),
],
)
async def run(self, input_data: Input, **kwargs) -> BlockOutput:
payload = input_data.payload
message = payload.get("message", {})
# Extract common fields
chat = message.get("chat", {})
sender = message.get("from", {})
yield "payload", payload
yield "chat_id", chat.get("id", 0)
yield "message_id", message.get("message_id", 0)
yield "user_id", sender.get("id", 0)
yield "username", sender.get("username", "")
yield "first_name", sender.get("first_name", "")
# Determine message type and extract content
if "text" in message:
yield "event", "text"
yield "text", message.get("text", "")
yield "photo_file_id", ""
yield "voice_file_id", ""
yield "audio_file_id", ""
yield "caption", ""
elif "photo" in message:
# Get the largest photo (last in array)
photos = message.get("photo", [])
file_id = photos[-1]["file_id"] if photos else ""
yield "event", "photo"
yield "text", ""
yield "photo_file_id", file_id
yield "voice_file_id", ""
yield "audio_file_id", ""
yield "caption", message.get("caption", "")
elif "voice" in message:
voice = message.get("voice", {})
yield "event", "voice"
yield "text", ""
yield "photo_file_id", ""
yield "voice_file_id", voice.get("file_id", "")
yield "audio_file_id", ""
yield "caption", message.get("caption", "")
elif "audio" in message:
audio = message.get("audio", {})
yield "event", "audio"
yield "text", ""
yield "photo_file_id", ""
yield "voice_file_id", ""
yield "audio_file_id", audio.get("file_id", "")
yield "caption", message.get("caption", "")
elif "document" in message:
document = message.get("document", {})
yield "event", "document"
yield "text", ""
yield "photo_file_id", ""
yield "voice_file_id", ""
yield "audio_file_id", document.get("file_id", "")
yield "caption", message.get("caption", "")
elif "video" in message:
video = message.get("video", {})
yield "event", "video"
yield "text", ""
yield "photo_file_id", ""
yield "voice_file_id", ""
yield "audio_file_id", video.get("file_id", "")
yield "caption", message.get("caption", "")
else:
yield "event", "other"
yield "text", ""
yield "photo_file_id", ""
yield "voice_file_id", ""
yield "audio_file_id", ""
yield "caption", ""

View File

@@ -45,6 +45,7 @@ class ProviderName(str, Enum):
SLANT3D = "slant3d"
SMARTLEAD = "smartlead"
SMTP = "smtp"
TELEGRAM = "telegram"
TWITTER = "twitter"
TODOIST = "todoist"
UNREAL_SPEECH = "unreal_speech"

View File

@@ -15,6 +15,7 @@ def load_webhook_managers() -> dict["ProviderName", type["BaseWebhooksManager"]]
from .compass import CompassWebhookManager
from .github import GithubWebhooksManager
from .slant3d import Slant3DWebhooksManager
from .telegram import TelegramWebhooksManager
webhook_managers.update(
{
@@ -23,6 +24,7 @@ def load_webhook_managers() -> dict["ProviderName", type["BaseWebhooksManager"]]
CompassWebhookManager,
GithubWebhooksManager,
Slant3DWebhooksManager,
TelegramWebhooksManager,
]
}
)

View File

@@ -0,0 +1,164 @@
"""
Telegram Bot API Webhooks Manager.
Handles webhook registration and validation for Telegram bots.
"""
import logging
from fastapi import HTTPException, Request
from strenum import StrEnum
from backend.data import integrations
from backend.data.model import APIKeyCredentials, Credentials
from backend.integrations.providers import ProviderName
from backend.util.request import Requests
from ._base import BaseWebhooksManager
logger = logging.getLogger(__name__)
class TelegramWebhookType(StrEnum):
BOT = "bot"
class TelegramWebhooksManager(BaseWebhooksManager[TelegramWebhookType]):
"""
Manages Telegram bot webhooks.
Telegram webhooks are registered via the setWebhook API method.
Incoming requests are validated using the secret_token header.
"""
PROVIDER_NAME = ProviderName.TELEGRAM
WebhookType = TelegramWebhookType
TELEGRAM_API_BASE = "https://api.telegram.org"
@classmethod
async def validate_payload(
cls,
webhook: integrations.Webhook,
request: Request,
credentials: Credentials | None,
) -> tuple[dict, str]:
"""
Validates incoming Telegram webhook request.
Telegram sends X-Telegram-Bot-Api-Secret-Token header when secret_token
was set in setWebhook call.
Returns:
tuple: (payload dict, event_type string)
"""
# Verify secret token header
secret_header = request.headers.get("X-Telegram-Bot-Api-Secret-Token")
if not secret_header or secret_header != webhook.secret:
raise HTTPException(
status_code=403,
detail="Invalid or missing X-Telegram-Bot-Api-Secret-Token",
)
payload = await request.json()
# Determine event type based on update content
# Telegram updates can contain: message, edited_message,
# channel_post, callback_query, etc.
if "message" in payload:
message = payload["message"]
if "text" in message:
event_type = "message.text"
elif "photo" in message:
event_type = "message.photo"
elif "voice" in message:
event_type = "message.voice"
elif "audio" in message:
event_type = "message.audio"
elif "document" in message:
event_type = "message.document"
elif "video" in message:
event_type = "message.video"
else:
event_type = "message.other"
elif "edited_message" in payload:
event_type = "edited_message"
elif "callback_query" in payload:
event_type = "callback_query"
else:
event_type = "unknown"
return payload, event_type
async def _register_webhook(
self,
credentials: Credentials,
webhook_type: TelegramWebhookType,
resource: str,
events: list[str],
ingress_url: str,
secret: str,
) -> tuple[str, dict]:
"""
Register webhook with Telegram using setWebhook API.
Args:
credentials: Bot token credentials
webhook_type: Type of webhook (always BOT for Telegram)
resource: Resource identifier (unused for Telegram, bots are global)
events: Events to subscribe to
ingress_url: URL to receive webhook payloads
secret: Secret token for request validation
Returns:
tuple: (provider_webhook_id, config dict)
"""
if not isinstance(credentials, APIKeyCredentials):
raise ValueError("API key (bot token) is required for Telegram webhooks")
token = credentials.api_key.get_secret_value()
url = f"{self.TELEGRAM_API_BASE}/bot{token}/setWebhook"
# Telegram setWebhook parameters
webhook_data = {
"url": ingress_url,
"secret_token": secret,
"allowed_updates": ["message", "edited_message"],
}
response = await Requests().post(url, json=webhook_data)
result = response.json()
if not result.get("ok"):
error_desc = result.get("description", "Unknown error")
raise ValueError(f"Failed to set Telegram webhook: {error_desc}")
# Telegram doesn't return a webhook ID, use empty string
config = {
"url": ingress_url,
"allowed_updates": webhook_data["allowed_updates"],
}
return "", config
async def _deregister_webhook(
self, webhook: integrations.Webhook, credentials: Credentials
) -> None:
"""
Deregister webhook by calling setWebhook with empty URL.
This removes the webhook from Telegram's servers.
"""
if not isinstance(credentials, APIKeyCredentials):
raise ValueError("API key (bot token) is required for Telegram webhooks")
token = credentials.api_key.get_secret_value()
url = f"{self.TELEGRAM_API_BASE}/bot{token}/setWebhook"
# Setting empty URL removes the webhook
response = await Requests().post(url, json={"url": ""})
result = response.json()
if not result.get("ok"):
error_desc = result.get("description", "Unknown error")
logger.warning(f"Failed to deregister Telegram webhook: {error_desc}")