refactor(platform): simplify copilot block, drop asyncio.timeout

- Remove asyncio.timeout wrapper: the SDK's internal stream must not
  be cancelled mid-flight (corrupts anyio memory stream, see
  sdk/service.py L998-1001). Matches normal copilot behavior.
- Remove timeout input field (no longer needed).
- Extract _check_recursion, _reset_recursion, _get_or_create_session
  as static helpers to reduce indentation and improve readability.
- Use ge=1 on max_recursion_depth SchemaField for schema-level
  validation instead of runtime check.
- Drop explicit TimeoutError catch (no longer raised).
This commit is contained in:
Zamil Majdy
2026-03-16 23:27:57 +07:00
parent 348e9f8e27
commit 8f73813565
2 changed files with 83 additions and 103 deletions

View File

@@ -1,4 +1,3 @@
import asyncio
import contextvars
import json
from typing import TYPE_CHECKING
@@ -13,17 +12,15 @@ from backend.blocks._base import (
from backend.data.model import SchemaField
if TYPE_CHECKING:
from backend.copilot.model import ChatSession
from backend.executor.utils import ExecutionContext
# Task-scoped recursion depth counter.
# Incremented each time execute_copilot runs, decremented on exit.
# Because contextvars are scoped to the current asyncio task, concurrent
# graph executions each get their own independent counter.
# Task-scoped recursion depth counter & chain-wide limit.
# contextvars are scoped to the current asyncio task, so concurrent
# graph executions each get independent counters.
_copilot_recursion_depth: contextvars.ContextVar[int] = contextvars.ContextVar(
"_copilot_recursion_depth", default=0
)
# Chain-wide effective recursion limit. Once the first caller sets a limit,
# nested calls cannot raise it — they can only lower it.
_copilot_recursion_limit: contextvars.ContextVar[int | None] = contextvars.ContextVar(
"_copilot_recursion_limit", default=None
)
@@ -68,22 +65,13 @@ class AutogptCopilotBlock(Block):
advanced=True,
)
timeout: int = SchemaField(
description=(
"Maximum execution time in seconds. Copilot tasks can involve "
"multiple tool calls and may take a while. Set appropriately "
"for your task complexity."
),
default=300,
advanced=True,
)
max_recursion_depth: int = SchemaField(
description=(
"Maximum nesting depth when the copilot calls this block "
"recursively (sub-agent pattern). Prevents infinite loops."
),
default=3,
ge=1,
advanced=True,
)
@@ -135,7 +123,6 @@ class AutogptCopilotBlock(Block):
"prompt": "List my agents",
"system_context": "",
"session_id": "",
"timeout": 300,
"max_recursion_depth": 3,
},
test_output=[
@@ -170,22 +157,57 @@ class AutogptCopilotBlock(Block):
},
)
@staticmethod
def _check_recursion(max_recursion_depth: int) -> tuple:
"""Check and increment recursion depth. Returns tokens to reset later."""
current_depth = _copilot_recursion_depth.get()
inherited_limit = _copilot_recursion_limit.get()
effective_limit = (
max_recursion_depth
if inherited_limit is None
else min(inherited_limit, max_recursion_depth)
)
if current_depth >= effective_limit:
raise RuntimeError(
f"Copilot recursion depth limit reached ({effective_limit}). "
"The copilot has called itself too many times."
)
return (
_copilot_recursion_depth.set(current_depth + 1),
_copilot_recursion_limit.set(effective_limit),
)
@staticmethod
def _reset_recursion(tokens: tuple) -> None:
_copilot_recursion_depth.reset(tokens[0])
_copilot_recursion_limit.reset(tokens[1])
@staticmethod
async def _get_or_create_session(
session_id: str, user_id: str
) -> "ChatSession": # noqa: F821
from backend.copilot.model import create_chat_session, get_chat_session
if session_id:
session = await get_chat_session(session_id, user_id)
if not session:
raise ValueError(
f"Copilot session {session_id} not found. "
"Use an empty session_id to start a new session."
)
return session
return await create_chat_session(user_id)
async def execute_copilot(
self,
prompt: str,
system_context: str,
session_id: str,
timeout: int,
max_recursion_depth: int,
user_id: str,
) -> tuple[str, list[dict], str, str, dict]:
"""Invoke the copilot and collect all stream results.
Returns:
Tuple of (response, tool_calls, conversation_history_json,
session_id, token_usage)
"""
from backend.copilot.model import create_chat_session, get_chat_session
"""Invoke the copilot and collect all stream results."""
from backend.copilot.model import get_chat_session
from backend.copilot.response_model import (
StreamError,
StreamTextDelta,
@@ -195,41 +217,14 @@ class AutogptCopilotBlock(Block):
)
from backend.copilot.sdk.service import stream_chat_completion_sdk
# -- Recursion guard --
current_depth = _copilot_recursion_depth.get()
inherited_limit = _copilot_recursion_limit.get()
effective_limit = (
max_recursion_depth
if inherited_limit is None
else min(inherited_limit, max_recursion_depth)
)
if current_depth >= effective_limit:
raise RuntimeError(
f"Copilot recursion depth limit reached ({effective_limit}). "
"The copilot has called itself too many times."
)
depth_token = _copilot_recursion_depth.set(current_depth + 1)
limit_token = _copilot_recursion_limit.set(effective_limit)
tokens = self._check_recursion(max_recursion_depth)
try:
# -- Session management --
if session_id:
session = await get_chat_session(session_id, user_id)
if not session:
raise ValueError(
f"Copilot session {session_id} not found. "
"Use an empty session_id to start a new session."
)
else:
session = await create_chat_session(user_id)
session = await self._get_or_create_session(session_id, user_id)
# -- Build effective prompt --
effective_prompt = prompt
if system_context:
effective_prompt = f"[System Context: {system_context}]\n\n{prompt}"
# -- Stream consumption --
response_parts: list[str] = []
tool_calls: list[dict] = []
tool_calls_by_id: dict[str, dict] = {}
@@ -239,38 +234,38 @@ class AutogptCopilotBlock(Block):
"totalTokens": 0,
}
async with asyncio.timeout(timeout):
async for event in stream_chat_completion_sdk(
session_id=session.session_id,
message=effective_prompt,
is_user_message=True,
user_id=user_id,
session=session,
):
if isinstance(event, StreamTextDelta):
response_parts.append(event.delta)
elif isinstance(event, StreamToolInputAvailable):
entry = {
"toolCallId": event.toolCallId,
"toolName": event.toolName,
"input": event.input,
"output": None,
"success": None,
}
tool_calls.append(entry)
tool_calls_by_id[event.toolCallId] = entry
elif isinstance(event, StreamToolOutputAvailable):
if tc := tool_calls_by_id.get(event.toolCallId):
tc["output"] = event.output
tc["success"] = event.success
elif isinstance(event, StreamUsage):
total_usage["promptTokens"] += event.promptTokens
total_usage["completionTokens"] += event.completionTokens
total_usage["totalTokens"] += event.totalTokens
elif isinstance(event, StreamError):
raise RuntimeError(f"Copilot error: {event.errorText}")
# No asyncio.timeout — the SDK's internal stream must not be
# cancelled mid-flight (see sdk/service.py L998-1001).
async for event in stream_chat_completion_sdk(
session_id=session.session_id,
message=effective_prompt,
is_user_message=True,
user_id=user_id,
session=session,
):
if isinstance(event, StreamTextDelta):
response_parts.append(event.delta)
elif isinstance(event, StreamToolInputAvailable):
entry = {
"toolCallId": event.toolCallId,
"toolName": event.toolName,
"input": event.input,
"output": None,
"success": None,
}
tool_calls.append(entry)
tool_calls_by_id[event.toolCallId] = entry
elif isinstance(event, StreamToolOutputAvailable):
if tc := tool_calls_by_id.get(event.toolCallId):
tc["output"] = event.output
tc["success"] = event.success
elif isinstance(event, StreamUsage):
total_usage["promptTokens"] += event.promptTokens
total_usage["completionTokens"] += event.completionTokens
total_usage["totalTokens"] += event.totalTokens
elif isinstance(event, StreamError):
raise RuntimeError(f"Copilot error: {event.errorText}")
# -- Build conversation history from session --
updated_session = await get_chat_session(session.session_id, user_id)
history_json = "[]"
if updated_session and updated_session.messages:
@@ -279,18 +274,15 @@ class AutogptCopilotBlock(Block):
default=str,
)
response_text = "".join(response_parts)
return (
response_text,
"".join(response_parts),
tool_calls,
history_json,
session.session_id,
total_usage,
)
finally:
_copilot_recursion_depth.reset(depth_token)
_copilot_recursion_limit.reset(limit_token)
self._reset_recursion(tokens)
async def run(
self,
@@ -303,14 +295,6 @@ class AutogptCopilotBlock(Block):
yield "error", "Prompt cannot be empty."
return
if input_data.timeout <= 0:
yield "error", "Timeout must be greater than 0 seconds."
return
if input_data.max_recursion_depth < 1:
yield "error", "max_recursion_depth must be at least 1."
return
if not execution_context.user_id:
yield "error", "Cannot run copilot without an authenticated user."
return
@@ -320,7 +304,6 @@ class AutogptCopilotBlock(Block):
prompt=input_data.prompt,
system_context=input_data.system_context,
session_id=input_data.session_id,
timeout=input_data.timeout,
max_recursion_depth=input_data.max_recursion_depth,
user_id=execution_context.user_id,
)
@@ -330,7 +313,5 @@ class AutogptCopilotBlock(Block):
yield "conversation_history", history
yield "session_id", sid
yield "token_usage", usage
except TimeoutError:
yield "error", (f"Copilot execution timed out after {input_data.timeout}s.")
except Exception as e:
yield "error", str(e)

View File

@@ -55,7 +55,6 @@ This block invokes the platform's copilot system directly via `stream_chat_compl
| prompt | The task or instruction for the copilot to execute. The copilot has access to platform tools like agent management, workspace files, web fetch, block execution, and more. | str | Yes |
| system_context | Optional additional context prepended to the prompt. Use this to constrain copilot behavior, provide domain context, or set output format requirements. | str | No |
| session_id | Session ID to continue an existing copilot conversation. Leave empty to start a new session. Use the session_id output from a previous run to continue. | str | No |
| timeout | Maximum execution time in seconds. Copilot tasks can involve multiple tool calls and may take a while. Set appropriately for your task complexity. | int | No |
| max_recursion_depth | Maximum nesting depth when the copilot calls this block recursively (sub-agent pattern). Prevents infinite loops. | int | No |
### Outputs