mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-04-30 03:00:41 -04:00
## Changes 🏗️
Adds Redis-based SSE reconnection support for long-running CoPilot
operations (like Agent Generator), enabling clients to reconnect and
resume receiving updates after disconnection.
### What this does:
- **Stream Registry** - Redis-backed task tracking with message
persistence via Redis Streams
- **SSE Reconnection** - Clients can reconnect to active tasks using
`task_id` and `last_message_id`
- **Duplicate Message Fix** - Filters out in-progress assistant messages
from session response when active stream exists
- **Completion Consumer** - Handles background task completion
notifications via Redis Streams
### Architecture:
```
1. User sends message → Backend creates task in Redis
2. SSE chunks written to Redis Stream for persistence
3. Client receives chunks via SSE subscription
4. If client disconnects → Task continues in background
5. Client reconnects → GET /sessions/{id} returns active_stream info
6. Client subscribes to /tasks/{task_id}/stream with last_message_id
7. Missed messages replayed from Redis Stream
```
### Key endpoints:
- `GET /sessions/{session_id}` - Returns `active_stream` info if task is
running
- `GET /tasks/{task_id}/stream?last_message_id=X` - SSE endpoint for
reconnection
- `GET /tasks/{task_id}` - Get task status
- `POST /operations/{op_id}/complete` - Webhook for external service
completion
### Duplicate message fix:
When `GET /sessions/{id}` detects an active stream:
1. Filters out the in-progress assistant message from response
2. Returns `last_message_id="0-0"` so client replays stream from
beginning
3. Client receives complete response only through SSE (single source of
truth)
### Frontend changes:
- Task persistence in localStorage for cross-tab reconnection
- Stream event dispatcher handles reconnection flow
- Deduplication logic prevents duplicate messages
### Testing:
- Manual testing of reconnection scenarios
- Verified duplicate message fix works correctly
## Related
- Resolves SSE timeout issues for Agent Generator
- Fixes duplicate message bug on reconnection
90 lines
2.3 KiB
TypeScript
90 lines
2.3 KiB
TypeScript
import type { ToolArguments, ToolResult } from "@/types/chat";
|
|
import type { StreamChunk, VercelStreamChunk } from "./chat-types";
|
|
|
|
const LEGACY_STREAM_TYPES = new Set<StreamChunk["type"]>([
|
|
"text_chunk",
|
|
"text_ended",
|
|
"tool_call",
|
|
"tool_call_start",
|
|
"tool_response",
|
|
"login_needed",
|
|
"need_login",
|
|
"credentials_needed",
|
|
"error",
|
|
"usage",
|
|
"stream_end",
|
|
]);
|
|
|
|
export function isLegacyStreamChunk(
|
|
chunk: StreamChunk | VercelStreamChunk,
|
|
): chunk is StreamChunk {
|
|
return LEGACY_STREAM_TYPES.has(chunk.type as StreamChunk["type"]);
|
|
}
|
|
|
|
export function normalizeStreamChunk(
|
|
chunk: StreamChunk | VercelStreamChunk,
|
|
): StreamChunk | null {
|
|
if (isLegacyStreamChunk(chunk)) return chunk;
|
|
|
|
switch (chunk.type) {
|
|
case "text-delta":
|
|
// Vercel AI SDK sends "delta" for text content
|
|
return { type: "text_chunk", content: chunk.delta };
|
|
case "text-end":
|
|
return { type: "text_ended" };
|
|
case "tool-input-available":
|
|
return {
|
|
type: "tool_call_start",
|
|
tool_id: chunk.toolCallId,
|
|
tool_name: chunk.toolName,
|
|
arguments: chunk.input as ToolArguments,
|
|
};
|
|
case "tool-output-available":
|
|
return {
|
|
type: "tool_response",
|
|
tool_id: chunk.toolCallId,
|
|
tool_name: chunk.toolName,
|
|
result: chunk.output as ToolResult,
|
|
success: chunk.success ?? true,
|
|
};
|
|
case "usage":
|
|
return {
|
|
type: "usage",
|
|
promptTokens: chunk.promptTokens,
|
|
completionTokens: chunk.completionTokens,
|
|
totalTokens: chunk.totalTokens,
|
|
};
|
|
case "error":
|
|
return {
|
|
type: "error",
|
|
message: chunk.errorText,
|
|
code: chunk.code,
|
|
details: chunk.details,
|
|
};
|
|
case "finish":
|
|
return { type: "stream_end" };
|
|
case "start":
|
|
// Start event with optional taskId for reconnection
|
|
return chunk.taskId
|
|
? { type: "stream_start", taskId: chunk.taskId }
|
|
: null;
|
|
case "text-start":
|
|
return null;
|
|
case "tool-input-start":
|
|
return {
|
|
type: "tool_call_start",
|
|
tool_id: chunk.toolCallId,
|
|
tool_name: chunk.toolName,
|
|
arguments: {},
|
|
};
|
|
}
|
|
}
|
|
|
|
export const MAX_RETRIES = 3;
|
|
export const INITIAL_RETRY_DELAY = 1000;
|
|
|
|
export function parseSSELine(line: string): string | null {
|
|
if (line.startsWith("data: ")) return line.slice(6);
|
|
return null;
|
|
}
|