mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-02-18 10:41:49 -05:00
Uncouple Copilot task execution from the REST API server. This should help performance and scalability, and allows task execution to continue regardless of the state of the user's connection. - Resolves #12023 ### Changes 🏗️ - Add `backend.copilot.executor`->`CoPilotExecutor` (setup similar to `backend.executor`->`ExecutionManager`). This executor service uses RabbitMQ-based task distribution, and sticks with the existing Redis Streams setup for task output. It uses a cluster lock mechanism to ensure a task is only executed by one pod, and the `DatabaseManager` for pooled DB access. - Add `backend.data.db_accessors` for automatic choice of direct/proxied DB access Chat requests now flow: API → RabbitMQ → CoPilot Executor → Redis Streams → SSE Client. This enables horizontal scaling of chat processing and isolates long-running LLM operations from the API service. - Move non-API Copilot stuff into `backend.copilot` (from `backend.api.features.chat`) - Updated import paths for all usages - Move `backend.executor.database` to `backend.data.db_manager` and add methods for copilot executor - Updated import paths for all usages - Make `backend.copilot.db` RPC-compatible (-> DB ops return ~~Prisma~~ Pydantic models) - Make `backend.data.workspace` RPC-compatible - Make `backend.data.graphs.get_store_listed_graphs` RPC-compatible DX: - Add `copilot_executor` service to Docker setup Config: - Add `Config.num_copilot_workers` (default 5) and `Config.copilot_executor_port` (default 8008) - Remove unused `Config.agent_server_port` > [!WARNING] > **This change adds a new microservice to the system, with entrypoint `backend.copilot.executor`.** > The `docker compose` setup has been updated, but if you run the Platform on something else, you'll have to update your deployment config to include this new service. > > When running locally, the `CoPilotExecutor` uses port 8008 by default. ### Checklist 📋 #### For code changes: - [x] I have clearly listed my changes in the PR description - [x] I have made a test plan - [x] I have tested my changes according to the test plan: - [x] Copilot works - [x] Processes messages when triggered - [x] Can use its tools #### For configuration changes: - [x] `.env.default` is updated or already compatible with my changes - [x] `docker-compose.yml` is updated or already compatible with my changes - [x] I have included a list of my configuration changes in the PR description (under **Changes**) --------- Co-authored-by: Zamil Majdy <zamil.majdy@agpt.co>
226 lines
6.5 KiB
Python
226 lines
6.5 KiB
Python
"""
|
|
Response models for Vercel AI SDK UI Stream Protocol.
|
|
|
|
This module implements the AI SDK UI Stream Protocol (v1) for streaming chat responses.
|
|
See: https://ai-sdk.dev/docs/ai-sdk-ui/stream-protocol
|
|
"""
|
|
|
|
from enum import Enum
|
|
from typing import Any
|
|
|
|
from pydantic import BaseModel, Field
|
|
|
|
from backend.util.json import dumps as json_dumps
|
|
|
|
|
|
class ResponseType(str, Enum):
|
|
"""Types of streaming responses following AI SDK protocol."""
|
|
|
|
# Message lifecycle
|
|
START = "start"
|
|
FINISH = "finish"
|
|
|
|
# Step lifecycle (one LLM API call within a message)
|
|
START_STEP = "start-step"
|
|
FINISH_STEP = "finish-step"
|
|
|
|
# Text streaming
|
|
TEXT_START = "text-start"
|
|
TEXT_DELTA = "text-delta"
|
|
TEXT_END = "text-end"
|
|
|
|
# Tool interaction
|
|
TOOL_INPUT_START = "tool-input-start"
|
|
TOOL_INPUT_AVAILABLE = "tool-input-available"
|
|
TOOL_OUTPUT_AVAILABLE = "tool-output-available"
|
|
|
|
# Other
|
|
ERROR = "error"
|
|
USAGE = "usage"
|
|
HEARTBEAT = "heartbeat"
|
|
|
|
|
|
class StreamBaseResponse(BaseModel):
|
|
"""Base response model for all streaming responses."""
|
|
|
|
type: ResponseType
|
|
|
|
def to_sse(self) -> str:
|
|
"""Convert to SSE format."""
|
|
return f"data: {self.model_dump_json()}\n\n"
|
|
|
|
|
|
# ========== Message Lifecycle ==========
|
|
|
|
|
|
class StreamStart(StreamBaseResponse):
|
|
"""Start of a new message."""
|
|
|
|
type: ResponseType = ResponseType.START
|
|
messageId: str = Field(..., description="Unique message ID")
|
|
taskId: str | None = Field(
|
|
default=None,
|
|
description="Task ID for SSE reconnection. Clients can reconnect using GET /tasks/{taskId}/stream",
|
|
)
|
|
|
|
def to_sse(self) -> str:
|
|
"""Convert to SSE format, excluding non-protocol fields like taskId."""
|
|
import json
|
|
|
|
data: dict[str, Any] = {
|
|
"type": self.type.value,
|
|
"messageId": self.messageId,
|
|
}
|
|
return f"data: {json.dumps(data)}\n\n"
|
|
|
|
|
|
class StreamFinish(StreamBaseResponse):
|
|
"""End of message/stream."""
|
|
|
|
type: ResponseType = ResponseType.FINISH
|
|
|
|
|
|
class StreamStartStep(StreamBaseResponse):
|
|
"""Start of a step (one LLM API call within a message).
|
|
|
|
The AI SDK uses this to add a step-start boundary to message.parts,
|
|
enabling visual separation between multiple LLM calls in a single message.
|
|
"""
|
|
|
|
type: ResponseType = ResponseType.START_STEP
|
|
|
|
|
|
class StreamFinishStep(StreamBaseResponse):
|
|
"""End of a step (one LLM API call within a message).
|
|
|
|
The AI SDK uses this to reset activeTextParts and activeReasoningParts,
|
|
so the next LLM call in a tool-call continuation starts with clean state.
|
|
"""
|
|
|
|
type: ResponseType = ResponseType.FINISH_STEP
|
|
|
|
|
|
# ========== Text Streaming ==========
|
|
|
|
|
|
class StreamTextStart(StreamBaseResponse):
|
|
"""Start of a text block."""
|
|
|
|
type: ResponseType = ResponseType.TEXT_START
|
|
id: str = Field(..., description="Text block ID")
|
|
|
|
|
|
class StreamTextDelta(StreamBaseResponse):
|
|
"""Streaming text content delta."""
|
|
|
|
type: ResponseType = ResponseType.TEXT_DELTA
|
|
id: str = Field(..., description="Text block ID")
|
|
delta: str = Field(..., description="Text content delta")
|
|
|
|
|
|
class StreamTextEnd(StreamBaseResponse):
|
|
"""End of a text block."""
|
|
|
|
type: ResponseType = ResponseType.TEXT_END
|
|
id: str = Field(..., description="Text block ID")
|
|
|
|
|
|
# ========== Tool Interaction ==========
|
|
|
|
|
|
class StreamToolInputStart(StreamBaseResponse):
|
|
"""Tool call started notification."""
|
|
|
|
type: ResponseType = ResponseType.TOOL_INPUT_START
|
|
toolCallId: str = Field(..., description="Unique tool call ID")
|
|
toolName: str = Field(..., description="Name of the tool being called")
|
|
|
|
|
|
class StreamToolInputAvailable(StreamBaseResponse):
|
|
"""Tool input is ready for execution."""
|
|
|
|
type: ResponseType = ResponseType.TOOL_INPUT_AVAILABLE
|
|
toolCallId: str = Field(..., description="Unique tool call ID")
|
|
toolName: str = Field(..., description="Name of the tool being called")
|
|
input: dict[str, Any] = Field(
|
|
default_factory=dict, description="Tool input arguments"
|
|
)
|
|
|
|
|
|
class StreamToolOutputAvailable(StreamBaseResponse):
|
|
"""Tool execution result."""
|
|
|
|
type: ResponseType = ResponseType.TOOL_OUTPUT_AVAILABLE
|
|
toolCallId: str = Field(..., description="Tool call ID this responds to")
|
|
output: str | dict[str, Any] = Field(..., description="Tool execution output")
|
|
# Keep these for internal backend use
|
|
toolName: str | None = Field(
|
|
default=None, description="Name of the tool that was executed"
|
|
)
|
|
success: bool = Field(
|
|
default=True, description="Whether the tool execution succeeded"
|
|
)
|
|
|
|
def to_sse(self) -> str:
|
|
"""Convert to SSE format, excluding non-spec fields."""
|
|
import json
|
|
|
|
data = {
|
|
"type": self.type.value,
|
|
"toolCallId": self.toolCallId,
|
|
"output": self.output,
|
|
}
|
|
return f"data: {json.dumps(data)}\n\n"
|
|
|
|
|
|
# ========== Other ==========
|
|
|
|
|
|
class StreamUsage(StreamBaseResponse):
|
|
"""Token usage statistics."""
|
|
|
|
type: ResponseType = ResponseType.USAGE
|
|
promptTokens: int = Field(..., description="Number of prompt tokens")
|
|
completionTokens: int = Field(..., description="Number of completion tokens")
|
|
totalTokens: int = Field(..., description="Total number of tokens")
|
|
|
|
|
|
class StreamError(StreamBaseResponse):
|
|
"""Error response."""
|
|
|
|
type: ResponseType = ResponseType.ERROR
|
|
errorText: str = Field(..., description="Error message text")
|
|
code: str | None = Field(default=None, description="Error code")
|
|
details: dict[str, Any] | None = Field(
|
|
default=None, description="Additional error details"
|
|
)
|
|
|
|
def to_sse(self) -> str:
|
|
"""Convert to SSE format, only emitting fields required by AI SDK protocol.
|
|
|
|
The AI SDK uses z.strictObject({type, errorText}) which rejects
|
|
any extra fields like `code` or `details`.
|
|
"""
|
|
data = {
|
|
"type": self.type.value,
|
|
"errorText": self.errorText,
|
|
}
|
|
return f"data: {json_dumps(data)}\n\n"
|
|
|
|
|
|
class StreamHeartbeat(StreamBaseResponse):
|
|
"""Heartbeat to keep SSE connection alive during long-running operations.
|
|
|
|
Uses SSE comment format (: comment) which is ignored by clients but keeps
|
|
the connection alive through proxies and load balancers.
|
|
"""
|
|
|
|
type: ResponseType = ResponseType.HEARTBEAT
|
|
toolCallId: str | None = Field(
|
|
default=None, description="Tool call ID if heartbeat is for a specific tool"
|
|
)
|
|
|
|
def to_sse(self) -> str:
|
|
"""Convert to SSE comment format to keep connection alive."""
|
|
return ": heartbeat\n\n"
|