mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-04-08 03:00:28 -04:00
fix(copilot): remove stream timeout, add error propagation to frontend (#12175)
## Summary
Fixes critical reliability issues where long-running copilot sessions
were forcibly terminated and failures showed no error messages to users.
## Issues Fixed
1. **Silent failures**: Tasks failed but frontend showed "stopped" with
zero explanation
2. **Premature timeout**: Sessions auto-expired after 5 minutes even
when actively running
## Changes
### Error propagation to frontend
- Add `error_message` parameter to `mark_task_completed()`
- When `status="failed"`, publish `StreamError` before `StreamFinish` so
frontend displays reason
- Update all failure callers with specific error messages:
- Session not found: `"Session {id} not found"`
- Tool setup failed: `"Failed to setup tool {name}: {error}"`
- Task cancelled: `"Task was cancelled"`
### Remove stream timeout
- Delete `stream_timeout` config (was 300s/5min)
- Remove auto-expiry logic in `get_active_task_for_session()`
- Sessions now run indefinitely — user controls stopping via UI
## Why
**Auto-expiry was broken:**
- Used `created_at` (task start) not last activity
- SDK sessions with multiple LLM calls + subagent Tasks easily run
20-30+ minutes
- A task publishing chunks every second still got killed at 5min mark
- Hard timeout is inappropriate for long-running AI agents
**Error propagation was missing:**
- `mark_task_completed(status="failed")` only sent `StreamFinish`
- No `StreamError` event = frontend had no message to show user
- Backend logs showed errors but user saw nothing
## Test Plan
- [x] Formatter, linter, type-check pass
- [ ] Start a copilot session with Task tool (spawns subagent)
- [ ] Verify session runs beyond 5 minutes without auto-expiry
- [ ] Cancel a running session → frontend shows "Task was cancelled"
error
- [ ] Trigger a tool setup failure → frontend shows error message
- [ ] Session continues running until user clicks stop or task completes
## Files Changed
- `backend/copilot/config.py` — removed `stream_timeout`
- `backend/copilot/stream_registry.py` — removed auto-expiry, added
error propagation
- `backend/copilot/service.py` — error messages for 2 failure paths
- `backend/copilot/executor/processor.py` — error message for
cancellation
This commit is contained in:
@@ -27,7 +27,6 @@ class ChatConfig(BaseSettings):
|
||||
session_ttl: int = Field(default=43200, description="Session TTL in seconds")
|
||||
|
||||
# Streaming Configuration
|
||||
stream_timeout: int = Field(default=300, description="Stream timeout in seconds")
|
||||
max_retries: int = Field(
|
||||
default=3,
|
||||
description="Max retries for fallback path (SDK handles retries internally)",
|
||||
|
||||
@@ -266,7 +266,11 @@ class CoPilotProcessor:
|
||||
|
||||
except asyncio.CancelledError:
|
||||
log.info("Task cancelled")
|
||||
await stream_registry.mark_task_completed(entry.task_id, status="failed")
|
||||
await stream_registry.mark_task_completed(
|
||||
entry.task_id,
|
||||
status="failed",
|
||||
error_message="Task was cancelled",
|
||||
)
|
||||
raise
|
||||
|
||||
except Exception as e:
|
||||
|
||||
@@ -1563,7 +1563,11 @@ async def _yield_tool_call(
|
||||
await _mark_operation_completed(tool_call_id)
|
||||
# Mark stream registry task as failed if it was created
|
||||
try:
|
||||
await stream_registry.mark_task_completed(task_id, status="failed")
|
||||
await stream_registry.mark_task_completed(
|
||||
task_id,
|
||||
status="failed",
|
||||
error_message=f"Failed to setup tool {tool_name}: {e}",
|
||||
)
|
||||
except Exception as mark_err:
|
||||
logger.warning(f"Failed to mark task {task_id} as failed: {mark_err}")
|
||||
logger.error(
|
||||
@@ -1731,7 +1735,11 @@ async def _execute_long_running_tool_with_streaming(
|
||||
session = await get_chat_session(session_id, user_id)
|
||||
if not session:
|
||||
logger.error(f"Session {session_id} not found for background tool")
|
||||
await stream_registry.mark_task_completed(task_id, status="failed")
|
||||
await stream_registry.mark_task_completed(
|
||||
task_id,
|
||||
status="failed",
|
||||
error_message=f"Session {session_id} not found",
|
||||
)
|
||||
return
|
||||
|
||||
# Pass operation_id and task_id to the tool for async processing
|
||||
|
||||
@@ -644,6 +644,8 @@ async def _stream_listener(
|
||||
async def mark_task_completed(
|
||||
task_id: str,
|
||||
status: Literal["completed", "failed"] = "completed",
|
||||
*,
|
||||
error_message: str | None = None,
|
||||
) -> bool:
|
||||
"""Mark a task as completed and publish finish event.
|
||||
|
||||
@@ -654,6 +656,10 @@ async def mark_task_completed(
|
||||
Args:
|
||||
task_id: Task ID to mark as completed
|
||||
status: Final status ("completed" or "failed")
|
||||
error_message: If provided and status="failed", publish a StreamError
|
||||
before StreamFinish so connected clients see why the task ended.
|
||||
If not provided, no StreamError is published (caller should publish
|
||||
manually if needed to avoid duplicates).
|
||||
|
||||
Returns:
|
||||
True if task was newly marked completed, False if already completed/failed
|
||||
@@ -669,6 +675,17 @@ async def mark_task_completed(
|
||||
logger.debug(f"Task {task_id} already completed/failed, skipping")
|
||||
return False
|
||||
|
||||
# Publish error event before finish so connected clients know WHY the
|
||||
# task ended. Only publish if caller provided an explicit error message
|
||||
# to avoid duplicates with code paths that manually publish StreamError.
|
||||
# This is best-effort — if it fails, the StreamFinish still ensures
|
||||
# listeners clean up.
|
||||
if status == "failed" and error_message:
|
||||
try:
|
||||
await publish_chunk(task_id, StreamError(errorText=error_message))
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to publish error event for task {task_id}: {e}")
|
||||
|
||||
# THEN publish finish event (best-effort - listeners can detect via status polling)
|
||||
try:
|
||||
await publish_chunk(task_id, StreamFinish())
|
||||
@@ -821,27 +838,6 @@ async def get_active_task_for_session(
|
||||
if task_user_id and user_id != task_user_id:
|
||||
continue
|
||||
|
||||
# Auto-expire stale tasks that exceeded stream_timeout
|
||||
created_at_str = meta.get("created_at", "")
|
||||
if created_at_str:
|
||||
try:
|
||||
created_at = datetime.fromisoformat(created_at_str)
|
||||
age_seconds = (
|
||||
datetime.now(timezone.utc) - created_at
|
||||
).total_seconds()
|
||||
if age_seconds > config.stream_timeout:
|
||||
logger.warning(
|
||||
f"[TASK_LOOKUP] Auto-expiring stale task {task_id[:8]}... "
|
||||
f"(age={age_seconds:.0f}s > timeout={config.stream_timeout}s)"
|
||||
)
|
||||
await mark_task_completed(task_id, "failed")
|
||||
continue
|
||||
except (ValueError, TypeError) as exc:
|
||||
logger.warning(
|
||||
f"[TASK_LOOKUP] Failed to parse created_at "
|
||||
f"for task {task_id[:8]}...: {exc}"
|
||||
)
|
||||
|
||||
logger.info(
|
||||
f"[TASK_LOOKUP] Found running task {task_id[:8]}... for session {session_id[:8]}..."
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user