Compare commits

..

4 Commits

Author SHA1 Message Date
Ubbe
efe2f18719 Merge branch 'dev' into fix/spinner-height-copilot 2026-02-18 20:44:42 +08:00
Lluis Agusti
23eda10cb1 chore: format 2026-02-18 20:34:48 +08:00
Lluis Agusti
7bfc5140b0 fix: make spinner centred when loading 2026-02-18 20:29:27 +08:00
Otto
15bcdae4e8 fix(backend/copilot): Clean up GCSWorkspaceStorage per worker (#12153)
The copilot executor runs each worker in its own thread with a dedicated
event loop (`asyncio.new_event_loop()`). `aiohttp.ClientSession` is
bound to the event loop where it was created — using it from a different
loop causes `asyncio.timeout()` to fail with:

```
RuntimeError: Timeout context manager should be used inside a task
```

This was the root cause of transcript upload failures tracked in
SECRT-2009 and [Sentry
#7272473694](https://significant-gravitas.sentry.io/issues/7272473694/).

### Fix

**One `GCSWorkspaceStorage` instance per event loop** instead of a
single shared global.

- `get_workspace_storage()` now returns a per-loop GCS instance (keyed
by `id(asyncio.get_running_loop())`). Local storage remains shared since
it has no async I/O.
- `shutdown_workspace_storage()` closes the instance for the **current**
loop only, so `session.close()` always runs on the loop that created the
session.
- `CoPilotProcessor.cleanup()` shuts down workspace storage on the
worker's own loop, then stops the loop.
- Manager cleanup submits `cleanup_worker` to each thread pool worker
before shutting down the executor — replacing the old approach of
creating a temporary event loop that couldn't close cross-loop sessions.

### Changes

| File | Change |
|------|--------|
| `util/workspace_storage.py` | `GCSWorkspaceStorage` back to simple
single-session class; `get_workspace_storage()` returns per-loop GCS
instance; `shutdown_workspace_storage()` scoped to current loop |
| `copilot/executor/processor.py` | Added `CoPilotProcessor.cleanup()`
and `cleanup_worker()` |
| `copilot/executor/manager.py` | Calls `cleanup_worker` on each thread
pool worker during shutdown |

Fixes SECRT-2009

---------

Co-authored-by: Reinier van der Leer <pwuts@agpt.co>
2026-02-18 11:17:39 +00:00
16 changed files with 112 additions and 2439 deletions

View File

@@ -1,159 +0,0 @@
"""
Telegram Bot API helper functions.
Provides utilities for making authenticated requests to the Telegram Bot API.
"""
import logging
from io import BytesIO
from typing import Any, Optional
from backend.data.model import APIKeyCredentials
from backend.util.request import Requests
logger = logging.getLogger(__name__)
TELEGRAM_API_BASE = "https://api.telegram.org"
class TelegramAPIException(ValueError):
"""Exception raised for Telegram API errors."""
def __init__(self, message: str, error_code: int = 0):
super().__init__(message)
self.error_code = error_code
def get_bot_api_url(bot_token: str, method: str) -> str:
"""Construct Telegram Bot API URL for a method."""
return f"{TELEGRAM_API_BASE}/bot{bot_token}/{method}"
def get_file_url(bot_token: str, file_path: str) -> str:
"""Construct Telegram file download URL."""
return f"{TELEGRAM_API_BASE}/file/bot{bot_token}/{file_path}"
async def call_telegram_api(
credentials: APIKeyCredentials,
method: str,
data: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
"""
Make a request to the Telegram Bot API.
Args:
credentials: Bot token credentials
method: API method name (e.g., "sendMessage", "getFile")
data: Request parameters
Returns:
API response result
Raises:
TelegramAPIException: If the API returns an error
"""
token = credentials.api_key.get_secret_value()
url = get_bot_api_url(token, method)
response = await Requests().post(url, json=data or {})
result = response.json()
if not result.get("ok"):
error_code = result.get("error_code", 0)
description = result.get("description", "Unknown error")
raise TelegramAPIException(description, error_code)
return result.get("result", {})
async def call_telegram_api_with_file(
credentials: APIKeyCredentials,
method: str,
file_field: str,
file_data: bytes,
filename: str,
content_type: str,
data: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
"""
Make a multipart/form-data request to the Telegram Bot API with a file upload.
Args:
credentials: Bot token credentials
method: API method name (e.g., "sendPhoto", "sendVoice")
file_field: Form field name for the file (e.g., "photo", "voice")
file_data: Raw file bytes
filename: Filename for the upload
content_type: MIME type of the file
data: Additional form parameters
Returns:
API response result
Raises:
TelegramAPIException: If the API returns an error
"""
token = credentials.api_key.get_secret_value()
url = get_bot_api_url(token, method)
files = [(file_field, (filename, BytesIO(file_data), content_type))]
response = await Requests().post(url, files=files, data=data or {})
result = response.json()
if not result.get("ok"):
error_code = result.get("error_code", 0)
description = result.get("description", "Unknown error")
raise TelegramAPIException(description, error_code)
return result.get("result", {})
async def get_file_info(credentials: APIKeyCredentials, file_id: str) -> dict[str, Any]:
"""
Get file information from Telegram.
Args:
credentials: Bot token credentials
file_id: Telegram file_id from message
Returns:
File info dict containing file_id, file_unique_id, file_size, file_path
"""
return await call_telegram_api(credentials, "getFile", {"file_id": file_id})
async def get_file_download_url(credentials: APIKeyCredentials, file_id: str) -> str:
"""
Get the download URL for a Telegram file.
Args:
credentials: Bot token credentials
file_id: Telegram file_id from message
Returns:
Full download URL
"""
token = credentials.api_key.get_secret_value()
result = await get_file_info(credentials, file_id)
file_path = result.get("file_path")
if not file_path:
raise TelegramAPIException("No file_path returned from getFile")
return get_file_url(token, file_path)
async def download_telegram_file(credentials: APIKeyCredentials, file_id: str) -> bytes:
"""
Download a file from Telegram servers.
Args:
credentials: Bot token credentials
file_id: Telegram file_id
Returns:
File content as bytes
"""
url = await get_file_download_url(credentials, file_id)
response = await Requests().get(url)
return response.content

View File

@@ -1,43 +0,0 @@
"""
Telegram Bot credentials handling.
Telegram bots use an API key (bot token) obtained from @BotFather.
"""
from typing import Literal
from pydantic import SecretStr
from backend.data.model import APIKeyCredentials, CredentialsField, CredentialsMetaInput
from backend.integrations.providers import ProviderName
# Bot token credentials (API key style)
TelegramCredentials = APIKeyCredentials
TelegramCredentialsInput = CredentialsMetaInput[
Literal[ProviderName.TELEGRAM], Literal["api_key"]
]
def TelegramCredentialsField() -> TelegramCredentialsInput:
"""Creates a Telegram bot token credentials field."""
return CredentialsField(
description="Telegram Bot API token from @BotFather. "
"Create a bot at https://t.me/BotFather to get your token."
)
# Test credentials for unit tests
TEST_CREDENTIALS = APIKeyCredentials(
id="01234567-89ab-cdef-0123-456789abcdef",
provider="telegram",
api_key=SecretStr("test_telegram_bot_token"),
title="Mock Telegram Bot Token",
expires_at=None,
)
TEST_CREDENTIALS_INPUT = {
"provider": TEST_CREDENTIALS.provider,
"id": TEST_CREDENTIALS.id,
"type": TEST_CREDENTIALS.type,
"title": TEST_CREDENTIALS.title,
}

File diff suppressed because it is too large Load Diff

View File

@@ -1,352 +0,0 @@
"""
Telegram trigger blocks for receiving messages via webhooks.
"""
import logging
from pydantic import BaseModel
from backend.blocks._base import (
Block,
BlockCategory,
BlockOutput,
BlockSchemaInput,
BlockSchemaOutput,
BlockWebhookConfig,
)
from backend.data.model import SchemaField
from backend.integrations.providers import ProviderName
from backend.integrations.webhooks.telegram import TelegramWebhookType
from ._auth import (
TEST_CREDENTIALS,
TEST_CREDENTIALS_INPUT,
TelegramCredentialsField,
TelegramCredentialsInput,
)
logger = logging.getLogger(__name__)
# Example payload for testing
EXAMPLE_MESSAGE_PAYLOAD = {
"update_id": 123456789,
"message": {
"message_id": 1,
"from": {
"id": 12345678,
"is_bot": False,
"first_name": "John",
"last_name": "Doe",
"username": "johndoe",
"language_code": "en",
},
"chat": {
"id": 12345678,
"first_name": "John",
"last_name": "Doe",
"username": "johndoe",
"type": "private",
},
"date": 1234567890,
"text": "Hello, bot!",
},
}
class TelegramTriggerBase:
"""Base class for Telegram trigger blocks."""
class Input(BlockSchemaInput):
credentials: TelegramCredentialsInput = TelegramCredentialsField()
payload: dict = SchemaField(hidden=True, default_factory=dict)
class TelegramMessageTriggerBlock(TelegramTriggerBase, Block):
"""
Triggers when a message is received by your Telegram bot.
Supports text, photos, voice messages, and audio files.
Connect the outputs to other blocks to process messages and send responses.
"""
class Input(TelegramTriggerBase.Input):
class EventsFilter(BaseModel):
"""Filter for message types to receive."""
text: bool = True
photo: bool = False
voice: bool = False
audio: bool = False
document: bool = False
video: bool = False
events: EventsFilter = SchemaField(
title="Message Types", description="Types of messages to receive"
)
class Output(BlockSchemaOutput):
payload: dict = SchemaField(
description="The complete webhook payload from Telegram"
)
chat_id: int = SchemaField(
description="The chat ID where the message was received. "
"Use this to send replies."
)
message_id: int = SchemaField(description="The unique message ID")
user_id: int = SchemaField(description="The user ID who sent the message")
username: str = SchemaField(description="Username of the sender (may be empty)")
first_name: str = SchemaField(description="First name of the sender")
event: str = SchemaField(
description="The message type (text, photo, voice, audio, etc.)"
)
text: str = SchemaField(
description="Text content of the message (for text messages)"
)
photo_file_id: str = SchemaField(
description="File ID of the photo (for photo messages). "
"Use GetTelegramFileBlock to download."
)
voice_file_id: str = SchemaField(
description="File ID of the voice message (for voice messages). "
"Use GetTelegramFileBlock to download."
)
audio_file_id: str = SchemaField(
description="File ID of the audio file (for audio messages). "
"Use GetTelegramFileBlock to download."
)
file_id: str = SchemaField(
description="File ID for document/video messages. "
"Use GetTelegramFileBlock to download."
)
file_name: str = SchemaField(
description="Original filename (for document/audio messages)"
)
caption: str = SchemaField(description="Caption for media messages")
def __init__(self):
super().__init__(
id="4435e4e0-df6e-4301-8f35-ad70b12fc9ec",
description="Triggers when a message is received by your Telegram bot. "
"Supports text, photos, voice messages, and audio files.",
categories={BlockCategory.SOCIAL},
input_schema=TelegramMessageTriggerBlock.Input,
output_schema=TelegramMessageTriggerBlock.Output,
webhook_config=BlockWebhookConfig(
provider=ProviderName.TELEGRAM,
webhook_type=TelegramWebhookType.BOT,
resource_format="bot",
event_filter_input="events",
event_format="message.{event}",
),
test_input={
"events": {"text": True, "photo": True},
"credentials": TEST_CREDENTIALS_INPUT,
"payload": EXAMPLE_MESSAGE_PAYLOAD,
},
test_credentials=TEST_CREDENTIALS,
test_output=[
("payload", EXAMPLE_MESSAGE_PAYLOAD),
("chat_id", 12345678),
("message_id", 1),
("user_id", 12345678),
("username", "johndoe"),
("first_name", "John"),
("event", "text"),
("text", "Hello, bot!"),
("photo_file_id", ""),
("voice_file_id", ""),
("audio_file_id", ""),
("file_id", ""),
("file_name", ""),
("caption", ""),
],
)
async def run(self, input_data: Input, **kwargs) -> BlockOutput:
payload = input_data.payload
message = payload.get("message", {})
# Extract common fields
chat = message.get("chat", {})
sender = message.get("from", {})
yield "payload", payload
yield "chat_id", chat.get("id", 0)
yield "message_id", message.get("message_id", 0)
yield "user_id", sender.get("id", 0)
yield "username", sender.get("username", "")
yield "first_name", sender.get("first_name", "")
# Determine message type and extract content
if "text" in message:
yield "event", "text"
yield "text", message.get("text", "")
yield "photo_file_id", ""
yield "voice_file_id", ""
yield "audio_file_id", ""
yield "file_id", ""
yield "file_name", ""
yield "caption", ""
elif "photo" in message:
# Get the largest photo (last in array)
photos = message.get("photo", [])
photo_fid = photos[-1].get("file_id", "") if photos else ""
yield "event", "photo"
yield "text", ""
yield "photo_file_id", photo_fid
yield "voice_file_id", ""
yield "audio_file_id", ""
yield "file_id", ""
yield "file_name", ""
yield "caption", message.get("caption", "")
elif "voice" in message:
voice = message.get("voice", {})
yield "event", "voice"
yield "text", ""
yield "photo_file_id", ""
yield "voice_file_id", voice.get("file_id", "")
yield "audio_file_id", ""
yield "file_id", ""
yield "file_name", ""
yield "caption", message.get("caption", "")
elif "audio" in message:
audio = message.get("audio", {})
yield "event", "audio"
yield "text", ""
yield "photo_file_id", ""
yield "voice_file_id", ""
yield "audio_file_id", audio.get("file_id", "")
yield "file_id", ""
yield "file_name", audio.get("file_name", "")
yield "caption", message.get("caption", "")
elif "document" in message:
document = message.get("document", {})
yield "event", "document"
yield "text", ""
yield "photo_file_id", ""
yield "voice_file_id", ""
yield "audio_file_id", ""
yield "file_id", document.get("file_id", "")
yield "file_name", document.get("file_name", "")
yield "caption", message.get("caption", "")
elif "video" in message:
video = message.get("video", {})
yield "event", "video"
yield "text", ""
yield "photo_file_id", ""
yield "voice_file_id", ""
yield "audio_file_id", ""
yield "file_id", video.get("file_id", "")
yield "file_name", video.get("file_name", "")
yield "caption", message.get("caption", "")
else:
yield "event", "other"
yield "text", ""
yield "photo_file_id", ""
yield "voice_file_id", ""
yield "audio_file_id", ""
yield "file_id", ""
yield "file_name", ""
yield "caption", ""
# Example payload for reaction trigger testing
EXAMPLE_REACTION_PAYLOAD = {
"update_id": 123456790,
"message_reaction": {
"chat": {
"id": 12345678,
"first_name": "John",
"last_name": "Doe",
"username": "johndoe",
"type": "private",
},
"message_id": 42,
"user": {
"id": 12345678,
"is_bot": False,
"first_name": "John",
"username": "johndoe",
},
"date": 1234567890,
"new_reaction": [{"type": "emoji", "emoji": "👍"}],
"old_reaction": [],
},
}
class TelegramMessageReactionTriggerBlock(TelegramTriggerBase, Block):
"""
Triggers when a reaction to a message is changed.
Works automatically in private chats. In group chats, the bot must be
an administrator to receive reaction updates.
"""
class Input(TelegramTriggerBase.Input):
pass
class Output(BlockSchemaOutput):
payload: dict = SchemaField(
description="The complete webhook payload from Telegram"
)
chat_id: int = SchemaField(
description="The chat ID where the reaction occurred"
)
message_id: int = SchemaField(description="The message ID that was reacted to")
user_id: int = SchemaField(description="The user ID who changed the reaction")
username: str = SchemaField(description="Username of the user (may be empty)")
new_reactions: list = SchemaField(
description="List of new reactions on the message"
)
old_reactions: list = SchemaField(
description="List of previous reactions on the message"
)
def __init__(self):
super().__init__(
id="82525328-9368-4966-8f0c-cd78e80181fd",
description="Triggers when a reaction to a message is changed. "
"Works in private chats automatically. "
"In groups, the bot must be an administrator.",
categories={BlockCategory.SOCIAL},
input_schema=TelegramMessageReactionTriggerBlock.Input,
output_schema=TelegramMessageReactionTriggerBlock.Output,
webhook_config=BlockWebhookConfig(
provider=ProviderName.TELEGRAM,
webhook_type=TelegramWebhookType.BOT,
resource_format="bot",
event_filter_input="",
event_format="message_reaction",
),
test_input={
"credentials": TEST_CREDENTIALS_INPUT,
"payload": EXAMPLE_REACTION_PAYLOAD,
},
test_credentials=TEST_CREDENTIALS,
test_output=[
("payload", EXAMPLE_REACTION_PAYLOAD),
("chat_id", 12345678),
("message_id", 42),
("user_id", 12345678),
("username", "johndoe"),
("new_reactions", [{"type": "emoji", "emoji": "👍"}]),
("old_reactions", []),
],
)
async def run(self, input_data: Input, **kwargs) -> BlockOutput:
payload = input_data.payload
reaction = payload.get("message_reaction", {})
chat = reaction.get("chat", {})
user = reaction.get("user", {})
yield "payload", payload
yield "chat_id", chat.get("id", 0)
yield "message_id", reaction.get("message_id", 0)
yield "user_id", user.get("id", 0)
yield "username", user.get("username", "")
yield "new_reactions", reaction.get("new_reaction", [])
yield "old_reactions", reaction.get("old_reaction", [])

View File

@@ -164,21 +164,23 @@ class CoPilotExecutor(AppProcess):
self._cancel_thread, self.cancel_client, "[cleanup][cancel]"
)
# Shutdown executor
# Clean up worker threads (closes per-loop workspace storage sessions)
if self._executor:
from .processor import cleanup_worker
logger.info(f"[cleanup {pid}] Cleaning up workers...")
futures = []
for _ in range(self._executor._max_workers):
futures.append(self._executor.submit(cleanup_worker))
for f in futures:
try:
f.result(timeout=10)
except Exception as e:
logger.warning(f"[cleanup {pid}] Worker cleanup error: {e}")
logger.info(f"[cleanup {pid}] Shutting down executor...")
self._executor.shutdown(wait=False)
# Close async resources (workspace storage aiohttp session, etc.)
try:
from backend.util.workspace_storage import shutdown_workspace_storage
loop = asyncio.new_event_loop()
loop.run_until_complete(shutdown_workspace_storage())
loop.close()
except Exception as e:
logger.warning(f"[cleanup {pid}] Error closing workspace storage: {e}")
# Release any remaining locks
for task_id, lock in list(self._task_locks.items()):
try:

View File

@@ -60,6 +60,18 @@ def init_worker():
_tls.processor.on_executor_start()
def cleanup_worker():
"""Clean up the processor for the current worker thread.
Should be called before the worker thread's event loop is destroyed so
that event-loop-bound resources (e.g. ``aiohttp.ClientSession``) are
closed on the correct loop.
"""
processor: CoPilotProcessor | None = getattr(_tls, "processor", None)
if processor is not None:
processor.cleanup()
# ============ Processor Class ============ #
@@ -98,6 +110,28 @@ class CoPilotProcessor:
logger.info(f"[CoPilotExecutor] Worker {self.tid} started")
def cleanup(self):
"""Clean up event-loop-bound resources before the loop is destroyed.
Shuts down the workspace storage instance that belongs to this
worker's event loop, ensuring ``aiohttp.ClientSession.close()``
runs on the same loop that created the session.
"""
from backend.util.workspace_storage import shutdown_workspace_storage
try:
future = asyncio.run_coroutine_threadsafe(
shutdown_workspace_storage(), self.execution_loop
)
future.result(timeout=5)
except Exception as e:
logger.warning(f"[CoPilotExecutor] Worker {self.tid} cleanup error: {e}")
# Stop the event loop
self.execution_loop.call_soon_threadsafe(self.execution_loop.stop)
self.execution_thread.join(timeout=5)
logger.info(f"[CoPilotExecutor] Worker {self.tid} cleaned up")
@error_logged(swallow=False)
def execute(
self,

View File

@@ -47,7 +47,6 @@ class ProviderName(str, Enum):
SLANT3D = "slant3d"
SMARTLEAD = "smartlead"
SMTP = "smtp"
TELEGRAM = "telegram"
TWITTER = "twitter"
TODOIST = "todoist"
UNREAL_SPEECH = "unreal_speech"

View File

@@ -15,7 +15,6 @@ def load_webhook_managers() -> dict["ProviderName", type["BaseWebhooksManager"]]
from .compass import CompassWebhookManager
from .github import GithubWebhooksManager
from .slant3d import Slant3DWebhooksManager
from .telegram import TelegramWebhooksManager
webhook_managers.update(
{
@@ -24,7 +23,6 @@ def load_webhook_managers() -> dict["ProviderName", type["BaseWebhooksManager"]]
CompassWebhookManager,
GithubWebhooksManager,
Slant3DWebhooksManager,
TelegramWebhooksManager,
]
}
)

View File

@@ -1,167 +0,0 @@
"""
Telegram Bot API Webhooks Manager.
Handles webhook registration and validation for Telegram bots.
"""
import hmac
import logging
from fastapi import HTTPException, Request
from strenum import StrEnum
from backend.data import integrations
from backend.data.model import APIKeyCredentials, Credentials
from backend.integrations.providers import ProviderName
from backend.util.request import Requests
from ._base import BaseWebhooksManager
logger = logging.getLogger(__name__)
class TelegramWebhookType(StrEnum):
BOT = "bot"
class TelegramWebhooksManager(BaseWebhooksManager):
"""
Manages Telegram bot webhooks.
Telegram webhooks are registered via the setWebhook API method.
Incoming requests are validated using the secret_token header.
"""
PROVIDER_NAME = ProviderName.TELEGRAM
WebhookType = TelegramWebhookType
TELEGRAM_API_BASE = "https://api.telegram.org"
@classmethod
async def validate_payload(
cls,
webhook: integrations.Webhook,
request: Request,
credentials: Credentials | None,
) -> tuple[dict, str]:
"""
Validates incoming Telegram webhook request.
Telegram sends X-Telegram-Bot-Api-Secret-Token header when secret_token
was set in setWebhook call.
Returns:
tuple: (payload dict, event_type string)
"""
# Verify secret token header
secret_header = request.headers.get("X-Telegram-Bot-Api-Secret-Token")
if not secret_header or not hmac.compare_digest(secret_header, webhook.secret):
raise HTTPException(
status_code=403,
detail="Invalid or missing X-Telegram-Bot-Api-Secret-Token",
)
payload = await request.json()
# Determine event type based on update content
# Telegram updates can contain: message, edited_message,
# channel_post, callback_query, etc.
if "message" in payload:
message = payload["message"]
if "text" in message:
event_type = "message.text"
elif "photo" in message:
event_type = "message.photo"
elif "voice" in message:
event_type = "message.voice"
elif "audio" in message:
event_type = "message.audio"
elif "document" in message:
event_type = "message.document"
elif "video" in message:
event_type = "message.video"
else:
event_type = "message.other"
elif "edited_message" in payload:
event_type = "edited_message"
elif "message_reaction" in payload:
event_type = "message_reaction"
elif "callback_query" in payload:
event_type = "callback_query"
else:
event_type = "unknown"
return payload, event_type
async def _register_webhook(
self,
credentials: Credentials,
webhook_type: TelegramWebhookType,
resource: str,
events: list[str],
ingress_url: str,
secret: str,
) -> tuple[str, dict]:
"""
Register webhook with Telegram using setWebhook API.
Args:
credentials: Bot token credentials
webhook_type: Type of webhook (always BOT for Telegram)
resource: Resource identifier (unused for Telegram, bots are global)
events: Events to subscribe to
ingress_url: URL to receive webhook payloads
secret: Secret token for request validation
Returns:
tuple: (provider_webhook_id, config dict)
"""
if not isinstance(credentials, APIKeyCredentials):
raise ValueError("API key (bot token) is required for Telegram webhooks")
token = credentials.api_key.get_secret_value()
url = f"{self.TELEGRAM_API_BASE}/bot{token}/setWebhook"
# Telegram setWebhook parameters
webhook_data = {
"url": ingress_url,
"secret_token": secret,
"allowed_updates": ["message", "edited_message", "message_reaction"],
}
response = await Requests().post(url, json=webhook_data)
result = response.json()
if not result.get("ok"):
error_desc = result.get("description", "Unknown error")
raise ValueError(f"Failed to set Telegram webhook: {error_desc}")
# Telegram doesn't return a webhook ID, use empty string
config = {
"url": ingress_url,
"allowed_updates": webhook_data["allowed_updates"],
}
return "", config
async def _deregister_webhook(
self, webhook: integrations.Webhook, credentials: Credentials
) -> None:
"""
Deregister webhook by calling setWebhook with empty URL.
This removes the webhook from Telegram's servers.
"""
if not isinstance(credentials, APIKeyCredentials):
raise ValueError("API key (bot token) is required for Telegram webhooks")
token = credentials.api_key.get_secret_value()
url = f"{self.TELEGRAM_API_BASE}/bot{token}/setWebhook"
# Setting empty URL removes the webhook
response = await Requests().post(url, json={"url": ""})
result = response.json()
if not result.get("ok"):
error_desc = result.get("description", "Unknown error")
logger.warning(f"Failed to deregister Telegram webhook: {error_desc}")

View File

@@ -93,7 +93,14 @@ class WorkspaceStorageBackend(ABC):
class GCSWorkspaceStorage(WorkspaceStorageBackend):
"""Google Cloud Storage implementation for workspace storage."""
"""Google Cloud Storage implementation for workspace storage.
Each instance owns a single ``aiohttp.ClientSession`` and GCS async
client. Because ``ClientSession`` is bound to the event loop on which it
was created, callers that run on separate loops (e.g. copilot executor
worker threads) **must** obtain their own ``GCSWorkspaceStorage`` instance
via :func:`get_workspace_storage` which is event-loop-aware.
"""
def __init__(self, bucket_name: str):
self.bucket_name = bucket_name
@@ -337,60 +344,73 @@ class LocalWorkspaceStorage(WorkspaceStorageBackend):
raise ValueError(f"Invalid storage path format: {storage_path}")
# Global storage backend instance
_workspace_storage: Optional[WorkspaceStorageBackend] = None
# ---------------------------------------------------------------------------
# Storage instance management
# ---------------------------------------------------------------------------
# ``aiohttp.ClientSession`` is bound to the event loop where it is created.
# The copilot executor runs each worker in its own thread with a dedicated
# event loop, so a single global ``GCSWorkspaceStorage`` instance would break.
#
# For **local storage** a single shared instance is fine (no async I/O).
# For **GCS storage** we keep one instance *per event loop* so every loop
# gets its own ``ClientSession``.
# ---------------------------------------------------------------------------
_local_storage: Optional[LocalWorkspaceStorage] = None
_gcs_storages: dict[int, GCSWorkspaceStorage] = {}
_storage_lock = asyncio.Lock()
async def get_workspace_storage() -> WorkspaceStorageBackend:
"""Return a workspace storage backend for the **current** event loop.
* Local storage → single shared instance (no event-loop affinity).
* GCS storage → one instance per event loop to avoid cross-loop
``aiohttp`` errors.
"""
Get the workspace storage backend instance.
global _local_storage
Uses GCS if media_gcs_bucket_name is configured, otherwise uses local storage.
"""
global _workspace_storage
config = Config()
if _workspace_storage is None:
async with _storage_lock:
if _workspace_storage is None:
config = Config()
# --- Local storage (shared) ---
if not config.media_gcs_bucket_name:
if _local_storage is None:
storage_dir = (
config.workspace_storage_dir if config.workspace_storage_dir else None
)
logger.info(f"Using local workspace storage: {storage_dir or 'default'}")
_local_storage = LocalWorkspaceStorage(storage_dir)
return _local_storage
if config.media_gcs_bucket_name:
logger.info(
f"Using GCS workspace storage: {config.media_gcs_bucket_name}"
)
_workspace_storage = GCSWorkspaceStorage(
config.media_gcs_bucket_name
)
else:
storage_dir = (
config.workspace_storage_dir
if config.workspace_storage_dir
else None
)
logger.info(
f"Using local workspace storage: {storage_dir or 'default'}"
)
_workspace_storage = LocalWorkspaceStorage(storage_dir)
return _workspace_storage
# --- GCS storage (per event loop) ---
loop_id = id(asyncio.get_running_loop())
if loop_id not in _gcs_storages:
logger.info(
f"Creating GCS workspace storage for loop {loop_id}: "
f"{config.media_gcs_bucket_name}"
)
_gcs_storages[loop_id] = GCSWorkspaceStorage(config.media_gcs_bucket_name)
return _gcs_storages[loop_id]
async def shutdown_workspace_storage() -> None:
"""
Properly shutdown the global workspace storage backend.
"""Shut down workspace storage for the **current** event loop.
Closes aiohttp sessions and other resources for GCS backend.
Should be called during application shutdown.
Closes the ``aiohttp`` session owned by the current loop's GCS instance.
Each worker thread should call this on its own loop before the loop is
destroyed. The REST API lifespan hook calls it for the main server loop.
"""
global _workspace_storage
global _local_storage
if _workspace_storage is not None:
async with _storage_lock:
if _workspace_storage is not None:
if isinstance(_workspace_storage, GCSWorkspaceStorage):
await _workspace_storage.close()
_workspace_storage = None
loop_id = id(asyncio.get_running_loop())
storage = _gcs_storages.pop(loop_id, None)
if storage is not None:
await storage.close()
# Clear local storage only when the last GCS instance is gone
# (i.e. full shutdown, not just a single worker stopping).
if not _gcs_storages:
_local_storage = None
def compute_file_checksum(content: bytes) -> str:

Binary file not shown.

Before

Width:  |  Height:  |  Size: 192 KiB

View File

@@ -169,7 +169,10 @@ export const ChatMessagesContainer = ({
<ConversationContent className="flex flex-1 flex-col gap-6 px-3 py-6">
{headerSlot}
{isLoading && messages.length === 0 && (
<div className="flex min-h-full flex-1 items-center justify-center">
<div
className="flex flex-1 items-center justify-center"
style={{ minHeight: "calc(100vh - 12rem)" }}
>
<LoadingSpinner className="text-neutral-600" />
</div>
)}

View File

@@ -305,12 +305,10 @@ Below is a comprehensive list of all available blocks, categorized by their prim
| [Create Reddit Post](block-integrations/misc.md#create-reddit-post) | Create a new post on a subreddit |
| [Delete Reddit Comment](block-integrations/misc.md#delete-reddit-comment) | Delete a Reddit comment that you own |
| [Delete Reddit Post](block-integrations/misc.md#delete-reddit-post) | Delete a Reddit post that you own |
| [Delete Telegram Message](block-integrations/telegram/blocks.md#delete-telegram-message) | Delete a message from a Telegram chat |
| [Discord Channel Info](block-integrations/discord/bot_blocks.md#discord-channel-info) | Resolves Discord channel names to IDs and vice versa |
| [Discord Get Current User](block-integrations/discord/oauth_blocks.md#discord-get-current-user) | Gets information about the currently authenticated Discord user using OAuth2 credentials |
| [Discord User Info](block-integrations/discord/bot_blocks.md#discord-user-info) | Gets information about a Discord user by their ID |
| [Edit Reddit Post](block-integrations/misc.md#edit-reddit-post) | Edit the body text of an existing Reddit post that you own |
| [Edit Telegram Message](block-integrations/telegram/blocks.md#edit-telegram-message) | Edit the text of an existing message sent by the bot |
| [Get Linkedin Profile](block-integrations/enrichlayer/linkedin.md#get-linkedin-profile) | Fetch LinkedIn profile data using Enrichlayer |
| [Get Linkedin Profile Picture](block-integrations/enrichlayer/linkedin.md#get-linkedin-profile-picture) | Get LinkedIn profile pictures using Enrichlayer |
| [Get Reddit Comment](block-integrations/misc.md#get-reddit-comment) | Get details about a specific Reddit comment by its ID |
@@ -323,7 +321,6 @@ Below is a comprehensive list of all available blocks, categorized by their prim
| [Get Subreddit Flairs](block-integrations/misc.md#get-subreddit-flairs) | Get available link flair options for a subreddit |
| [Get Subreddit Info](block-integrations/misc.md#get-subreddit-info) | Get information about a subreddit including subscriber count, description, and rules |
| [Get Subreddit Rules](block-integrations/misc.md#get-subreddit-rules) | Get the rules for a subreddit to ensure compliance before posting |
| [Get Telegram File](block-integrations/telegram/blocks.md#get-telegram-file) | Download a file from Telegram using its file_id |
| [Get User Posts](block-integrations/misc.md#get-user-posts) | Fetch posts by a specific Reddit user |
| [Linkedin Person Lookup](block-integrations/enrichlayer/linkedin.md#linkedin-person-lookup) | Look up LinkedIn profiles by person information using Enrichlayer |
| [Linkedin Role Lookup](block-integrations/enrichlayer/linkedin.md#linkedin-role-lookup) | Look up LinkedIn profiles by role in a company using Enrichlayer |
@@ -346,21 +343,12 @@ Below is a comprehensive list of all available blocks, categorized by their prim
| [Reddit Get My Posts](block-integrations/misc.md#reddit-get-my-posts) | Fetch posts created by the authenticated Reddit user (you) |
| [Reply To Discord Message](block-integrations/discord/bot_blocks.md#reply-to-discord-message) | Replies to a specific Discord message |
| [Reply To Reddit Comment](block-integrations/misc.md#reply-to-reddit-comment) | Reply to a specific Reddit comment |
| [Reply To Telegram Message](block-integrations/telegram/blocks.md#reply-to-telegram-message) | Reply to a specific message in a Telegram chat |
| [Search Reddit](block-integrations/misc.md#search-reddit) | Search Reddit for posts matching a query |
| [Send Discord DM](block-integrations/discord/bot_blocks.md#send-discord-dm) | Sends a direct message to a Discord user using their user ID |
| [Send Discord Embed](block-integrations/discord/bot_blocks.md#send-discord-embed) | Sends a rich embed message to a Discord channel |
| [Send Discord File](block-integrations/discord/bot_blocks.md#send-discord-file) | Sends a file attachment to a Discord channel |
| [Send Discord Message](block-integrations/discord/bot_blocks.md#send-discord-message) | Sends a message to a Discord channel using a bot token |
| [Send Reddit Message](block-integrations/misc.md#send-reddit-message) | Send a private message (DM) to a Reddit user |
| [Send Telegram Audio](block-integrations/telegram/blocks.md#send-telegram-audio) | Send an audio file to a Telegram chat |
| [Send Telegram Document](block-integrations/telegram/blocks.md#send-telegram-document) | Send a document (any file type) to a Telegram chat |
| [Send Telegram Message](block-integrations/telegram/blocks.md#send-telegram-message) | Send a text message to a Telegram chat |
| [Send Telegram Photo](block-integrations/telegram/blocks.md#send-telegram-photo) | Send a photo to a Telegram chat |
| [Send Telegram Video](block-integrations/telegram/blocks.md#send-telegram-video) | Send a video to a Telegram chat |
| [Send Telegram Voice](block-integrations/telegram/blocks.md#send-telegram-voice) | Send a voice message to a Telegram chat |
| [Telegram Message Reaction Trigger](block-integrations/telegram/triggers.md#telegram-message-reaction-trigger) | Triggers when a reaction to a message is changed |
| [Telegram Message Trigger](block-integrations/telegram/triggers.md#telegram-message-trigger) | Triggers when a message is received by your Telegram bot |
| [Transcribe Youtube Video](block-integrations/misc.md#transcribe-youtube-video) | Transcribes a YouTube video using a proxy |
| [Twitter Add List Member](block-integrations/twitter/list_members.md#twitter-add-list-member) | This block adds a specified user to a Twitter List owned by the authenticated user |
| [Twitter Bookmark Tweet](block-integrations/twitter/bookmark.md#twitter-bookmark-tweet) | This block bookmarks a tweet on Twitter |

View File

@@ -103,8 +103,6 @@
* [Stagehand Blocks](block-integrations/stagehand/blocks.md)
* [System Library Operations](block-integrations/system/library_operations.md)
* [System Store Operations](block-integrations/system/store_operations.md)
* [Telegram Blocks](block-integrations/telegram/blocks.md)
* [Telegram Triggers](block-integrations/telegram/triggers.md)
* [Text](block-integrations/text.md)
* [Todoist Comments](block-integrations/todoist/comments.md)
* [Todoist Labels](block-integrations/todoist/labels.md)

View File

@@ -1,348 +0,0 @@
# Telegram Blocks
<!-- MANUAL: file_description -->
_Add a description of this category of blocks._
<!-- END MANUAL -->
## Delete Telegram Message
### What it is
Delete a message from a Telegram chat. Bots can delete their own messages and incoming messages in private chats at any time.
### How it works
<!-- MANUAL: how_it_works -->
_Add technical explanation here._
<!-- END MANUAL -->
### Inputs
| Input | Description | Type | Required |
|-------|-------------|------|----------|
| chat_id | The chat ID containing the message | int | Yes |
| message_id | The ID of the message to delete | int | Yes |
### Outputs
| Output | Description | Type |
|--------|-------------|------|
| error | Error message if the operation failed | str |
| status | Status of the operation | str |
### Possible use case
<!-- MANUAL: use_case -->
_Add practical use case examples here._
<!-- END MANUAL -->
---
## Edit Telegram Message
### What it is
Edit the text of an existing message sent by the bot.
### How it works
<!-- MANUAL: how_it_works -->
_Add technical explanation here._
<!-- END MANUAL -->
### Inputs
| Input | Description | Type | Required |
|-------|-------------|------|----------|
| chat_id | The chat ID containing the message | int | Yes |
| message_id | The ID of the message to edit | int | Yes |
| text | New text for the message (max 4096 characters) | str | Yes |
| parse_mode | Message formatting mode | "none" \| "Markdown" \| "MarkdownV2" \| "HTML" | No |
### Outputs
| Output | Description | Type |
|--------|-------------|------|
| error | Error message if the operation failed | str |
| message_id | The ID of the edited message | int |
| status | Status of the operation | str |
### Possible use case
<!-- MANUAL: use_case -->
_Add practical use case examples here._
<!-- END MANUAL -->
---
## Get Telegram File
### What it is
Download a file from Telegram using its file_id. Use this to process photos, voice messages, or documents received.
### How it works
<!-- MANUAL: how_it_works -->
_Add technical explanation here._
<!-- END MANUAL -->
### Inputs
| Input | Description | Type | Required |
|-------|-------------|------|----------|
| file_id | The Telegram file_id to download. Get this from trigger outputs (photo_file_id, voice_file_id, etc.) | str | Yes |
### Outputs
| Output | Description | Type |
|--------|-------------|------|
| error | Error message if the operation failed | str |
| file | The downloaded file (workspace:// reference or data URI) | str (file) |
| status | Status of the operation | str |
### Possible use case
<!-- MANUAL: use_case -->
_Add practical use case examples here._
<!-- END MANUAL -->
---
## Reply To Telegram Message
### What it is
Reply to a specific message in a Telegram chat.
### How it works
<!-- MANUAL: how_it_works -->
_Add technical explanation here._
<!-- END MANUAL -->
### Inputs
| Input | Description | Type | Required |
|-------|-------------|------|----------|
| chat_id | The chat ID where the message is | int | Yes |
| reply_to_message_id | The message ID to reply to | int | Yes |
| text | The reply text | str | Yes |
| parse_mode | Message formatting mode | "none" \| "Markdown" \| "MarkdownV2" \| "HTML" | No |
### Outputs
| Output | Description | Type |
|--------|-------------|------|
| error | Error message if the operation failed | str |
| message_id | The ID of the reply message | int |
| status | Status of the operation | str |
### Possible use case
<!-- MANUAL: use_case -->
_Add practical use case examples here._
<!-- END MANUAL -->
---
## Send Telegram Audio
### What it is
Send an audio file to a Telegram chat. The file is displayed in the music player. For voice messages, use the Send Voice block instead.
### How it works
<!-- MANUAL: how_it_works -->
_Add technical explanation here._
<!-- END MANUAL -->
### Inputs
| Input | Description | Type | Required |
|-------|-------------|------|----------|
| chat_id | The chat ID to send the audio to | int | Yes |
| audio | Audio file to send (MP3 or M4A format). Can be URL, data URI, or workspace:// reference. | str (file) | Yes |
| caption | Caption for the audio file | str | No |
| title | Track title | str | No |
| performer | Track performer/artist | str | No |
| duration | Duration in seconds | int | No |
| reply_to_message_id | Message ID to reply to | int | No |
### Outputs
| Output | Description | Type |
|--------|-------------|------|
| error | Error message if the operation failed | str |
| message_id | The ID of the sent message | int |
| status | Status of the operation | str |
### Possible use case
<!-- MANUAL: use_case -->
_Add practical use case examples here._
<!-- END MANUAL -->
---
## Send Telegram Document
### What it is
Send a document (any file type) to a Telegram chat.
### How it works
<!-- MANUAL: how_it_works -->
_Add technical explanation here._
<!-- END MANUAL -->
### Inputs
| Input | Description | Type | Required |
|-------|-------------|------|----------|
| chat_id | The chat ID to send the document to | int | Yes |
| document | Document to send (any file type). Can be URL, data URI, or workspace:// reference. | str (file) | Yes |
| filename | Filename shown to the recipient. If empty, the original filename is used (may be a random ID for uploaded files). | str | No |
| caption | Caption for the document | str | No |
| parse_mode | Caption formatting mode | "none" \| "Markdown" \| "MarkdownV2" \| "HTML" | No |
| reply_to_message_id | Message ID to reply to | int | No |
### Outputs
| Output | Description | Type |
|--------|-------------|------|
| error | Error message if the operation failed | str |
| message_id | The ID of the sent message | int |
| status | Status of the operation | str |
### Possible use case
<!-- MANUAL: use_case -->
_Add practical use case examples here._
<!-- END MANUAL -->
---
## Send Telegram Message
### What it is
Send a text message to a Telegram chat.
### How it works
<!-- MANUAL: how_it_works -->
_Add technical explanation here._
<!-- END MANUAL -->
### Inputs
| Input | Description | Type | Required |
|-------|-------------|------|----------|
| chat_id | The chat ID to send the message to. Get this from the trigger block's chat_id output. | int | Yes |
| text | The text message to send (max 4096 characters) | str | Yes |
| parse_mode | Message formatting mode (Markdown, HTML, or none) | "none" \| "Markdown" \| "MarkdownV2" \| "HTML" | No |
| reply_to_message_id | Message ID to reply to | int | No |
| disable_notification | Send message silently (no notification sound) | bool | No |
### Outputs
| Output | Description | Type |
|--------|-------------|------|
| error | Error message if the operation failed | str |
| message_id | The ID of the sent message | int |
| status | Status of the operation | str |
### Possible use case
<!-- MANUAL: use_case -->
_Add practical use case examples here._
<!-- END MANUAL -->
---
## Send Telegram Photo
### What it is
Send a photo to a Telegram chat.
### How it works
<!-- MANUAL: how_it_works -->
_Add technical explanation here._
<!-- END MANUAL -->
### Inputs
| Input | Description | Type | Required |
|-------|-------------|------|----------|
| chat_id | The chat ID to send the photo to | int | Yes |
| photo | Photo to send (URL, data URI, or workspace:// reference). URLs are preferred as Telegram will fetch them directly. | str (file) | Yes |
| caption | Caption for the photo (max 1024 characters) | str | No |
| parse_mode | Caption formatting mode | "none" \| "Markdown" \| "MarkdownV2" \| "HTML" | No |
| reply_to_message_id | Message ID to reply to | int | No |
### Outputs
| Output | Description | Type |
|--------|-------------|------|
| error | Error message if the operation failed | str |
| message_id | The ID of the sent message | int |
| status | Status of the operation | str |
### Possible use case
<!-- MANUAL: use_case -->
_Add practical use case examples here._
<!-- END MANUAL -->
---
## Send Telegram Video
### What it is
Send a video to a Telegram chat.
### How it works
<!-- MANUAL: how_it_works -->
_Add technical explanation here._
<!-- END MANUAL -->
### Inputs
| Input | Description | Type | Required |
|-------|-------------|------|----------|
| chat_id | The chat ID to send the video to | int | Yes |
| video | Video to send (MP4 format). Can be URL, data URI, or workspace:// reference. | str (file) | Yes |
| caption | Caption for the video | str | No |
| parse_mode | Caption formatting mode | "none" \| "Markdown" \| "MarkdownV2" \| "HTML" | No |
| duration | Duration in seconds | int | No |
| reply_to_message_id | Message ID to reply to | int | No |
### Outputs
| Output | Description | Type |
|--------|-------------|------|
| error | Error message if the operation failed | str |
| message_id | The ID of the sent message | int |
| status | Status of the operation | str |
### Possible use case
<!-- MANUAL: use_case -->
_Add practical use case examples here._
<!-- END MANUAL -->
---
## Send Telegram Voice
### What it is
Send a voice message to a Telegram chat. Voice must be OGG format with OPUS codec.
### How it works
<!-- MANUAL: how_it_works -->
_Add technical explanation here._
<!-- END MANUAL -->
### Inputs
| Input | Description | Type | Required |
|-------|-------------|------|----------|
| chat_id | The chat ID to send the voice message to | int | Yes |
| voice | Voice message to send (OGG format with OPUS codec). Can be URL, data URI, or workspace:// reference. | str (file) | Yes |
| caption | Caption for the voice message | str | No |
| duration | Duration in seconds | int | No |
| reply_to_message_id | Message ID to reply to | int | No |
### Outputs
| Output | Description | Type |
|--------|-------------|------|
| error | Error message if the operation failed | str |
| message_id | The ID of the sent message | int |
| status | Status of the operation | str |
### Possible use case
<!-- MANUAL: use_case -->
_Add practical use case examples here._
<!-- END MANUAL -->
---

View File

@@ -1,77 +0,0 @@
# Telegram Triggers
<!-- MANUAL: file_description -->
_Add a description of this category of blocks._
<!-- END MANUAL -->
## Telegram Message Reaction Trigger
### What it is
Triggers when a reaction to a message is changed. Works in private chats automatically. In groups, the bot must be an administrator.
### How it works
<!-- MANUAL: how_it_works -->
_Add technical explanation here._
<!-- END MANUAL -->
### Outputs
| Output | Description | Type |
|--------|-------------|------|
| error | Error message if the operation failed | str |
| payload | The complete webhook payload from Telegram | Dict[str, Any] |
| chat_id | The chat ID where the reaction occurred | int |
| message_id | The message ID that was reacted to | int |
| user_id | The user ID who changed the reaction | int |
| username | Username of the user (may be empty) | str |
| new_reactions | List of new reactions on the message | List[Any] |
| old_reactions | List of previous reactions on the message | List[Any] |
### Possible use case
<!-- MANUAL: use_case -->
_Add practical use case examples here._
<!-- END MANUAL -->
---
## Telegram Message Trigger
### What it is
Triggers when a message is received by your Telegram bot. Supports text, photos, voice messages, and audio files.
### How it works
<!-- MANUAL: how_it_works -->
_Add technical explanation here._
<!-- END MANUAL -->
### Inputs
| Input | Description | Type | Required |
|-------|-------------|------|----------|
| events | Types of messages to receive | Message Types | Yes |
### Outputs
| Output | Description | Type |
|--------|-------------|------|
| error | Error message if the operation failed | str |
| payload | The complete webhook payload from Telegram | Dict[str, Any] |
| chat_id | The chat ID where the message was received. Use this to send replies. | int |
| message_id | The unique message ID | int |
| user_id | The user ID who sent the message | int |
| username | Username of the sender (may be empty) | str |
| first_name | First name of the sender | str |
| event | The message type (text, photo, voice, audio, etc.) | str |
| text | Text content of the message (for text messages) | str |
| photo_file_id | File ID of the photo (for photo messages). Use GetTelegramFileBlock to download. | str |
| voice_file_id | File ID of the voice message (for voice messages). Use GetTelegramFileBlock to download. | str |
| audio_file_id | File ID of the audio file (for audio messages). Use GetTelegramFileBlock to download. | str |
| file_id | File ID for document/video messages. Use GetTelegramFileBlock to download. | str |
| file_name | Original filename (for document/audio messages) | str |
| caption | Caption for media messages | str |
### Possible use case
<!-- MANUAL: use_case -->
_Add practical use case examples here._
<!-- END MANUAL -->
---