mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-02-06 04:45:10 -05:00
Compare commits
7 Commits
docs/works
...
feat/copit
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5efb80d47b | ||
|
|
b49d8e2cba | ||
|
|
452544530d | ||
|
|
32ee7e6cf8 | ||
|
|
670663c406 | ||
|
|
0dbe4cf51e | ||
|
|
29ee85c86f |
@@ -27,12 +27,20 @@ class ChatConfig(BaseSettings):
|
||||
session_ttl: int = Field(default=43200, description="Session TTL in seconds")
|
||||
|
||||
# Streaming Configuration
|
||||
# Note: When using Claude Agent SDK, context management is handled automatically
|
||||
# via the SDK's built-in compaction. This is mainly used for the fallback path.
|
||||
max_context_messages: int = Field(
|
||||
default=50, ge=1, le=200, description="Maximum context messages"
|
||||
default=100,
|
||||
ge=1,
|
||||
le=500,
|
||||
description="Max context messages (SDK handles compaction automatically)",
|
||||
)
|
||||
|
||||
stream_timeout: int = Field(default=300, description="Stream timeout in seconds")
|
||||
max_retries: int = Field(default=3, description="Maximum number of retries")
|
||||
max_retries: int = Field(
|
||||
default=3,
|
||||
description="Max retries for fallback path (SDK handles retries internally)",
|
||||
)
|
||||
max_agent_runs: int = Field(default=30, description="Maximum number of agent runs")
|
||||
max_agent_schedules: int = Field(
|
||||
default=30, description="Maximum number of agent schedules"
|
||||
@@ -93,6 +101,12 @@ class ChatConfig(BaseSettings):
|
||||
description="Name of the prompt in Langfuse to fetch",
|
||||
)
|
||||
|
||||
# Claude Agent SDK Configuration
|
||||
use_claude_agent_sdk: bool = Field(
|
||||
default=True,
|
||||
description="Use Claude Agent SDK for chat completions",
|
||||
)
|
||||
|
||||
@field_validator("api_key", mode="before")
|
||||
@classmethod
|
||||
def get_api_key(cls, v):
|
||||
@@ -132,6 +146,17 @@ class ChatConfig(BaseSettings):
|
||||
v = os.getenv("CHAT_INTERNAL_API_KEY")
|
||||
return v
|
||||
|
||||
@field_validator("use_claude_agent_sdk", mode="before")
|
||||
@classmethod
|
||||
def get_use_claude_agent_sdk(cls, v):
|
||||
"""Get use_claude_agent_sdk from environment if not provided."""
|
||||
# Check environment variable - default to True if not set
|
||||
env_val = os.getenv("CHAT_USE_CLAUDE_AGENT_SDK", "").lower()
|
||||
if env_val:
|
||||
return env_val in ("true", "1", "yes", "on")
|
||||
# Default to True (SDK enabled by default)
|
||||
return True if v is None else v
|
||||
|
||||
# Prompt paths for different contexts
|
||||
PROMPT_PATHS: dict[str, str] = {
|
||||
"default": "prompts/chat_system.md",
|
||||
|
||||
@@ -273,9 +273,8 @@ async def _get_session_from_cache(session_id: str) -> ChatSession | None:
|
||||
try:
|
||||
session = ChatSession.model_validate_json(raw_session)
|
||||
logger.info(
|
||||
f"Loading session {session_id} from cache: "
|
||||
f"message_count={len(session.messages)}, "
|
||||
f"roles={[m.role for m in session.messages]}"
|
||||
f"[CACHE] Loaded session {session_id}: {len(session.messages)} messages, "
|
||||
f"last_roles={[m.role for m in session.messages[-3:]]}" # Last 3 roles
|
||||
)
|
||||
return session
|
||||
except Exception as e:
|
||||
@@ -317,11 +316,9 @@ async def _get_session_from_db(session_id: str) -> ChatSession | None:
|
||||
return None
|
||||
|
||||
messages = prisma_session.Messages
|
||||
logger.info(
|
||||
f"Loading session {session_id} from DB: "
|
||||
f"has_messages={messages is not None}, "
|
||||
f"message_count={len(messages) if messages else 0}, "
|
||||
f"roles={[m.role for m in messages] if messages else []}"
|
||||
logger.debug(
|
||||
f"[DB] Loaded session {session_id}: {len(messages) if messages else 0} messages, "
|
||||
f"roles={[m.role for m in messages[-3:]] if messages else []}" # Last 3 roles
|
||||
)
|
||||
|
||||
return ChatSession.from_db(prisma_session, messages)
|
||||
@@ -372,10 +369,9 @@ async def _save_session_to_db(
|
||||
"function_call": msg.function_call,
|
||||
}
|
||||
)
|
||||
logger.info(
|
||||
f"Saving {len(new_messages)} new messages to DB for session {session.session_id}: "
|
||||
f"roles={[m['role'] for m in messages_data]}, "
|
||||
f"start_sequence={existing_message_count}"
|
||||
logger.debug(
|
||||
f"[DB] Saving {len(new_messages)} messages to session {session.session_id}, "
|
||||
f"roles={[m['role'] for m in messages_data]}"
|
||||
)
|
||||
await chat_db.add_chat_messages_batch(
|
||||
session_id=session.session_id,
|
||||
@@ -415,7 +411,7 @@ async def get_chat_session(
|
||||
logger.warning(f"Unexpected cache error for session {session_id}: {e}")
|
||||
|
||||
# Fall back to database
|
||||
logger.info(f"Session {session_id} not in cache, checking database")
|
||||
logger.debug(f"Session {session_id} not in cache, checking database")
|
||||
session = await _get_session_from_db(session_id)
|
||||
|
||||
if session is None:
|
||||
@@ -432,7 +428,6 @@ async def get_chat_session(
|
||||
# Cache the session from DB
|
||||
try:
|
||||
await _cache_session(session)
|
||||
logger.info(f"Cached session {session_id} from database")
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to cache session {session_id}: {e}")
|
||||
|
||||
@@ -603,13 +598,19 @@ async def update_session_title(session_id: str, title: str) -> bool:
|
||||
logger.warning(f"Session {session_id} not found for title update")
|
||||
return False
|
||||
|
||||
# Invalidate cache so next fetch gets updated title
|
||||
# Update title in cache if it exists (instead of invalidating).
|
||||
# This prevents race conditions where cache invalidation causes
|
||||
# the frontend to see stale DB data while streaming is still in progress.
|
||||
try:
|
||||
redis_key = _get_session_cache_key(session_id)
|
||||
async_redis = await get_redis_async()
|
||||
await async_redis.delete(redis_key)
|
||||
cached = await _get_session_from_cache(session_id)
|
||||
if cached:
|
||||
cached.title = title
|
||||
await _cache_session(cached)
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to invalidate cache for session {session_id}: {e}")
|
||||
# Not critical - title will be correct on next full cache refresh
|
||||
logger.warning(
|
||||
f"Failed to update title in cache for session {session_id}: {e}"
|
||||
)
|
||||
|
||||
return True
|
||||
except Exception as e:
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
"""Chat API routes for chat session management and streaming via SSE."""
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import uuid as uuid_module
|
||||
from collections.abc import AsyncGenerator
|
||||
@@ -16,8 +17,17 @@ from . import service as chat_service
|
||||
from . import stream_registry
|
||||
from .completion_handler import process_operation_failure, process_operation_success
|
||||
from .config import ChatConfig
|
||||
from .model import ChatSession, create_chat_session, get_chat_session, get_user_sessions
|
||||
from .model import (
|
||||
ChatMessage,
|
||||
ChatSession,
|
||||
create_chat_session,
|
||||
get_chat_session,
|
||||
get_user_sessions,
|
||||
upsert_chat_session,
|
||||
)
|
||||
from .response_model import StreamFinish, StreamHeartbeat, StreamStart
|
||||
from .sdk import service as sdk_service
|
||||
from .tracking import track_user_message
|
||||
|
||||
config = ChatConfig()
|
||||
|
||||
@@ -209,6 +219,10 @@ async def get_session(
|
||||
active_task, last_message_id = await stream_registry.get_active_task_for_session(
|
||||
session_id, user_id
|
||||
)
|
||||
logger.info(
|
||||
f"[GET_SESSION] session={session_id}, active_task={active_task is not None}, "
|
||||
f"msg_count={len(messages)}, last_role={messages[-1].get('role') if messages else 'none'}"
|
||||
)
|
||||
if active_task:
|
||||
# Filter out the in-progress assistant message from the session response.
|
||||
# The client will receive the complete assistant response through the SSE
|
||||
@@ -265,10 +279,30 @@ async def stream_chat_post(
|
||||
containing the task_id for reconnection.
|
||||
|
||||
"""
|
||||
import asyncio
|
||||
|
||||
session = await _validate_and_get_session(session_id, user_id)
|
||||
|
||||
# Add user message to session BEFORE creating task to avoid race condition
|
||||
# where GET_SESSION sees the task as "running" but the message isn't saved yet
|
||||
if request.message:
|
||||
session.messages.append(
|
||||
ChatMessage(
|
||||
role="user" if request.is_user_message else "assistant",
|
||||
content=request.message,
|
||||
)
|
||||
)
|
||||
if request.is_user_message:
|
||||
track_user_message(
|
||||
user_id=user_id,
|
||||
session_id=session_id,
|
||||
message_length=len(request.message),
|
||||
)
|
||||
logger.info(
|
||||
f"[STREAM] Saving user message to session {session_id}, "
|
||||
f"msg_count={len(session.messages)}"
|
||||
)
|
||||
session = await upsert_chat_session(session)
|
||||
logger.info(f"[STREAM] User message saved for session {session_id}")
|
||||
|
||||
# Create a task in the stream registry for reconnection support
|
||||
task_id = str(uuid_module.uuid4())
|
||||
operation_id = str(uuid_module.uuid4())
|
||||
@@ -283,24 +317,38 @@ async def stream_chat_post(
|
||||
|
||||
# Background task that runs the AI generation independently of SSE connection
|
||||
async def run_ai_generation():
|
||||
chunk_count = 0
|
||||
try:
|
||||
# Emit a start event with task_id for reconnection
|
||||
start_chunk = StreamStart(messageId=task_id, taskId=task_id)
|
||||
await stream_registry.publish_chunk(task_id, start_chunk)
|
||||
|
||||
async for chunk in chat_service.stream_chat_completion(
|
||||
# Choose service based on configuration
|
||||
use_sdk = config.use_claude_agent_sdk
|
||||
stream_fn = (
|
||||
sdk_service.stream_chat_completion_sdk
|
||||
if use_sdk
|
||||
else chat_service.stream_chat_completion
|
||||
)
|
||||
# Pass message=None since we already added it to the session above
|
||||
async for chunk in stream_fn(
|
||||
session_id,
|
||||
request.message,
|
||||
None, # Message already in session
|
||||
is_user_message=request.is_user_message,
|
||||
user_id=user_id,
|
||||
session=session, # Pass pre-fetched session to avoid double-fetch
|
||||
session=session, # Pass session with message already added
|
||||
context=request.context,
|
||||
):
|
||||
chunk_count += 1
|
||||
# Write to Redis (subscribers will receive via XREAD)
|
||||
await stream_registry.publish_chunk(task_id, chunk)
|
||||
|
||||
# Mark task as completed
|
||||
await stream_registry.mark_task_completed(task_id, "completed")
|
||||
logger.info(
|
||||
f"[BG_TASK] AI generation completed for session {session_id}: {chunk_count} chunks, marking task {task_id} as completed"
|
||||
)
|
||||
# Mark task as completed (also publishes StreamFinish)
|
||||
completed = await stream_registry.mark_task_completed(task_id, "completed")
|
||||
logger.info(f"[BG_TASK] mark_task_completed returned: {completed}")
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Error in background AI generation for session {session_id}: {e}"
|
||||
@@ -315,7 +363,7 @@ async def stream_chat_post(
|
||||
async def event_generator() -> AsyncGenerator[str, None]:
|
||||
subscriber_queue = None
|
||||
try:
|
||||
# Subscribe to the task stream (this replays existing messages + live updates)
|
||||
# Subscribe to the task stream (replays + live updates)
|
||||
subscriber_queue = await stream_registry.subscribe_to_task(
|
||||
task_id=task_id,
|
||||
user_id=user_id,
|
||||
@@ -323,6 +371,7 @@ async def stream_chat_post(
|
||||
)
|
||||
|
||||
if subscriber_queue is None:
|
||||
logger.warning(f"Failed to subscribe to task {task_id}")
|
||||
yield StreamFinish().to_sse()
|
||||
yield "data: [DONE]\n\n"
|
||||
return
|
||||
@@ -341,11 +390,11 @@ async def stream_chat_post(
|
||||
yield StreamHeartbeat().to_sse()
|
||||
|
||||
except GeneratorExit:
|
||||
pass # Client disconnected - background task continues
|
||||
pass # Client disconnected - normal behavior
|
||||
except Exception as e:
|
||||
logger.error(f"Error in SSE stream for task {task_id}: {e}")
|
||||
finally:
|
||||
# Unsubscribe when client disconnects or stream ends to prevent resource leak
|
||||
# Unsubscribe when client disconnects or stream ends
|
||||
if subscriber_queue is not None:
|
||||
try:
|
||||
await stream_registry.unsubscribe_from_task(
|
||||
@@ -400,35 +449,21 @@ async def stream_chat_get(
|
||||
session = await _validate_and_get_session(session_id, user_id)
|
||||
|
||||
async def event_generator() -> AsyncGenerator[str, None]:
|
||||
chunk_count = 0
|
||||
first_chunk_type: str | None = None
|
||||
async for chunk in chat_service.stream_chat_completion(
|
||||
# Choose service based on configuration
|
||||
use_sdk = config.use_claude_agent_sdk
|
||||
stream_fn = (
|
||||
sdk_service.stream_chat_completion_sdk
|
||||
if use_sdk
|
||||
else chat_service.stream_chat_completion
|
||||
)
|
||||
async for chunk in stream_fn(
|
||||
session_id,
|
||||
message,
|
||||
is_user_message=is_user_message,
|
||||
user_id=user_id,
|
||||
session=session, # Pass pre-fetched session to avoid double-fetch
|
||||
):
|
||||
if chunk_count < 3:
|
||||
logger.info(
|
||||
"Chat stream chunk",
|
||||
extra={
|
||||
"session_id": session_id,
|
||||
"chunk_type": str(chunk.type),
|
||||
},
|
||||
)
|
||||
if not first_chunk_type:
|
||||
first_chunk_type = str(chunk.type)
|
||||
chunk_count += 1
|
||||
yield chunk.to_sse()
|
||||
logger.info(
|
||||
"Chat stream completed",
|
||||
extra={
|
||||
"session_id": session_id,
|
||||
"chunk_count": chunk_count,
|
||||
"first_chunk_type": first_chunk_type,
|
||||
},
|
||||
)
|
||||
# AI SDK protocol termination
|
||||
yield "data: [DONE]\n\n"
|
||||
|
||||
@@ -550,8 +585,6 @@ async def stream_task(
|
||||
)
|
||||
|
||||
async def event_generator() -> AsyncGenerator[str, None]:
|
||||
import asyncio
|
||||
|
||||
heartbeat_interval = 15.0 # Send heartbeat every 15 seconds
|
||||
try:
|
||||
while True:
|
||||
|
||||
@@ -0,0 +1,14 @@
|
||||
"""Claude Agent SDK integration for CoPilot.
|
||||
|
||||
This module provides the integration layer between the Claude Agent SDK
|
||||
and the existing CoPilot tool system, enabling drop-in replacement of
|
||||
the current LLM orchestration with the battle-tested Claude Agent SDK.
|
||||
"""
|
||||
|
||||
from .service import stream_chat_completion_sdk
|
||||
from .tool_adapter import create_copilot_mcp_server
|
||||
|
||||
__all__ = [
|
||||
"stream_chat_completion_sdk",
|
||||
"create_copilot_mcp_server",
|
||||
]
|
||||
@@ -0,0 +1,348 @@
|
||||
"""Anthropic SDK fallback implementation.
|
||||
|
||||
This module provides the fallback streaming implementation using the Anthropic SDK
|
||||
directly when the Claude Agent SDK is not available.
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import uuid
|
||||
from collections.abc import AsyncGenerator
|
||||
from typing import Any, cast
|
||||
|
||||
from ..model import ChatMessage, ChatSession
|
||||
from ..response_model import (
|
||||
StreamBaseResponse,
|
||||
StreamError,
|
||||
StreamFinish,
|
||||
StreamTextDelta,
|
||||
StreamTextEnd,
|
||||
StreamTextStart,
|
||||
StreamToolInputAvailable,
|
||||
StreamToolInputStart,
|
||||
StreamToolOutputAvailable,
|
||||
StreamUsage,
|
||||
)
|
||||
from .tool_adapter import get_tool_definitions, get_tool_handlers
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
async def stream_with_anthropic(
|
||||
session: ChatSession,
|
||||
system_prompt: str,
|
||||
text_block_id: str,
|
||||
) -> AsyncGenerator[StreamBaseResponse, None]:
|
||||
"""Stream using Anthropic SDK directly with tool calling support.
|
||||
|
||||
This function accumulates messages into the session for persistence.
|
||||
The caller should NOT yield an additional StreamFinish - this function handles it.
|
||||
"""
|
||||
import anthropic
|
||||
|
||||
# Only use ANTHROPIC_API_KEY - don't fall back to OpenRouter keys
|
||||
api_key = os.getenv("ANTHROPIC_API_KEY")
|
||||
if not api_key:
|
||||
yield StreamError(
|
||||
errorText="ANTHROPIC_API_KEY not configured for fallback",
|
||||
code="config_error",
|
||||
)
|
||||
yield StreamFinish()
|
||||
return
|
||||
|
||||
client = anthropic.AsyncAnthropic(api_key=api_key)
|
||||
tool_definitions = get_tool_definitions()
|
||||
tool_handlers = get_tool_handlers()
|
||||
|
||||
anthropic_tools = [
|
||||
{
|
||||
"name": t["name"],
|
||||
"description": t["description"],
|
||||
"input_schema": t["inputSchema"],
|
||||
}
|
||||
for t in tool_definitions
|
||||
]
|
||||
|
||||
anthropic_messages = _convert_session_to_anthropic(session)
|
||||
|
||||
if not anthropic_messages or anthropic_messages[-1]["role"] != "user":
|
||||
anthropic_messages.append(
|
||||
{"role": "user", "content": "Continue with the task."}
|
||||
)
|
||||
|
||||
has_started_text = False
|
||||
max_iterations = 10
|
||||
accumulated_text = ""
|
||||
accumulated_tool_calls: list[dict[str, Any]] = []
|
||||
|
||||
for _ in range(max_iterations):
|
||||
try:
|
||||
async with client.messages.stream(
|
||||
model="claude-sonnet-4-20250514",
|
||||
max_tokens=4096,
|
||||
system=system_prompt,
|
||||
messages=cast(Any, anthropic_messages),
|
||||
tools=cast(Any, anthropic_tools) if anthropic_tools else [],
|
||||
) as stream:
|
||||
async for event in stream:
|
||||
if event.type == "content_block_start":
|
||||
block = event.content_block
|
||||
if hasattr(block, "type"):
|
||||
if block.type == "text" and not has_started_text:
|
||||
yield StreamTextStart(id=text_block_id)
|
||||
has_started_text = True
|
||||
elif block.type == "tool_use":
|
||||
yield StreamToolInputStart(
|
||||
toolCallId=block.id, toolName=block.name
|
||||
)
|
||||
|
||||
elif event.type == "content_block_delta":
|
||||
delta = event.delta
|
||||
if hasattr(delta, "type") and delta.type == "text_delta":
|
||||
accumulated_text += delta.text
|
||||
yield StreamTextDelta(id=text_block_id, delta=delta.text)
|
||||
|
||||
final_message = await stream.get_final_message()
|
||||
|
||||
if final_message.stop_reason == "tool_use":
|
||||
if has_started_text:
|
||||
yield StreamTextEnd(id=text_block_id)
|
||||
has_started_text = False
|
||||
text_block_id = str(uuid.uuid4())
|
||||
|
||||
tool_results = []
|
||||
assistant_content: list[dict[str, Any]] = []
|
||||
|
||||
for block in final_message.content:
|
||||
if block.type == "text":
|
||||
assistant_content.append(
|
||||
{"type": "text", "text": block.text}
|
||||
)
|
||||
elif block.type == "tool_use":
|
||||
assistant_content.append(
|
||||
{
|
||||
"type": "tool_use",
|
||||
"id": block.id,
|
||||
"name": block.name,
|
||||
"input": block.input,
|
||||
}
|
||||
)
|
||||
|
||||
# Track tool call for session persistence
|
||||
accumulated_tool_calls.append(
|
||||
{
|
||||
"id": block.id,
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": block.name,
|
||||
"arguments": json.dumps(
|
||||
block.input
|
||||
if isinstance(block.input, dict)
|
||||
else {}
|
||||
),
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
yield StreamToolInputAvailable(
|
||||
toolCallId=block.id,
|
||||
toolName=block.name,
|
||||
input=(
|
||||
block.input if isinstance(block.input, dict) else {}
|
||||
),
|
||||
)
|
||||
|
||||
output, is_error = await _execute_tool(
|
||||
block.name, block.input, tool_handlers
|
||||
)
|
||||
|
||||
yield StreamToolOutputAvailable(
|
||||
toolCallId=block.id,
|
||||
toolName=block.name,
|
||||
output=output,
|
||||
success=not is_error,
|
||||
)
|
||||
|
||||
# Save tool result to session
|
||||
session.messages.append(
|
||||
ChatMessage(
|
||||
role="tool",
|
||||
content=output,
|
||||
tool_call_id=block.id,
|
||||
)
|
||||
)
|
||||
|
||||
tool_results.append(
|
||||
{
|
||||
"type": "tool_result",
|
||||
"tool_use_id": block.id,
|
||||
"content": output,
|
||||
"is_error": is_error,
|
||||
}
|
||||
)
|
||||
|
||||
# Save assistant message with tool calls to session
|
||||
session.messages.append(
|
||||
ChatMessage(
|
||||
role="assistant",
|
||||
content=accumulated_text or None,
|
||||
tool_calls=(
|
||||
accumulated_tool_calls
|
||||
if accumulated_tool_calls
|
||||
else None
|
||||
),
|
||||
)
|
||||
)
|
||||
# Reset for next iteration
|
||||
accumulated_text = ""
|
||||
accumulated_tool_calls = []
|
||||
|
||||
anthropic_messages.append(
|
||||
{"role": "assistant", "content": assistant_content}
|
||||
)
|
||||
anthropic_messages.append({"role": "user", "content": tool_results})
|
||||
continue
|
||||
|
||||
else:
|
||||
if has_started_text:
|
||||
yield StreamTextEnd(id=text_block_id)
|
||||
|
||||
# Save final assistant response to session
|
||||
if accumulated_text:
|
||||
session.messages.append(
|
||||
ChatMessage(role="assistant", content=accumulated_text)
|
||||
)
|
||||
|
||||
yield StreamUsage(
|
||||
promptTokens=final_message.usage.input_tokens,
|
||||
completionTokens=final_message.usage.output_tokens,
|
||||
totalTokens=final_message.usage.input_tokens
|
||||
+ final_message.usage.output_tokens,
|
||||
)
|
||||
yield StreamFinish()
|
||||
return
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"[Anthropic Fallback] Error: {e}", exc_info=True)
|
||||
yield StreamError(
|
||||
errorText="An error occurred. Please try again.",
|
||||
code="anthropic_error",
|
||||
)
|
||||
yield StreamFinish()
|
||||
return
|
||||
|
||||
yield StreamError(errorText="Max tool iterations reached", code="max_iterations")
|
||||
yield StreamFinish()
|
||||
|
||||
|
||||
def _convert_session_to_anthropic(session: ChatSession) -> list[dict[str, Any]]:
|
||||
"""Convert session messages to Anthropic format.
|
||||
|
||||
Handles merging consecutive same-role messages (Anthropic requires alternating roles).
|
||||
"""
|
||||
messages: list[dict[str, Any]] = []
|
||||
|
||||
for msg in session.messages:
|
||||
if msg.role == "user":
|
||||
new_msg = {"role": "user", "content": msg.content or ""}
|
||||
elif msg.role == "assistant":
|
||||
content: list[dict[str, Any]] = []
|
||||
if msg.content:
|
||||
content.append({"type": "text", "text": msg.content})
|
||||
if msg.tool_calls:
|
||||
for tc in msg.tool_calls:
|
||||
func = tc.get("function", {})
|
||||
args = func.get("arguments", {})
|
||||
if isinstance(args, str):
|
||||
try:
|
||||
args = json.loads(args)
|
||||
except json.JSONDecodeError:
|
||||
args = {}
|
||||
content.append(
|
||||
{
|
||||
"type": "tool_use",
|
||||
"id": tc.get("id", str(uuid.uuid4())),
|
||||
"name": func.get("name", ""),
|
||||
"input": args,
|
||||
}
|
||||
)
|
||||
if content:
|
||||
new_msg = {"role": "assistant", "content": content}
|
||||
else:
|
||||
continue # Skip empty assistant messages
|
||||
elif msg.role == "tool":
|
||||
new_msg = {
|
||||
"role": "user",
|
||||
"content": [
|
||||
{
|
||||
"type": "tool_result",
|
||||
"tool_use_id": msg.tool_call_id or "",
|
||||
"content": msg.content or "",
|
||||
}
|
||||
],
|
||||
}
|
||||
else:
|
||||
continue
|
||||
|
||||
messages.append(new_msg)
|
||||
|
||||
# Merge consecutive same-role messages (Anthropic requires alternating roles)
|
||||
return _merge_consecutive_roles(messages)
|
||||
|
||||
|
||||
def _merge_consecutive_roles(messages: list[dict[str, Any]]) -> list[dict[str, Any]]:
|
||||
"""Merge consecutive messages with the same role.
|
||||
|
||||
Anthropic API requires alternating user/assistant roles.
|
||||
"""
|
||||
if not messages:
|
||||
return []
|
||||
|
||||
merged: list[dict[str, Any]] = []
|
||||
for msg in messages:
|
||||
if merged and merged[-1]["role"] == msg["role"]:
|
||||
# Merge with previous message
|
||||
prev_content = merged[-1]["content"]
|
||||
new_content = msg["content"]
|
||||
|
||||
# Normalize both to list-of-blocks form
|
||||
if isinstance(prev_content, str):
|
||||
prev_content = [{"type": "text", "text": prev_content}]
|
||||
if isinstance(new_content, str):
|
||||
new_content = [{"type": "text", "text": new_content}]
|
||||
|
||||
# Ensure both are lists
|
||||
if not isinstance(prev_content, list):
|
||||
prev_content = [prev_content]
|
||||
if not isinstance(new_content, list):
|
||||
new_content = [new_content]
|
||||
|
||||
merged[-1]["content"] = prev_content + new_content
|
||||
else:
|
||||
merged.append(msg)
|
||||
|
||||
return merged
|
||||
|
||||
|
||||
async def _execute_tool(
|
||||
tool_name: str, tool_input: Any, handlers: dict[str, Any]
|
||||
) -> tuple[str, bool]:
|
||||
"""Execute a tool and return (output, is_error)."""
|
||||
handler = handlers.get(tool_name)
|
||||
if not handler:
|
||||
return f"Unknown tool: {tool_name}", True
|
||||
|
||||
try:
|
||||
result = await handler(tool_input)
|
||||
# Safely extract output - handle empty or missing content
|
||||
content = result.get("content") or []
|
||||
if content and isinstance(content, list) and len(content) > 0:
|
||||
first_item = content[0]
|
||||
output = first_item.get("text", "") if isinstance(first_item, dict) else ""
|
||||
else:
|
||||
output = ""
|
||||
is_error = result.get("isError", False)
|
||||
return output, is_error
|
||||
except Exception as e:
|
||||
return f"Error: {str(e)}", True
|
||||
@@ -0,0 +1,300 @@
|
||||
"""Response adapter for converting Claude Agent SDK messages to Vercel AI SDK format.
|
||||
|
||||
This module provides the adapter layer that converts streaming messages from
|
||||
the Claude Agent SDK into the Vercel AI SDK UI Stream Protocol format that
|
||||
the frontend expects.
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
import uuid
|
||||
from typing import Any, AsyncGenerator
|
||||
|
||||
from backend.api.features.chat.response_model import (
|
||||
StreamBaseResponse,
|
||||
StreamError,
|
||||
StreamFinish,
|
||||
StreamHeartbeat,
|
||||
StreamStart,
|
||||
StreamTextDelta,
|
||||
StreamTextEnd,
|
||||
StreamTextStart,
|
||||
StreamToolInputAvailable,
|
||||
StreamToolInputStart,
|
||||
StreamToolOutputAvailable,
|
||||
StreamUsage,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class SDKResponseAdapter:
|
||||
"""Adapter for converting Claude Agent SDK messages to Vercel AI SDK format.
|
||||
|
||||
This class maintains state during a streaming session to properly track
|
||||
text blocks, tool calls, and message lifecycle.
|
||||
"""
|
||||
|
||||
def __init__(self, message_id: str | None = None):
|
||||
"""Initialize the adapter.
|
||||
|
||||
Args:
|
||||
message_id: Optional message ID. If not provided, one will be generated.
|
||||
"""
|
||||
self.message_id = message_id or str(uuid.uuid4())
|
||||
self.text_block_id = str(uuid.uuid4())
|
||||
self.has_started_text = False
|
||||
self.has_ended_text = False
|
||||
self.current_tool_calls: dict[str, dict[str, Any]] = {}
|
||||
self.task_id: str | None = None
|
||||
|
||||
def set_task_id(self, task_id: str) -> None:
|
||||
"""Set the task ID for reconnection support."""
|
||||
self.task_id = task_id
|
||||
|
||||
def convert_message(self, sdk_message: Any) -> list[StreamBaseResponse]:
|
||||
"""Convert a single SDK message to Vercel AI SDK format.
|
||||
|
||||
Args:
|
||||
sdk_message: A message from the Claude Agent SDK.
|
||||
|
||||
Returns:
|
||||
List of StreamBaseResponse objects (may be empty or multiple).
|
||||
"""
|
||||
responses: list[StreamBaseResponse] = []
|
||||
|
||||
# Handle different SDK message types - use class name since SDK uses dataclasses
|
||||
class_name = type(sdk_message).__name__
|
||||
msg_subtype = getattr(sdk_message, "subtype", None)
|
||||
|
||||
if class_name == "SystemMessage":
|
||||
if msg_subtype == "init":
|
||||
# Session initialization - emit start
|
||||
responses.append(
|
||||
StreamStart(
|
||||
messageId=self.message_id,
|
||||
taskId=self.task_id,
|
||||
)
|
||||
)
|
||||
|
||||
elif class_name == "AssistantMessage":
|
||||
# Assistant message with content blocks
|
||||
content = getattr(sdk_message, "content", [])
|
||||
for block in content:
|
||||
# Check block type by class name (SDK uses dataclasses) or dict type
|
||||
block_class = type(block).__name__
|
||||
block_type = block.get("type") if isinstance(block, dict) else None
|
||||
|
||||
if block_class == "TextBlock" or block_type == "text":
|
||||
# Text content
|
||||
text = getattr(block, "text", None) or (
|
||||
block.get("text") if isinstance(block, dict) else ""
|
||||
)
|
||||
|
||||
if text:
|
||||
# Start text block if needed (or restart after tool calls)
|
||||
if not self.has_started_text or self.has_ended_text:
|
||||
# Generate new text block ID for text after tools
|
||||
if self.has_ended_text:
|
||||
self.text_block_id = str(uuid.uuid4())
|
||||
self.has_ended_text = False
|
||||
responses.append(StreamTextStart(id=self.text_block_id))
|
||||
self.has_started_text = True
|
||||
|
||||
# Emit text delta
|
||||
responses.append(
|
||||
StreamTextDelta(
|
||||
id=self.text_block_id,
|
||||
delta=text,
|
||||
)
|
||||
)
|
||||
|
||||
elif block_class == "ToolUseBlock" or block_type == "tool_use":
|
||||
# Tool call
|
||||
tool_id_raw = getattr(block, "id", None) or (
|
||||
block.get("id") if isinstance(block, dict) else None
|
||||
)
|
||||
tool_id: str = (
|
||||
str(tool_id_raw) if tool_id_raw else str(uuid.uuid4())
|
||||
)
|
||||
|
||||
tool_name_raw = getattr(block, "name", None) or (
|
||||
block.get("name") if isinstance(block, dict) else None
|
||||
)
|
||||
tool_name: str = str(tool_name_raw) if tool_name_raw else "unknown"
|
||||
|
||||
tool_input = getattr(block, "input", None) or (
|
||||
block.get("input") if isinstance(block, dict) else {}
|
||||
)
|
||||
|
||||
# End text block if we were streaming text
|
||||
if self.has_started_text and not self.has_ended_text:
|
||||
responses.append(StreamTextEnd(id=self.text_block_id))
|
||||
self.has_ended_text = True
|
||||
|
||||
# Emit tool input start
|
||||
responses.append(
|
||||
StreamToolInputStart(
|
||||
toolCallId=tool_id,
|
||||
toolName=tool_name,
|
||||
)
|
||||
)
|
||||
|
||||
# Emit tool input available with full input
|
||||
responses.append(
|
||||
StreamToolInputAvailable(
|
||||
toolCallId=tool_id,
|
||||
toolName=tool_name,
|
||||
input=tool_input if isinstance(tool_input, dict) else {},
|
||||
)
|
||||
)
|
||||
|
||||
# Track the tool call
|
||||
self.current_tool_calls[tool_id] = {
|
||||
"name": tool_name,
|
||||
"input": tool_input,
|
||||
}
|
||||
|
||||
elif class_name in ("ToolResultMessage", "UserMessage"):
|
||||
# Tool result - check for tool_result content
|
||||
content = getattr(sdk_message, "content", [])
|
||||
|
||||
for block in content:
|
||||
block_class = type(block).__name__
|
||||
block_type = block.get("type") if isinstance(block, dict) else None
|
||||
|
||||
if block_class == "ToolResultBlock" or block_type == "tool_result":
|
||||
tool_use_id = getattr(block, "tool_use_id", None) or (
|
||||
block.get("tool_use_id") if isinstance(block, dict) else None
|
||||
)
|
||||
result_content = getattr(block, "content", None) or (
|
||||
block.get("content") if isinstance(block, dict) else ""
|
||||
)
|
||||
is_error = getattr(block, "is_error", False) or (
|
||||
block.get("is_error", False)
|
||||
if isinstance(block, dict)
|
||||
else False
|
||||
)
|
||||
|
||||
if tool_use_id:
|
||||
tool_info = self.current_tool_calls.get(tool_use_id, {})
|
||||
tool_name = tool_info.get("name", "unknown")
|
||||
|
||||
# Format the output
|
||||
if isinstance(result_content, list):
|
||||
# Extract text from content blocks
|
||||
output_text = ""
|
||||
for item in result_content:
|
||||
if (
|
||||
isinstance(item, dict)
|
||||
and item.get("type") == "text"
|
||||
):
|
||||
output_text += item.get("text", "")
|
||||
elif hasattr(item, "text"):
|
||||
output_text += getattr(item, "text", "")
|
||||
output = output_text
|
||||
elif isinstance(result_content, str):
|
||||
output = result_content
|
||||
else:
|
||||
output = json.dumps(result_content)
|
||||
|
||||
responses.append(
|
||||
StreamToolOutputAvailable(
|
||||
toolCallId=tool_use_id,
|
||||
toolName=tool_name,
|
||||
output=output,
|
||||
success=not is_error,
|
||||
)
|
||||
)
|
||||
|
||||
elif class_name == "ResultMessage":
|
||||
# Final result
|
||||
if msg_subtype == "success":
|
||||
# End text block if still open
|
||||
if self.has_started_text and not self.has_ended_text:
|
||||
responses.append(StreamTextEnd(id=self.text_block_id))
|
||||
self.has_ended_text = True
|
||||
|
||||
# Emit finish
|
||||
responses.append(StreamFinish())
|
||||
|
||||
elif msg_subtype in ("error", "error_during_execution"):
|
||||
error_msg = getattr(sdk_message, "error", "Unknown error")
|
||||
responses.append(
|
||||
StreamError(
|
||||
errorText=str(error_msg),
|
||||
code="sdk_error",
|
||||
)
|
||||
)
|
||||
responses.append(StreamFinish())
|
||||
|
||||
elif class_name == "ErrorMessage":
|
||||
# Error message
|
||||
error_msg = getattr(sdk_message, "message", None) or getattr(
|
||||
sdk_message, "error", "Unknown error"
|
||||
)
|
||||
responses.append(
|
||||
StreamError(
|
||||
errorText=str(error_msg),
|
||||
code="sdk_error",
|
||||
)
|
||||
)
|
||||
responses.append(StreamFinish())
|
||||
|
||||
return responses
|
||||
|
||||
def create_heartbeat(self, tool_call_id: str | None = None) -> StreamHeartbeat:
|
||||
"""Create a heartbeat response."""
|
||||
return StreamHeartbeat(toolCallId=tool_call_id)
|
||||
|
||||
def create_usage(
|
||||
self,
|
||||
prompt_tokens: int,
|
||||
completion_tokens: int,
|
||||
) -> StreamUsage:
|
||||
"""Create a usage statistics response."""
|
||||
return StreamUsage(
|
||||
promptTokens=prompt_tokens,
|
||||
completionTokens=completion_tokens,
|
||||
totalTokens=prompt_tokens + completion_tokens,
|
||||
)
|
||||
|
||||
|
||||
async def adapt_sdk_stream(
|
||||
sdk_stream: AsyncGenerator[Any, None],
|
||||
message_id: str | None = None,
|
||||
task_id: str | None = None,
|
||||
) -> AsyncGenerator[StreamBaseResponse, None]:
|
||||
"""Adapt a Claude Agent SDK stream to Vercel AI SDK format.
|
||||
|
||||
Args:
|
||||
sdk_stream: The async generator from the Claude Agent SDK.
|
||||
message_id: Optional message ID for the response.
|
||||
task_id: Optional task ID for reconnection support.
|
||||
|
||||
Yields:
|
||||
StreamBaseResponse objects in Vercel AI SDK format.
|
||||
"""
|
||||
adapter = SDKResponseAdapter(message_id=message_id)
|
||||
if task_id:
|
||||
adapter.set_task_id(task_id)
|
||||
|
||||
# Emit start immediately
|
||||
yield StreamStart(messageId=adapter.message_id, taskId=task_id)
|
||||
|
||||
try:
|
||||
async for sdk_message in sdk_stream:
|
||||
responses = adapter.convert_message(sdk_message)
|
||||
for response in responses:
|
||||
# Skip duplicate start messages
|
||||
if isinstance(response, StreamStart):
|
||||
continue
|
||||
yield response
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error in SDK stream: {e}", exc_info=True)
|
||||
yield StreamError(
|
||||
errorText=f"Stream error: {str(e)}",
|
||||
code="stream_error",
|
||||
)
|
||||
yield StreamFinish()
|
||||
@@ -0,0 +1,278 @@
|
||||
"""Security hooks for Claude Agent SDK integration.
|
||||
|
||||
This module provides security hooks that validate tool calls before execution,
|
||||
ensuring multi-user isolation and preventing unauthorized operations.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import re
|
||||
from typing import Any, cast
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Tools that are blocked entirely (CLI/system access)
|
||||
BLOCKED_TOOLS = {
|
||||
"Bash",
|
||||
"bash",
|
||||
"shell",
|
||||
"exec",
|
||||
"terminal",
|
||||
"command",
|
||||
"Read", # Block raw file read - use workspace tools instead
|
||||
"Write", # Block raw file write - use workspace tools instead
|
||||
"Edit", # Block raw file edit - use workspace tools instead
|
||||
"Glob", # Block raw file glob - use workspace tools instead
|
||||
"Grep", # Block raw file grep - use workspace tools instead
|
||||
}
|
||||
|
||||
# Dangerous patterns in tool inputs
|
||||
DANGEROUS_PATTERNS = [
|
||||
r"sudo",
|
||||
r"rm\s+-rf",
|
||||
r"dd\s+if=",
|
||||
r"/etc/passwd",
|
||||
r"/etc/shadow",
|
||||
r"chmod\s+777",
|
||||
r"curl\s+.*\|.*sh",
|
||||
r"wget\s+.*\|.*sh",
|
||||
r"eval\s*\(",
|
||||
r"exec\s*\(",
|
||||
r"__import__",
|
||||
r"os\.system",
|
||||
r"subprocess",
|
||||
]
|
||||
|
||||
|
||||
def _validate_tool_access(tool_name: str, tool_input: dict[str, Any]) -> dict[str, Any]:
|
||||
"""Validate that a tool call is allowed.
|
||||
|
||||
Returns:
|
||||
Empty dict to allow, or dict with hookSpecificOutput to deny
|
||||
"""
|
||||
# Block forbidden tools
|
||||
if tool_name in BLOCKED_TOOLS:
|
||||
logger.warning(f"Blocked tool access attempt: {tool_name}")
|
||||
return {
|
||||
"hookSpecificOutput": {
|
||||
"hookEventName": "PreToolUse",
|
||||
"permissionDecision": "deny",
|
||||
"permissionDecisionReason": (
|
||||
f"Tool '{tool_name}' is not available. "
|
||||
"Use the CoPilot-specific tools instead."
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
# Check for dangerous patterns in tool input
|
||||
input_str = str(tool_input)
|
||||
|
||||
for pattern in DANGEROUS_PATTERNS:
|
||||
if re.search(pattern, input_str, re.IGNORECASE):
|
||||
logger.warning(
|
||||
f"Blocked dangerous pattern in tool input: {pattern} in {tool_name}"
|
||||
)
|
||||
return {
|
||||
"hookSpecificOutput": {
|
||||
"hookEventName": "PreToolUse",
|
||||
"permissionDecision": "deny",
|
||||
"permissionDecisionReason": "Input contains blocked pattern",
|
||||
}
|
||||
}
|
||||
|
||||
return {}
|
||||
|
||||
|
||||
def _validate_user_isolation(
|
||||
tool_name: str, tool_input: dict[str, Any], user_id: str | None
|
||||
) -> dict[str, Any]:
|
||||
"""Validate that tool calls respect user isolation."""
|
||||
# For workspace file tools, ensure path doesn't escape
|
||||
if "workspace" in tool_name.lower():
|
||||
path = tool_input.get("path", "") or tool_input.get("file_path", "")
|
||||
if path:
|
||||
# Check for path traversal
|
||||
if ".." in path or path.startswith("/"):
|
||||
logger.warning(
|
||||
f"Blocked path traversal attempt: {path} by user {user_id}"
|
||||
)
|
||||
return {
|
||||
"hookSpecificOutput": {
|
||||
"hookEventName": "PreToolUse",
|
||||
"permissionDecision": "deny",
|
||||
"permissionDecisionReason": "Path traversal not allowed",
|
||||
}
|
||||
}
|
||||
|
||||
return {}
|
||||
|
||||
|
||||
def create_security_hooks(user_id: str | None) -> dict[str, Any]:
|
||||
"""Create the security hooks configuration for Claude Agent SDK.
|
||||
|
||||
Includes security validation and observability hooks:
|
||||
- PreToolUse: Security validation before tool execution
|
||||
- PostToolUse: Log successful tool executions
|
||||
- PostToolUseFailure: Log and handle failed tool executions
|
||||
- PreCompact: Log context compaction events (SDK handles compaction automatically)
|
||||
|
||||
Args:
|
||||
user_id: Current user ID for isolation validation
|
||||
|
||||
Returns:
|
||||
Hooks configuration dict for ClaudeAgentOptions
|
||||
"""
|
||||
try:
|
||||
from claude_agent_sdk import HookMatcher
|
||||
from claude_agent_sdk.types import HookContext, HookInput, SyncHookJSONOutput
|
||||
|
||||
async def pre_tool_use_hook(
|
||||
input_data: HookInput,
|
||||
tool_use_id: str | None,
|
||||
context: HookContext,
|
||||
) -> SyncHookJSONOutput:
|
||||
"""Combined pre-tool-use validation hook."""
|
||||
_ = context # unused but required by signature
|
||||
tool_name = cast(str, input_data.get("tool_name", ""))
|
||||
tool_input = cast(dict[str, Any], input_data.get("tool_input", {}))
|
||||
|
||||
# Validate basic tool access
|
||||
result = _validate_tool_access(tool_name, tool_input)
|
||||
if result:
|
||||
return cast(SyncHookJSONOutput, result)
|
||||
|
||||
# Validate user isolation
|
||||
result = _validate_user_isolation(tool_name, tool_input, user_id)
|
||||
if result:
|
||||
return cast(SyncHookJSONOutput, result)
|
||||
|
||||
logger.debug(f"[SDK] Tool start: {tool_name}, user={user_id}")
|
||||
return cast(SyncHookJSONOutput, {})
|
||||
|
||||
async def post_tool_use_hook(
|
||||
input_data: HookInput,
|
||||
tool_use_id: str | None,
|
||||
context: HookContext,
|
||||
) -> SyncHookJSONOutput:
|
||||
"""Log successful tool executions for observability."""
|
||||
_ = context
|
||||
tool_name = cast(str, input_data.get("tool_name", ""))
|
||||
logger.debug(f"[SDK] Tool success: {tool_name}, tool_use_id={tool_use_id}")
|
||||
return cast(SyncHookJSONOutput, {})
|
||||
|
||||
async def post_tool_failure_hook(
|
||||
input_data: HookInput,
|
||||
tool_use_id: str | None,
|
||||
context: HookContext,
|
||||
) -> SyncHookJSONOutput:
|
||||
"""Log failed tool executions for debugging."""
|
||||
_ = context
|
||||
tool_name = cast(str, input_data.get("tool_name", ""))
|
||||
error = input_data.get("error", "Unknown error")
|
||||
logger.warning(
|
||||
f"[SDK] Tool failed: {tool_name}, error={error}, "
|
||||
f"user={user_id}, tool_use_id={tool_use_id}"
|
||||
)
|
||||
return cast(SyncHookJSONOutput, {})
|
||||
|
||||
async def pre_compact_hook(
|
||||
input_data: HookInput,
|
||||
tool_use_id: str | None,
|
||||
context: HookContext,
|
||||
) -> SyncHookJSONOutput:
|
||||
"""Log when SDK triggers context compaction.
|
||||
|
||||
The SDK automatically compacts conversation history when it grows too large.
|
||||
This hook provides visibility into when compaction happens.
|
||||
"""
|
||||
_ = context, tool_use_id
|
||||
trigger = input_data.get("trigger", "auto")
|
||||
logger.info(
|
||||
f"[SDK] Context compaction triggered: {trigger}, user={user_id}"
|
||||
)
|
||||
return cast(SyncHookJSONOutput, {})
|
||||
|
||||
return {
|
||||
"PreToolUse": [HookMatcher(matcher="*", hooks=[pre_tool_use_hook])],
|
||||
"PostToolUse": [HookMatcher(matcher="*", hooks=[post_tool_use_hook])],
|
||||
"PostToolUseFailure": [
|
||||
HookMatcher(matcher="*", hooks=[post_tool_failure_hook])
|
||||
],
|
||||
"PreCompact": [HookMatcher(matcher="*", hooks=[pre_compact_hook])],
|
||||
}
|
||||
except ImportError:
|
||||
# Fallback for when SDK isn't available - return empty hooks
|
||||
return {}
|
||||
|
||||
|
||||
def create_strict_security_hooks(
|
||||
user_id: str | None,
|
||||
allowed_tools: list[str] | None = None,
|
||||
) -> dict[str, Any]:
|
||||
"""Create strict security hooks that only allow specific tools.
|
||||
|
||||
Args:
|
||||
user_id: Current user ID
|
||||
allowed_tools: List of allowed tool names (defaults to CoPilot tools)
|
||||
|
||||
Returns:
|
||||
Hooks configuration dict
|
||||
"""
|
||||
try:
|
||||
from claude_agent_sdk import HookMatcher
|
||||
from claude_agent_sdk.types import HookContext, HookInput, SyncHookJSONOutput
|
||||
|
||||
from .tool_adapter import RAW_TOOL_NAMES
|
||||
|
||||
tools_list = allowed_tools if allowed_tools is not None else RAW_TOOL_NAMES
|
||||
allowed_set = set(tools_list)
|
||||
|
||||
async def strict_pre_tool_use(
|
||||
input_data: HookInput,
|
||||
tool_use_id: str | None,
|
||||
context: HookContext,
|
||||
) -> SyncHookJSONOutput:
|
||||
"""Strict validation that only allows whitelisted tools."""
|
||||
_ = context # unused but required by signature
|
||||
tool_name = cast(str, input_data.get("tool_name", ""))
|
||||
tool_input = cast(dict[str, Any], input_data.get("tool_input", {}))
|
||||
|
||||
# Remove MCP prefix if present
|
||||
clean_name = tool_name.removeprefix("mcp__copilot__")
|
||||
|
||||
if clean_name not in allowed_set:
|
||||
logger.warning(f"Blocked non-whitelisted tool: {tool_name}")
|
||||
return cast(
|
||||
SyncHookJSONOutput,
|
||||
{
|
||||
"hookSpecificOutput": {
|
||||
"hookEventName": "PreToolUse",
|
||||
"permissionDecision": "deny",
|
||||
"permissionDecisionReason": (
|
||||
f"Tool '{tool_name}' is not in the allowed list"
|
||||
),
|
||||
}
|
||||
},
|
||||
)
|
||||
|
||||
# Run standard validations
|
||||
result = _validate_tool_access(tool_name, tool_input)
|
||||
if result:
|
||||
return cast(SyncHookJSONOutput, result)
|
||||
|
||||
result = _validate_user_isolation(tool_name, tool_input, user_id)
|
||||
if result:
|
||||
return cast(SyncHookJSONOutput, result)
|
||||
|
||||
logger.debug(
|
||||
f"[SDK Audit] Tool call: tool={tool_name}, "
|
||||
f"user={user_id}, tool_use_id={tool_use_id}"
|
||||
)
|
||||
return cast(SyncHookJSONOutput, {})
|
||||
|
||||
return {
|
||||
"PreToolUse": [
|
||||
HookMatcher(matcher="*", hooks=[strict_pre_tool_use]),
|
||||
],
|
||||
}
|
||||
except ImportError:
|
||||
return {}
|
||||
@@ -0,0 +1,471 @@
|
||||
"""Claude Agent SDK service layer for CoPilot chat completions."""
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import uuid
|
||||
from collections.abc import AsyncGenerator
|
||||
from typing import Any
|
||||
|
||||
import openai
|
||||
|
||||
from backend.data.understanding import (
|
||||
format_understanding_for_prompt,
|
||||
get_business_understanding,
|
||||
)
|
||||
from backend.util.exceptions import NotFoundError
|
||||
|
||||
from ..config import ChatConfig
|
||||
from ..model import (
|
||||
ChatMessage,
|
||||
ChatSession,
|
||||
get_chat_session,
|
||||
update_session_title,
|
||||
upsert_chat_session,
|
||||
)
|
||||
from ..response_model import (
|
||||
StreamBaseResponse,
|
||||
StreamError,
|
||||
StreamFinish,
|
||||
StreamStart,
|
||||
StreamTextDelta,
|
||||
StreamToolInputAvailable,
|
||||
StreamToolOutputAvailable,
|
||||
)
|
||||
from ..tracking import track_user_message
|
||||
from .anthropic_fallback import stream_with_anthropic
|
||||
from .response_adapter import SDKResponseAdapter
|
||||
from .security_hooks import create_security_hooks
|
||||
from .tool_adapter import (
|
||||
COPILOT_TOOL_NAMES,
|
||||
create_copilot_mcp_server,
|
||||
set_execution_context,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
config = ChatConfig()
|
||||
|
||||
# Set to hold background tasks to prevent garbage collection
|
||||
_background_tasks: set[asyncio.Task[Any]] = set()
|
||||
|
||||
DEFAULT_SYSTEM_PROMPT = """You are **Otto**, an AI Co-Pilot for AutoGPT and a Forward-Deployed Automation Engineer serving small business owners. Your mission is to help users automate business tasks with AI by delivering tangible value through working automations—not through documentation or lengthy explanations.
|
||||
|
||||
Here is everything you know about the current user from previous interactions:
|
||||
|
||||
<users_information>
|
||||
{users_information}
|
||||
</users_information>
|
||||
|
||||
## YOUR CORE MANDATE
|
||||
|
||||
You are action-oriented. Your success is measured by:
|
||||
- **Value Delivery**: Does the user think "wow, that was amazing" or "what was the point"?
|
||||
- **Demonstrable Proof**: Show working automations, not descriptions of what's possible
|
||||
- **Time Saved**: Focus on tangible efficiency gains
|
||||
- **Quality Output**: Deliver results that meet or exceed expectations
|
||||
|
||||
## YOUR WORKFLOW
|
||||
|
||||
Adapt flexibly to the conversation context. Not every interaction requires all stages:
|
||||
|
||||
1. **Explore & Understand**: Learn about the user's business, tasks, and goals. Use `add_understanding` to capture important context that will improve future conversations.
|
||||
|
||||
2. **Assess Automation Potential**: Help the user understand whether and how AI can automate their task.
|
||||
|
||||
3. **Prepare for AI**: Provide brief, actionable guidance on prerequisites (data, access, etc.).
|
||||
|
||||
4. **Discover or Create Agents**:
|
||||
- **Always check the user's library first** with `find_library_agent` (these may be customized to their needs)
|
||||
- Search the marketplace with `find_agent` for pre-built automations
|
||||
- Find reusable components with `find_block`
|
||||
- Create custom solutions with `create_agent` if nothing suitable exists
|
||||
- Modify existing library agents with `edit_agent`
|
||||
|
||||
5. **Execute**: Run automations immediately, schedule them, or set up webhooks using `run_agent`. Test specific components with `run_block`.
|
||||
|
||||
6. **Show Results**: Display outputs using `agent_output`.
|
||||
|
||||
## BEHAVIORAL GUIDELINES
|
||||
|
||||
**Be Concise:**
|
||||
- Target 2-5 short lines maximum
|
||||
- Make every word count—no repetition or filler
|
||||
- Use lightweight structure for scannability (bullets, numbered lists, short prompts)
|
||||
- Avoid jargon (blocks, slugs, cron) unless the user asks
|
||||
|
||||
**Be Proactive:**
|
||||
- Suggest next steps before being asked
|
||||
- Anticipate needs based on conversation context and user information
|
||||
- Look for opportunities to expand scope when relevant
|
||||
- Reveal capabilities through action, not explanation
|
||||
|
||||
**Use Tools Effectively:**
|
||||
- Select the right tool for each task
|
||||
- **Always check `find_library_agent` before searching the marketplace**
|
||||
- Use `add_understanding` to capture valuable business context
|
||||
- When tool calls fail, try alternative approaches
|
||||
|
||||
## CRITICAL REMINDER
|
||||
|
||||
You are NOT a chatbot. You are NOT documentation. You are a partner who helps busy business owners get value quickly by showing proof through working automations. Bias toward action over explanation."""
|
||||
|
||||
|
||||
async def _build_system_prompt(
|
||||
user_id: str | None, has_conversation_history: bool = False
|
||||
) -> tuple[str, Any]:
|
||||
"""Build the system prompt with user's business understanding context.
|
||||
|
||||
Args:
|
||||
user_id: The user ID to fetch understanding for.
|
||||
has_conversation_history: Whether there's existing conversation history.
|
||||
If True, we don't tell the model to greet/introduce (since they're
|
||||
already in a conversation).
|
||||
"""
|
||||
understanding = None
|
||||
if user_id:
|
||||
try:
|
||||
understanding = await get_business_understanding(user_id)
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to fetch business understanding: {e}")
|
||||
|
||||
if understanding:
|
||||
context = format_understanding_for_prompt(understanding)
|
||||
elif has_conversation_history:
|
||||
# Don't tell model to greet if there's conversation history
|
||||
context = "No prior understanding saved yet. Continue the existing conversation naturally."
|
||||
else:
|
||||
context = "This is the first time you are meeting the user. Greet them and introduce them to the platform"
|
||||
|
||||
return DEFAULT_SYSTEM_PROMPT.format(users_information=context), understanding
|
||||
|
||||
|
||||
def _format_conversation_history(session: ChatSession) -> str:
|
||||
"""Format conversation history as a prompt context.
|
||||
|
||||
The SDK handles context compaction automatically, but we apply
|
||||
max_context_messages as a safety guard to limit initial prompt size.
|
||||
"""
|
||||
if not session.messages:
|
||||
return ""
|
||||
|
||||
# Get all messages except the last user message (which will be the prompt)
|
||||
messages = session.messages[:-1] if session.messages else []
|
||||
if not messages:
|
||||
return ""
|
||||
|
||||
# Apply max_context_messages limit as a safety guard
|
||||
# (SDK handles compaction, but this prevents excessively large initial prompts)
|
||||
max_messages = config.max_context_messages
|
||||
if len(messages) > max_messages:
|
||||
messages = messages[-max_messages:]
|
||||
|
||||
history_parts = ["<conversation_history>"]
|
||||
|
||||
for msg in messages:
|
||||
if msg.role == "user":
|
||||
history_parts.append(f"User: {msg.content or ''}")
|
||||
elif msg.role == "assistant":
|
||||
# Pass full content - SDK handles compaction automatically
|
||||
history_parts.append(f"Assistant: {msg.content or ''}")
|
||||
if msg.tool_calls:
|
||||
for tc in msg.tool_calls:
|
||||
func = tc.get("function", {})
|
||||
history_parts.append(
|
||||
f" [Called tool: {func.get('name', 'unknown')}]"
|
||||
)
|
||||
elif msg.role == "tool":
|
||||
# Pass full tool results - SDK handles compaction
|
||||
history_parts.append(f" [Tool result: {msg.content or ''}]")
|
||||
|
||||
history_parts.append("</conversation_history>")
|
||||
history_parts.append("")
|
||||
history_parts.append(
|
||||
"Continue this conversation. Respond to the user's latest message:"
|
||||
)
|
||||
history_parts.append("")
|
||||
|
||||
return "\n".join(history_parts)
|
||||
|
||||
|
||||
async def _generate_session_title(
|
||||
message: str,
|
||||
user_id: str | None = None,
|
||||
session_id: str | None = None,
|
||||
) -> str | None:
|
||||
"""Generate a concise title for a chat session."""
|
||||
from backend.util.settings import Settings
|
||||
|
||||
settings = Settings()
|
||||
try:
|
||||
# Build extra_body for OpenRouter tracing
|
||||
extra_body: dict[str, Any] = {
|
||||
"posthogProperties": {"environment": settings.config.app_env.value},
|
||||
}
|
||||
if user_id:
|
||||
extra_body["user"] = user_id[:128]
|
||||
extra_body["posthogDistinctId"] = user_id
|
||||
if session_id:
|
||||
extra_body["session_id"] = session_id[:128]
|
||||
|
||||
client = openai.AsyncOpenAI(api_key=config.api_key, base_url=config.base_url)
|
||||
response = await client.chat.completions.create(
|
||||
model=config.title_model,
|
||||
messages=[
|
||||
{
|
||||
"role": "system",
|
||||
"content": "Generate a very short title (3-6 words) for a chat conversation based on the user's first message. Return ONLY the title, no quotes or punctuation.",
|
||||
},
|
||||
{"role": "user", "content": message[:500]},
|
||||
],
|
||||
max_tokens=20,
|
||||
extra_body=extra_body,
|
||||
)
|
||||
title = response.choices[0].message.content
|
||||
if title:
|
||||
title = title.strip().strip("\"'")
|
||||
return title[:47] + "..." if len(title) > 50 else title
|
||||
return None
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to generate session title: {e}")
|
||||
return None
|
||||
|
||||
|
||||
async def stream_chat_completion_sdk(
|
||||
session_id: str,
|
||||
message: str | None = None,
|
||||
tool_call_response: str | None = None, # noqa: ARG001
|
||||
is_user_message: bool = True,
|
||||
user_id: str | None = None,
|
||||
retry_count: int = 0, # noqa: ARG001
|
||||
session: ChatSession | None = None,
|
||||
context: dict[str, str] | None = None, # noqa: ARG001
|
||||
) -> AsyncGenerator[StreamBaseResponse, None]:
|
||||
"""Stream chat completion using Claude Agent SDK.
|
||||
|
||||
Drop-in replacement for stream_chat_completion with improved reliability.
|
||||
"""
|
||||
|
||||
if session is None:
|
||||
session = await get_chat_session(session_id, user_id)
|
||||
|
||||
if not session:
|
||||
raise NotFoundError(
|
||||
f"Session {session_id} not found. Please create a new session first."
|
||||
)
|
||||
|
||||
if message:
|
||||
session.messages.append(
|
||||
ChatMessage(
|
||||
role="user" if is_user_message else "assistant", content=message
|
||||
)
|
||||
)
|
||||
if is_user_message:
|
||||
track_user_message(
|
||||
user_id=user_id, session_id=session_id, message_length=len(message)
|
||||
)
|
||||
|
||||
session = await upsert_chat_session(session)
|
||||
|
||||
# Generate title for new sessions (first user message)
|
||||
if is_user_message and not session.title:
|
||||
user_messages = [m for m in session.messages if m.role == "user"]
|
||||
if len(user_messages) == 1:
|
||||
first_message = user_messages[0].content or message or ""
|
||||
if first_message:
|
||||
task = asyncio.create_task(
|
||||
_update_title_async(session_id, first_message, user_id)
|
||||
)
|
||||
# Store reference to prevent garbage collection
|
||||
_background_tasks.add(task)
|
||||
task.add_done_callback(_background_tasks.discard)
|
||||
|
||||
# Check if there's conversation history (more than just the current message)
|
||||
has_history = len(session.messages) > 1
|
||||
system_prompt, _ = await _build_system_prompt(
|
||||
user_id, has_conversation_history=has_history
|
||||
)
|
||||
set_execution_context(user_id, session, None)
|
||||
|
||||
message_id = str(uuid.uuid4())
|
||||
text_block_id = str(uuid.uuid4())
|
||||
task_id = str(uuid.uuid4())
|
||||
|
||||
yield StreamStart(messageId=message_id, taskId=task_id)
|
||||
|
||||
# Track whether the stream completed normally via ResultMessage
|
||||
stream_completed = False
|
||||
|
||||
try:
|
||||
try:
|
||||
from claude_agent_sdk import ClaudeAgentOptions, ClaudeSDKClient
|
||||
|
||||
# Create MCP server with CoPilot tools
|
||||
mcp_server = create_copilot_mcp_server()
|
||||
|
||||
options = ClaudeAgentOptions(
|
||||
system_prompt=system_prompt,
|
||||
mcp_servers={"copilot": mcp_server}, # type: ignore[arg-type]
|
||||
allowed_tools=COPILOT_TOOL_NAMES,
|
||||
hooks=create_security_hooks(user_id), # type: ignore[arg-type]
|
||||
continue_conversation=True, # Enable conversation continuation
|
||||
)
|
||||
|
||||
adapter = SDKResponseAdapter(message_id=message_id)
|
||||
adapter.set_task_id(task_id)
|
||||
|
||||
async with ClaudeSDKClient(options=options) as client:
|
||||
# Build prompt with conversation history for context
|
||||
# The SDK doesn't support replaying full conversation history,
|
||||
# so we include it as context in the prompt
|
||||
current_message = message or ""
|
||||
if not current_message and session.messages:
|
||||
last_user = [m for m in session.messages if m.role == "user"]
|
||||
if last_user:
|
||||
current_message = last_user[-1].content or ""
|
||||
|
||||
# Include conversation history if there are prior messages
|
||||
if len(session.messages) > 1:
|
||||
history_context = _format_conversation_history(session)
|
||||
prompt = f"{history_context}{current_message}"
|
||||
else:
|
||||
prompt = current_message
|
||||
|
||||
# Guard against empty prompts
|
||||
if not prompt.strip():
|
||||
yield StreamError(
|
||||
errorText="Message cannot be empty.",
|
||||
code="empty_prompt",
|
||||
)
|
||||
yield StreamFinish()
|
||||
return
|
||||
|
||||
await client.query(prompt, session_id=session_id)
|
||||
|
||||
# Track assistant response to save to session
|
||||
# We may need multiple assistant messages if text comes after tool results
|
||||
assistant_response = ChatMessage(role="assistant", content="")
|
||||
accumulated_tool_calls: list[dict[str, Any]] = []
|
||||
has_appended_assistant = False
|
||||
has_tool_results = False # Track if we've received tool results
|
||||
|
||||
# Receive messages from the SDK
|
||||
async for sdk_msg in client.receive_messages():
|
||||
|
||||
for response in adapter.convert_message(sdk_msg):
|
||||
if isinstance(response, StreamStart):
|
||||
continue
|
||||
yield response
|
||||
|
||||
# Accumulate text deltas into assistant response
|
||||
if isinstance(response, StreamTextDelta):
|
||||
delta = response.delta or ""
|
||||
# After tool results, create new assistant message for post-tool text
|
||||
if has_tool_results and has_appended_assistant:
|
||||
assistant_response = ChatMessage(
|
||||
role="assistant", content=delta
|
||||
)
|
||||
accumulated_tool_calls = [] # Reset for new message
|
||||
session.messages.append(assistant_response)
|
||||
has_tool_results = False
|
||||
else:
|
||||
assistant_response.content = (
|
||||
assistant_response.content or ""
|
||||
) + delta
|
||||
if not has_appended_assistant:
|
||||
session.messages.append(assistant_response)
|
||||
has_appended_assistant = True
|
||||
|
||||
# Track tool calls on the assistant message
|
||||
elif isinstance(response, StreamToolInputAvailable):
|
||||
accumulated_tool_calls.append(
|
||||
{
|
||||
"id": response.toolCallId,
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": response.toolName,
|
||||
"arguments": json.dumps(response.input or {}),
|
||||
},
|
||||
}
|
||||
)
|
||||
# Update assistant message with tool calls
|
||||
assistant_response.tool_calls = accumulated_tool_calls
|
||||
# Append assistant message if not already (tool-only response)
|
||||
if not has_appended_assistant:
|
||||
session.messages.append(assistant_response)
|
||||
has_appended_assistant = True
|
||||
|
||||
elif isinstance(response, StreamToolOutputAvailable):
|
||||
session.messages.append(
|
||||
ChatMessage(
|
||||
role="tool",
|
||||
content=(
|
||||
response.output
|
||||
if isinstance(response.output, str)
|
||||
else str(response.output)
|
||||
),
|
||||
tool_call_id=response.toolCallId,
|
||||
)
|
||||
)
|
||||
has_tool_results = True
|
||||
|
||||
elif isinstance(response, StreamFinish):
|
||||
stream_completed = True
|
||||
|
||||
# Break out of the message loop if we received finish signal
|
||||
if stream_completed:
|
||||
break
|
||||
|
||||
# Ensure assistant response is saved even if no text deltas
|
||||
# (e.g., only tool calls were made)
|
||||
if (
|
||||
assistant_response.content or assistant_response.tool_calls
|
||||
) and not has_appended_assistant:
|
||||
session.messages.append(assistant_response)
|
||||
|
||||
except ImportError:
|
||||
logger.warning(
|
||||
"[SDK] claude-agent-sdk not available, using Anthropic fallback"
|
||||
)
|
||||
async for response in stream_with_anthropic(
|
||||
session, system_prompt, text_block_id
|
||||
):
|
||||
yield response
|
||||
|
||||
# Save the session with accumulated messages
|
||||
await upsert_chat_session(session)
|
||||
logger.debug(
|
||||
f"[SDK] Session {session_id} saved with {len(session.messages)} messages"
|
||||
)
|
||||
# Always yield StreamFinish to signal completion to the caller
|
||||
# The adapter yields StreamFinish for the SSE stream, but we need to
|
||||
# yield it here so the background task in routes.py knows to call mark_task_completed
|
||||
yield StreamFinish()
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"[SDK] Error: {e}", exc_info=True)
|
||||
# Save session even on error to preserve any partial response
|
||||
try:
|
||||
await upsert_chat_session(session)
|
||||
except Exception as save_err:
|
||||
logger.error(f"[SDK] Failed to save session on error: {save_err}")
|
||||
# Sanitize error message to avoid exposing internal details
|
||||
yield StreamError(
|
||||
errorText="An error occurred. Please try again.",
|
||||
code="sdk_error",
|
||||
)
|
||||
yield StreamFinish()
|
||||
|
||||
|
||||
async def _update_title_async(
|
||||
session_id: str, message: str, user_id: str | None = None
|
||||
) -> None:
|
||||
"""Background task to update session title."""
|
||||
try:
|
||||
title = await _generate_session_title(
|
||||
message, user_id=user_id, session_id=session_id
|
||||
)
|
||||
if title:
|
||||
await update_session_title(session_id, title)
|
||||
logger.debug(f"[SDK] Generated title for {session_id}: {title}")
|
||||
except Exception as e:
|
||||
logger.warning(f"[SDK] Failed to update session title: {e}")
|
||||
@@ -0,0 +1,213 @@
|
||||
"""Tool adapter for wrapping existing CoPilot tools as Claude Agent SDK MCP tools.
|
||||
|
||||
This module provides the adapter layer that converts existing BaseTool implementations
|
||||
into in-process MCP tools that can be used with the Claude Agent SDK.
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
from contextvars import ContextVar
|
||||
from typing import Any
|
||||
|
||||
from backend.api.features.chat.model import ChatSession
|
||||
from backend.api.features.chat.tools import TOOL_REGISTRY
|
||||
from backend.api.features.chat.tools.base import BaseTool
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Context variables to pass user/session info to tool execution
|
||||
_current_user_id: ContextVar[str | None] = ContextVar("current_user_id", default=None)
|
||||
_current_session: ContextVar[ChatSession | None] = ContextVar(
|
||||
"current_session", default=None
|
||||
)
|
||||
_current_tool_call_id: ContextVar[str | None] = ContextVar(
|
||||
"current_tool_call_id", default=None
|
||||
)
|
||||
|
||||
|
||||
def set_execution_context(
|
||||
user_id: str | None,
|
||||
session: ChatSession,
|
||||
tool_call_id: str | None = None,
|
||||
) -> None:
|
||||
"""Set the execution context for tool calls.
|
||||
|
||||
This must be called before streaming begins to ensure tools have access
|
||||
to user_id and session information.
|
||||
"""
|
||||
_current_user_id.set(user_id)
|
||||
_current_session.set(session)
|
||||
_current_tool_call_id.set(tool_call_id)
|
||||
|
||||
|
||||
def get_execution_context() -> tuple[str | None, ChatSession | None, str | None]:
|
||||
"""Get the current execution context."""
|
||||
return (
|
||||
_current_user_id.get(),
|
||||
_current_session.get(),
|
||||
_current_tool_call_id.get(),
|
||||
)
|
||||
|
||||
|
||||
def create_tool_handler(base_tool: BaseTool):
|
||||
"""Create an async handler function for a BaseTool.
|
||||
|
||||
This wraps the existing BaseTool._execute method to be compatible
|
||||
with the Claude Agent SDK MCP tool format.
|
||||
"""
|
||||
|
||||
async def tool_handler(args: dict[str, Any]) -> dict[str, Any]:
|
||||
"""Execute the wrapped tool and return MCP-formatted response."""
|
||||
user_id, session, tool_call_id = get_execution_context()
|
||||
|
||||
if session is None:
|
||||
return {
|
||||
"content": [
|
||||
{
|
||||
"type": "text",
|
||||
"text": json.dumps(
|
||||
{
|
||||
"error": "No session context available",
|
||||
"type": "error",
|
||||
}
|
||||
),
|
||||
}
|
||||
],
|
||||
"isError": True,
|
||||
}
|
||||
|
||||
try:
|
||||
# Call the existing tool's execute method
|
||||
result = await base_tool.execute(
|
||||
user_id=user_id,
|
||||
session=session,
|
||||
tool_call_id=tool_call_id or "sdk-call",
|
||||
**args,
|
||||
)
|
||||
|
||||
# The result is a StreamToolOutputAvailable, extract the output
|
||||
return {
|
||||
"content": [
|
||||
{
|
||||
"type": "text",
|
||||
"text": (
|
||||
result.output
|
||||
if isinstance(result.output, str)
|
||||
else json.dumps(result.output)
|
||||
),
|
||||
}
|
||||
],
|
||||
"isError": not result.success,
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error executing tool {base_tool.name}: {e}", exc_info=True)
|
||||
return {
|
||||
"content": [
|
||||
{
|
||||
"type": "text",
|
||||
"text": json.dumps(
|
||||
{
|
||||
"error": str(e),
|
||||
"type": "error",
|
||||
"message": f"Failed to execute {base_tool.name}",
|
||||
}
|
||||
),
|
||||
}
|
||||
],
|
||||
"isError": True,
|
||||
}
|
||||
|
||||
return tool_handler
|
||||
|
||||
|
||||
def get_tool_definitions() -> list[dict[str, Any]]:
|
||||
"""Get all tool definitions in MCP format.
|
||||
|
||||
Returns a list of tool definitions that can be used with
|
||||
create_sdk_mcp_server or as raw tool definitions.
|
||||
"""
|
||||
tool_definitions = []
|
||||
|
||||
for tool_name, base_tool in TOOL_REGISTRY.items():
|
||||
tool_def = {
|
||||
"name": tool_name,
|
||||
"description": base_tool.description,
|
||||
"inputSchema": {
|
||||
"type": "object",
|
||||
"properties": base_tool.parameters.get("properties", {}),
|
||||
"required": base_tool.parameters.get("required", []),
|
||||
},
|
||||
}
|
||||
tool_definitions.append(tool_def)
|
||||
|
||||
return tool_definitions
|
||||
|
||||
|
||||
def get_tool_handlers() -> dict[str, Any]:
|
||||
"""Get all tool handlers mapped by name.
|
||||
|
||||
Returns a dictionary mapping tool names to their handler functions.
|
||||
"""
|
||||
handlers = {}
|
||||
|
||||
for tool_name, base_tool in TOOL_REGISTRY.items():
|
||||
handlers[tool_name] = create_tool_handler(base_tool)
|
||||
|
||||
return handlers
|
||||
|
||||
|
||||
# Create the MCP server configuration
|
||||
def create_copilot_mcp_server():
|
||||
"""Create an in-process MCP server configuration for CoPilot tools.
|
||||
|
||||
This can be passed to ClaudeAgentOptions.mcp_servers.
|
||||
|
||||
Note: The actual SDK MCP server creation depends on the claude-agent-sdk
|
||||
package being available. This function returns the configuration that
|
||||
can be used with the SDK.
|
||||
"""
|
||||
try:
|
||||
from claude_agent_sdk import create_sdk_mcp_server, tool
|
||||
|
||||
# Create decorated tool functions
|
||||
sdk_tools = []
|
||||
|
||||
for tool_name, base_tool in TOOL_REGISTRY.items():
|
||||
# Get the handler
|
||||
handler = create_tool_handler(base_tool)
|
||||
|
||||
# Create the decorated tool
|
||||
# The @tool decorator expects (name, description, schema)
|
||||
decorated = tool(
|
||||
tool_name,
|
||||
base_tool.description,
|
||||
base_tool.parameters.get("properties", {}),
|
||||
)(handler)
|
||||
|
||||
sdk_tools.append(decorated)
|
||||
|
||||
# Create the MCP server
|
||||
server = create_sdk_mcp_server(
|
||||
name="copilot",
|
||||
version="1.0.0",
|
||||
tools=sdk_tools,
|
||||
)
|
||||
|
||||
return server
|
||||
|
||||
except ImportError:
|
||||
logger.warning(
|
||||
"claude-agent-sdk not available, returning tool definitions only"
|
||||
)
|
||||
return {
|
||||
"tools": get_tool_definitions(),
|
||||
"handlers": get_tool_handlers(),
|
||||
}
|
||||
|
||||
|
||||
# List of tool names for allowed_tools configuration
|
||||
COPILOT_TOOL_NAMES = [f"mcp__copilot__{name}" for name in TOOL_REGISTRY.keys()]
|
||||
|
||||
# Also export the raw tool names for flexibility
|
||||
RAW_TOOL_NAMES = list(TOOL_REGISTRY.keys())
|
||||
@@ -555,6 +555,10 @@ async def get_active_task_for_session(
|
||||
if task_user_id and user_id != task_user_id:
|
||||
continue
|
||||
|
||||
logger.info(
|
||||
f"[TASK_LOOKUP] Found running task {task_id[:8]}... for session {session_id[:8]}..."
|
||||
)
|
||||
|
||||
# Get the last message ID from Redis Stream
|
||||
stream_key = _get_task_stream_key(task_id)
|
||||
last_id = "0-0"
|
||||
|
||||
@@ -22,6 +22,7 @@ from backend.data.workspace import (
|
||||
soft_delete_workspace_file,
|
||||
)
|
||||
from backend.util.settings import Config
|
||||
from backend.util.virus_scanner import scan_content_safe
|
||||
from backend.util.workspace_storage import compute_file_checksum, get_workspace_storage
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -187,6 +188,9 @@ class WorkspaceManager:
|
||||
f"{Config().max_file_size_mb}MB limit"
|
||||
)
|
||||
|
||||
# Virus scan content before persisting (defense in depth)
|
||||
await scan_content_safe(content, filename=filename)
|
||||
|
||||
# Determine path with session scoping
|
||||
if path is None:
|
||||
path = f"/{filename}"
|
||||
|
||||
93
autogpt_platform/backend/poetry.lock
generated
93
autogpt_platform/backend/poetry.lock
generated
@@ -825,6 +825,29 @@ files = [
|
||||
{file = "charset_normalizer-3.4.2.tar.gz", hash = "sha256:5baececa9ecba31eff645232d59845c07aa030f0c81ee70184a90d35099a0e63"},
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "claude-agent-sdk"
|
||||
version = "0.1.31"
|
||||
description = "Python SDK for Claude Code"
|
||||
optional = false
|
||||
python-versions = ">=3.10"
|
||||
groups = ["main"]
|
||||
files = [
|
||||
{file = "claude_agent_sdk-0.1.31-py3-none-macosx_11_0_arm64.whl", hash = "sha256:801bacfe4192782a7cc7b61b0d23a57f061c069993dd3dfa8109aa2e7050a530"},
|
||||
{file = "claude_agent_sdk-0.1.31-py3-none-manylinux_2_17_aarch64.whl", hash = "sha256:0b608e0cbfcedcb827427e6d16a73fe573d58e7f93e15f95435066feacbe6511"},
|
||||
{file = "claude_agent_sdk-0.1.31-py3-none-manylinux_2_17_x86_64.whl", hash = "sha256:d0cb30e026a22246e84d9237d23bb4df20be5146913a04d2802ddd37d4f8b8c9"},
|
||||
{file = "claude_agent_sdk-0.1.31-py3-none-win_amd64.whl", hash = "sha256:8ceca675c2770ad739bd1208362059a830e91c74efcf128045b5a7af14d36f2b"},
|
||||
{file = "claude_agent_sdk-0.1.31.tar.gz", hash = "sha256:b68c681083d7cc985dd3e48f73aabf459f056c1a7e1c5b9c47033c6af94da1a1"},
|
||||
]
|
||||
|
||||
[package.dependencies]
|
||||
anyio = ">=4.0.0"
|
||||
mcp = ">=0.1.0"
|
||||
typing-extensions = {version = ">=4.0.0", markers = "python_version < \"3.11\""}
|
||||
|
||||
[package.extras]
|
||||
dev = ["anyio[trio] (>=4.0.0)", "mypy (>=1.0.0)", "pytest (>=7.0.0)", "pytest-asyncio (>=0.20.0)", "pytest-cov (>=4.0.0)", "ruff (>=0.1.0)"]
|
||||
|
||||
[[package]]
|
||||
name = "cleo"
|
||||
version = "2.1.0"
|
||||
@@ -2343,6 +2366,18 @@ http2 = ["h2 (>=3,<5)"]
|
||||
socks = ["socksio (==1.*)"]
|
||||
zstd = ["zstandard (>=0.18.0)"]
|
||||
|
||||
[[package]]
|
||||
name = "httpx-sse"
|
||||
version = "0.4.3"
|
||||
description = "Consume Server-Sent Event (SSE) messages with HTTPX."
|
||||
optional = false
|
||||
python-versions = ">=3.9"
|
||||
groups = ["main"]
|
||||
files = [
|
||||
{file = "httpx_sse-0.4.3-py3-none-any.whl", hash = "sha256:0ac1c9fe3c0afad2e0ebb25a934a59f4c7823b60792691f779fad2c5568830fc"},
|
||||
{file = "httpx_sse-0.4.3.tar.gz", hash = "sha256:9b1ed0127459a66014aec3c56bebd93da3c1bc8bb6618c8082039a44889a755d"},
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "huggingface-hub"
|
||||
version = "0.34.4"
|
||||
@@ -3004,6 +3039,39 @@ files = [
|
||||
{file = "mccabe-0.7.0.tar.gz", hash = "sha256:348e0240c33b60bbdf4e523192ef919f28cb2c3d7d5c7794f74009290f236325"},
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "mcp"
|
||||
version = "1.26.0"
|
||||
description = "Model Context Protocol SDK"
|
||||
optional = false
|
||||
python-versions = ">=3.10"
|
||||
groups = ["main"]
|
||||
files = [
|
||||
{file = "mcp-1.26.0-py3-none-any.whl", hash = "sha256:904a21c33c25aa98ddbeb47273033c435e595bbacfdb177f4bd87f6dceebe1ca"},
|
||||
{file = "mcp-1.26.0.tar.gz", hash = "sha256:db6e2ef491eecc1a0d93711a76f28dec2e05999f93afd48795da1c1137142c66"},
|
||||
]
|
||||
|
||||
[package.dependencies]
|
||||
anyio = ">=4.5"
|
||||
httpx = ">=0.27.1"
|
||||
httpx-sse = ">=0.4"
|
||||
jsonschema = ">=4.20.0"
|
||||
pydantic = ">=2.11.0,<3.0.0"
|
||||
pydantic-settings = ">=2.5.2"
|
||||
pyjwt = {version = ">=2.10.1", extras = ["crypto"]}
|
||||
python-multipart = ">=0.0.9"
|
||||
pywin32 = {version = ">=310", markers = "sys_platform == \"win32\""}
|
||||
sse-starlette = ">=1.6.1"
|
||||
starlette = ">=0.27"
|
||||
typing-extensions = ">=4.9.0"
|
||||
typing-inspection = ">=0.4.1"
|
||||
uvicorn = {version = ">=0.31.1", markers = "sys_platform != \"emscripten\""}
|
||||
|
||||
[package.extras]
|
||||
cli = ["python-dotenv (>=1.0.0)", "typer (>=0.16.0)"]
|
||||
rich = ["rich (>=13.9.4)"]
|
||||
ws = ["websockets (>=15.0.1)"]
|
||||
|
||||
[[package]]
|
||||
name = "mdurl"
|
||||
version = "0.1.2"
|
||||
@@ -5233,7 +5301,7 @@ description = "Python for Window Extensions"
|
||||
optional = false
|
||||
python-versions = "*"
|
||||
groups = ["main"]
|
||||
markers = "platform_system == \"Windows\""
|
||||
markers = "sys_platform == \"win32\" or platform_system == \"Windows\""
|
||||
files = [
|
||||
{file = "pywin32-311-cp310-cp310-win32.whl", hash = "sha256:d03ff496d2a0cd4a5893504789d4a15399133fe82517455e78bad62efbb7f0a3"},
|
||||
{file = "pywin32-311-cp310-cp310-win_amd64.whl", hash = "sha256:797c2772017851984b97180b0bebe4b620bb86328e8a884bb626156295a63b3b"},
|
||||
@@ -6218,6 +6286,27 @@ postgresql-psycopgbinary = ["psycopg[binary] (>=3.0.7)"]
|
||||
pymysql = ["pymysql"]
|
||||
sqlcipher = ["sqlcipher3_binary"]
|
||||
|
||||
[[package]]
|
||||
name = "sse-starlette"
|
||||
version = "3.0.3"
|
||||
description = "SSE plugin for Starlette"
|
||||
optional = false
|
||||
python-versions = ">=3.9"
|
||||
groups = ["main"]
|
||||
files = [
|
||||
{file = "sse_starlette-3.0.3-py3-none-any.whl", hash = "sha256:af5bf5a6f3933df1d9c7f8539633dc8444ca6a97ab2e2a7cd3b6e431ac03a431"},
|
||||
{file = "sse_starlette-3.0.3.tar.gz", hash = "sha256:88cfb08747e16200ea990c8ca876b03910a23b547ab3bd764c0d8eb81019b971"},
|
||||
]
|
||||
|
||||
[package.dependencies]
|
||||
anyio = ">=4.7.0"
|
||||
|
||||
[package.extras]
|
||||
daphne = ["daphne (>=4.2.0)"]
|
||||
examples = ["aiosqlite (>=0.21.0)", "fastapi (>=0.115.12)", "sqlalchemy[asyncio] (>=2.0.41)", "starlette (>=0.49.1)", "uvicorn (>=0.34.0)"]
|
||||
granian = ["granian (>=2.3.1)"]
|
||||
uvicorn = ["uvicorn (>=0.34.0)"]
|
||||
|
||||
[[package]]
|
||||
name = "stagehand"
|
||||
version = "0.5.1"
|
||||
@@ -7557,4 +7646,4 @@ cffi = ["cffi (>=1.11)"]
|
||||
[metadata]
|
||||
lock-version = "2.1"
|
||||
python-versions = ">=3.10,<3.14"
|
||||
content-hash = "8239323f9ae6713224dffd1fe8ba8b449fe88b6c3c7a90940294a74f43a0387a"
|
||||
content-hash = "f79a5f01baf459195d6fd06be2515b83c60cf2aef11a16530842b47febb98a23"
|
||||
|
||||
@@ -13,6 +13,7 @@ aio-pika = "^9.5.5"
|
||||
aiohttp = "^3.10.0"
|
||||
aiodns = "^3.5.0"
|
||||
anthropic = "^0.59.0"
|
||||
claude-agent-sdk = "^0.1.0"
|
||||
apscheduler = "^3.11.1"
|
||||
autogpt-libs = { path = "../autogpt_libs", develop = true }
|
||||
bleach = { extras = ["css"], version = "^6.2.0" }
|
||||
|
||||
Reference in New Issue
Block a user