mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-02-08 05:45:07 -05:00
Compare commits
7 Commits
feat/copit
...
feat/mcp-b
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e9b996abb0 | ||
|
|
9b972389a0 | ||
|
|
cd64562e1b | ||
|
|
8fddc9d71f | ||
|
|
3d1cd03fc8 | ||
|
|
e7ebe42306 | ||
|
|
e0fab7e34e |
16
.github/workflows/platform-frontend-ci.yml
vendored
16
.github/workflows/platform-frontend-ci.yml
vendored
@@ -27,11 +27,20 @@ jobs:
|
||||
runs-on: ubuntu-latest
|
||||
outputs:
|
||||
cache-key: ${{ steps.cache-key.outputs.key }}
|
||||
components-changed: ${{ steps.filter.outputs.components }}
|
||||
|
||||
steps:
|
||||
- name: Checkout repository
|
||||
uses: actions/checkout@v4
|
||||
|
||||
- name: Check for component changes
|
||||
uses: dorny/paths-filter@v3
|
||||
id: filter
|
||||
with:
|
||||
filters: |
|
||||
components:
|
||||
- 'autogpt_platform/frontend/src/components/**'
|
||||
|
||||
- name: Set up Node.js
|
||||
uses: actions/setup-node@v4
|
||||
with:
|
||||
@@ -90,8 +99,11 @@ jobs:
|
||||
chromatic:
|
||||
runs-on: ubuntu-latest
|
||||
needs: setup
|
||||
# Only run on dev branch pushes or PRs targeting dev
|
||||
if: github.ref == 'refs/heads/dev' || github.base_ref == 'dev'
|
||||
# Disabled: to re-enable, remove 'false &&' from the condition below
|
||||
if: >-
|
||||
false
|
||||
&& (github.ref == 'refs/heads/dev' || github.base_ref == 'dev')
|
||||
&& needs.setup.outputs.components-changed == 'true'
|
||||
|
||||
steps:
|
||||
- name: Checkout repository
|
||||
|
||||
1328
autogpt_platform/autogpt_libs/poetry.lock
generated
1328
autogpt_platform/autogpt_libs/poetry.lock
generated
File diff suppressed because it is too large
Load Diff
@@ -11,15 +11,15 @@ python = ">=3.10,<4.0"
|
||||
colorama = "^0.4.6"
|
||||
cryptography = "^45.0"
|
||||
expiringdict = "^1.2.2"
|
||||
fastapi = "^0.116.1"
|
||||
google-cloud-logging = "^3.12.1"
|
||||
launchdarkly-server-sdk = "^9.12.0"
|
||||
pydantic = "^2.11.7"
|
||||
pydantic-settings = "^2.10.1"
|
||||
pyjwt = { version = "^2.10.1", extras = ["crypto"] }
|
||||
fastapi = "^0.128.0"
|
||||
google-cloud-logging = "^3.13.0"
|
||||
launchdarkly-server-sdk = "^9.14.1"
|
||||
pydantic = "^2.12.5"
|
||||
pydantic-settings = "^2.12.0"
|
||||
pyjwt = { version = "^2.11.0", extras = ["crypto"] }
|
||||
redis = "^6.2.0"
|
||||
supabase = "^2.16.0"
|
||||
uvicorn = "^0.35.0"
|
||||
supabase = "^2.27.2"
|
||||
uvicorn = "^0.40.0"
|
||||
|
||||
[tool.poetry.group.dev.dependencies]
|
||||
pyright = "^1.1.404"
|
||||
|
||||
125
autogpt_platform/backend/MCP_BLOCK_IMPLEMENTATION.md
Normal file
125
autogpt_platform/backend/MCP_BLOCK_IMPLEMENTATION.md
Normal file
@@ -0,0 +1,125 @@
|
||||
# MCP Block Implementation Plan
|
||||
|
||||
## Overview
|
||||
|
||||
Create a single **MCPBlock** that dynamically integrates with any MCP (Model Context Protocol)
|
||||
server. Users provide a server URL, the block discovers available tools, presents them as a
|
||||
dropdown, and dynamically adjusts input/output schema based on the selected tool — exactly like
|
||||
`AgentExecutorBlock` handles dynamic schemas.
|
||||
|
||||
## Architecture
|
||||
|
||||
```
|
||||
User provides MCP server URL + credentials
|
||||
↓
|
||||
MCPBlock fetches tools via MCP protocol (tools/list)
|
||||
↓
|
||||
User selects tool from dropdown (stored in constantInput)
|
||||
↓
|
||||
Input schema dynamically updates based on selected tool's inputSchema
|
||||
↓
|
||||
On execution: MCPBlock calls the tool via MCP protocol (tools/call)
|
||||
↓
|
||||
Result yielded as block output
|
||||
```
|
||||
|
||||
## Design Decisions
|
||||
|
||||
1. **Single block, not many blocks** — One `MCPBlock` handles all MCP servers/tools
|
||||
2. **Dynamic schema via AgentExecutorBlock pattern** — Override `get_input_schema()`,
|
||||
`get_input_defaults()`, `get_missing_input()` on the Input class
|
||||
3. **Auth via API key credentials** — Use existing `APIKeyCredentials` with `ProviderName.MCP`
|
||||
provider. The API key is sent as Bearer token in the HTTP Authorization header to the MCP
|
||||
server. This keeps it simple and uses existing infrastructure.
|
||||
4. **HTTP-based MCP client** — Use `aiohttp` (already a dependency) to implement MCP Streamable
|
||||
HTTP transport directly. No need for the `mcp` Python SDK — the protocol is simple JSON-RPC
|
||||
over HTTP.
|
||||
5. **No new DB tables** — Everything fits in existing `AgentBlock` + `AgentNode` tables
|
||||
|
||||
## Implementation Files
|
||||
|
||||
### New Files
|
||||
- `backend/blocks/mcp/` — MCP block package
|
||||
- `__init__.py`
|
||||
- `block.py` — MCPToolBlock implementation
|
||||
- `client.py` — MCP HTTP client (list_tools, call_tool)
|
||||
- `test_mcp.py` — Tests (34 tests)
|
||||
|
||||
### Modified Files
|
||||
- `backend/integrations/providers.py` — Add `MCP = "mcp"` to ProviderName
|
||||
- `pyproject.toml` — No changes needed (using aiohttp which is already a dep)
|
||||
|
||||
## Detailed Design
|
||||
|
||||
### MCP Client (`client.py`)
|
||||
|
||||
Simple async HTTP client for MCP Streamable HTTP protocol:
|
||||
|
||||
```python
|
||||
class MCPClient:
|
||||
async def list_tools(server_url: str, headers: dict) -> list[MCPTool]
|
||||
async def call_tool(server_url: str, tool_name: str, arguments: dict, headers: dict) -> Any
|
||||
```
|
||||
|
||||
Uses JSON-RPC 2.0 over HTTP POST:
|
||||
- `tools/list` → `{"jsonrpc": "2.0", "method": "tools/list", "id": 1}`
|
||||
- `tools/call` → `{"jsonrpc": "2.0", "method": "tools/call", "params": {"name": "...", "arguments": {...}}, "id": 2}`
|
||||
|
||||
### MCPBlock (`block.py`)
|
||||
|
||||
Key fields:
|
||||
- `server_url: str` — MCP server endpoint URL
|
||||
- `credentials: MCPCredentialsInput` — API key for auth (optional)
|
||||
- `available_tools: dict` — Cached tools list from server (populated by frontend API call)
|
||||
- `selected_tool: str` — Which tool the user selected
|
||||
- `tool_input_schema: dict` — JSON schema of the selected tool's inputs
|
||||
- `tool_arguments: dict` — The actual tool arguments (dynamic, validated against tool_input_schema)
|
||||
|
||||
Dynamic schema pattern (like AgentExecutorBlock):
|
||||
```python
|
||||
@classmethod
|
||||
def get_input_schema(cls, data: BlockInput) -> dict[str, Any]:
|
||||
return data.get("tool_input_schema", {})
|
||||
|
||||
@classmethod
|
||||
def get_input_defaults(cls, data: BlockInput) -> BlockInput:
|
||||
return data.get("tool_arguments", {})
|
||||
|
||||
@classmethod
|
||||
def get_missing_input(cls, data: BlockInput) -> set[str]:
|
||||
required = cls.get_input_schema(data).get("required", [])
|
||||
return set(required) - set(data)
|
||||
```
|
||||
|
||||
### Auth
|
||||
|
||||
Use existing `APIKeyCredentials` with provider `"mcp"`:
|
||||
- User creates an API key credential for their MCP server
|
||||
- Block sends it as `Authorization: Bearer <key>` header
|
||||
- Credentials are optional (some MCP servers don't need auth)
|
||||
|
||||
## Dev Loop
|
||||
|
||||
```bash
|
||||
cd /Users/majdyz/Code/AutoGPT2/autogpt_platform/backend
|
||||
poetry run pytest backend/blocks/test/test_mcp_block.py -xvs # Run MCP-specific tests
|
||||
poetry run pytest backend/blocks/test/test_block.py -xvs -k "MCP" # Run block test suite for MCP
|
||||
```
|
||||
|
||||
## Dev Loop
|
||||
|
||||
```bash
|
||||
cd /Users/majdyz/Code/AutoGPT2/autogpt_platform/backend
|
||||
poetry run pytest backend/blocks/mcp/test_mcp.py -xvs # Run MCP-specific tests (34 tests)
|
||||
poetry run pytest backend/blocks/test/test_block.py -xvs -k "MCP" # Run block test suite for MCP
|
||||
```
|
||||
|
||||
## Status
|
||||
|
||||
- [x] Research & Design
|
||||
- [x] Add ProviderName.MCP
|
||||
- [x] Implement MCP client (client.py)
|
||||
- [x] Implement MCPToolBlock (block.py)
|
||||
- [x] Write unit tests (34 tests — all passing)
|
||||
- [x] Run tests & fix issues
|
||||
- [ ] Create PR
|
||||
@@ -27,20 +27,12 @@ class ChatConfig(BaseSettings):
|
||||
session_ttl: int = Field(default=43200, description="Session TTL in seconds")
|
||||
|
||||
# Streaming Configuration
|
||||
# Note: When using Claude Agent SDK, context management is handled automatically
|
||||
# via the SDK's built-in compaction. This is mainly used for the fallback path.
|
||||
max_context_messages: int = Field(
|
||||
default=100,
|
||||
ge=1,
|
||||
le=500,
|
||||
description="Max context messages (SDK handles compaction automatically)",
|
||||
default=50, ge=1, le=200, description="Maximum context messages"
|
||||
)
|
||||
|
||||
stream_timeout: int = Field(default=300, description="Stream timeout in seconds")
|
||||
max_retries: int = Field(
|
||||
default=3,
|
||||
description="Max retries for fallback path (SDK handles retries internally)",
|
||||
)
|
||||
max_retries: int = Field(default=3, description="Maximum number of retries")
|
||||
max_agent_runs: int = Field(default=30, description="Maximum number of agent runs")
|
||||
max_agent_schedules: int = Field(
|
||||
default=30, description="Maximum number of agent schedules"
|
||||
@@ -101,12 +93,6 @@ class ChatConfig(BaseSettings):
|
||||
description="Name of the prompt in Langfuse to fetch",
|
||||
)
|
||||
|
||||
# Claude Agent SDK Configuration
|
||||
use_claude_agent_sdk: bool = Field(
|
||||
default=True,
|
||||
description="Use Claude Agent SDK for chat completions",
|
||||
)
|
||||
|
||||
@field_validator("api_key", mode="before")
|
||||
@classmethod
|
||||
def get_api_key(cls, v):
|
||||
@@ -146,17 +132,6 @@ class ChatConfig(BaseSettings):
|
||||
v = os.getenv("CHAT_INTERNAL_API_KEY")
|
||||
return v
|
||||
|
||||
@field_validator("use_claude_agent_sdk", mode="before")
|
||||
@classmethod
|
||||
def get_use_claude_agent_sdk(cls, v):
|
||||
"""Get use_claude_agent_sdk from environment if not provided."""
|
||||
# Check environment variable - default to True if not set
|
||||
env_val = os.getenv("CHAT_USE_CLAUDE_AGENT_SDK", "").lower()
|
||||
if env_val:
|
||||
return env_val in ("true", "1", "yes", "on")
|
||||
# Default to True (SDK enabled by default)
|
||||
return True if v is None else v
|
||||
|
||||
# Prompt paths for different contexts
|
||||
PROMPT_PATHS: dict[str, str] = {
|
||||
"default": "prompts/chat_system.md",
|
||||
|
||||
@@ -273,8 +273,9 @@ async def _get_session_from_cache(session_id: str) -> ChatSession | None:
|
||||
try:
|
||||
session = ChatSession.model_validate_json(raw_session)
|
||||
logger.info(
|
||||
f"[CACHE] Loaded session {session_id}: {len(session.messages)} messages, "
|
||||
f"last_roles={[m.role for m in session.messages[-3:]]}" # Last 3 roles
|
||||
f"Loading session {session_id} from cache: "
|
||||
f"message_count={len(session.messages)}, "
|
||||
f"roles={[m.role for m in session.messages]}"
|
||||
)
|
||||
return session
|
||||
except Exception as e:
|
||||
@@ -316,9 +317,11 @@ async def _get_session_from_db(session_id: str) -> ChatSession | None:
|
||||
return None
|
||||
|
||||
messages = prisma_session.Messages
|
||||
logger.debug(
|
||||
f"[DB] Loaded session {session_id}: {len(messages) if messages else 0} messages, "
|
||||
f"roles={[m.role for m in messages[-3:]] if messages else []}" # Last 3 roles
|
||||
logger.info(
|
||||
f"Loading session {session_id} from DB: "
|
||||
f"has_messages={messages is not None}, "
|
||||
f"message_count={len(messages) if messages else 0}, "
|
||||
f"roles={[m.role for m in messages] if messages else []}"
|
||||
)
|
||||
|
||||
return ChatSession.from_db(prisma_session, messages)
|
||||
@@ -369,9 +372,10 @@ async def _save_session_to_db(
|
||||
"function_call": msg.function_call,
|
||||
}
|
||||
)
|
||||
logger.debug(
|
||||
f"[DB] Saving {len(new_messages)} messages to session {session.session_id}, "
|
||||
f"roles={[m['role'] for m in messages_data]}"
|
||||
logger.info(
|
||||
f"Saving {len(new_messages)} new messages to DB for session {session.session_id}: "
|
||||
f"roles={[m['role'] for m in messages_data]}, "
|
||||
f"start_sequence={existing_message_count}"
|
||||
)
|
||||
await chat_db.add_chat_messages_batch(
|
||||
session_id=session.session_id,
|
||||
@@ -411,7 +415,7 @@ async def get_chat_session(
|
||||
logger.warning(f"Unexpected cache error for session {session_id}: {e}")
|
||||
|
||||
# Fall back to database
|
||||
logger.debug(f"Session {session_id} not in cache, checking database")
|
||||
logger.info(f"Session {session_id} not in cache, checking database")
|
||||
session = await _get_session_from_db(session_id)
|
||||
|
||||
if session is None:
|
||||
@@ -428,6 +432,7 @@ async def get_chat_session(
|
||||
# Cache the session from DB
|
||||
try:
|
||||
await _cache_session(session)
|
||||
logger.info(f"Cached session {session_id} from database")
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to cache session {session_id}: {e}")
|
||||
|
||||
@@ -598,19 +603,13 @@ async def update_session_title(session_id: str, title: str) -> bool:
|
||||
logger.warning(f"Session {session_id} not found for title update")
|
||||
return False
|
||||
|
||||
# Update title in cache if it exists (instead of invalidating).
|
||||
# This prevents race conditions where cache invalidation causes
|
||||
# the frontend to see stale DB data while streaming is still in progress.
|
||||
# Invalidate cache so next fetch gets updated title
|
||||
try:
|
||||
cached = await _get_session_from_cache(session_id)
|
||||
if cached:
|
||||
cached.title = title
|
||||
await _cache_session(cached)
|
||||
redis_key = _get_session_cache_key(session_id)
|
||||
async_redis = await get_redis_async()
|
||||
await async_redis.delete(redis_key)
|
||||
except Exception as e:
|
||||
# Not critical - title will be correct on next full cache refresh
|
||||
logger.warning(
|
||||
f"Failed to update title in cache for session {session_id}: {e}"
|
||||
)
|
||||
logger.warning(f"Failed to invalidate cache for session {session_id}: {e}")
|
||||
|
||||
return True
|
||||
except Exception as e:
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
"""Chat API routes for chat session management and streaming via SSE."""
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import uuid as uuid_module
|
||||
from collections.abc import AsyncGenerator
|
||||
@@ -17,17 +16,8 @@ from . import service as chat_service
|
||||
from . import stream_registry
|
||||
from .completion_handler import process_operation_failure, process_operation_success
|
||||
from .config import ChatConfig
|
||||
from .model import (
|
||||
ChatMessage,
|
||||
ChatSession,
|
||||
create_chat_session,
|
||||
get_chat_session,
|
||||
get_user_sessions,
|
||||
upsert_chat_session,
|
||||
)
|
||||
from .model import ChatSession, create_chat_session, get_chat_session, get_user_sessions
|
||||
from .response_model import StreamFinish, StreamHeartbeat, StreamStart
|
||||
from .sdk import service as sdk_service
|
||||
from .tracking import track_user_message
|
||||
|
||||
config = ChatConfig()
|
||||
|
||||
@@ -219,10 +209,6 @@ async def get_session(
|
||||
active_task, last_message_id = await stream_registry.get_active_task_for_session(
|
||||
session_id, user_id
|
||||
)
|
||||
logger.info(
|
||||
f"[GET_SESSION] session={session_id}, active_task={active_task is not None}, "
|
||||
f"msg_count={len(messages)}, last_role={messages[-1].get('role') if messages else 'none'}"
|
||||
)
|
||||
if active_task:
|
||||
# Filter out the in-progress assistant message from the session response.
|
||||
# The client will receive the complete assistant response through the SSE
|
||||
@@ -279,29 +265,9 @@ async def stream_chat_post(
|
||||
containing the task_id for reconnection.
|
||||
|
||||
"""
|
||||
session = await _validate_and_get_session(session_id, user_id)
|
||||
import asyncio
|
||||
|
||||
# Add user message to session BEFORE creating task to avoid race condition
|
||||
# where GET_SESSION sees the task as "running" but the message isn't saved yet
|
||||
if request.message:
|
||||
session.messages.append(
|
||||
ChatMessage(
|
||||
role="user" if request.is_user_message else "assistant",
|
||||
content=request.message,
|
||||
)
|
||||
)
|
||||
if request.is_user_message:
|
||||
track_user_message(
|
||||
user_id=user_id,
|
||||
session_id=session_id,
|
||||
message_length=len(request.message),
|
||||
)
|
||||
logger.info(
|
||||
f"[STREAM] Saving user message to session {session_id}, "
|
||||
f"msg_count={len(session.messages)}"
|
||||
)
|
||||
session = await upsert_chat_session(session)
|
||||
logger.info(f"[STREAM] User message saved for session {session_id}")
|
||||
session = await _validate_and_get_session(session_id, user_id)
|
||||
|
||||
# Create a task in the stream registry for reconnection support
|
||||
task_id = str(uuid_module.uuid4())
|
||||
@@ -317,38 +283,24 @@ async def stream_chat_post(
|
||||
|
||||
# Background task that runs the AI generation independently of SSE connection
|
||||
async def run_ai_generation():
|
||||
chunk_count = 0
|
||||
try:
|
||||
# Emit a start event with task_id for reconnection
|
||||
start_chunk = StreamStart(messageId=task_id, taskId=task_id)
|
||||
await stream_registry.publish_chunk(task_id, start_chunk)
|
||||
|
||||
# Choose service based on configuration
|
||||
use_sdk = config.use_claude_agent_sdk
|
||||
stream_fn = (
|
||||
sdk_service.stream_chat_completion_sdk
|
||||
if use_sdk
|
||||
else chat_service.stream_chat_completion
|
||||
)
|
||||
# Pass message=None since we already added it to the session above
|
||||
async for chunk in stream_fn(
|
||||
async for chunk in chat_service.stream_chat_completion(
|
||||
session_id,
|
||||
None, # Message already in session
|
||||
request.message,
|
||||
is_user_message=request.is_user_message,
|
||||
user_id=user_id,
|
||||
session=session, # Pass session with message already added
|
||||
session=session, # Pass pre-fetched session to avoid double-fetch
|
||||
context=request.context,
|
||||
):
|
||||
chunk_count += 1
|
||||
# Write to Redis (subscribers will receive via XREAD)
|
||||
await stream_registry.publish_chunk(task_id, chunk)
|
||||
|
||||
logger.info(
|
||||
f"[BG_TASK] AI generation completed for session {session_id}: {chunk_count} chunks, marking task {task_id} as completed"
|
||||
)
|
||||
# Mark task as completed (also publishes StreamFinish)
|
||||
completed = await stream_registry.mark_task_completed(task_id, "completed")
|
||||
logger.info(f"[BG_TASK] mark_task_completed returned: {completed}")
|
||||
# Mark task as completed
|
||||
await stream_registry.mark_task_completed(task_id, "completed")
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Error in background AI generation for session {session_id}: {e}"
|
||||
@@ -363,7 +315,7 @@ async def stream_chat_post(
|
||||
async def event_generator() -> AsyncGenerator[str, None]:
|
||||
subscriber_queue = None
|
||||
try:
|
||||
# Subscribe to the task stream (replays + live updates)
|
||||
# Subscribe to the task stream (this replays existing messages + live updates)
|
||||
subscriber_queue = await stream_registry.subscribe_to_task(
|
||||
task_id=task_id,
|
||||
user_id=user_id,
|
||||
@@ -371,7 +323,6 @@ async def stream_chat_post(
|
||||
)
|
||||
|
||||
if subscriber_queue is None:
|
||||
logger.warning(f"Failed to subscribe to task {task_id}")
|
||||
yield StreamFinish().to_sse()
|
||||
yield "data: [DONE]\n\n"
|
||||
return
|
||||
@@ -390,11 +341,11 @@ async def stream_chat_post(
|
||||
yield StreamHeartbeat().to_sse()
|
||||
|
||||
except GeneratorExit:
|
||||
pass # Client disconnected - normal behavior
|
||||
pass # Client disconnected - background task continues
|
||||
except Exception as e:
|
||||
logger.error(f"Error in SSE stream for task {task_id}: {e}")
|
||||
finally:
|
||||
# Unsubscribe when client disconnects or stream ends
|
||||
# Unsubscribe when client disconnects or stream ends to prevent resource leak
|
||||
if subscriber_queue is not None:
|
||||
try:
|
||||
await stream_registry.unsubscribe_from_task(
|
||||
@@ -449,21 +400,35 @@ async def stream_chat_get(
|
||||
session = await _validate_and_get_session(session_id, user_id)
|
||||
|
||||
async def event_generator() -> AsyncGenerator[str, None]:
|
||||
# Choose service based on configuration
|
||||
use_sdk = config.use_claude_agent_sdk
|
||||
stream_fn = (
|
||||
sdk_service.stream_chat_completion_sdk
|
||||
if use_sdk
|
||||
else chat_service.stream_chat_completion
|
||||
)
|
||||
async for chunk in stream_fn(
|
||||
chunk_count = 0
|
||||
first_chunk_type: str | None = None
|
||||
async for chunk in chat_service.stream_chat_completion(
|
||||
session_id,
|
||||
message,
|
||||
is_user_message=is_user_message,
|
||||
user_id=user_id,
|
||||
session=session, # Pass pre-fetched session to avoid double-fetch
|
||||
):
|
||||
if chunk_count < 3:
|
||||
logger.info(
|
||||
"Chat stream chunk",
|
||||
extra={
|
||||
"session_id": session_id,
|
||||
"chunk_type": str(chunk.type),
|
||||
},
|
||||
)
|
||||
if not first_chunk_type:
|
||||
first_chunk_type = str(chunk.type)
|
||||
chunk_count += 1
|
||||
yield chunk.to_sse()
|
||||
logger.info(
|
||||
"Chat stream completed",
|
||||
extra={
|
||||
"session_id": session_id,
|
||||
"chunk_count": chunk_count,
|
||||
"first_chunk_type": first_chunk_type,
|
||||
},
|
||||
)
|
||||
# AI SDK protocol termination
|
||||
yield "data: [DONE]\n\n"
|
||||
|
||||
@@ -585,6 +550,8 @@ async def stream_task(
|
||||
)
|
||||
|
||||
async def event_generator() -> AsyncGenerator[str, None]:
|
||||
import asyncio
|
||||
|
||||
heartbeat_interval = 15.0 # Send heartbeat every 15 seconds
|
||||
try:
|
||||
while True:
|
||||
|
||||
@@ -1,14 +0,0 @@
|
||||
"""Claude Agent SDK integration for CoPilot.
|
||||
|
||||
This module provides the integration layer between the Claude Agent SDK
|
||||
and the existing CoPilot tool system, enabling drop-in replacement of
|
||||
the current LLM orchestration with the battle-tested Claude Agent SDK.
|
||||
"""
|
||||
|
||||
from .service import stream_chat_completion_sdk
|
||||
from .tool_adapter import create_copilot_mcp_server
|
||||
|
||||
__all__ = [
|
||||
"stream_chat_completion_sdk",
|
||||
"create_copilot_mcp_server",
|
||||
]
|
||||
@@ -1,348 +0,0 @@
|
||||
"""Anthropic SDK fallback implementation.
|
||||
|
||||
This module provides the fallback streaming implementation using the Anthropic SDK
|
||||
directly when the Claude Agent SDK is not available.
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import uuid
|
||||
from collections.abc import AsyncGenerator
|
||||
from typing import Any, cast
|
||||
|
||||
from ..model import ChatMessage, ChatSession
|
||||
from ..response_model import (
|
||||
StreamBaseResponse,
|
||||
StreamError,
|
||||
StreamFinish,
|
||||
StreamTextDelta,
|
||||
StreamTextEnd,
|
||||
StreamTextStart,
|
||||
StreamToolInputAvailable,
|
||||
StreamToolInputStart,
|
||||
StreamToolOutputAvailable,
|
||||
StreamUsage,
|
||||
)
|
||||
from .tool_adapter import get_tool_definitions, get_tool_handlers
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
async def stream_with_anthropic(
|
||||
session: ChatSession,
|
||||
system_prompt: str,
|
||||
text_block_id: str,
|
||||
) -> AsyncGenerator[StreamBaseResponse, None]:
|
||||
"""Stream using Anthropic SDK directly with tool calling support.
|
||||
|
||||
This function accumulates messages into the session for persistence.
|
||||
The caller should NOT yield an additional StreamFinish - this function handles it.
|
||||
"""
|
||||
import anthropic
|
||||
|
||||
# Only use ANTHROPIC_API_KEY - don't fall back to OpenRouter keys
|
||||
api_key = os.getenv("ANTHROPIC_API_KEY")
|
||||
if not api_key:
|
||||
yield StreamError(
|
||||
errorText="ANTHROPIC_API_KEY not configured for fallback",
|
||||
code="config_error",
|
||||
)
|
||||
yield StreamFinish()
|
||||
return
|
||||
|
||||
client = anthropic.AsyncAnthropic(api_key=api_key)
|
||||
tool_definitions = get_tool_definitions()
|
||||
tool_handlers = get_tool_handlers()
|
||||
|
||||
anthropic_tools = [
|
||||
{
|
||||
"name": t["name"],
|
||||
"description": t["description"],
|
||||
"input_schema": t["inputSchema"],
|
||||
}
|
||||
for t in tool_definitions
|
||||
]
|
||||
|
||||
anthropic_messages = _convert_session_to_anthropic(session)
|
||||
|
||||
if not anthropic_messages or anthropic_messages[-1]["role"] != "user":
|
||||
anthropic_messages.append(
|
||||
{"role": "user", "content": "Continue with the task."}
|
||||
)
|
||||
|
||||
has_started_text = False
|
||||
max_iterations = 10
|
||||
accumulated_text = ""
|
||||
accumulated_tool_calls: list[dict[str, Any]] = []
|
||||
|
||||
for _ in range(max_iterations):
|
||||
try:
|
||||
async with client.messages.stream(
|
||||
model="claude-sonnet-4-20250514",
|
||||
max_tokens=4096,
|
||||
system=system_prompt,
|
||||
messages=cast(Any, anthropic_messages),
|
||||
tools=cast(Any, anthropic_tools) if anthropic_tools else [],
|
||||
) as stream:
|
||||
async for event in stream:
|
||||
if event.type == "content_block_start":
|
||||
block = event.content_block
|
||||
if hasattr(block, "type"):
|
||||
if block.type == "text" and not has_started_text:
|
||||
yield StreamTextStart(id=text_block_id)
|
||||
has_started_text = True
|
||||
elif block.type == "tool_use":
|
||||
yield StreamToolInputStart(
|
||||
toolCallId=block.id, toolName=block.name
|
||||
)
|
||||
|
||||
elif event.type == "content_block_delta":
|
||||
delta = event.delta
|
||||
if hasattr(delta, "type") and delta.type == "text_delta":
|
||||
accumulated_text += delta.text
|
||||
yield StreamTextDelta(id=text_block_id, delta=delta.text)
|
||||
|
||||
final_message = await stream.get_final_message()
|
||||
|
||||
if final_message.stop_reason == "tool_use":
|
||||
if has_started_text:
|
||||
yield StreamTextEnd(id=text_block_id)
|
||||
has_started_text = False
|
||||
text_block_id = str(uuid.uuid4())
|
||||
|
||||
tool_results = []
|
||||
assistant_content: list[dict[str, Any]] = []
|
||||
|
||||
for block in final_message.content:
|
||||
if block.type == "text":
|
||||
assistant_content.append(
|
||||
{"type": "text", "text": block.text}
|
||||
)
|
||||
elif block.type == "tool_use":
|
||||
assistant_content.append(
|
||||
{
|
||||
"type": "tool_use",
|
||||
"id": block.id,
|
||||
"name": block.name,
|
||||
"input": block.input,
|
||||
}
|
||||
)
|
||||
|
||||
# Track tool call for session persistence
|
||||
accumulated_tool_calls.append(
|
||||
{
|
||||
"id": block.id,
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": block.name,
|
||||
"arguments": json.dumps(
|
||||
block.input
|
||||
if isinstance(block.input, dict)
|
||||
else {}
|
||||
),
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
yield StreamToolInputAvailable(
|
||||
toolCallId=block.id,
|
||||
toolName=block.name,
|
||||
input=(
|
||||
block.input if isinstance(block.input, dict) else {}
|
||||
),
|
||||
)
|
||||
|
||||
output, is_error = await _execute_tool(
|
||||
block.name, block.input, tool_handlers
|
||||
)
|
||||
|
||||
yield StreamToolOutputAvailable(
|
||||
toolCallId=block.id,
|
||||
toolName=block.name,
|
||||
output=output,
|
||||
success=not is_error,
|
||||
)
|
||||
|
||||
# Save tool result to session
|
||||
session.messages.append(
|
||||
ChatMessage(
|
||||
role="tool",
|
||||
content=output,
|
||||
tool_call_id=block.id,
|
||||
)
|
||||
)
|
||||
|
||||
tool_results.append(
|
||||
{
|
||||
"type": "tool_result",
|
||||
"tool_use_id": block.id,
|
||||
"content": output,
|
||||
"is_error": is_error,
|
||||
}
|
||||
)
|
||||
|
||||
# Save assistant message with tool calls to session
|
||||
session.messages.append(
|
||||
ChatMessage(
|
||||
role="assistant",
|
||||
content=accumulated_text or None,
|
||||
tool_calls=(
|
||||
accumulated_tool_calls
|
||||
if accumulated_tool_calls
|
||||
else None
|
||||
),
|
||||
)
|
||||
)
|
||||
# Reset for next iteration
|
||||
accumulated_text = ""
|
||||
accumulated_tool_calls = []
|
||||
|
||||
anthropic_messages.append(
|
||||
{"role": "assistant", "content": assistant_content}
|
||||
)
|
||||
anthropic_messages.append({"role": "user", "content": tool_results})
|
||||
continue
|
||||
|
||||
else:
|
||||
if has_started_text:
|
||||
yield StreamTextEnd(id=text_block_id)
|
||||
|
||||
# Save final assistant response to session
|
||||
if accumulated_text:
|
||||
session.messages.append(
|
||||
ChatMessage(role="assistant", content=accumulated_text)
|
||||
)
|
||||
|
||||
yield StreamUsage(
|
||||
promptTokens=final_message.usage.input_tokens,
|
||||
completionTokens=final_message.usage.output_tokens,
|
||||
totalTokens=final_message.usage.input_tokens
|
||||
+ final_message.usage.output_tokens,
|
||||
)
|
||||
yield StreamFinish()
|
||||
return
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"[Anthropic Fallback] Error: {e}", exc_info=True)
|
||||
yield StreamError(
|
||||
errorText="An error occurred. Please try again.",
|
||||
code="anthropic_error",
|
||||
)
|
||||
yield StreamFinish()
|
||||
return
|
||||
|
||||
yield StreamError(errorText="Max tool iterations reached", code="max_iterations")
|
||||
yield StreamFinish()
|
||||
|
||||
|
||||
def _convert_session_to_anthropic(session: ChatSession) -> list[dict[str, Any]]:
|
||||
"""Convert session messages to Anthropic format.
|
||||
|
||||
Handles merging consecutive same-role messages (Anthropic requires alternating roles).
|
||||
"""
|
||||
messages: list[dict[str, Any]] = []
|
||||
|
||||
for msg in session.messages:
|
||||
if msg.role == "user":
|
||||
new_msg = {"role": "user", "content": msg.content or ""}
|
||||
elif msg.role == "assistant":
|
||||
content: list[dict[str, Any]] = []
|
||||
if msg.content:
|
||||
content.append({"type": "text", "text": msg.content})
|
||||
if msg.tool_calls:
|
||||
for tc in msg.tool_calls:
|
||||
func = tc.get("function", {})
|
||||
args = func.get("arguments", {})
|
||||
if isinstance(args, str):
|
||||
try:
|
||||
args = json.loads(args)
|
||||
except json.JSONDecodeError:
|
||||
args = {}
|
||||
content.append(
|
||||
{
|
||||
"type": "tool_use",
|
||||
"id": tc.get("id", str(uuid.uuid4())),
|
||||
"name": func.get("name", ""),
|
||||
"input": args,
|
||||
}
|
||||
)
|
||||
if content:
|
||||
new_msg = {"role": "assistant", "content": content}
|
||||
else:
|
||||
continue # Skip empty assistant messages
|
||||
elif msg.role == "tool":
|
||||
new_msg = {
|
||||
"role": "user",
|
||||
"content": [
|
||||
{
|
||||
"type": "tool_result",
|
||||
"tool_use_id": msg.tool_call_id or "",
|
||||
"content": msg.content or "",
|
||||
}
|
||||
],
|
||||
}
|
||||
else:
|
||||
continue
|
||||
|
||||
messages.append(new_msg)
|
||||
|
||||
# Merge consecutive same-role messages (Anthropic requires alternating roles)
|
||||
return _merge_consecutive_roles(messages)
|
||||
|
||||
|
||||
def _merge_consecutive_roles(messages: list[dict[str, Any]]) -> list[dict[str, Any]]:
|
||||
"""Merge consecutive messages with the same role.
|
||||
|
||||
Anthropic API requires alternating user/assistant roles.
|
||||
"""
|
||||
if not messages:
|
||||
return []
|
||||
|
||||
merged: list[dict[str, Any]] = []
|
||||
for msg in messages:
|
||||
if merged and merged[-1]["role"] == msg["role"]:
|
||||
# Merge with previous message
|
||||
prev_content = merged[-1]["content"]
|
||||
new_content = msg["content"]
|
||||
|
||||
# Normalize both to list-of-blocks form
|
||||
if isinstance(prev_content, str):
|
||||
prev_content = [{"type": "text", "text": prev_content}]
|
||||
if isinstance(new_content, str):
|
||||
new_content = [{"type": "text", "text": new_content}]
|
||||
|
||||
# Ensure both are lists
|
||||
if not isinstance(prev_content, list):
|
||||
prev_content = [prev_content]
|
||||
if not isinstance(new_content, list):
|
||||
new_content = [new_content]
|
||||
|
||||
merged[-1]["content"] = prev_content + new_content
|
||||
else:
|
||||
merged.append(msg)
|
||||
|
||||
return merged
|
||||
|
||||
|
||||
async def _execute_tool(
|
||||
tool_name: str, tool_input: Any, handlers: dict[str, Any]
|
||||
) -> tuple[str, bool]:
|
||||
"""Execute a tool and return (output, is_error)."""
|
||||
handler = handlers.get(tool_name)
|
||||
if not handler:
|
||||
return f"Unknown tool: {tool_name}", True
|
||||
|
||||
try:
|
||||
result = await handler(tool_input)
|
||||
# Safely extract output - handle empty or missing content
|
||||
content = result.get("content") or []
|
||||
if content and isinstance(content, list) and len(content) > 0:
|
||||
first_item = content[0]
|
||||
output = first_item.get("text", "") if isinstance(first_item, dict) else ""
|
||||
else:
|
||||
output = ""
|
||||
is_error = result.get("isError", False)
|
||||
return output, is_error
|
||||
except Exception as e:
|
||||
return f"Error: {str(e)}", True
|
||||
@@ -1,308 +0,0 @@
|
||||
"""Response adapter for converting Claude Agent SDK messages to Vercel AI SDK format.
|
||||
|
||||
This module provides the adapter layer that converts streaming messages from
|
||||
the Claude Agent SDK into the Vercel AI SDK UI Stream Protocol format that
|
||||
the frontend expects.
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
import uuid
|
||||
from typing import Any, AsyncGenerator
|
||||
|
||||
from backend.api.features.chat.response_model import (
|
||||
StreamBaseResponse,
|
||||
StreamError,
|
||||
StreamFinish,
|
||||
StreamHeartbeat,
|
||||
StreamStart,
|
||||
StreamTextDelta,
|
||||
StreamTextEnd,
|
||||
StreamTextStart,
|
||||
StreamToolInputAvailable,
|
||||
StreamToolInputStart,
|
||||
StreamToolOutputAvailable,
|
||||
StreamUsage,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class SDKResponseAdapter:
|
||||
"""Adapter for converting Claude Agent SDK messages to Vercel AI SDK format.
|
||||
|
||||
This class maintains state during a streaming session to properly track
|
||||
text blocks, tool calls, and message lifecycle.
|
||||
"""
|
||||
|
||||
def __init__(self, message_id: str | None = None):
|
||||
"""Initialize the adapter.
|
||||
|
||||
Args:
|
||||
message_id: Optional message ID. If not provided, one will be generated.
|
||||
"""
|
||||
self.message_id = message_id or str(uuid.uuid4())
|
||||
self.text_block_id = str(uuid.uuid4())
|
||||
self.has_started_text = False
|
||||
self.has_ended_text = False
|
||||
self.current_tool_calls: dict[str, dict[str, Any]] = {}
|
||||
self.task_id: str | None = None
|
||||
|
||||
def set_task_id(self, task_id: str) -> None:
|
||||
"""Set the task ID for reconnection support."""
|
||||
self.task_id = task_id
|
||||
|
||||
def convert_message(self, sdk_message: Any) -> list[StreamBaseResponse]:
|
||||
"""Convert a single SDK message to Vercel AI SDK format.
|
||||
|
||||
Args:
|
||||
sdk_message: A message from the Claude Agent SDK.
|
||||
|
||||
Returns:
|
||||
List of StreamBaseResponse objects (may be empty or multiple).
|
||||
"""
|
||||
responses: list[StreamBaseResponse] = []
|
||||
|
||||
# Handle different SDK message types - use class name since SDK uses dataclasses
|
||||
class_name = type(sdk_message).__name__
|
||||
msg_subtype = getattr(sdk_message, "subtype", None)
|
||||
|
||||
if class_name == "SystemMessage":
|
||||
if msg_subtype == "init":
|
||||
# Session initialization - emit start
|
||||
responses.append(
|
||||
StreamStart(
|
||||
messageId=self.message_id,
|
||||
taskId=self.task_id,
|
||||
)
|
||||
)
|
||||
|
||||
elif class_name == "AssistantMessage":
|
||||
# Assistant message with content blocks
|
||||
content = getattr(sdk_message, "content", [])
|
||||
for block in content:
|
||||
# Check block type by class name (SDK uses dataclasses) or dict type
|
||||
block_class = type(block).__name__
|
||||
block_type = block.get("type") if isinstance(block, dict) else None
|
||||
|
||||
if block_class == "TextBlock" or block_type == "text":
|
||||
# Text content
|
||||
text = getattr(block, "text", None) or (
|
||||
block.get("text") if isinstance(block, dict) else ""
|
||||
)
|
||||
|
||||
if text:
|
||||
# Start text block if needed (or restart after tool calls)
|
||||
if not self.has_started_text or self.has_ended_text:
|
||||
# Generate new text block ID for text after tools
|
||||
if self.has_ended_text:
|
||||
self.text_block_id = str(uuid.uuid4())
|
||||
self.has_ended_text = False
|
||||
responses.append(StreamTextStart(id=self.text_block_id))
|
||||
self.has_started_text = True
|
||||
|
||||
# Emit text delta
|
||||
responses.append(
|
||||
StreamTextDelta(
|
||||
id=self.text_block_id,
|
||||
delta=text,
|
||||
)
|
||||
)
|
||||
|
||||
elif block_class == "ToolUseBlock" or block_type == "tool_use":
|
||||
# Tool call
|
||||
tool_id_raw = getattr(block, "id", None) or (
|
||||
block.get("id") if isinstance(block, dict) else None
|
||||
)
|
||||
tool_id: str = (
|
||||
str(tool_id_raw) if tool_id_raw else str(uuid.uuid4())
|
||||
)
|
||||
|
||||
tool_name_raw = getattr(block, "name", None) or (
|
||||
block.get("name") if isinstance(block, dict) else None
|
||||
)
|
||||
tool_name: str = str(tool_name_raw) if tool_name_raw else "unknown"
|
||||
|
||||
tool_input = getattr(block, "input", None) or (
|
||||
block.get("input") if isinstance(block, dict) else {}
|
||||
)
|
||||
|
||||
# End text block if we were streaming text
|
||||
if self.has_started_text and not self.has_ended_text:
|
||||
responses.append(StreamTextEnd(id=self.text_block_id))
|
||||
self.has_ended_text = True
|
||||
|
||||
# Emit tool input start
|
||||
responses.append(
|
||||
StreamToolInputStart(
|
||||
toolCallId=tool_id,
|
||||
toolName=tool_name,
|
||||
)
|
||||
)
|
||||
|
||||
# Emit tool input available with full input
|
||||
responses.append(
|
||||
StreamToolInputAvailable(
|
||||
toolCallId=tool_id,
|
||||
toolName=tool_name,
|
||||
input=tool_input if isinstance(tool_input, dict) else {},
|
||||
)
|
||||
)
|
||||
|
||||
# Track the tool call
|
||||
self.current_tool_calls[tool_id] = {
|
||||
"name": tool_name,
|
||||
"input": tool_input,
|
||||
}
|
||||
|
||||
elif class_name in ("ToolResultMessage", "UserMessage"):
|
||||
# Tool result - check for tool_result content
|
||||
content = getattr(sdk_message, "content", [])
|
||||
|
||||
for block in content:
|
||||
block_class = type(block).__name__
|
||||
block_type = block.get("type") if isinstance(block, dict) else None
|
||||
|
||||
if block_class == "ToolResultBlock" or block_type == "tool_result":
|
||||
tool_use_id = getattr(block, "tool_use_id", None) or (
|
||||
block.get("tool_use_id") if isinstance(block, dict) else None
|
||||
)
|
||||
result_content = getattr(block, "content", None) or (
|
||||
block.get("content") if isinstance(block, dict) else ""
|
||||
)
|
||||
is_error = getattr(block, "is_error", False) or (
|
||||
block.get("is_error", False)
|
||||
if isinstance(block, dict)
|
||||
else False
|
||||
)
|
||||
|
||||
if tool_use_id:
|
||||
tool_info = self.current_tool_calls.get(tool_use_id, {})
|
||||
tool_name = tool_info.get("name", "unknown")
|
||||
|
||||
# Format the output
|
||||
if isinstance(result_content, list):
|
||||
# Extract text from content blocks
|
||||
output_text = ""
|
||||
for item in result_content:
|
||||
if (
|
||||
isinstance(item, dict)
|
||||
and item.get("type") == "text"
|
||||
):
|
||||
output_text += item.get("text", "")
|
||||
elif hasattr(item, "text"):
|
||||
output_text += getattr(item, "text", "")
|
||||
output = output_text
|
||||
elif isinstance(result_content, str):
|
||||
output = result_content
|
||||
else:
|
||||
output = json.dumps(result_content)
|
||||
|
||||
responses.append(
|
||||
StreamToolOutputAvailable(
|
||||
toolCallId=tool_use_id,
|
||||
toolName=tool_name,
|
||||
output=output,
|
||||
success=not is_error,
|
||||
)
|
||||
)
|
||||
|
||||
elif class_name == "ResultMessage":
|
||||
# Final result
|
||||
if msg_subtype == "success":
|
||||
# End text block if still open
|
||||
if self.has_started_text and not self.has_ended_text:
|
||||
responses.append(StreamTextEnd(id=self.text_block_id))
|
||||
self.has_ended_text = True
|
||||
|
||||
# Emit finish
|
||||
responses.append(StreamFinish())
|
||||
|
||||
elif msg_subtype in ("error", "error_during_execution"):
|
||||
error_msg = getattr(sdk_message, "error", "Unknown error")
|
||||
responses.append(
|
||||
StreamError(
|
||||
errorText=str(error_msg),
|
||||
code="sdk_error",
|
||||
)
|
||||
)
|
||||
responses.append(StreamFinish())
|
||||
|
||||
elif class_name == "ErrorMessage":
|
||||
# Error message
|
||||
error_msg = getattr(sdk_message, "message", None) or getattr(
|
||||
sdk_message, "error", "Unknown error"
|
||||
)
|
||||
responses.append(
|
||||
StreamError(
|
||||
errorText=str(error_msg),
|
||||
code="sdk_error",
|
||||
)
|
||||
)
|
||||
responses.append(StreamFinish())
|
||||
|
||||
return responses
|
||||
|
||||
def create_heartbeat(self, tool_call_id: str | None = None) -> StreamHeartbeat:
|
||||
"""Create a heartbeat response."""
|
||||
return StreamHeartbeat(toolCallId=tool_call_id)
|
||||
|
||||
def create_usage(
|
||||
self,
|
||||
prompt_tokens: int,
|
||||
completion_tokens: int,
|
||||
) -> StreamUsage:
|
||||
"""Create a usage statistics response."""
|
||||
return StreamUsage(
|
||||
promptTokens=prompt_tokens,
|
||||
completionTokens=completion_tokens,
|
||||
totalTokens=prompt_tokens + completion_tokens,
|
||||
)
|
||||
|
||||
|
||||
async def adapt_sdk_stream(
|
||||
sdk_stream: AsyncGenerator[Any, None],
|
||||
message_id: str | None = None,
|
||||
task_id: str | None = None,
|
||||
) -> AsyncGenerator[StreamBaseResponse, None]:
|
||||
"""Adapt a Claude Agent SDK stream to Vercel AI SDK format.
|
||||
|
||||
Args:
|
||||
sdk_stream: The async generator from the Claude Agent SDK.
|
||||
message_id: Optional message ID for the response.
|
||||
task_id: Optional task ID for reconnection support.
|
||||
|
||||
Yields:
|
||||
StreamBaseResponse objects in Vercel AI SDK format.
|
||||
"""
|
||||
adapter = SDKResponseAdapter(message_id=message_id)
|
||||
if task_id:
|
||||
adapter.set_task_id(task_id)
|
||||
|
||||
# Emit start immediately
|
||||
yield StreamStart(messageId=adapter.message_id, taskId=task_id)
|
||||
|
||||
finished = False
|
||||
try:
|
||||
async for sdk_message in sdk_stream:
|
||||
responses = adapter.convert_message(sdk_message)
|
||||
for response in responses:
|
||||
# Skip duplicate start messages
|
||||
if isinstance(response, StreamStart):
|
||||
continue
|
||||
if isinstance(response, StreamFinish):
|
||||
finished = True
|
||||
yield response
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error in SDK stream: {e}", exc_info=True)
|
||||
yield StreamError(
|
||||
errorText="An error occurred. Please try again.",
|
||||
code="stream_error",
|
||||
)
|
||||
yield StreamFinish()
|
||||
return
|
||||
|
||||
# Ensure terminal StreamFinish if SDK stream ended without one
|
||||
if not finished:
|
||||
yield StreamFinish()
|
||||
@@ -1,278 +0,0 @@
|
||||
"""Security hooks for Claude Agent SDK integration.
|
||||
|
||||
This module provides security hooks that validate tool calls before execution,
|
||||
ensuring multi-user isolation and preventing unauthorized operations.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import re
|
||||
from typing import Any, cast
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Tools that are blocked entirely (CLI/system access)
|
||||
BLOCKED_TOOLS = {
|
||||
"Bash",
|
||||
"bash",
|
||||
"shell",
|
||||
"exec",
|
||||
"terminal",
|
||||
"command",
|
||||
"Read", # Block raw file read - use workspace tools instead
|
||||
"Write", # Block raw file write - use workspace tools instead
|
||||
"Edit", # Block raw file edit - use workspace tools instead
|
||||
"Glob", # Block raw file glob - use workspace tools instead
|
||||
"Grep", # Block raw file grep - use workspace tools instead
|
||||
}
|
||||
|
||||
# Dangerous patterns in tool inputs
|
||||
DANGEROUS_PATTERNS = [
|
||||
r"sudo",
|
||||
r"rm\s+-rf",
|
||||
r"dd\s+if=",
|
||||
r"/etc/passwd",
|
||||
r"/etc/shadow",
|
||||
r"chmod\s+777",
|
||||
r"curl\s+.*\|.*sh",
|
||||
r"wget\s+.*\|.*sh",
|
||||
r"eval\s*\(",
|
||||
r"exec\s*\(",
|
||||
r"__import__",
|
||||
r"os\.system",
|
||||
r"subprocess",
|
||||
]
|
||||
|
||||
|
||||
def _validate_tool_access(tool_name: str, tool_input: dict[str, Any]) -> dict[str, Any]:
|
||||
"""Validate that a tool call is allowed.
|
||||
|
||||
Returns:
|
||||
Empty dict to allow, or dict with hookSpecificOutput to deny
|
||||
"""
|
||||
# Block forbidden tools
|
||||
if tool_name in BLOCKED_TOOLS:
|
||||
logger.warning(f"Blocked tool access attempt: {tool_name}")
|
||||
return {
|
||||
"hookSpecificOutput": {
|
||||
"hookEventName": "PreToolUse",
|
||||
"permissionDecision": "deny",
|
||||
"permissionDecisionReason": (
|
||||
f"Tool '{tool_name}' is not available. "
|
||||
"Use the CoPilot-specific tools instead."
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
# Check for dangerous patterns in tool input
|
||||
input_str = str(tool_input)
|
||||
|
||||
for pattern in DANGEROUS_PATTERNS:
|
||||
if re.search(pattern, input_str, re.IGNORECASE):
|
||||
logger.warning(
|
||||
f"Blocked dangerous pattern in tool input: {pattern} in {tool_name}"
|
||||
)
|
||||
return {
|
||||
"hookSpecificOutput": {
|
||||
"hookEventName": "PreToolUse",
|
||||
"permissionDecision": "deny",
|
||||
"permissionDecisionReason": "Input contains blocked pattern",
|
||||
}
|
||||
}
|
||||
|
||||
return {}
|
||||
|
||||
|
||||
def _validate_user_isolation(
|
||||
tool_name: str, tool_input: dict[str, Any], user_id: str | None
|
||||
) -> dict[str, Any]:
|
||||
"""Validate that tool calls respect user isolation."""
|
||||
# For workspace file tools, ensure path doesn't escape
|
||||
if "workspace" in tool_name.lower():
|
||||
path = tool_input.get("path", "") or tool_input.get("file_path", "")
|
||||
if path:
|
||||
# Check for path traversal
|
||||
if ".." in path or path.startswith("/"):
|
||||
logger.warning(
|
||||
f"Blocked path traversal attempt: {path} by user {user_id}"
|
||||
)
|
||||
return {
|
||||
"hookSpecificOutput": {
|
||||
"hookEventName": "PreToolUse",
|
||||
"permissionDecision": "deny",
|
||||
"permissionDecisionReason": "Path traversal not allowed",
|
||||
}
|
||||
}
|
||||
|
||||
return {}
|
||||
|
||||
|
||||
def create_security_hooks(user_id: str | None) -> dict[str, Any]:
|
||||
"""Create the security hooks configuration for Claude Agent SDK.
|
||||
|
||||
Includes security validation and observability hooks:
|
||||
- PreToolUse: Security validation before tool execution
|
||||
- PostToolUse: Log successful tool executions
|
||||
- PostToolUseFailure: Log and handle failed tool executions
|
||||
- PreCompact: Log context compaction events (SDK handles compaction automatically)
|
||||
|
||||
Args:
|
||||
user_id: Current user ID for isolation validation
|
||||
|
||||
Returns:
|
||||
Hooks configuration dict for ClaudeAgentOptions
|
||||
"""
|
||||
try:
|
||||
from claude_agent_sdk import HookMatcher
|
||||
from claude_agent_sdk.types import HookContext, HookInput, SyncHookJSONOutput
|
||||
|
||||
async def pre_tool_use_hook(
|
||||
input_data: HookInput,
|
||||
tool_use_id: str | None,
|
||||
context: HookContext,
|
||||
) -> SyncHookJSONOutput:
|
||||
"""Combined pre-tool-use validation hook."""
|
||||
_ = context # unused but required by signature
|
||||
tool_name = cast(str, input_data.get("tool_name", ""))
|
||||
tool_input = cast(dict[str, Any], input_data.get("tool_input", {}))
|
||||
|
||||
# Validate basic tool access
|
||||
result = _validate_tool_access(tool_name, tool_input)
|
||||
if result:
|
||||
return cast(SyncHookJSONOutput, result)
|
||||
|
||||
# Validate user isolation
|
||||
result = _validate_user_isolation(tool_name, tool_input, user_id)
|
||||
if result:
|
||||
return cast(SyncHookJSONOutput, result)
|
||||
|
||||
logger.debug(f"[SDK] Tool start: {tool_name}, user={user_id}")
|
||||
return cast(SyncHookJSONOutput, {})
|
||||
|
||||
async def post_tool_use_hook(
|
||||
input_data: HookInput,
|
||||
tool_use_id: str | None,
|
||||
context: HookContext,
|
||||
) -> SyncHookJSONOutput:
|
||||
"""Log successful tool executions for observability."""
|
||||
_ = context
|
||||
tool_name = cast(str, input_data.get("tool_name", ""))
|
||||
logger.debug(f"[SDK] Tool success: {tool_name}, tool_use_id={tool_use_id}")
|
||||
return cast(SyncHookJSONOutput, {})
|
||||
|
||||
async def post_tool_failure_hook(
|
||||
input_data: HookInput,
|
||||
tool_use_id: str | None,
|
||||
context: HookContext,
|
||||
) -> SyncHookJSONOutput:
|
||||
"""Log failed tool executions for debugging."""
|
||||
_ = context
|
||||
tool_name = cast(str, input_data.get("tool_name", ""))
|
||||
error = input_data.get("error", "Unknown error")
|
||||
logger.warning(
|
||||
f"[SDK] Tool failed: {tool_name}, error={error}, "
|
||||
f"user={user_id}, tool_use_id={tool_use_id}"
|
||||
)
|
||||
return cast(SyncHookJSONOutput, {})
|
||||
|
||||
async def pre_compact_hook(
|
||||
input_data: HookInput,
|
||||
tool_use_id: str | None,
|
||||
context: HookContext,
|
||||
) -> SyncHookJSONOutput:
|
||||
"""Log when SDK triggers context compaction.
|
||||
|
||||
The SDK automatically compacts conversation history when it grows too large.
|
||||
This hook provides visibility into when compaction happens.
|
||||
"""
|
||||
_ = context, tool_use_id
|
||||
trigger = input_data.get("trigger", "auto")
|
||||
logger.info(
|
||||
f"[SDK] Context compaction triggered: {trigger}, user={user_id}"
|
||||
)
|
||||
return cast(SyncHookJSONOutput, {})
|
||||
|
||||
return {
|
||||
"PreToolUse": [HookMatcher(matcher="*", hooks=[pre_tool_use_hook])],
|
||||
"PostToolUse": [HookMatcher(matcher="*", hooks=[post_tool_use_hook])],
|
||||
"PostToolUseFailure": [
|
||||
HookMatcher(matcher="*", hooks=[post_tool_failure_hook])
|
||||
],
|
||||
"PreCompact": [HookMatcher(matcher="*", hooks=[pre_compact_hook])],
|
||||
}
|
||||
except ImportError:
|
||||
# Fallback for when SDK isn't available - return empty hooks
|
||||
return {}
|
||||
|
||||
|
||||
def create_strict_security_hooks(
|
||||
user_id: str | None,
|
||||
allowed_tools: list[str] | None = None,
|
||||
) -> dict[str, Any]:
|
||||
"""Create strict security hooks that only allow specific tools.
|
||||
|
||||
Args:
|
||||
user_id: Current user ID
|
||||
allowed_tools: List of allowed tool names (defaults to CoPilot tools)
|
||||
|
||||
Returns:
|
||||
Hooks configuration dict
|
||||
"""
|
||||
try:
|
||||
from claude_agent_sdk import HookMatcher
|
||||
from claude_agent_sdk.types import HookContext, HookInput, SyncHookJSONOutput
|
||||
|
||||
from .tool_adapter import RAW_TOOL_NAMES
|
||||
|
||||
tools_list = allowed_tools if allowed_tools is not None else RAW_TOOL_NAMES
|
||||
allowed_set = set(tools_list)
|
||||
|
||||
async def strict_pre_tool_use(
|
||||
input_data: HookInput,
|
||||
tool_use_id: str | None,
|
||||
context: HookContext,
|
||||
) -> SyncHookJSONOutput:
|
||||
"""Strict validation that only allows whitelisted tools."""
|
||||
_ = context # unused but required by signature
|
||||
tool_name = cast(str, input_data.get("tool_name", ""))
|
||||
tool_input = cast(dict[str, Any], input_data.get("tool_input", {}))
|
||||
|
||||
# Remove MCP prefix if present
|
||||
clean_name = tool_name.removeprefix("mcp__copilot__")
|
||||
|
||||
if clean_name not in allowed_set:
|
||||
logger.warning(f"Blocked non-whitelisted tool: {tool_name}")
|
||||
return cast(
|
||||
SyncHookJSONOutput,
|
||||
{
|
||||
"hookSpecificOutput": {
|
||||
"hookEventName": "PreToolUse",
|
||||
"permissionDecision": "deny",
|
||||
"permissionDecisionReason": (
|
||||
f"Tool '{tool_name}' is not in the allowed list"
|
||||
),
|
||||
}
|
||||
},
|
||||
)
|
||||
|
||||
# Run standard validations
|
||||
result = _validate_tool_access(tool_name, tool_input)
|
||||
if result:
|
||||
return cast(SyncHookJSONOutput, result)
|
||||
|
||||
result = _validate_user_isolation(tool_name, tool_input, user_id)
|
||||
if result:
|
||||
return cast(SyncHookJSONOutput, result)
|
||||
|
||||
logger.debug(
|
||||
f"[SDK Audit] Tool call: tool={tool_name}, "
|
||||
f"user={user_id}, tool_use_id={tool_use_id}"
|
||||
)
|
||||
return cast(SyncHookJSONOutput, {})
|
||||
|
||||
return {
|
||||
"PreToolUse": [
|
||||
HookMatcher(matcher="*", hooks=[strict_pre_tool_use]),
|
||||
],
|
||||
}
|
||||
except ImportError:
|
||||
return {}
|
||||
@@ -1,470 +0,0 @@
|
||||
"""Claude Agent SDK service layer for CoPilot chat completions."""
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import uuid
|
||||
from collections.abc import AsyncGenerator
|
||||
from typing import Any
|
||||
|
||||
import openai
|
||||
|
||||
from backend.data.understanding import (
|
||||
format_understanding_for_prompt,
|
||||
get_business_understanding,
|
||||
)
|
||||
from backend.util.exceptions import NotFoundError
|
||||
|
||||
from ..config import ChatConfig
|
||||
from ..model import (
|
||||
ChatMessage,
|
||||
ChatSession,
|
||||
get_chat_session,
|
||||
update_session_title,
|
||||
upsert_chat_session,
|
||||
)
|
||||
from ..response_model import (
|
||||
StreamBaseResponse,
|
||||
StreamError,
|
||||
StreamFinish,
|
||||
StreamStart,
|
||||
StreamTextDelta,
|
||||
StreamToolInputAvailable,
|
||||
StreamToolOutputAvailable,
|
||||
)
|
||||
from ..tracking import track_user_message
|
||||
from .anthropic_fallback import stream_with_anthropic
|
||||
from .response_adapter import SDKResponseAdapter
|
||||
from .security_hooks import create_security_hooks
|
||||
from .tool_adapter import (
|
||||
COPILOT_TOOL_NAMES,
|
||||
create_copilot_mcp_server,
|
||||
set_execution_context,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
config = ChatConfig()
|
||||
|
||||
# Set to hold background tasks to prevent garbage collection
|
||||
_background_tasks: set[asyncio.Task[Any]] = set()
|
||||
|
||||
DEFAULT_SYSTEM_PROMPT = """You are **Otto**, an AI Co-Pilot for AutoGPT and a Forward-Deployed Automation Engineer serving small business owners. Your mission is to help users automate business tasks with AI by delivering tangible value through working automations—not through documentation or lengthy explanations.
|
||||
|
||||
Here is everything you know about the current user from previous interactions:
|
||||
|
||||
<users_information>
|
||||
{users_information}
|
||||
</users_information>
|
||||
|
||||
## YOUR CORE MANDATE
|
||||
|
||||
You are action-oriented. Your success is measured by:
|
||||
- **Value Delivery**: Does the user think "wow, that was amazing" or "what was the point"?
|
||||
- **Demonstrable Proof**: Show working automations, not descriptions of what's possible
|
||||
- **Time Saved**: Focus on tangible efficiency gains
|
||||
- **Quality Output**: Deliver results that meet or exceed expectations
|
||||
|
||||
## YOUR WORKFLOW
|
||||
|
||||
Adapt flexibly to the conversation context. Not every interaction requires all stages:
|
||||
|
||||
1. **Explore & Understand**: Learn about the user's business, tasks, and goals. Use `add_understanding` to capture important context that will improve future conversations.
|
||||
|
||||
2. **Assess Automation Potential**: Help the user understand whether and how AI can automate their task.
|
||||
|
||||
3. **Prepare for AI**: Provide brief, actionable guidance on prerequisites (data, access, etc.).
|
||||
|
||||
4. **Discover or Create Agents**:
|
||||
- **Always check the user's library first** with `find_library_agent` (these may be customized to their needs)
|
||||
- Search the marketplace with `find_agent` for pre-built automations
|
||||
- Find reusable components with `find_block`
|
||||
- Create custom solutions with `create_agent` if nothing suitable exists
|
||||
- Modify existing library agents with `edit_agent`
|
||||
|
||||
5. **Execute**: Run automations immediately, schedule them, or set up webhooks using `run_agent`. Test specific components with `run_block`.
|
||||
|
||||
6. **Show Results**: Display outputs using `agent_output`.
|
||||
|
||||
## BEHAVIORAL GUIDELINES
|
||||
|
||||
**Be Concise:**
|
||||
- Target 2-5 short lines maximum
|
||||
- Make every word count—no repetition or filler
|
||||
- Use lightweight structure for scannability (bullets, numbered lists, short prompts)
|
||||
- Avoid jargon (blocks, slugs, cron) unless the user asks
|
||||
|
||||
**Be Proactive:**
|
||||
- Suggest next steps before being asked
|
||||
- Anticipate needs based on conversation context and user information
|
||||
- Look for opportunities to expand scope when relevant
|
||||
- Reveal capabilities through action, not explanation
|
||||
|
||||
**Use Tools Effectively:**
|
||||
- Select the right tool for each task
|
||||
- **Always check `find_library_agent` before searching the marketplace**
|
||||
- Use `add_understanding` to capture valuable business context
|
||||
- When tool calls fail, try alternative approaches
|
||||
|
||||
## CRITICAL REMINDER
|
||||
|
||||
You are NOT a chatbot. You are NOT documentation. You are a partner who helps busy business owners get value quickly by showing proof through working automations. Bias toward action over explanation."""
|
||||
|
||||
|
||||
async def _build_system_prompt(
|
||||
user_id: str | None, has_conversation_history: bool = False
|
||||
) -> tuple[str, Any]:
|
||||
"""Build the system prompt with user's business understanding context.
|
||||
|
||||
Args:
|
||||
user_id: The user ID to fetch understanding for.
|
||||
has_conversation_history: Whether there's existing conversation history.
|
||||
If True, we don't tell the model to greet/introduce (since they're
|
||||
already in a conversation).
|
||||
"""
|
||||
understanding = None
|
||||
if user_id:
|
||||
try:
|
||||
understanding = await get_business_understanding(user_id)
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to fetch business understanding: {e}")
|
||||
|
||||
if understanding:
|
||||
context = format_understanding_for_prompt(understanding)
|
||||
elif has_conversation_history:
|
||||
# Don't tell model to greet if there's conversation history
|
||||
context = "No prior understanding saved yet. Continue the existing conversation naturally."
|
||||
else:
|
||||
context = "This is the first time you are meeting the user. Greet them and introduce them to the platform"
|
||||
|
||||
return DEFAULT_SYSTEM_PROMPT.format(users_information=context), understanding
|
||||
|
||||
|
||||
def _format_conversation_history(session: ChatSession) -> str:
|
||||
"""Format conversation history as a prompt context.
|
||||
|
||||
The SDK handles context compaction automatically, but we apply
|
||||
max_context_messages as a safety guard to limit initial prompt size.
|
||||
"""
|
||||
if not session.messages:
|
||||
return ""
|
||||
|
||||
# Get all messages except the last user message (which will be the prompt)
|
||||
messages = session.messages[:-1] if session.messages else []
|
||||
if not messages:
|
||||
return ""
|
||||
|
||||
# Apply max_context_messages limit as a safety guard
|
||||
# (SDK handles compaction, but this prevents excessively large initial prompts)
|
||||
max_messages = config.max_context_messages
|
||||
if len(messages) > max_messages:
|
||||
messages = messages[-max_messages:]
|
||||
|
||||
history_parts = ["<conversation_history>"]
|
||||
|
||||
for msg in messages:
|
||||
if msg.role == "user":
|
||||
history_parts.append(f"User: {msg.content or ''}")
|
||||
elif msg.role == "assistant":
|
||||
# Pass full content - SDK handles compaction automatically
|
||||
history_parts.append(f"Assistant: {msg.content or ''}")
|
||||
if msg.tool_calls:
|
||||
for tc in msg.tool_calls:
|
||||
func = tc.get("function", {})
|
||||
history_parts.append(
|
||||
f" [Called tool: {func.get('name', 'unknown')}]"
|
||||
)
|
||||
elif msg.role == "tool":
|
||||
# Pass full tool results - SDK handles compaction
|
||||
history_parts.append(f" [Tool result: {msg.content or ''}]")
|
||||
|
||||
history_parts.append("</conversation_history>")
|
||||
history_parts.append("")
|
||||
history_parts.append(
|
||||
"Continue this conversation. Respond to the user's latest message:"
|
||||
)
|
||||
history_parts.append("")
|
||||
|
||||
return "\n".join(history_parts)
|
||||
|
||||
|
||||
async def _generate_session_title(
|
||||
message: str,
|
||||
user_id: str | None = None,
|
||||
session_id: str | None = None,
|
||||
) -> str | None:
|
||||
"""Generate a concise title for a chat session."""
|
||||
from backend.util.settings import Settings
|
||||
|
||||
settings = Settings()
|
||||
try:
|
||||
# Build extra_body for OpenRouter tracing
|
||||
extra_body: dict[str, Any] = {
|
||||
"posthogProperties": {"environment": settings.config.app_env.value},
|
||||
}
|
||||
if user_id:
|
||||
extra_body["user"] = user_id[:128]
|
||||
extra_body["posthogDistinctId"] = user_id
|
||||
if session_id:
|
||||
extra_body["session_id"] = session_id[:128]
|
||||
|
||||
client = openai.AsyncOpenAI(api_key=config.api_key, base_url=config.base_url)
|
||||
response = await client.chat.completions.create(
|
||||
model=config.title_model,
|
||||
messages=[
|
||||
{
|
||||
"role": "system",
|
||||
"content": "Generate a very short title (3-6 words) for a chat conversation based on the user's first message. Return ONLY the title, no quotes or punctuation.",
|
||||
},
|
||||
{"role": "user", "content": message[:500]},
|
||||
],
|
||||
max_tokens=20,
|
||||
extra_body=extra_body,
|
||||
)
|
||||
title = response.choices[0].message.content
|
||||
if title:
|
||||
title = title.strip().strip("\"'")
|
||||
return title[:47] + "..." if len(title) > 50 else title
|
||||
return None
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to generate session title: {e}")
|
||||
return None
|
||||
|
||||
|
||||
async def stream_chat_completion_sdk(
|
||||
session_id: str,
|
||||
message: str | None = None,
|
||||
tool_call_response: str | None = None, # noqa: ARG001
|
||||
is_user_message: bool = True,
|
||||
user_id: str | None = None,
|
||||
retry_count: int = 0, # noqa: ARG001
|
||||
session: ChatSession | None = None,
|
||||
context: dict[str, str] | None = None, # noqa: ARG001
|
||||
) -> AsyncGenerator[StreamBaseResponse, None]:
|
||||
"""Stream chat completion using Claude Agent SDK.
|
||||
|
||||
Drop-in replacement for stream_chat_completion with improved reliability.
|
||||
"""
|
||||
|
||||
if session is None:
|
||||
session = await get_chat_session(session_id, user_id)
|
||||
|
||||
if not session:
|
||||
raise NotFoundError(
|
||||
f"Session {session_id} not found. Please create a new session first."
|
||||
)
|
||||
|
||||
if message:
|
||||
session.messages.append(
|
||||
ChatMessage(
|
||||
role="user" if is_user_message else "assistant", content=message
|
||||
)
|
||||
)
|
||||
if is_user_message:
|
||||
track_user_message(
|
||||
user_id=user_id, session_id=session_id, message_length=len(message)
|
||||
)
|
||||
|
||||
session = await upsert_chat_session(session)
|
||||
|
||||
# Generate title for new sessions (first user message)
|
||||
if is_user_message and not session.title:
|
||||
user_messages = [m for m in session.messages if m.role == "user"]
|
||||
if len(user_messages) == 1:
|
||||
first_message = user_messages[0].content or message or ""
|
||||
if first_message:
|
||||
task = asyncio.create_task(
|
||||
_update_title_async(session_id, first_message, user_id)
|
||||
)
|
||||
# Store reference to prevent garbage collection
|
||||
_background_tasks.add(task)
|
||||
task.add_done_callback(_background_tasks.discard)
|
||||
|
||||
# Check if there's conversation history (more than just the current message)
|
||||
has_history = len(session.messages) > 1
|
||||
system_prompt, _ = await _build_system_prompt(
|
||||
user_id, has_conversation_history=has_history
|
||||
)
|
||||
set_execution_context(user_id, session, None)
|
||||
|
||||
message_id = str(uuid.uuid4())
|
||||
text_block_id = str(uuid.uuid4())
|
||||
task_id = str(uuid.uuid4())
|
||||
|
||||
yield StreamStart(messageId=message_id, taskId=task_id)
|
||||
|
||||
# Track whether the stream completed normally via ResultMessage
|
||||
stream_completed = False
|
||||
|
||||
try:
|
||||
try:
|
||||
from claude_agent_sdk import ClaudeAgentOptions, ClaudeSDKClient
|
||||
|
||||
# Create MCP server with CoPilot tools
|
||||
mcp_server = create_copilot_mcp_server()
|
||||
|
||||
options = ClaudeAgentOptions(
|
||||
system_prompt=system_prompt,
|
||||
mcp_servers={"copilot": mcp_server}, # type: ignore[arg-type]
|
||||
allowed_tools=COPILOT_TOOL_NAMES,
|
||||
hooks=create_security_hooks(user_id), # type: ignore[arg-type]
|
||||
continue_conversation=True, # Enable conversation continuation
|
||||
)
|
||||
|
||||
adapter = SDKResponseAdapter(message_id=message_id)
|
||||
adapter.set_task_id(task_id)
|
||||
|
||||
async with ClaudeSDKClient(options=options) as client:
|
||||
# Build prompt with conversation history for context
|
||||
# The SDK doesn't support replaying full conversation history,
|
||||
# so we include it as context in the prompt
|
||||
current_message = message or ""
|
||||
if not current_message and session.messages:
|
||||
last_user = [m for m in session.messages if m.role == "user"]
|
||||
if last_user:
|
||||
current_message = last_user[-1].content or ""
|
||||
|
||||
# Include conversation history if there are prior messages
|
||||
if len(session.messages) > 1:
|
||||
history_context = _format_conversation_history(session)
|
||||
prompt = f"{history_context}{current_message}"
|
||||
else:
|
||||
prompt = current_message
|
||||
|
||||
# Guard against empty prompts
|
||||
if not prompt.strip():
|
||||
yield StreamError(
|
||||
errorText="Message cannot be empty.",
|
||||
code="empty_prompt",
|
||||
)
|
||||
yield StreamFinish()
|
||||
return
|
||||
|
||||
await client.query(prompt, session_id=session_id)
|
||||
|
||||
# Track assistant response to save to session
|
||||
# We may need multiple assistant messages if text comes after tool results
|
||||
assistant_response = ChatMessage(role="assistant", content="")
|
||||
accumulated_tool_calls: list[dict[str, Any]] = []
|
||||
has_appended_assistant = False
|
||||
has_tool_results = False # Track if we've received tool results
|
||||
|
||||
# Receive messages from the SDK
|
||||
async for sdk_msg in client.receive_messages():
|
||||
for response in adapter.convert_message(sdk_msg):
|
||||
if isinstance(response, StreamStart):
|
||||
continue
|
||||
yield response
|
||||
|
||||
# Accumulate text deltas into assistant response
|
||||
if isinstance(response, StreamTextDelta):
|
||||
delta = response.delta or ""
|
||||
# After tool results, create new assistant message for post-tool text
|
||||
if has_tool_results and has_appended_assistant:
|
||||
assistant_response = ChatMessage(
|
||||
role="assistant", content=delta
|
||||
)
|
||||
accumulated_tool_calls = [] # Reset for new message
|
||||
session.messages.append(assistant_response)
|
||||
has_tool_results = False
|
||||
else:
|
||||
assistant_response.content = (
|
||||
assistant_response.content or ""
|
||||
) + delta
|
||||
if not has_appended_assistant:
|
||||
session.messages.append(assistant_response)
|
||||
has_appended_assistant = True
|
||||
|
||||
# Track tool calls on the assistant message
|
||||
elif isinstance(response, StreamToolInputAvailable):
|
||||
accumulated_tool_calls.append(
|
||||
{
|
||||
"id": response.toolCallId,
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": response.toolName,
|
||||
"arguments": json.dumps(response.input or {}),
|
||||
},
|
||||
}
|
||||
)
|
||||
# Update assistant message with tool calls
|
||||
assistant_response.tool_calls = accumulated_tool_calls
|
||||
# Append assistant message if not already (tool-only response)
|
||||
if not has_appended_assistant:
|
||||
session.messages.append(assistant_response)
|
||||
has_appended_assistant = True
|
||||
|
||||
elif isinstance(response, StreamToolOutputAvailable):
|
||||
session.messages.append(
|
||||
ChatMessage(
|
||||
role="tool",
|
||||
content=(
|
||||
response.output
|
||||
if isinstance(response.output, str)
|
||||
else str(response.output)
|
||||
),
|
||||
tool_call_id=response.toolCallId,
|
||||
)
|
||||
)
|
||||
has_tool_results = True
|
||||
|
||||
elif isinstance(response, StreamFinish):
|
||||
stream_completed = True
|
||||
|
||||
# Break out of the message loop if we received finish signal
|
||||
if stream_completed:
|
||||
break
|
||||
|
||||
# Ensure assistant response is saved even if no text deltas
|
||||
# (e.g., only tool calls were made)
|
||||
if (
|
||||
assistant_response.content or assistant_response.tool_calls
|
||||
) and not has_appended_assistant:
|
||||
session.messages.append(assistant_response)
|
||||
|
||||
except ImportError:
|
||||
logger.warning(
|
||||
"[SDK] claude-agent-sdk not available, using Anthropic fallback"
|
||||
)
|
||||
async for response in stream_with_anthropic(
|
||||
session, system_prompt, text_block_id
|
||||
):
|
||||
yield response
|
||||
|
||||
# Save the session with accumulated messages
|
||||
await upsert_chat_session(session)
|
||||
logger.debug(
|
||||
f"[SDK] Session {session_id} saved with {len(session.messages)} messages"
|
||||
)
|
||||
# Always yield StreamFinish to signal completion to the caller
|
||||
# The adapter yields StreamFinish for the SSE stream, but we need to
|
||||
# yield it here so the background task in routes.py knows to call mark_task_completed
|
||||
yield StreamFinish()
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"[SDK] Error: {e}", exc_info=True)
|
||||
# Save session even on error to preserve any partial response
|
||||
try:
|
||||
await upsert_chat_session(session)
|
||||
except Exception as save_err:
|
||||
logger.error(f"[SDK] Failed to save session on error: {save_err}")
|
||||
# Sanitize error message to avoid exposing internal details
|
||||
yield StreamError(
|
||||
errorText="An error occurred. Please try again.",
|
||||
code="sdk_error",
|
||||
)
|
||||
yield StreamFinish()
|
||||
|
||||
|
||||
async def _update_title_async(
|
||||
session_id: str, message: str, user_id: str | None = None
|
||||
) -> None:
|
||||
"""Background task to update session title."""
|
||||
try:
|
||||
title = await _generate_session_title(
|
||||
message, user_id=user_id, session_id=session_id
|
||||
)
|
||||
if title:
|
||||
await update_session_title(session_id, title)
|
||||
logger.debug(f"[SDK] Generated title for {session_id}: {title}")
|
||||
except Exception as e:
|
||||
logger.warning(f"[SDK] Failed to update session title: {e}")
|
||||
@@ -1,218 +0,0 @@
|
||||
"""Tool adapter for wrapping existing CoPilot tools as Claude Agent SDK MCP tools.
|
||||
|
||||
This module provides the adapter layer that converts existing BaseTool implementations
|
||||
into in-process MCP tools that can be used with the Claude Agent SDK.
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
from contextvars import ContextVar
|
||||
from typing import Any
|
||||
|
||||
from backend.api.features.chat.model import ChatSession
|
||||
from backend.api.features.chat.tools import TOOL_REGISTRY
|
||||
from backend.api.features.chat.tools.base import BaseTool
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Context variables to pass user/session info to tool execution
|
||||
_current_user_id: ContextVar[str | None] = ContextVar("current_user_id", default=None)
|
||||
_current_session: ContextVar[ChatSession | None] = ContextVar(
|
||||
"current_session", default=None
|
||||
)
|
||||
_current_tool_call_id: ContextVar[str | None] = ContextVar(
|
||||
"current_tool_call_id", default=None
|
||||
)
|
||||
|
||||
|
||||
def set_execution_context(
|
||||
user_id: str | None,
|
||||
session: ChatSession,
|
||||
tool_call_id: str | None = None,
|
||||
) -> None:
|
||||
"""Set the execution context for tool calls.
|
||||
|
||||
This must be called before streaming begins to ensure tools have access
|
||||
to user_id and session information.
|
||||
"""
|
||||
_current_user_id.set(user_id)
|
||||
_current_session.set(session)
|
||||
_current_tool_call_id.set(tool_call_id)
|
||||
|
||||
|
||||
def get_execution_context() -> tuple[str | None, ChatSession | None, str | None]:
|
||||
"""Get the current execution context."""
|
||||
return (
|
||||
_current_user_id.get(),
|
||||
_current_session.get(),
|
||||
_current_tool_call_id.get(),
|
||||
)
|
||||
|
||||
|
||||
def create_tool_handler(base_tool: BaseTool):
|
||||
"""Create an async handler function for a BaseTool.
|
||||
|
||||
This wraps the existing BaseTool._execute method to be compatible
|
||||
with the Claude Agent SDK MCP tool format.
|
||||
"""
|
||||
|
||||
async def tool_handler(args: dict[str, Any]) -> dict[str, Any]:
|
||||
"""Execute the wrapped tool and return MCP-formatted response."""
|
||||
user_id, session, tool_call_id = get_execution_context()
|
||||
|
||||
if session is None:
|
||||
return {
|
||||
"content": [
|
||||
{
|
||||
"type": "text",
|
||||
"text": json.dumps(
|
||||
{
|
||||
"error": "No session context available",
|
||||
"type": "error",
|
||||
}
|
||||
),
|
||||
}
|
||||
],
|
||||
"isError": True,
|
||||
}
|
||||
|
||||
try:
|
||||
# Call the existing tool's execute method
|
||||
result = await base_tool.execute(
|
||||
user_id=user_id,
|
||||
session=session,
|
||||
tool_call_id=tool_call_id or "sdk-call",
|
||||
**args,
|
||||
)
|
||||
|
||||
# The result is a StreamToolOutputAvailable, extract the output
|
||||
return {
|
||||
"content": [
|
||||
{
|
||||
"type": "text",
|
||||
"text": (
|
||||
result.output
|
||||
if isinstance(result.output, str)
|
||||
else json.dumps(result.output)
|
||||
),
|
||||
}
|
||||
],
|
||||
"isError": not result.success,
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error executing tool {base_tool.name}: {e}", exc_info=True)
|
||||
return {
|
||||
"content": [
|
||||
{
|
||||
"type": "text",
|
||||
"text": json.dumps(
|
||||
{
|
||||
"error": str(e),
|
||||
"type": "error",
|
||||
"message": f"Failed to execute {base_tool.name}",
|
||||
}
|
||||
),
|
||||
}
|
||||
],
|
||||
"isError": True,
|
||||
}
|
||||
|
||||
return tool_handler
|
||||
|
||||
|
||||
def get_tool_definitions() -> list[dict[str, Any]]:
|
||||
"""Get all tool definitions in MCP format.
|
||||
|
||||
Returns a list of tool definitions that can be used with
|
||||
create_sdk_mcp_server or as raw tool definitions.
|
||||
"""
|
||||
tool_definitions = []
|
||||
|
||||
for tool_name, base_tool in TOOL_REGISTRY.items():
|
||||
tool_def = {
|
||||
"name": tool_name,
|
||||
"description": base_tool.description,
|
||||
"inputSchema": {
|
||||
"type": "object",
|
||||
"properties": base_tool.parameters.get("properties", {}),
|
||||
"required": base_tool.parameters.get("required", []),
|
||||
},
|
||||
}
|
||||
tool_definitions.append(tool_def)
|
||||
|
||||
return tool_definitions
|
||||
|
||||
|
||||
def get_tool_handlers() -> dict[str, Any]:
|
||||
"""Get all tool handlers mapped by name.
|
||||
|
||||
Returns a dictionary mapping tool names to their handler functions.
|
||||
"""
|
||||
handlers = {}
|
||||
|
||||
for tool_name, base_tool in TOOL_REGISTRY.items():
|
||||
handlers[tool_name] = create_tool_handler(base_tool)
|
||||
|
||||
return handlers
|
||||
|
||||
|
||||
# Create the MCP server configuration
|
||||
def create_copilot_mcp_server():
|
||||
"""Create an in-process MCP server configuration for CoPilot tools.
|
||||
|
||||
This can be passed to ClaudeAgentOptions.mcp_servers.
|
||||
|
||||
Note: The actual SDK MCP server creation depends on the claude-agent-sdk
|
||||
package being available. This function returns the configuration that
|
||||
can be used with the SDK.
|
||||
"""
|
||||
try:
|
||||
from claude_agent_sdk import create_sdk_mcp_server, tool
|
||||
|
||||
# Create decorated tool functions
|
||||
sdk_tools = []
|
||||
|
||||
for tool_name, base_tool in TOOL_REGISTRY.items():
|
||||
# Get the handler
|
||||
handler = create_tool_handler(base_tool)
|
||||
|
||||
# Create the decorated tool
|
||||
# The @tool decorator expects (name, description, schema)
|
||||
# Pass full JSON schema with type, properties, and required
|
||||
decorated = tool(
|
||||
tool_name,
|
||||
base_tool.description,
|
||||
{
|
||||
"type": "object",
|
||||
"properties": base_tool.parameters.get("properties", {}),
|
||||
"required": base_tool.parameters.get("required", []),
|
||||
},
|
||||
)(handler)
|
||||
|
||||
sdk_tools.append(decorated)
|
||||
|
||||
# Create the MCP server
|
||||
server = create_sdk_mcp_server(
|
||||
name="copilot",
|
||||
version="1.0.0",
|
||||
tools=sdk_tools,
|
||||
)
|
||||
|
||||
return server
|
||||
|
||||
except ImportError:
|
||||
logger.warning(
|
||||
"claude-agent-sdk not available, returning tool definitions only"
|
||||
)
|
||||
return {
|
||||
"tools": get_tool_definitions(),
|
||||
"handlers": get_tool_handlers(),
|
||||
}
|
||||
|
||||
|
||||
# List of tool names for allowed_tools configuration
|
||||
COPILOT_TOOL_NAMES = [f"mcp__copilot__{name}" for name in TOOL_REGISTRY.keys()]
|
||||
|
||||
# Also export the raw tool names for flexibility
|
||||
RAW_TOOL_NAMES = list(TOOL_REGISTRY.keys())
|
||||
@@ -555,10 +555,6 @@ async def get_active_task_for_session(
|
||||
if task_user_id and user_id != task_user_id:
|
||||
continue
|
||||
|
||||
logger.info(
|
||||
f"[TASK_LOOKUP] Found running task {task_id[:8]}... for session {session_id[:8]}..."
|
||||
)
|
||||
|
||||
# Get the last message ID from Redis Stream
|
||||
stream_key = _get_task_stream_key(task_id)
|
||||
last_id = "0-0"
|
||||
|
||||
@@ -6,7 +6,6 @@ from typing import Any
|
||||
from backend.api.features.library import db as library_db
|
||||
from backend.api.features.library import model as library_model
|
||||
from backend.api.features.store import db as store_db
|
||||
from backend.data import graph as graph_db
|
||||
from backend.data.graph import GraphModel
|
||||
from backend.data.model import (
|
||||
CredentialsFieldInfo,
|
||||
@@ -44,14 +43,8 @@ async def fetch_graph_from_store_slug(
|
||||
return None, None
|
||||
|
||||
# Get the graph from store listing version
|
||||
graph_meta = await store_db.get_available_graph(
|
||||
store_agent.store_listing_version_id
|
||||
)
|
||||
graph = await graph_db.get_graph(
|
||||
graph_id=graph_meta.id,
|
||||
version=graph_meta.version,
|
||||
user_id=None, # Public access
|
||||
include_subgraphs=True,
|
||||
graph = await store_db.get_available_graph(
|
||||
store_agent.store_listing_version_id, hide_nodes=False
|
||||
)
|
||||
return graph, store_agent
|
||||
|
||||
@@ -128,7 +121,7 @@ def build_missing_credentials_from_graph(
|
||||
|
||||
return {
|
||||
field_key: _serialize_missing_credential(field_key, field_info)
|
||||
for field_key, (field_info, _node_fields) in aggregated_fields.items()
|
||||
for field_key, (field_info, _, _) in aggregated_fields.items()
|
||||
if field_key not in matched_keys
|
||||
}
|
||||
|
||||
@@ -269,7 +262,8 @@ async def match_user_credentials_to_graph(
|
||||
# provider is in the set of acceptable providers.
|
||||
for credential_field_name, (
|
||||
credential_requirements,
|
||||
_node_fields,
|
||||
_,
|
||||
_,
|
||||
) in aggregated_creds.items():
|
||||
# Find first matching credential by provider, type, and scopes
|
||||
matching_cred = next(
|
||||
|
||||
@@ -374,7 +374,7 @@ async def get_library_agent_by_graph_id(
|
||||
|
||||
|
||||
async def add_generated_agent_image(
|
||||
graph: graph_db.BaseGraph,
|
||||
graph: graph_db.GraphBaseMeta,
|
||||
user_id: str,
|
||||
library_agent_id: str,
|
||||
) -> Optional[prisma.models.LibraryAgent]:
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import asyncio
|
||||
import logging
|
||||
from datetime import datetime, timezone
|
||||
from typing import Any, Literal
|
||||
from typing import Any, Literal, overload
|
||||
|
||||
import fastapi
|
||||
import prisma.enums
|
||||
@@ -11,8 +11,8 @@ import prisma.types
|
||||
|
||||
from backend.data.db import transaction
|
||||
from backend.data.graph import (
|
||||
GraphMeta,
|
||||
GraphModel,
|
||||
GraphModelWithoutNodes,
|
||||
get_graph,
|
||||
get_graph_as_admin,
|
||||
get_sub_graphs,
|
||||
@@ -334,7 +334,22 @@ async def get_store_agent_details(
|
||||
raise DatabaseError("Failed to fetch agent details") from e
|
||||
|
||||
|
||||
async def get_available_graph(store_listing_version_id: str) -> GraphMeta:
|
||||
@overload
|
||||
async def get_available_graph(
|
||||
store_listing_version_id: str, hide_nodes: Literal[False]
|
||||
) -> GraphModel: ...
|
||||
|
||||
|
||||
@overload
|
||||
async def get_available_graph(
|
||||
store_listing_version_id: str, hide_nodes: Literal[True] = True
|
||||
) -> GraphModelWithoutNodes: ...
|
||||
|
||||
|
||||
async def get_available_graph(
|
||||
store_listing_version_id: str,
|
||||
hide_nodes: bool = True,
|
||||
) -> GraphModelWithoutNodes | GraphModel:
|
||||
try:
|
||||
# Get avaialble, non-deleted store listing version
|
||||
store_listing_version = (
|
||||
@@ -344,7 +359,7 @@ async def get_available_graph(store_listing_version_id: str) -> GraphMeta:
|
||||
"isAvailable": True,
|
||||
"isDeleted": False,
|
||||
},
|
||||
include={"AgentGraph": {"include": {"Nodes": True}}},
|
||||
include={"AgentGraph": {"include": AGENT_GRAPH_INCLUDE}},
|
||||
)
|
||||
)
|
||||
|
||||
@@ -354,7 +369,9 @@ async def get_available_graph(store_listing_version_id: str) -> GraphMeta:
|
||||
detail=f"Store listing version {store_listing_version_id} not found",
|
||||
)
|
||||
|
||||
return GraphModel.from_db(store_listing_version.AgentGraph).meta()
|
||||
return (GraphModelWithoutNodes if hide_nodes else GraphModel).from_db(
|
||||
store_listing_version.AgentGraph
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting agent: {e}")
|
||||
|
||||
@@ -16,7 +16,7 @@ from backend.blocks.ideogram import (
|
||||
StyleType,
|
||||
UpscaleOption,
|
||||
)
|
||||
from backend.data.graph import BaseGraph
|
||||
from backend.data.graph import GraphBaseMeta
|
||||
from backend.data.model import CredentialsMetaInput, ProviderName
|
||||
from backend.integrations.credentials_store import ideogram_credentials
|
||||
from backend.util.request import Requests
|
||||
@@ -34,14 +34,14 @@ class ImageStyle(str, Enum):
|
||||
DIGITAL_ART = "digital art"
|
||||
|
||||
|
||||
async def generate_agent_image(agent: BaseGraph | AgentGraph) -> io.BytesIO:
|
||||
async def generate_agent_image(agent: GraphBaseMeta | AgentGraph) -> io.BytesIO:
|
||||
if settings.config.use_agent_image_generation_v2:
|
||||
return await generate_agent_image_v2(graph=agent)
|
||||
else:
|
||||
return await generate_agent_image_v1(agent=agent)
|
||||
|
||||
|
||||
async def generate_agent_image_v2(graph: BaseGraph | AgentGraph) -> io.BytesIO:
|
||||
async def generate_agent_image_v2(graph: GraphBaseMeta | AgentGraph) -> io.BytesIO:
|
||||
"""
|
||||
Generate an image for an agent using Ideogram model.
|
||||
Returns:
|
||||
@@ -54,14 +54,17 @@ async def generate_agent_image_v2(graph: BaseGraph | AgentGraph) -> io.BytesIO:
|
||||
description = f"{name} ({graph.description})" if graph.description else name
|
||||
|
||||
prompt = (
|
||||
f"Create a visually striking retro-futuristic vector pop art illustration prominently featuring "
|
||||
f'"{name}" in bold typography. The image clearly and literally depicts a {description}, '
|
||||
f"along with recognizable objects directly associated with the primary function of a {name}. "
|
||||
f"Ensure the imagery is concrete, intuitive, and immediately understandable, clearly conveying the "
|
||||
f"purpose of a {name}. Maintain vibrant, limited-palette colors, sharp vector lines, geometric "
|
||||
f"shapes, flat illustration techniques, and solid colors without gradients or shading. Preserve a "
|
||||
f"retro-futuristic aesthetic influenced by mid-century futurism and 1960s psychedelia, "
|
||||
f"prioritizing clear visual storytelling and thematic clarity above all else."
|
||||
"Create a visually striking retro-futuristic vector pop art illustration "
|
||||
f'prominently featuring "{name}" in bold typography. The image clearly and '
|
||||
f"literally depicts a {description}, along with recognizable objects directly "
|
||||
f"associated with the primary function of a {name}. "
|
||||
f"Ensure the imagery is concrete, intuitive, and immediately understandable, "
|
||||
f"clearly conveying the purpose of a {name}. "
|
||||
"Maintain vibrant, limited-palette colors, sharp vector lines, "
|
||||
"geometric shapes, flat illustration techniques, and solid colors "
|
||||
"without gradients or shading. Preserve a retro-futuristic aesthetic "
|
||||
"influenced by mid-century futurism and 1960s psychedelia, "
|
||||
"prioritizing clear visual storytelling and thematic clarity above all else."
|
||||
)
|
||||
|
||||
custom_colors = [
|
||||
@@ -99,12 +102,12 @@ async def generate_agent_image_v2(graph: BaseGraph | AgentGraph) -> io.BytesIO:
|
||||
return io.BytesIO(response.content)
|
||||
|
||||
|
||||
async def generate_agent_image_v1(agent: BaseGraph | AgentGraph) -> io.BytesIO:
|
||||
async def generate_agent_image_v1(agent: GraphBaseMeta | AgentGraph) -> io.BytesIO:
|
||||
"""
|
||||
Generate an image for an agent using Flux model via Replicate API.
|
||||
|
||||
Args:
|
||||
agent (Graph): The agent to generate an image for
|
||||
agent (GraphBaseMeta | AgentGraph): The agent to generate an image for
|
||||
|
||||
Returns:
|
||||
io.BytesIO: The generated image as bytes
|
||||
@@ -114,7 +117,13 @@ async def generate_agent_image_v1(agent: BaseGraph | AgentGraph) -> io.BytesIO:
|
||||
raise ValueError("Missing Replicate API key in settings")
|
||||
|
||||
# Construct prompt from agent details
|
||||
prompt = f"Create a visually engaging app store thumbnail for the AI agent that highlights what it does in a clear and captivating way:\n- **Name**: {agent.name}\n- **Description**: {agent.description}\nFocus on showcasing its core functionality with an appealing design."
|
||||
prompt = (
|
||||
"Create a visually engaging app store thumbnail for the AI agent "
|
||||
"that highlights what it does in a clear and captivating way:\n"
|
||||
f"- **Name**: {agent.name}\n"
|
||||
f"- **Description**: {agent.description}\n"
|
||||
f"Focus on showcasing its core functionality with an appealing design."
|
||||
)
|
||||
|
||||
# Set up Replicate client
|
||||
client = ReplicateClient(api_token=settings.secrets.replicate_api_key)
|
||||
|
||||
@@ -278,7 +278,7 @@ async def get_agent(
|
||||
)
|
||||
async def get_graph_meta_by_store_listing_version_id(
|
||||
store_listing_version_id: str,
|
||||
) -> backend.data.graph.GraphMeta:
|
||||
) -> backend.data.graph.GraphModelWithoutNodes:
|
||||
"""
|
||||
Get Agent Graph from Store Listing Version ID.
|
||||
"""
|
||||
|
||||
@@ -478,7 +478,7 @@ class ExaCreateOrFindWebsetBlock(Block):
|
||||
aexa = AsyncExa(api_key=credentials.api_key.get_secret_value())
|
||||
|
||||
try:
|
||||
webset = aexa.websets.get(id=input_data.external_id)
|
||||
webset = await aexa.websets.get(id=input_data.external_id)
|
||||
webset_result = Webset.model_validate(webset.model_dump(by_alias=True))
|
||||
|
||||
yield "webset", webset_result
|
||||
@@ -494,7 +494,7 @@ class ExaCreateOrFindWebsetBlock(Block):
|
||||
count=input_data.search_count,
|
||||
)
|
||||
|
||||
webset = aexa.websets.create(
|
||||
webset = await aexa.websets.create(
|
||||
params=CreateWebsetParameters(
|
||||
search=search_params,
|
||||
external_id=input_data.external_id,
|
||||
@@ -554,7 +554,7 @@ class ExaUpdateWebsetBlock(Block):
|
||||
if input_data.metadata is not None:
|
||||
payload["metadata"] = input_data.metadata
|
||||
|
||||
sdk_webset = aexa.websets.update(id=input_data.webset_id, params=payload)
|
||||
sdk_webset = await aexa.websets.update(id=input_data.webset_id, params=payload)
|
||||
|
||||
status_str = (
|
||||
sdk_webset.status.value
|
||||
@@ -617,7 +617,7 @@ class ExaListWebsetsBlock(Block):
|
||||
) -> BlockOutput:
|
||||
aexa = AsyncExa(api_key=credentials.api_key.get_secret_value())
|
||||
|
||||
response = aexa.websets.list(
|
||||
response = await aexa.websets.list(
|
||||
cursor=input_data.cursor,
|
||||
limit=input_data.limit,
|
||||
)
|
||||
@@ -678,7 +678,7 @@ class ExaGetWebsetBlock(Block):
|
||||
) -> BlockOutput:
|
||||
aexa = AsyncExa(api_key=credentials.api_key.get_secret_value())
|
||||
|
||||
sdk_webset = aexa.websets.get(id=input_data.webset_id)
|
||||
sdk_webset = await aexa.websets.get(id=input_data.webset_id)
|
||||
|
||||
status_str = (
|
||||
sdk_webset.status.value
|
||||
@@ -748,7 +748,7 @@ class ExaDeleteWebsetBlock(Block):
|
||||
) -> BlockOutput:
|
||||
aexa = AsyncExa(api_key=credentials.api_key.get_secret_value())
|
||||
|
||||
deleted_webset = aexa.websets.delete(id=input_data.webset_id)
|
||||
deleted_webset = await aexa.websets.delete(id=input_data.webset_id)
|
||||
|
||||
status_str = (
|
||||
deleted_webset.status.value
|
||||
@@ -798,7 +798,7 @@ class ExaCancelWebsetBlock(Block):
|
||||
) -> BlockOutput:
|
||||
aexa = AsyncExa(api_key=credentials.api_key.get_secret_value())
|
||||
|
||||
canceled_webset = aexa.websets.cancel(id=input_data.webset_id)
|
||||
canceled_webset = await aexa.websets.cancel(id=input_data.webset_id)
|
||||
|
||||
status_str = (
|
||||
canceled_webset.status.value
|
||||
@@ -968,7 +968,7 @@ class ExaPreviewWebsetBlock(Block):
|
||||
entity["description"] = input_data.entity_description
|
||||
payload["entity"] = entity
|
||||
|
||||
sdk_preview = aexa.websets.preview(params=payload)
|
||||
sdk_preview = await aexa.websets.preview(params=payload)
|
||||
|
||||
preview = PreviewWebsetModel.from_sdk(sdk_preview)
|
||||
|
||||
@@ -1051,7 +1051,7 @@ class ExaWebsetStatusBlock(Block):
|
||||
) -> BlockOutput:
|
||||
aexa = AsyncExa(api_key=credentials.api_key.get_secret_value())
|
||||
|
||||
webset = aexa.websets.get(id=input_data.webset_id)
|
||||
webset = await aexa.websets.get(id=input_data.webset_id)
|
||||
|
||||
status = (
|
||||
webset.status.value
|
||||
@@ -1185,7 +1185,7 @@ class ExaWebsetSummaryBlock(Block):
|
||||
) -> BlockOutput:
|
||||
aexa = AsyncExa(api_key=credentials.api_key.get_secret_value())
|
||||
|
||||
webset = aexa.websets.get(id=input_data.webset_id)
|
||||
webset = await aexa.websets.get(id=input_data.webset_id)
|
||||
|
||||
# Extract basic info
|
||||
webset_id = webset.id
|
||||
@@ -1211,7 +1211,7 @@ class ExaWebsetSummaryBlock(Block):
|
||||
total_items = 0
|
||||
|
||||
if input_data.include_sample_items and input_data.sample_size > 0:
|
||||
items_response = aexa.websets.items.list(
|
||||
items_response = await aexa.websets.items.list(
|
||||
webset_id=input_data.webset_id, limit=input_data.sample_size
|
||||
)
|
||||
sample_items_data = [
|
||||
@@ -1362,7 +1362,7 @@ class ExaWebsetReadyCheckBlock(Block):
|
||||
aexa = AsyncExa(api_key=credentials.api_key.get_secret_value())
|
||||
|
||||
# Get webset details
|
||||
webset = aexa.websets.get(id=input_data.webset_id)
|
||||
webset = await aexa.websets.get(id=input_data.webset_id)
|
||||
|
||||
status = (
|
||||
webset.status.value
|
||||
|
||||
@@ -202,7 +202,7 @@ class ExaCreateEnrichmentBlock(Block):
|
||||
# Use AsyncExa SDK
|
||||
aexa = AsyncExa(api_key=credentials.api_key.get_secret_value())
|
||||
|
||||
sdk_enrichment = aexa.websets.enrichments.create(
|
||||
sdk_enrichment = await aexa.websets.enrichments.create(
|
||||
webset_id=input_data.webset_id, params=payload
|
||||
)
|
||||
|
||||
@@ -223,7 +223,7 @@ class ExaCreateEnrichmentBlock(Block):
|
||||
items_enriched = 0
|
||||
|
||||
while time.time() - poll_start < input_data.polling_timeout:
|
||||
current_enrich = aexa.websets.enrichments.get(
|
||||
current_enrich = await aexa.websets.enrichments.get(
|
||||
webset_id=input_data.webset_id, id=enrichment_id
|
||||
)
|
||||
current_status = (
|
||||
@@ -234,7 +234,7 @@ class ExaCreateEnrichmentBlock(Block):
|
||||
|
||||
if current_status in ["completed", "failed", "cancelled"]:
|
||||
# Estimate items from webset searches
|
||||
webset = aexa.websets.get(id=input_data.webset_id)
|
||||
webset = await aexa.websets.get(id=input_data.webset_id)
|
||||
if webset.searches:
|
||||
for search in webset.searches:
|
||||
if search.progress:
|
||||
@@ -329,7 +329,7 @@ class ExaGetEnrichmentBlock(Block):
|
||||
# Use AsyncExa SDK
|
||||
aexa = AsyncExa(api_key=credentials.api_key.get_secret_value())
|
||||
|
||||
sdk_enrichment = aexa.websets.enrichments.get(
|
||||
sdk_enrichment = await aexa.websets.enrichments.get(
|
||||
webset_id=input_data.webset_id, id=input_data.enrichment_id
|
||||
)
|
||||
|
||||
@@ -474,7 +474,7 @@ class ExaDeleteEnrichmentBlock(Block):
|
||||
# Use AsyncExa SDK
|
||||
aexa = AsyncExa(api_key=credentials.api_key.get_secret_value())
|
||||
|
||||
deleted_enrichment = aexa.websets.enrichments.delete(
|
||||
deleted_enrichment = await aexa.websets.enrichments.delete(
|
||||
webset_id=input_data.webset_id, id=input_data.enrichment_id
|
||||
)
|
||||
|
||||
@@ -525,13 +525,13 @@ class ExaCancelEnrichmentBlock(Block):
|
||||
# Use AsyncExa SDK
|
||||
aexa = AsyncExa(api_key=credentials.api_key.get_secret_value())
|
||||
|
||||
canceled_enrichment = aexa.websets.enrichments.cancel(
|
||||
canceled_enrichment = await aexa.websets.enrichments.cancel(
|
||||
webset_id=input_data.webset_id, id=input_data.enrichment_id
|
||||
)
|
||||
|
||||
# Try to estimate how many items were enriched before cancellation
|
||||
items_enriched = 0
|
||||
items_response = aexa.websets.items.list(
|
||||
items_response = await aexa.websets.items.list(
|
||||
webset_id=input_data.webset_id, limit=100
|
||||
)
|
||||
|
||||
|
||||
@@ -222,7 +222,7 @@ class ExaCreateImportBlock(Block):
|
||||
def _create_test_mock():
|
||||
"""Create test mocks for the AsyncExa SDK."""
|
||||
from datetime import datetime
|
||||
from unittest.mock import MagicMock
|
||||
from unittest.mock import AsyncMock, MagicMock
|
||||
|
||||
# Create mock SDK import object
|
||||
mock_import = MagicMock()
|
||||
@@ -247,7 +247,7 @@ class ExaCreateImportBlock(Block):
|
||||
return {
|
||||
"_get_client": lambda *args, **kwargs: MagicMock(
|
||||
websets=MagicMock(
|
||||
imports=MagicMock(create=lambda *args, **kwargs: mock_import)
|
||||
imports=MagicMock(create=AsyncMock(return_value=mock_import))
|
||||
)
|
||||
)
|
||||
}
|
||||
@@ -294,7 +294,7 @@ class ExaCreateImportBlock(Block):
|
||||
if input_data.metadata:
|
||||
payload["metadata"] = input_data.metadata
|
||||
|
||||
sdk_import = aexa.websets.imports.create(
|
||||
sdk_import = await aexa.websets.imports.create(
|
||||
params=payload, csv_data=input_data.csv_data
|
||||
)
|
||||
|
||||
@@ -360,7 +360,7 @@ class ExaGetImportBlock(Block):
|
||||
# Use AsyncExa SDK
|
||||
aexa = AsyncExa(api_key=credentials.api_key.get_secret_value())
|
||||
|
||||
sdk_import = aexa.websets.imports.get(import_id=input_data.import_id)
|
||||
sdk_import = await aexa.websets.imports.get(import_id=input_data.import_id)
|
||||
|
||||
import_obj = ImportModel.from_sdk(sdk_import)
|
||||
|
||||
@@ -426,7 +426,7 @@ class ExaListImportsBlock(Block):
|
||||
# Use AsyncExa SDK
|
||||
aexa = AsyncExa(api_key=credentials.api_key.get_secret_value())
|
||||
|
||||
response = aexa.websets.imports.list(
|
||||
response = await aexa.websets.imports.list(
|
||||
cursor=input_data.cursor,
|
||||
limit=input_data.limit,
|
||||
)
|
||||
@@ -474,7 +474,9 @@ class ExaDeleteImportBlock(Block):
|
||||
# Use AsyncExa SDK
|
||||
aexa = AsyncExa(api_key=credentials.api_key.get_secret_value())
|
||||
|
||||
deleted_import = aexa.websets.imports.delete(import_id=input_data.import_id)
|
||||
deleted_import = await aexa.websets.imports.delete(
|
||||
import_id=input_data.import_id
|
||||
)
|
||||
|
||||
yield "import_id", deleted_import.id
|
||||
yield "success", "true"
|
||||
@@ -573,14 +575,14 @@ class ExaExportWebsetBlock(Block):
|
||||
}
|
||||
)
|
||||
|
||||
# Create mock iterator
|
||||
mock_items = [mock_item1, mock_item2]
|
||||
# Create async iterator for list_all
|
||||
async def async_item_iterator(*args, **kwargs):
|
||||
for item in [mock_item1, mock_item2]:
|
||||
yield item
|
||||
|
||||
return {
|
||||
"_get_client": lambda *args, **kwargs: MagicMock(
|
||||
websets=MagicMock(
|
||||
items=MagicMock(list_all=lambda *args, **kwargs: iter(mock_items))
|
||||
)
|
||||
websets=MagicMock(items=MagicMock(list_all=async_item_iterator))
|
||||
)
|
||||
}
|
||||
|
||||
@@ -602,7 +604,7 @@ class ExaExportWebsetBlock(Block):
|
||||
webset_id=input_data.webset_id, limit=input_data.max_items
|
||||
)
|
||||
|
||||
for sdk_item in item_iterator:
|
||||
async for sdk_item in item_iterator:
|
||||
if len(all_items) >= input_data.max_items:
|
||||
break
|
||||
|
||||
|
||||
@@ -178,7 +178,7 @@ class ExaGetWebsetItemBlock(Block):
|
||||
) -> BlockOutput:
|
||||
aexa = AsyncExa(api_key=credentials.api_key.get_secret_value())
|
||||
|
||||
sdk_item = aexa.websets.items.get(
|
||||
sdk_item = await aexa.websets.items.get(
|
||||
webset_id=input_data.webset_id, id=input_data.item_id
|
||||
)
|
||||
|
||||
@@ -269,7 +269,7 @@ class ExaListWebsetItemsBlock(Block):
|
||||
response = None
|
||||
|
||||
while time.time() - start_time < input_data.wait_timeout:
|
||||
response = aexa.websets.items.list(
|
||||
response = await aexa.websets.items.list(
|
||||
webset_id=input_data.webset_id,
|
||||
cursor=input_data.cursor,
|
||||
limit=input_data.limit,
|
||||
@@ -282,13 +282,13 @@ class ExaListWebsetItemsBlock(Block):
|
||||
interval = min(interval * 1.2, 10)
|
||||
|
||||
if not response:
|
||||
response = aexa.websets.items.list(
|
||||
response = await aexa.websets.items.list(
|
||||
webset_id=input_data.webset_id,
|
||||
cursor=input_data.cursor,
|
||||
limit=input_data.limit,
|
||||
)
|
||||
else:
|
||||
response = aexa.websets.items.list(
|
||||
response = await aexa.websets.items.list(
|
||||
webset_id=input_data.webset_id,
|
||||
cursor=input_data.cursor,
|
||||
limit=input_data.limit,
|
||||
@@ -340,7 +340,7 @@ class ExaDeleteWebsetItemBlock(Block):
|
||||
) -> BlockOutput:
|
||||
aexa = AsyncExa(api_key=credentials.api_key.get_secret_value())
|
||||
|
||||
deleted_item = aexa.websets.items.delete(
|
||||
deleted_item = await aexa.websets.items.delete(
|
||||
webset_id=input_data.webset_id, id=input_data.item_id
|
||||
)
|
||||
|
||||
@@ -408,7 +408,7 @@ class ExaBulkWebsetItemsBlock(Block):
|
||||
webset_id=input_data.webset_id, limit=input_data.max_items
|
||||
)
|
||||
|
||||
for sdk_item in item_iterator:
|
||||
async for sdk_item in item_iterator:
|
||||
if len(all_items) >= input_data.max_items:
|
||||
break
|
||||
|
||||
@@ -475,7 +475,7 @@ class ExaWebsetItemsSummaryBlock(Block):
|
||||
# Use AsyncExa SDK
|
||||
aexa = AsyncExa(api_key=credentials.api_key.get_secret_value())
|
||||
|
||||
webset = aexa.websets.get(id=input_data.webset_id)
|
||||
webset = await aexa.websets.get(id=input_data.webset_id)
|
||||
|
||||
entity_type = "unknown"
|
||||
if webset.searches:
|
||||
@@ -495,7 +495,7 @@ class ExaWebsetItemsSummaryBlock(Block):
|
||||
# Get sample items if requested
|
||||
sample_items: List[WebsetItemModel] = []
|
||||
if input_data.sample_size > 0:
|
||||
items_response = aexa.websets.items.list(
|
||||
items_response = await aexa.websets.items.list(
|
||||
webset_id=input_data.webset_id, limit=input_data.sample_size
|
||||
)
|
||||
# Convert to our stable models
|
||||
@@ -569,7 +569,7 @@ class ExaGetNewItemsBlock(Block):
|
||||
aexa = AsyncExa(api_key=credentials.api_key.get_secret_value())
|
||||
|
||||
# Get items starting from cursor
|
||||
response = aexa.websets.items.list(
|
||||
response = await aexa.websets.items.list(
|
||||
webset_id=input_data.webset_id,
|
||||
cursor=input_data.since_cursor,
|
||||
limit=input_data.max_items,
|
||||
|
||||
@@ -233,7 +233,7 @@ class ExaCreateMonitorBlock(Block):
|
||||
def _create_test_mock():
|
||||
"""Create test mocks for the AsyncExa SDK."""
|
||||
from datetime import datetime
|
||||
from unittest.mock import MagicMock
|
||||
from unittest.mock import AsyncMock, MagicMock
|
||||
|
||||
# Create mock SDK monitor object
|
||||
mock_monitor = MagicMock()
|
||||
@@ -263,7 +263,7 @@ class ExaCreateMonitorBlock(Block):
|
||||
return {
|
||||
"_get_client": lambda *args, **kwargs: MagicMock(
|
||||
websets=MagicMock(
|
||||
monitors=MagicMock(create=lambda *args, **kwargs: mock_monitor)
|
||||
monitors=MagicMock(create=AsyncMock(return_value=mock_monitor))
|
||||
)
|
||||
)
|
||||
}
|
||||
@@ -320,7 +320,7 @@ class ExaCreateMonitorBlock(Block):
|
||||
if input_data.metadata:
|
||||
payload["metadata"] = input_data.metadata
|
||||
|
||||
sdk_monitor = aexa.websets.monitors.create(params=payload)
|
||||
sdk_monitor = await aexa.websets.monitors.create(params=payload)
|
||||
|
||||
monitor = MonitorModel.from_sdk(sdk_monitor)
|
||||
|
||||
@@ -384,7 +384,7 @@ class ExaGetMonitorBlock(Block):
|
||||
# Use AsyncExa SDK
|
||||
aexa = AsyncExa(api_key=credentials.api_key.get_secret_value())
|
||||
|
||||
sdk_monitor = aexa.websets.monitors.get(monitor_id=input_data.monitor_id)
|
||||
sdk_monitor = await aexa.websets.monitors.get(monitor_id=input_data.monitor_id)
|
||||
|
||||
monitor = MonitorModel.from_sdk(sdk_monitor)
|
||||
|
||||
@@ -476,7 +476,7 @@ class ExaUpdateMonitorBlock(Block):
|
||||
if input_data.metadata is not None:
|
||||
payload["metadata"] = input_data.metadata
|
||||
|
||||
sdk_monitor = aexa.websets.monitors.update(
|
||||
sdk_monitor = await aexa.websets.monitors.update(
|
||||
monitor_id=input_data.monitor_id, params=payload
|
||||
)
|
||||
|
||||
@@ -522,7 +522,9 @@ class ExaDeleteMonitorBlock(Block):
|
||||
# Use AsyncExa SDK
|
||||
aexa = AsyncExa(api_key=credentials.api_key.get_secret_value())
|
||||
|
||||
deleted_monitor = aexa.websets.monitors.delete(monitor_id=input_data.monitor_id)
|
||||
deleted_monitor = await aexa.websets.monitors.delete(
|
||||
monitor_id=input_data.monitor_id
|
||||
)
|
||||
|
||||
yield "monitor_id", deleted_monitor.id
|
||||
yield "success", "true"
|
||||
@@ -579,7 +581,7 @@ class ExaListMonitorsBlock(Block):
|
||||
# Use AsyncExa SDK
|
||||
aexa = AsyncExa(api_key=credentials.api_key.get_secret_value())
|
||||
|
||||
response = aexa.websets.monitors.list(
|
||||
response = await aexa.websets.monitors.list(
|
||||
cursor=input_data.cursor,
|
||||
limit=input_data.limit,
|
||||
webset_id=input_data.webset_id,
|
||||
|
||||
@@ -121,7 +121,7 @@ class ExaWaitForWebsetBlock(Block):
|
||||
WebsetTargetStatus.IDLE,
|
||||
WebsetTargetStatus.ANY_COMPLETE,
|
||||
]:
|
||||
final_webset = aexa.websets.wait_until_idle(
|
||||
final_webset = await aexa.websets.wait_until_idle(
|
||||
id=input_data.webset_id,
|
||||
timeout=input_data.timeout,
|
||||
poll_interval=input_data.check_interval,
|
||||
@@ -164,7 +164,7 @@ class ExaWaitForWebsetBlock(Block):
|
||||
interval = input_data.check_interval
|
||||
while time.time() - start_time < input_data.timeout:
|
||||
# Get current webset status
|
||||
webset = aexa.websets.get(id=input_data.webset_id)
|
||||
webset = await aexa.websets.get(id=input_data.webset_id)
|
||||
current_status = (
|
||||
webset.status.value
|
||||
if hasattr(webset.status, "value")
|
||||
@@ -209,7 +209,7 @@ class ExaWaitForWebsetBlock(Block):
|
||||
|
||||
# Timeout reached
|
||||
elapsed = time.time() - start_time
|
||||
webset = aexa.websets.get(id=input_data.webset_id)
|
||||
webset = await aexa.websets.get(id=input_data.webset_id)
|
||||
final_status = (
|
||||
webset.status.value
|
||||
if hasattr(webset.status, "value")
|
||||
@@ -345,7 +345,7 @@ class ExaWaitForSearchBlock(Block):
|
||||
try:
|
||||
while time.time() - start_time < input_data.timeout:
|
||||
# Get current search status using SDK
|
||||
search = aexa.websets.searches.get(
|
||||
search = await aexa.websets.searches.get(
|
||||
webset_id=input_data.webset_id, id=input_data.search_id
|
||||
)
|
||||
|
||||
@@ -401,7 +401,7 @@ class ExaWaitForSearchBlock(Block):
|
||||
elapsed = time.time() - start_time
|
||||
|
||||
# Get last known status
|
||||
search = aexa.websets.searches.get(
|
||||
search = await aexa.websets.searches.get(
|
||||
webset_id=input_data.webset_id, id=input_data.search_id
|
||||
)
|
||||
final_status = (
|
||||
@@ -503,7 +503,7 @@ class ExaWaitForEnrichmentBlock(Block):
|
||||
try:
|
||||
while time.time() - start_time < input_data.timeout:
|
||||
# Get current enrichment status using SDK
|
||||
enrichment = aexa.websets.enrichments.get(
|
||||
enrichment = await aexa.websets.enrichments.get(
|
||||
webset_id=input_data.webset_id, id=input_data.enrichment_id
|
||||
)
|
||||
|
||||
@@ -548,7 +548,7 @@ class ExaWaitForEnrichmentBlock(Block):
|
||||
elapsed = time.time() - start_time
|
||||
|
||||
# Get last known status
|
||||
enrichment = aexa.websets.enrichments.get(
|
||||
enrichment = await aexa.websets.enrichments.get(
|
||||
webset_id=input_data.webset_id, id=input_data.enrichment_id
|
||||
)
|
||||
final_status = (
|
||||
@@ -575,7 +575,7 @@ class ExaWaitForEnrichmentBlock(Block):
|
||||
) -> tuple[list[SampleEnrichmentModel], int]:
|
||||
"""Get sample enriched data and count."""
|
||||
# Get a few items to see enrichment results using SDK
|
||||
response = aexa.websets.items.list(webset_id=webset_id, limit=5)
|
||||
response = await aexa.websets.items.list(webset_id=webset_id, limit=5)
|
||||
|
||||
sample_data: list[SampleEnrichmentModel] = []
|
||||
enriched_count = 0
|
||||
|
||||
@@ -317,7 +317,7 @@ class ExaCreateWebsetSearchBlock(Block):
|
||||
|
||||
aexa = AsyncExa(api_key=credentials.api_key.get_secret_value())
|
||||
|
||||
sdk_search = aexa.websets.searches.create(
|
||||
sdk_search = await aexa.websets.searches.create(
|
||||
webset_id=input_data.webset_id, params=payload
|
||||
)
|
||||
|
||||
@@ -350,7 +350,7 @@ class ExaCreateWebsetSearchBlock(Block):
|
||||
poll_start = time.time()
|
||||
|
||||
while time.time() - poll_start < input_data.polling_timeout:
|
||||
current_search = aexa.websets.searches.get(
|
||||
current_search = await aexa.websets.searches.get(
|
||||
webset_id=input_data.webset_id, id=search_id
|
||||
)
|
||||
current_status = (
|
||||
@@ -442,7 +442,7 @@ class ExaGetWebsetSearchBlock(Block):
|
||||
# Use AsyncExa SDK
|
||||
aexa = AsyncExa(api_key=credentials.api_key.get_secret_value())
|
||||
|
||||
sdk_search = aexa.websets.searches.get(
|
||||
sdk_search = await aexa.websets.searches.get(
|
||||
webset_id=input_data.webset_id, id=input_data.search_id
|
||||
)
|
||||
|
||||
@@ -523,7 +523,7 @@ class ExaCancelWebsetSearchBlock(Block):
|
||||
# Use AsyncExa SDK
|
||||
aexa = AsyncExa(api_key=credentials.api_key.get_secret_value())
|
||||
|
||||
canceled_search = aexa.websets.searches.cancel(
|
||||
canceled_search = await aexa.websets.searches.cancel(
|
||||
webset_id=input_data.webset_id, id=input_data.search_id
|
||||
)
|
||||
|
||||
@@ -604,7 +604,7 @@ class ExaFindOrCreateSearchBlock(Block):
|
||||
aexa = AsyncExa(api_key=credentials.api_key.get_secret_value())
|
||||
|
||||
# Get webset to check existing searches
|
||||
webset = aexa.websets.get(id=input_data.webset_id)
|
||||
webset = await aexa.websets.get(id=input_data.webset_id)
|
||||
|
||||
# Look for existing search with same query
|
||||
existing_search = None
|
||||
@@ -636,7 +636,7 @@ class ExaFindOrCreateSearchBlock(Block):
|
||||
if input_data.entity_type != SearchEntityType.AUTO:
|
||||
payload["entity"] = {"type": input_data.entity_type.value}
|
||||
|
||||
sdk_search = aexa.websets.searches.create(
|
||||
sdk_search = await aexa.websets.searches.create(
|
||||
webset_id=input_data.webset_id, params=payload
|
||||
)
|
||||
|
||||
|
||||
@@ -596,10 +596,10 @@ def extract_openai_tool_calls(response) -> list[ToolContentBlock] | None:
|
||||
|
||||
def get_parallel_tool_calls_param(
|
||||
llm_model: LlmModel, parallel_tool_calls: bool | None
|
||||
):
|
||||
) -> bool | openai.Omit:
|
||||
"""Get the appropriate parallel_tool_calls parameter for OpenAI-compatible APIs."""
|
||||
if llm_model.startswith("o") or parallel_tool_calls is None:
|
||||
return openai.NOT_GIVEN
|
||||
return openai.omit
|
||||
return parallel_tool_calls
|
||||
|
||||
|
||||
|
||||
254
autogpt_platform/backend/backend/blocks/mcp/block.py
Normal file
254
autogpt_platform/backend/backend/blocks/mcp/block.py
Normal file
@@ -0,0 +1,254 @@
|
||||
"""
|
||||
MCP (Model Context Protocol) Tool Block.
|
||||
|
||||
A single dynamic block that can connect to any MCP server, discover available tools,
|
||||
and execute them. Works like AgentExecutorBlock — the user selects a tool from a
|
||||
dropdown and the input/output schema adapts dynamically.
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
from typing import Any, Literal
|
||||
|
||||
from pydantic import SecretStr
|
||||
|
||||
from backend.blocks.mcp.client import MCPClient, MCPClientError
|
||||
from backend.data.block import (
|
||||
Block,
|
||||
BlockCategory,
|
||||
BlockInput,
|
||||
BlockOutput,
|
||||
BlockSchemaInput,
|
||||
BlockSchemaOutput,
|
||||
BlockType,
|
||||
)
|
||||
from backend.data.model import (
|
||||
APIKeyCredentials,
|
||||
CredentialsField,
|
||||
CredentialsMetaInput,
|
||||
SchemaField,
|
||||
)
|
||||
from backend.integrations.providers import ProviderName
|
||||
from backend.util.json import validate_with_jsonschema
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
MCPCredentialsInput = CredentialsMetaInput[
|
||||
Literal[ProviderName.MCP], Literal["api_key"]
|
||||
]
|
||||
|
||||
TEST_CREDENTIALS = APIKeyCredentials(
|
||||
id="01234567-89ab-cdef-0123-456789abcdef",
|
||||
provider="mcp",
|
||||
api_key=SecretStr("test-mcp-token"),
|
||||
title="Mock MCP Credentials",
|
||||
)
|
||||
TEST_CREDENTIALS_INPUT = {
|
||||
"provider": TEST_CREDENTIALS.provider,
|
||||
"id": TEST_CREDENTIALS.id,
|
||||
"type": TEST_CREDENTIALS.type,
|
||||
"title": TEST_CREDENTIALS.title,
|
||||
}
|
||||
|
||||
|
||||
class MCPToolBlock(Block):
|
||||
"""
|
||||
A block that connects to an MCP server, lets the user pick a tool,
|
||||
and executes it with dynamic input/output schema.
|
||||
|
||||
The flow:
|
||||
1. User provides an MCP server URL (and optional credentials)
|
||||
2. Frontend calls the backend to get tool list from that URL
|
||||
3. User selects a tool from a dropdown (available_tools)
|
||||
4. The block's input schema updates to reflect the selected tool's parameters
|
||||
5. On execution, the block calls the MCP server to run the tool
|
||||
"""
|
||||
|
||||
class Input(BlockSchemaInput):
|
||||
# -- Static fields (always shown) --
|
||||
credentials: MCPCredentialsInput = CredentialsField(
|
||||
description="API key / Bearer token for the MCP server (optional for "
|
||||
"public servers — create a credential with any placeholder value).",
|
||||
)
|
||||
server_url: str = SchemaField(
|
||||
description="URL of the MCP server (Streamable HTTP endpoint)",
|
||||
placeholder="https://mcp.example.com/mcp",
|
||||
)
|
||||
available_tools: dict[str, Any] = SchemaField(
|
||||
description="Available tools on the MCP server. "
|
||||
"This is populated automatically when a server URL is provided.",
|
||||
default={},
|
||||
hidden=True,
|
||||
)
|
||||
selected_tool: str = SchemaField(
|
||||
description="The MCP tool to execute",
|
||||
placeholder="Select a tool",
|
||||
default="",
|
||||
)
|
||||
tool_input_schema: dict[str, Any] = SchemaField(
|
||||
description="JSON Schema for the selected tool's input parameters. "
|
||||
"Populated automatically when a tool is selected.",
|
||||
default={},
|
||||
hidden=True,
|
||||
)
|
||||
|
||||
# -- Dynamic field: actual arguments for the selected tool --
|
||||
tool_arguments: dict[str, Any] = SchemaField(
|
||||
description="Arguments to pass to the selected MCP tool. "
|
||||
"The fields here are defined by the tool's input schema.",
|
||||
default={},
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def get_input_schema(cls, data: BlockInput) -> dict[str, Any]:
|
||||
"""Return the tool's input schema so the builder UI renders dynamic fields."""
|
||||
return data.get("tool_input_schema", {})
|
||||
|
||||
@classmethod
|
||||
def get_input_defaults(cls, data: BlockInput) -> BlockInput:
|
||||
"""Return the current tool_arguments as defaults for the dynamic fields."""
|
||||
return data.get("tool_arguments", {})
|
||||
|
||||
@classmethod
|
||||
def get_missing_input(cls, data: BlockInput) -> set[str]:
|
||||
"""Check which required tool arguments are missing."""
|
||||
required_fields = cls.get_input_schema(data).get("required", [])
|
||||
return set(required_fields) - set(data)
|
||||
|
||||
@classmethod
|
||||
def get_mismatch_error(cls, data: BlockInput) -> str | None:
|
||||
"""Validate tool_arguments against the tool's input schema."""
|
||||
tool_schema = cls.get_input_schema(data)
|
||||
if not tool_schema:
|
||||
return None
|
||||
return validate_with_jsonschema(tool_schema, data)
|
||||
|
||||
class Output(BlockSchemaOutput):
|
||||
result: Any = SchemaField(
|
||||
description="The result returned by the MCP tool"
|
||||
)
|
||||
error: str = SchemaField(description="Error message if the tool call failed")
|
||||
|
||||
def __init__(self):
|
||||
super().__init__(
|
||||
id="a0a4b1c2-d3e4-4f56-a7b8-c9d0e1f2a3b4",
|
||||
description="Connect to any MCP server and execute its tools. "
|
||||
"Provide a server URL, select a tool, and pass arguments dynamically.",
|
||||
categories={BlockCategory.DEVELOPER_TOOLS},
|
||||
input_schema=MCPToolBlock.Input,
|
||||
output_schema=MCPToolBlock.Output,
|
||||
block_type=BlockType.STANDARD,
|
||||
test_input={
|
||||
"server_url": "https://mcp.example.com/mcp",
|
||||
"credentials": TEST_CREDENTIALS_INPUT,
|
||||
"selected_tool": "get_weather",
|
||||
"tool_input_schema": {
|
||||
"type": "object",
|
||||
"properties": {"city": {"type": "string"}},
|
||||
"required": ["city"],
|
||||
},
|
||||
"tool_arguments": {"city": "London"},
|
||||
},
|
||||
test_output=[
|
||||
(
|
||||
"result",
|
||||
{"weather": "sunny", "temperature": 20},
|
||||
),
|
||||
],
|
||||
test_mock={
|
||||
"_call_mcp_tool": lambda *a, **kw: {
|
||||
"weather": "sunny",
|
||||
"temperature": 20,
|
||||
},
|
||||
},
|
||||
test_credentials=TEST_CREDENTIALS,
|
||||
)
|
||||
|
||||
async def _call_mcp_tool(
|
||||
self,
|
||||
server_url: str,
|
||||
tool_name: str,
|
||||
arguments: dict[str, Any],
|
||||
auth_token: str | None = None,
|
||||
) -> Any:
|
||||
"""Call a tool on the MCP server. Extracted for easy mocking in tests."""
|
||||
# Trust the user-configured server URL to allow internal/localhost servers
|
||||
client = MCPClient(
|
||||
server_url,
|
||||
auth_token=auth_token,
|
||||
trusted_origins=[server_url],
|
||||
)
|
||||
await client.initialize()
|
||||
result = await client.call_tool(tool_name, arguments)
|
||||
|
||||
if result.is_error:
|
||||
error_text = ""
|
||||
for item in result.content:
|
||||
if item.get("type") == "text":
|
||||
error_text += item.get("text", "")
|
||||
raise MCPClientError(
|
||||
f"MCP tool '{tool_name}' returned an error: "
|
||||
f"{error_text or 'Unknown error'}"
|
||||
)
|
||||
|
||||
# Extract text content from the result
|
||||
output_parts = []
|
||||
for item in result.content:
|
||||
if item.get("type") == "text":
|
||||
text = item.get("text", "")
|
||||
# Try to parse as JSON for structured output
|
||||
try:
|
||||
output_parts.append(json.loads(text))
|
||||
except (json.JSONDecodeError, ValueError):
|
||||
output_parts.append(text)
|
||||
elif item.get("type") == "image":
|
||||
output_parts.append(
|
||||
{
|
||||
"type": "image",
|
||||
"data": item.get("data"),
|
||||
"mimeType": item.get("mimeType"),
|
||||
}
|
||||
)
|
||||
elif item.get("type") == "resource":
|
||||
output_parts.append(item.get("resource", {}))
|
||||
|
||||
# If single result, unwrap
|
||||
if len(output_parts) == 1:
|
||||
return output_parts[0]
|
||||
return output_parts if output_parts else None
|
||||
|
||||
async def run(
|
||||
self,
|
||||
input_data: Input,
|
||||
*,
|
||||
credentials: APIKeyCredentials,
|
||||
**kwargs,
|
||||
) -> BlockOutput:
|
||||
if not input_data.server_url:
|
||||
yield "error", "MCP server URL is required"
|
||||
return
|
||||
|
||||
if not input_data.selected_tool:
|
||||
yield "error", "No tool selected. Please select a tool from the dropdown."
|
||||
return
|
||||
|
||||
auth_token: str | None = None
|
||||
if credentials and credentials.api_key:
|
||||
token_value = credentials.api_key.get_secret_value()
|
||||
# Skip placeholder/fake tokens
|
||||
if token_value and token_value not in ("", "FAKE_API_KEY", "placeholder"):
|
||||
auth_token = token_value
|
||||
|
||||
try:
|
||||
result = await self._call_mcp_tool(
|
||||
server_url=input_data.server_url,
|
||||
tool_name=input_data.selected_tool,
|
||||
arguments=input_data.tool_arguments,
|
||||
auth_token=auth_token,
|
||||
)
|
||||
yield "result", result
|
||||
except MCPClientError as e:
|
||||
yield "error", str(e)
|
||||
except Exception as e:
|
||||
logger.exception(f"MCP tool call failed: {e}")
|
||||
yield "error", f"MCP tool call failed: {str(e)}"
|
||||
186
autogpt_platform/backend/backend/blocks/mcp/client.py
Normal file
186
autogpt_platform/backend/backend/blocks/mcp/client.py
Normal file
@@ -0,0 +1,186 @@
|
||||
"""
|
||||
MCP (Model Context Protocol) HTTP client.
|
||||
|
||||
Implements the MCP Streamable HTTP transport for listing tools and calling tools
|
||||
on remote MCP servers. Uses JSON-RPC 2.0 over HTTP POST.
|
||||
|
||||
Reference: https://modelcontextprotocol.io/docs/concepts/transports
|
||||
"""
|
||||
|
||||
import logging
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Any
|
||||
|
||||
from backend.util.request import Requests
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class MCPTool:
|
||||
"""Represents an MCP tool discovered from a server."""
|
||||
|
||||
name: str
|
||||
description: str
|
||||
input_schema: dict[str, Any]
|
||||
|
||||
|
||||
@dataclass
|
||||
class MCPCallResult:
|
||||
"""Result from calling an MCP tool."""
|
||||
|
||||
content: list[dict[str, Any]] = field(default_factory=list)
|
||||
is_error: bool = False
|
||||
|
||||
|
||||
class MCPClientError(Exception):
|
||||
"""Raised when an MCP protocol error occurs."""
|
||||
|
||||
pass
|
||||
|
||||
|
||||
class MCPClient:
|
||||
"""
|
||||
Async HTTP client for the MCP Streamable HTTP transport.
|
||||
|
||||
Communicates with MCP servers using JSON-RPC 2.0 over HTTP POST.
|
||||
Supports optional Bearer token authentication.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
server_url: str,
|
||||
auth_token: str | None = None,
|
||||
trusted_origins: list[str] | None = None,
|
||||
):
|
||||
self.server_url = server_url.rstrip("/")
|
||||
self.auth_token = auth_token
|
||||
self.trusted_origins = trusted_origins or []
|
||||
self._request_id = 0
|
||||
|
||||
def _next_id(self) -> int:
|
||||
self._request_id += 1
|
||||
return self._request_id
|
||||
|
||||
def _build_headers(self) -> dict[str, str]:
|
||||
headers = {
|
||||
"Content-Type": "application/json",
|
||||
"Accept": "application/json, text/event-stream",
|
||||
}
|
||||
if self.auth_token:
|
||||
headers["Authorization"] = f"Bearer {self.auth_token}"
|
||||
return headers
|
||||
|
||||
def _build_jsonrpc_request(
|
||||
self, method: str, params: dict[str, Any] | None = None
|
||||
) -> dict[str, Any]:
|
||||
req: dict[str, Any] = {
|
||||
"jsonrpc": "2.0",
|
||||
"method": method,
|
||||
"id": self._next_id(),
|
||||
}
|
||||
if params is not None:
|
||||
req["params"] = params
|
||||
return req
|
||||
|
||||
async def _send_request(
|
||||
self, method: str, params: dict[str, Any] | None = None
|
||||
) -> Any:
|
||||
"""Send a JSON-RPC request to the MCP server and return the result."""
|
||||
payload = self._build_jsonrpc_request(method, params)
|
||||
headers = self._build_headers()
|
||||
|
||||
requests = Requests(
|
||||
raise_for_status=True,
|
||||
extra_headers=headers,
|
||||
trusted_origins=self.trusted_origins,
|
||||
)
|
||||
response = await requests.post(self.server_url, json=payload)
|
||||
body = response.json()
|
||||
|
||||
# Handle JSON-RPC error
|
||||
if "error" in body:
|
||||
error = body["error"]
|
||||
raise MCPClientError(
|
||||
f"MCP server error [{error.get('code', '?')}]: "
|
||||
f"{error.get('message', 'Unknown error')}"
|
||||
)
|
||||
|
||||
return body.get("result")
|
||||
|
||||
async def _send_notification(self, method: str) -> None:
|
||||
"""Send a JSON-RPC notification (no id, no response expected)."""
|
||||
headers = self._build_headers()
|
||||
notification = {"jsonrpc": "2.0", "method": method}
|
||||
requests = Requests(
|
||||
raise_for_status=False,
|
||||
extra_headers=headers,
|
||||
trusted_origins=self.trusted_origins,
|
||||
)
|
||||
await requests.post(self.server_url, json=notification)
|
||||
|
||||
async def initialize(self) -> dict[str, Any]:
|
||||
"""
|
||||
Send the MCP initialize request.
|
||||
|
||||
This is required by the MCP protocol before any other requests.
|
||||
Returns the server's capabilities.
|
||||
"""
|
||||
result = await self._send_request(
|
||||
"initialize",
|
||||
{
|
||||
"protocolVersion": "2025-03-26",
|
||||
"capabilities": {},
|
||||
"clientInfo": {"name": "AutoGPT-Platform", "version": "1.0.0"},
|
||||
},
|
||||
)
|
||||
# Send initialized notification (no response expected)
|
||||
await self._send_notification("notifications/initialized")
|
||||
|
||||
return result or {}
|
||||
|
||||
async def list_tools(self) -> list[MCPTool]:
|
||||
"""
|
||||
Discover available tools from the MCP server.
|
||||
|
||||
Returns a list of MCPTool objects with name, description, and input schema.
|
||||
"""
|
||||
result = await self._send_request("tools/list")
|
||||
if not result or "tools" not in result:
|
||||
return []
|
||||
|
||||
tools = []
|
||||
for tool_data in result["tools"]:
|
||||
tools.append(
|
||||
MCPTool(
|
||||
name=tool_data.get("name", ""),
|
||||
description=tool_data.get("description", ""),
|
||||
input_schema=tool_data.get("inputSchema", {}),
|
||||
)
|
||||
)
|
||||
return tools
|
||||
|
||||
async def call_tool(
|
||||
self, tool_name: str, arguments: dict[str, Any]
|
||||
) -> MCPCallResult:
|
||||
"""
|
||||
Call a tool on the MCP server.
|
||||
|
||||
Args:
|
||||
tool_name: The name of the tool to call.
|
||||
arguments: The arguments to pass to the tool.
|
||||
|
||||
Returns:
|
||||
MCPCallResult with the tool's response content.
|
||||
"""
|
||||
result = await self._send_request(
|
||||
"tools/call",
|
||||
{"name": tool_name, "arguments": arguments},
|
||||
)
|
||||
if not result:
|
||||
return MCPCallResult(is_error=True)
|
||||
|
||||
return MCPCallResult(
|
||||
content=result.get("content", []),
|
||||
is_error=result.get("isError", False),
|
||||
)
|
||||
21
autogpt_platform/backend/backend/blocks/mcp/conftest.py
Normal file
21
autogpt_platform/backend/backend/blocks/mcp/conftest.py
Normal file
@@ -0,0 +1,21 @@
|
||||
"""
|
||||
Conftest for MCP block tests.
|
||||
|
||||
Override the session-scoped server and graph_cleanup fixtures from
|
||||
backend/conftest.py so that MCP integration tests don't spin up the
|
||||
full SpinTestServer infrastructure.
|
||||
"""
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def server():
|
||||
"""No-op override — MCP tests don't need the full platform server."""
|
||||
yield None
|
||||
|
||||
|
||||
@pytest.fixture(scope="session", autouse=True)
|
||||
def graph_cleanup(server):
|
||||
"""No-op override — MCP tests don't create graphs."""
|
||||
yield
|
||||
374
autogpt_platform/backend/backend/blocks/mcp/test_integration.py
Normal file
374
autogpt_platform/backend/backend/blocks/mcp/test_integration.py
Normal file
@@ -0,0 +1,374 @@
|
||||
"""
|
||||
Integration tests for MCP client and MCPToolBlock against a real HTTP server.
|
||||
|
||||
These tests spin up a local MCP test server and run the full client/block flow
|
||||
against it — no mocking, real HTTP requests.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import threading
|
||||
|
||||
import pytest
|
||||
from aiohttp import web
|
||||
from pydantic import SecretStr
|
||||
|
||||
from backend.blocks.mcp.block import MCPToolBlock
|
||||
from backend.blocks.mcp.client import MCPClient
|
||||
from backend.blocks.mcp.test_server import create_test_mcp_app
|
||||
from backend.data.model import APIKeyCredentials
|
||||
|
||||
|
||||
class _MCPTestServer:
|
||||
"""
|
||||
Run an MCP test server in a background thread with its own event loop.
|
||||
This avoids event loop conflicts with pytest-asyncio.
|
||||
"""
|
||||
|
||||
def __init__(self, auth_token: str | None = None):
|
||||
self.auth_token = auth_token
|
||||
self.url: str = ""
|
||||
self._runner: web.AppRunner | None = None
|
||||
self._loop: asyncio.AbstractEventLoop | None = None
|
||||
self._thread: threading.Thread | None = None
|
||||
self._started = threading.Event()
|
||||
|
||||
def _run(self):
|
||||
self._loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(self._loop)
|
||||
self._loop.run_until_complete(self._start())
|
||||
self._started.set()
|
||||
self._loop.run_forever()
|
||||
|
||||
async def _start(self):
|
||||
app = create_test_mcp_app(auth_token=self.auth_token)
|
||||
self._runner = web.AppRunner(app)
|
||||
await self._runner.setup()
|
||||
site = web.TCPSite(self._runner, "127.0.0.1", 0)
|
||||
await site.start()
|
||||
port = site._server.sockets[0].getsockname()[1]
|
||||
self.url = f"http://127.0.0.1:{port}/mcp"
|
||||
|
||||
def start(self):
|
||||
self._thread = threading.Thread(target=self._run, daemon=True)
|
||||
self._thread.start()
|
||||
self._started.wait(timeout=5)
|
||||
return self
|
||||
|
||||
def stop(self):
|
||||
if self._loop and self._runner:
|
||||
asyncio.run_coroutine_threadsafe(
|
||||
self._runner.cleanup(), self._loop
|
||||
).result(timeout=5)
|
||||
self._loop.call_soon_threadsafe(self._loop.stop)
|
||||
if self._thread:
|
||||
self._thread.join(timeout=5)
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def mcp_server():
|
||||
"""Start a local MCP test server in a background thread."""
|
||||
server = _MCPTestServer()
|
||||
server.start()
|
||||
yield server.url
|
||||
server.stop()
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def mcp_server_with_auth():
|
||||
"""Start a local MCP test server with auth in a background thread."""
|
||||
server = _MCPTestServer(auth_token="test-secret-token")
|
||||
server.start()
|
||||
yield server.url, "test-secret-token"
|
||||
server.stop()
|
||||
|
||||
|
||||
def _make_client(url: str, auth_token: str | None = None) -> MCPClient:
|
||||
"""Create an MCPClient with localhost trusted for integration tests."""
|
||||
return MCPClient(url, auth_token=auth_token, trusted_origins=[url])
|
||||
|
||||
|
||||
def _make_fake_creds(api_key: str = "FAKE_API_KEY") -> APIKeyCredentials:
|
||||
return APIKeyCredentials(
|
||||
id="test-integration",
|
||||
provider="mcp",
|
||||
api_key=SecretStr(api_key),
|
||||
title="test",
|
||||
)
|
||||
|
||||
|
||||
# ── MCPClient integration tests ──────────────────────────────────────
|
||||
|
||||
|
||||
class TestMCPClientIntegration:
|
||||
"""Test MCPClient against a real local MCP server."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_initialize(self, mcp_server):
|
||||
client = _make_client(mcp_server)
|
||||
result = await client.initialize()
|
||||
|
||||
assert result["protocolVersion"] == "2025-03-26"
|
||||
assert result["serverInfo"]["name"] == "test-mcp-server"
|
||||
assert "tools" in result["capabilities"]
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_list_tools(self, mcp_server):
|
||||
client = _make_client(mcp_server)
|
||||
await client.initialize()
|
||||
tools = await client.list_tools()
|
||||
|
||||
assert len(tools) == 3
|
||||
|
||||
tool_names = {t.name for t in tools}
|
||||
assert tool_names == {"get_weather", "add_numbers", "echo"}
|
||||
|
||||
# Check get_weather schema
|
||||
weather = next(t for t in tools if t.name == "get_weather")
|
||||
assert weather.description == "Get current weather for a city"
|
||||
assert "city" in weather.input_schema["properties"]
|
||||
assert weather.input_schema["required"] == ["city"]
|
||||
|
||||
# Check add_numbers schema
|
||||
add = next(t for t in tools if t.name == "add_numbers")
|
||||
assert "a" in add.input_schema["properties"]
|
||||
assert "b" in add.input_schema["properties"]
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_call_tool_get_weather(self, mcp_server):
|
||||
client = _make_client(mcp_server)
|
||||
await client.initialize()
|
||||
result = await client.call_tool("get_weather", {"city": "London"})
|
||||
|
||||
assert not result.is_error
|
||||
assert len(result.content) == 1
|
||||
assert result.content[0]["type"] == "text"
|
||||
|
||||
data = json.loads(result.content[0]["text"])
|
||||
assert data["city"] == "London"
|
||||
assert data["temperature"] == 22
|
||||
assert data["condition"] == "sunny"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_call_tool_add_numbers(self, mcp_server):
|
||||
client = _make_client(mcp_server)
|
||||
await client.initialize()
|
||||
result = await client.call_tool("add_numbers", {"a": 3, "b": 7})
|
||||
|
||||
assert not result.is_error
|
||||
data = json.loads(result.content[0]["text"])
|
||||
assert data["result"] == 10
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_call_tool_echo(self, mcp_server):
|
||||
client = _make_client(mcp_server)
|
||||
await client.initialize()
|
||||
result = await client.call_tool("echo", {"message": "Hello MCP!"})
|
||||
|
||||
assert not result.is_error
|
||||
assert result.content[0]["text"] == "Hello MCP!"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_call_unknown_tool(self, mcp_server):
|
||||
client = _make_client(mcp_server)
|
||||
await client.initialize()
|
||||
result = await client.call_tool("nonexistent_tool", {})
|
||||
|
||||
assert result.is_error
|
||||
assert "Unknown tool" in result.content[0]["text"]
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_auth_success(self, mcp_server_with_auth):
|
||||
url, token = mcp_server_with_auth
|
||||
client = _make_client(url, auth_token=token)
|
||||
result = await client.initialize()
|
||||
|
||||
assert result["protocolVersion"] == "2025-03-26"
|
||||
|
||||
tools = await client.list_tools()
|
||||
assert len(tools) == 3
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_auth_failure(self, mcp_server_with_auth):
|
||||
url, _ = mcp_server_with_auth
|
||||
client = _make_client(url, auth_token="wrong-token")
|
||||
|
||||
with pytest.raises(Exception):
|
||||
await client.initialize()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_auth_missing(self, mcp_server_with_auth):
|
||||
url, _ = mcp_server_with_auth
|
||||
client = _make_client(url)
|
||||
|
||||
with pytest.raises(Exception):
|
||||
await client.initialize()
|
||||
|
||||
|
||||
# ── MCPToolBlock integration tests ───────────────────────────────────
|
||||
|
||||
|
||||
class TestMCPToolBlockIntegration:
|
||||
"""Test MCPToolBlock end-to-end against a real local MCP server."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_full_flow_get_weather(self, mcp_server):
|
||||
"""Full flow: discover tools, select one, execute it."""
|
||||
# Step 1: Discover tools (simulating what the frontend/API would do)
|
||||
client = _make_client(mcp_server)
|
||||
await client.initialize()
|
||||
tools = await client.list_tools()
|
||||
assert len(tools) == 3
|
||||
|
||||
# Step 2: User selects "get_weather" and we get its schema
|
||||
weather_tool = next(t for t in tools if t.name == "get_weather")
|
||||
|
||||
# Step 3: Execute the block with the selected tool
|
||||
block = MCPToolBlock()
|
||||
input_data = MCPToolBlock.Input(
|
||||
server_url=mcp_server,
|
||||
selected_tool="get_weather",
|
||||
tool_input_schema=weather_tool.input_schema,
|
||||
tool_arguments={"city": "Paris"},
|
||||
credentials={ # type: ignore
|
||||
"provider": "mcp",
|
||||
"id": "test",
|
||||
"type": "api_key",
|
||||
"title": "test",
|
||||
},
|
||||
)
|
||||
|
||||
outputs = []
|
||||
async for name, data in block.run(
|
||||
input_data, credentials=_make_fake_creds()
|
||||
):
|
||||
outputs.append((name, data))
|
||||
|
||||
assert len(outputs) == 1
|
||||
assert outputs[0][0] == "result"
|
||||
result = outputs[0][1]
|
||||
assert result["city"] == "Paris"
|
||||
assert result["temperature"] == 22
|
||||
assert result["condition"] == "sunny"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_full_flow_add_numbers(self, mcp_server):
|
||||
"""Full flow for add_numbers tool."""
|
||||
client = _make_client(mcp_server)
|
||||
await client.initialize()
|
||||
tools = await client.list_tools()
|
||||
add_tool = next(t for t in tools if t.name == "add_numbers")
|
||||
|
||||
block = MCPToolBlock()
|
||||
input_data = MCPToolBlock.Input(
|
||||
server_url=mcp_server,
|
||||
selected_tool="add_numbers",
|
||||
tool_input_schema=add_tool.input_schema,
|
||||
tool_arguments={"a": 42, "b": 58},
|
||||
credentials={ # type: ignore
|
||||
"provider": "mcp",
|
||||
"id": "test",
|
||||
"type": "api_key",
|
||||
"title": "test",
|
||||
},
|
||||
)
|
||||
|
||||
outputs = []
|
||||
async for name, data in block.run(
|
||||
input_data, credentials=_make_fake_creds()
|
||||
):
|
||||
outputs.append((name, data))
|
||||
|
||||
assert len(outputs) == 1
|
||||
assert outputs[0][0] == "result"
|
||||
assert outputs[0][1]["result"] == 100
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_full_flow_echo_plain_text(self, mcp_server):
|
||||
"""Verify plain text (non-JSON) responses work."""
|
||||
block = MCPToolBlock()
|
||||
input_data = MCPToolBlock.Input(
|
||||
server_url=mcp_server,
|
||||
selected_tool="echo",
|
||||
tool_input_schema={
|
||||
"type": "object",
|
||||
"properties": {"message": {"type": "string"}},
|
||||
"required": ["message"],
|
||||
},
|
||||
tool_arguments={"message": "Hello from AutoGPT!"},
|
||||
credentials={ # type: ignore
|
||||
"provider": "mcp",
|
||||
"id": "test",
|
||||
"type": "api_key",
|
||||
"title": "test",
|
||||
},
|
||||
)
|
||||
|
||||
outputs = []
|
||||
async for name, data in block.run(
|
||||
input_data, credentials=_make_fake_creds()
|
||||
):
|
||||
outputs.append((name, data))
|
||||
|
||||
assert len(outputs) == 1
|
||||
assert outputs[0][0] == "result"
|
||||
assert outputs[0][1] == "Hello from AutoGPT!"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_full_flow_unknown_tool_yields_error(self, mcp_server):
|
||||
"""Calling an unknown tool should yield an error output."""
|
||||
block = MCPToolBlock()
|
||||
input_data = MCPToolBlock.Input(
|
||||
server_url=mcp_server,
|
||||
selected_tool="nonexistent_tool",
|
||||
tool_arguments={},
|
||||
credentials={ # type: ignore
|
||||
"provider": "mcp",
|
||||
"id": "test",
|
||||
"type": "api_key",
|
||||
"title": "test",
|
||||
},
|
||||
)
|
||||
|
||||
outputs = []
|
||||
async for name, data in block.run(
|
||||
input_data, credentials=_make_fake_creds()
|
||||
):
|
||||
outputs.append((name, data))
|
||||
|
||||
assert len(outputs) == 1
|
||||
assert outputs[0][0] == "error"
|
||||
assert "returned an error" in outputs[0][1]
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_full_flow_with_auth(self, mcp_server_with_auth):
|
||||
"""Full flow with authentication."""
|
||||
url, token = mcp_server_with_auth
|
||||
|
||||
block = MCPToolBlock()
|
||||
input_data = MCPToolBlock.Input(
|
||||
server_url=url,
|
||||
selected_tool="echo",
|
||||
tool_input_schema={
|
||||
"type": "object",
|
||||
"properties": {"message": {"type": "string"}},
|
||||
"required": ["message"],
|
||||
},
|
||||
tool_arguments={"message": "Authenticated!"},
|
||||
credentials={ # type: ignore
|
||||
"provider": "mcp",
|
||||
"id": "test",
|
||||
"type": "api_key",
|
||||
"title": "test",
|
||||
},
|
||||
)
|
||||
|
||||
outputs = []
|
||||
async for name, data in block.run(
|
||||
input_data, credentials=_make_fake_creds(api_key=token)
|
||||
):
|
||||
outputs.append((name, data))
|
||||
|
||||
assert len(outputs) == 1
|
||||
assert outputs[0][0] == "result"
|
||||
assert outputs[0][1] == "Authenticated!"
|
||||
536
autogpt_platform/backend/backend/blocks/mcp/test_mcp.py
Normal file
536
autogpt_platform/backend/backend/blocks/mcp/test_mcp.py
Normal file
@@ -0,0 +1,536 @@
|
||||
"""
|
||||
Tests for MCP client and MCPToolBlock.
|
||||
"""
|
||||
|
||||
import json
|
||||
from unittest.mock import AsyncMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from backend.blocks.mcp.block import MCPToolBlock, TEST_CREDENTIALS, TEST_CREDENTIALS_INPUT
|
||||
from backend.blocks.mcp.client import MCPCallResult, MCPClient, MCPClientError, MCPTool
|
||||
from backend.util.test import execute_block_test
|
||||
|
||||
|
||||
# ── MCPClient unit tests ─────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestMCPClient:
|
||||
"""Tests for the MCP HTTP client."""
|
||||
|
||||
def test_build_headers_without_auth(self):
|
||||
client = MCPClient("https://mcp.example.com")
|
||||
headers = client._build_headers()
|
||||
assert "Authorization" not in headers
|
||||
assert headers["Content-Type"] == "application/json"
|
||||
|
||||
def test_build_headers_with_auth(self):
|
||||
client = MCPClient("https://mcp.example.com", auth_token="my-token")
|
||||
headers = client._build_headers()
|
||||
assert headers["Authorization"] == "Bearer my-token"
|
||||
|
||||
def test_build_jsonrpc_request(self):
|
||||
client = MCPClient("https://mcp.example.com")
|
||||
req = client._build_jsonrpc_request("tools/list")
|
||||
assert req["jsonrpc"] == "2.0"
|
||||
assert req["method"] == "tools/list"
|
||||
assert "id" in req
|
||||
assert "params" not in req
|
||||
|
||||
def test_build_jsonrpc_request_with_params(self):
|
||||
client = MCPClient("https://mcp.example.com")
|
||||
req = client._build_jsonrpc_request(
|
||||
"tools/call", {"name": "test", "arguments": {"x": 1}}
|
||||
)
|
||||
assert req["params"] == {"name": "test", "arguments": {"x": 1}}
|
||||
|
||||
def test_request_id_increments(self):
|
||||
client = MCPClient("https://mcp.example.com")
|
||||
req1 = client._build_jsonrpc_request("tools/list")
|
||||
req2 = client._build_jsonrpc_request("tools/list")
|
||||
assert req2["id"] > req1["id"]
|
||||
|
||||
def test_server_url_trailing_slash_stripped(self):
|
||||
client = MCPClient("https://mcp.example.com/mcp/")
|
||||
assert client.server_url == "https://mcp.example.com/mcp"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_send_request_success(self):
|
||||
client = MCPClient("https://mcp.example.com")
|
||||
|
||||
mock_response = AsyncMock()
|
||||
mock_response.json.return_value = {
|
||||
"jsonrpc": "2.0",
|
||||
"result": {"tools": []},
|
||||
"id": 1,
|
||||
}
|
||||
|
||||
with patch.object(client, "_send_request", return_value={"tools": []}):
|
||||
result = await client._send_request("tools/list")
|
||||
assert result == {"tools": []}
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_send_request_error(self):
|
||||
client = MCPClient("https://mcp.example.com")
|
||||
|
||||
async def mock_send(*args, **kwargs):
|
||||
raise MCPClientError("MCP server error [-32600]: Invalid Request")
|
||||
|
||||
with patch.object(client, "_send_request", side_effect=mock_send):
|
||||
with pytest.raises(MCPClientError, match="Invalid Request"):
|
||||
await client._send_request("tools/list")
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_list_tools(self):
|
||||
client = MCPClient("https://mcp.example.com")
|
||||
|
||||
mock_result = {
|
||||
"tools": [
|
||||
{
|
||||
"name": "get_weather",
|
||||
"description": "Get current weather for a city",
|
||||
"inputSchema": {
|
||||
"type": "object",
|
||||
"properties": {"city": {"type": "string"}},
|
||||
"required": ["city"],
|
||||
},
|
||||
},
|
||||
{
|
||||
"name": "search",
|
||||
"description": "Search the web",
|
||||
"inputSchema": {
|
||||
"type": "object",
|
||||
"properties": {"query": {"type": "string"}},
|
||||
"required": ["query"],
|
||||
},
|
||||
},
|
||||
]
|
||||
}
|
||||
|
||||
with patch.object(client, "_send_request", return_value=mock_result):
|
||||
tools = await client.list_tools()
|
||||
|
||||
assert len(tools) == 2
|
||||
assert tools[0].name == "get_weather"
|
||||
assert tools[0].description == "Get current weather for a city"
|
||||
assert tools[0].input_schema["properties"]["city"]["type"] == "string"
|
||||
assert tools[1].name == "search"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_list_tools_empty(self):
|
||||
client = MCPClient("https://mcp.example.com")
|
||||
|
||||
with patch.object(client, "_send_request", return_value={"tools": []}):
|
||||
tools = await client.list_tools()
|
||||
|
||||
assert tools == []
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_list_tools_none_result(self):
|
||||
client = MCPClient("https://mcp.example.com")
|
||||
|
||||
with patch.object(client, "_send_request", return_value=None):
|
||||
tools = await client.list_tools()
|
||||
|
||||
assert tools == []
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_call_tool_success(self):
|
||||
client = MCPClient("https://mcp.example.com")
|
||||
|
||||
mock_result = {
|
||||
"content": [
|
||||
{"type": "text", "text": json.dumps({"temp": 20, "city": "London"})}
|
||||
],
|
||||
"isError": False,
|
||||
}
|
||||
|
||||
with patch.object(client, "_send_request", return_value=mock_result):
|
||||
result = await client.call_tool("get_weather", {"city": "London"})
|
||||
|
||||
assert not result.is_error
|
||||
assert len(result.content) == 1
|
||||
assert result.content[0]["type"] == "text"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_call_tool_error(self):
|
||||
client = MCPClient("https://mcp.example.com")
|
||||
|
||||
mock_result = {
|
||||
"content": [{"type": "text", "text": "City not found"}],
|
||||
"isError": True,
|
||||
}
|
||||
|
||||
with patch.object(client, "_send_request", return_value=mock_result):
|
||||
result = await client.call_tool("get_weather", {"city": "???"})
|
||||
|
||||
assert result.is_error
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_call_tool_none_result(self):
|
||||
client = MCPClient("https://mcp.example.com")
|
||||
|
||||
with patch.object(client, "_send_request", return_value=None):
|
||||
result = await client.call_tool("get_weather", {"city": "London"})
|
||||
|
||||
assert result.is_error
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_initialize(self):
|
||||
client = MCPClient("https://mcp.example.com")
|
||||
|
||||
mock_result = {
|
||||
"protocolVersion": "2025-03-26",
|
||||
"capabilities": {"tools": {}},
|
||||
"serverInfo": {"name": "test-server", "version": "1.0.0"},
|
||||
}
|
||||
|
||||
with (
|
||||
patch.object(
|
||||
client, "_send_request", return_value=mock_result
|
||||
) as mock_req,
|
||||
patch.object(client, "_send_notification") as mock_notif,
|
||||
):
|
||||
result = await client.initialize()
|
||||
|
||||
mock_req.assert_called_once()
|
||||
mock_notif.assert_called_once_with("notifications/initialized")
|
||||
assert result["protocolVersion"] == "2025-03-26"
|
||||
|
||||
|
||||
# ── MCPToolBlock unit tests ──────────────────────────────────────────
|
||||
|
||||
|
||||
class TestMCPToolBlock:
|
||||
"""Tests for the MCPToolBlock."""
|
||||
|
||||
def test_block_instantiation(self):
|
||||
block = MCPToolBlock()
|
||||
assert block.id == "a0a4b1c2-d3e4-4f56-a7b8-c9d0e1f2a3b4"
|
||||
assert block.name == "MCPToolBlock"
|
||||
|
||||
def test_input_schema_has_required_fields(self):
|
||||
block = MCPToolBlock()
|
||||
schema = block.input_schema.jsonschema()
|
||||
props = schema.get("properties", {})
|
||||
assert "server_url" in props
|
||||
assert "selected_tool" in props
|
||||
assert "tool_arguments" in props
|
||||
assert "credentials" in props
|
||||
|
||||
def test_output_schema(self):
|
||||
block = MCPToolBlock()
|
||||
schema = block.output_schema.jsonschema()
|
||||
props = schema.get("properties", {})
|
||||
assert "result" in props
|
||||
assert "error" in props
|
||||
|
||||
def test_get_input_schema_with_tool_schema(self):
|
||||
tool_schema = {
|
||||
"type": "object",
|
||||
"properties": {"query": {"type": "string"}},
|
||||
"required": ["query"],
|
||||
}
|
||||
data = {"tool_input_schema": tool_schema}
|
||||
result = MCPToolBlock.Input.get_input_schema(data)
|
||||
assert result == tool_schema
|
||||
|
||||
def test_get_input_schema_without_tool_schema(self):
|
||||
result = MCPToolBlock.Input.get_input_schema({})
|
||||
assert result == {}
|
||||
|
||||
def test_get_input_defaults(self):
|
||||
data = {"tool_arguments": {"city": "London"}}
|
||||
result = MCPToolBlock.Input.get_input_defaults(data)
|
||||
assert result == {"city": "London"}
|
||||
|
||||
def test_get_missing_input(self):
|
||||
data = {
|
||||
"tool_input_schema": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"city": {"type": "string"},
|
||||
"units": {"type": "string"},
|
||||
},
|
||||
"required": ["city", "units"],
|
||||
},
|
||||
"city": "London",
|
||||
}
|
||||
missing = MCPToolBlock.Input.get_missing_input(data)
|
||||
assert missing == {"units"}
|
||||
|
||||
def test_get_missing_input_all_present(self):
|
||||
data = {
|
||||
"tool_input_schema": {
|
||||
"type": "object",
|
||||
"properties": {"city": {"type": "string"}},
|
||||
"required": ["city"],
|
||||
},
|
||||
"city": "London",
|
||||
}
|
||||
missing = MCPToolBlock.Input.get_missing_input(data)
|
||||
assert missing == set()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_run_with_mock(self):
|
||||
"""Test the block using the built-in test infrastructure."""
|
||||
block = MCPToolBlock()
|
||||
await execute_block_test(block)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_run_missing_server_url(self):
|
||||
block = MCPToolBlock()
|
||||
input_data = MCPToolBlock.Input(
|
||||
server_url="",
|
||||
selected_tool="test",
|
||||
credentials=TEST_CREDENTIALS_INPUT, # type: ignore
|
||||
)
|
||||
outputs = []
|
||||
async for name, data in block.run(
|
||||
input_data, credentials=TEST_CREDENTIALS
|
||||
):
|
||||
outputs.append((name, data))
|
||||
assert outputs == [("error", "MCP server URL is required")]
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_run_missing_tool(self):
|
||||
block = MCPToolBlock()
|
||||
input_data = MCPToolBlock.Input(
|
||||
server_url="https://mcp.example.com/mcp",
|
||||
selected_tool="",
|
||||
credentials=TEST_CREDENTIALS_INPUT, # type: ignore
|
||||
)
|
||||
outputs = []
|
||||
async for name, data in block.run(
|
||||
input_data, credentials=TEST_CREDENTIALS
|
||||
):
|
||||
outputs.append((name, data))
|
||||
assert outputs == [
|
||||
("error", "No tool selected. Please select a tool from the dropdown.")
|
||||
]
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_run_success(self):
|
||||
block = MCPToolBlock()
|
||||
input_data = MCPToolBlock.Input(
|
||||
server_url="https://mcp.example.com/mcp",
|
||||
selected_tool="get_weather",
|
||||
tool_input_schema={
|
||||
"type": "object",
|
||||
"properties": {"city": {"type": "string"}},
|
||||
},
|
||||
tool_arguments={"city": "London"},
|
||||
credentials=TEST_CREDENTIALS_INPUT, # type: ignore
|
||||
)
|
||||
|
||||
async def mock_call(*args, **kwargs):
|
||||
return {"temp": 20, "city": "London"}
|
||||
|
||||
block._call_mcp_tool = mock_call # type: ignore
|
||||
|
||||
outputs = []
|
||||
async for name, data in block.run(
|
||||
input_data, credentials=TEST_CREDENTIALS
|
||||
):
|
||||
outputs.append((name, data))
|
||||
|
||||
assert len(outputs) == 1
|
||||
assert outputs[0][0] == "result"
|
||||
assert outputs[0][1] == {"temp": 20, "city": "London"}
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_run_mcp_error(self):
|
||||
block = MCPToolBlock()
|
||||
input_data = MCPToolBlock.Input(
|
||||
server_url="https://mcp.example.com/mcp",
|
||||
selected_tool="bad_tool",
|
||||
credentials=TEST_CREDENTIALS_INPUT, # type: ignore
|
||||
)
|
||||
|
||||
async def mock_call(*args, **kwargs):
|
||||
raise MCPClientError("Tool not found")
|
||||
|
||||
block._call_mcp_tool = mock_call # type: ignore
|
||||
|
||||
outputs = []
|
||||
async for name, data in block.run(
|
||||
input_data, credentials=TEST_CREDENTIALS
|
||||
):
|
||||
outputs.append((name, data))
|
||||
|
||||
assert outputs[0][0] == "error"
|
||||
assert "Tool not found" in outputs[0][1]
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_call_mcp_tool_parses_json_text(self):
|
||||
block = MCPToolBlock()
|
||||
|
||||
mock_result = MCPCallResult(
|
||||
content=[
|
||||
{"type": "text", "text": '{"temp": 20}'},
|
||||
],
|
||||
is_error=False,
|
||||
)
|
||||
|
||||
async def mock_init(self):
|
||||
return {}
|
||||
|
||||
async def mock_call(self, name, args):
|
||||
return mock_result
|
||||
|
||||
with (
|
||||
patch.object(MCPClient, "initialize", mock_init),
|
||||
patch.object(MCPClient, "call_tool", mock_call),
|
||||
):
|
||||
result = await block._call_mcp_tool(
|
||||
"https://mcp.example.com", "test_tool", {}
|
||||
)
|
||||
|
||||
assert result == {"temp": 20}
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_call_mcp_tool_plain_text(self):
|
||||
block = MCPToolBlock()
|
||||
|
||||
mock_result = MCPCallResult(
|
||||
content=[
|
||||
{"type": "text", "text": "Hello, world!"},
|
||||
],
|
||||
is_error=False,
|
||||
)
|
||||
|
||||
async def mock_init(self):
|
||||
return {}
|
||||
|
||||
async def mock_call(self, name, args):
|
||||
return mock_result
|
||||
|
||||
with (
|
||||
patch.object(MCPClient, "initialize", mock_init),
|
||||
patch.object(MCPClient, "call_tool", mock_call),
|
||||
):
|
||||
result = await block._call_mcp_tool(
|
||||
"https://mcp.example.com", "test_tool", {}
|
||||
)
|
||||
|
||||
assert result == "Hello, world!"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_call_mcp_tool_multiple_content(self):
|
||||
block = MCPToolBlock()
|
||||
|
||||
mock_result = MCPCallResult(
|
||||
content=[
|
||||
{"type": "text", "text": "Part 1"},
|
||||
{"type": "text", "text": '{"part": 2}'},
|
||||
],
|
||||
is_error=False,
|
||||
)
|
||||
|
||||
async def mock_init(self):
|
||||
return {}
|
||||
|
||||
async def mock_call(self, name, args):
|
||||
return mock_result
|
||||
|
||||
with (
|
||||
patch.object(MCPClient, "initialize", mock_init),
|
||||
patch.object(MCPClient, "call_tool", mock_call),
|
||||
):
|
||||
result = await block._call_mcp_tool(
|
||||
"https://mcp.example.com", "test_tool", {}
|
||||
)
|
||||
|
||||
assert result == ["Part 1", {"part": 2}]
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_call_mcp_tool_error_result(self):
|
||||
block = MCPToolBlock()
|
||||
|
||||
mock_result = MCPCallResult(
|
||||
content=[{"type": "text", "text": "Something went wrong"}],
|
||||
is_error=True,
|
||||
)
|
||||
|
||||
async def mock_init(self):
|
||||
return {}
|
||||
|
||||
async def mock_call(self, name, args):
|
||||
return mock_result
|
||||
|
||||
with (
|
||||
patch.object(MCPClient, "initialize", mock_init),
|
||||
patch.object(MCPClient, "call_tool", mock_call),
|
||||
):
|
||||
with pytest.raises(MCPClientError, match="returned an error"):
|
||||
await block._call_mcp_tool(
|
||||
"https://mcp.example.com", "test_tool", {}
|
||||
)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_call_mcp_tool_image_content(self):
|
||||
block = MCPToolBlock()
|
||||
|
||||
mock_result = MCPCallResult(
|
||||
content=[
|
||||
{
|
||||
"type": "image",
|
||||
"data": "base64data==",
|
||||
"mimeType": "image/png",
|
||||
}
|
||||
],
|
||||
is_error=False,
|
||||
)
|
||||
|
||||
async def mock_init(self):
|
||||
return {}
|
||||
|
||||
async def mock_call(self, name, args):
|
||||
return mock_result
|
||||
|
||||
with (
|
||||
patch.object(MCPClient, "initialize", mock_init),
|
||||
patch.object(MCPClient, "call_tool", mock_call),
|
||||
):
|
||||
result = await block._call_mcp_tool(
|
||||
"https://mcp.example.com", "test_tool", {}
|
||||
)
|
||||
|
||||
assert result == {
|
||||
"type": "image",
|
||||
"data": "base64data==",
|
||||
"mimeType": "image/png",
|
||||
}
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_run_skips_placeholder_credentials(self):
|
||||
"""Ensure placeholder API keys are not sent to the MCP server."""
|
||||
from backend.data.model import APIKeyCredentials
|
||||
from pydantic import SecretStr
|
||||
|
||||
block = MCPToolBlock()
|
||||
input_data = MCPToolBlock.Input(
|
||||
server_url="https://mcp.example.com/mcp",
|
||||
selected_tool="test_tool",
|
||||
credentials=TEST_CREDENTIALS_INPUT, # type: ignore
|
||||
)
|
||||
|
||||
placeholder_creds = APIKeyCredentials(
|
||||
id="test-id",
|
||||
provider="mcp",
|
||||
api_key=SecretStr("FAKE_API_KEY"),
|
||||
title="Placeholder",
|
||||
)
|
||||
|
||||
captured_tokens = []
|
||||
|
||||
async def mock_call(server_url, tool_name, arguments, auth_token=None):
|
||||
captured_tokens.append(auth_token)
|
||||
return "ok"
|
||||
|
||||
block._call_mcp_tool = mock_call # type: ignore
|
||||
|
||||
async for _ in block.run(input_data, credentials=placeholder_creds):
|
||||
pass
|
||||
|
||||
assert captured_tokens == [None]
|
||||
163
autogpt_platform/backend/backend/blocks/mcp/test_server.py
Normal file
163
autogpt_platform/backend/backend/blocks/mcp/test_server.py
Normal file
@@ -0,0 +1,163 @@
|
||||
"""
|
||||
Minimal MCP server for integration testing.
|
||||
|
||||
Implements the MCP Streamable HTTP transport (JSON-RPC 2.0 over HTTP POST)
|
||||
with a few sample tools. Runs on localhost with a random available port.
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
from aiohttp import web
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Sample tools this test server exposes
|
||||
TEST_TOOLS = [
|
||||
{
|
||||
"name": "get_weather",
|
||||
"description": "Get current weather for a city",
|
||||
"inputSchema": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"city": {
|
||||
"type": "string",
|
||||
"description": "City name",
|
||||
},
|
||||
},
|
||||
"required": ["city"],
|
||||
},
|
||||
},
|
||||
{
|
||||
"name": "add_numbers",
|
||||
"description": "Add two numbers together",
|
||||
"inputSchema": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"a": {"type": "number", "description": "First number"},
|
||||
"b": {"type": "number", "description": "Second number"},
|
||||
},
|
||||
"required": ["a", "b"],
|
||||
},
|
||||
},
|
||||
{
|
||||
"name": "echo",
|
||||
"description": "Echo back the input message",
|
||||
"inputSchema": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"message": {"type": "string", "description": "Message to echo"},
|
||||
},
|
||||
"required": ["message"],
|
||||
},
|
||||
},
|
||||
]
|
||||
|
||||
|
||||
def _handle_initialize(params: dict) -> dict:
|
||||
return {
|
||||
"protocolVersion": "2025-03-26",
|
||||
"capabilities": {"tools": {"listChanged": False}},
|
||||
"serverInfo": {"name": "test-mcp-server", "version": "1.0.0"},
|
||||
}
|
||||
|
||||
|
||||
def _handle_tools_list(params: dict) -> dict:
|
||||
return {"tools": TEST_TOOLS}
|
||||
|
||||
|
||||
def _handle_tools_call(params: dict) -> dict:
|
||||
tool_name = params.get("name", "")
|
||||
arguments = params.get("arguments", {})
|
||||
|
||||
if tool_name == "get_weather":
|
||||
city = arguments.get("city", "Unknown")
|
||||
return {
|
||||
"content": [
|
||||
{
|
||||
"type": "text",
|
||||
"text": json.dumps(
|
||||
{"city": city, "temperature": 22, "condition": "sunny"}
|
||||
),
|
||||
}
|
||||
],
|
||||
}
|
||||
|
||||
elif tool_name == "add_numbers":
|
||||
a = arguments.get("a", 0)
|
||||
b = arguments.get("b", 0)
|
||||
return {
|
||||
"content": [{"type": "text", "text": json.dumps({"result": a + b})}],
|
||||
}
|
||||
|
||||
elif tool_name == "echo":
|
||||
message = arguments.get("message", "")
|
||||
return {
|
||||
"content": [{"type": "text", "text": message}],
|
||||
}
|
||||
|
||||
else:
|
||||
return {
|
||||
"content": [{"type": "text", "text": f"Unknown tool: {tool_name}"}],
|
||||
"isError": True,
|
||||
}
|
||||
|
||||
|
||||
HANDLERS = {
|
||||
"initialize": _handle_initialize,
|
||||
"tools/list": _handle_tools_list,
|
||||
"tools/call": _handle_tools_call,
|
||||
}
|
||||
|
||||
|
||||
async def handle_mcp_request(request: web.Request) -> web.Response:
|
||||
"""Handle incoming MCP JSON-RPC 2.0 requests."""
|
||||
# Check auth if configured
|
||||
expected_token = request.app.get("auth_token")
|
||||
if expected_token:
|
||||
auth_header = request.headers.get("Authorization", "")
|
||||
if auth_header != f"Bearer {expected_token}":
|
||||
return web.json_response(
|
||||
{
|
||||
"jsonrpc": "2.0",
|
||||
"error": {"code": -32001, "message": "Unauthorized"},
|
||||
"id": None,
|
||||
},
|
||||
status=401,
|
||||
)
|
||||
|
||||
body = await request.json()
|
||||
|
||||
# Handle notifications (no id field) — just acknowledge
|
||||
if "id" not in body:
|
||||
return web.Response(status=202)
|
||||
|
||||
method = body.get("method", "")
|
||||
params = body.get("params", {})
|
||||
request_id = body.get("id")
|
||||
|
||||
handler = HANDLERS.get(method)
|
||||
if not handler:
|
||||
return web.json_response(
|
||||
{
|
||||
"jsonrpc": "2.0",
|
||||
"error": {
|
||||
"code": -32601,
|
||||
"message": f"Method not found: {method}",
|
||||
},
|
||||
"id": request_id,
|
||||
}
|
||||
)
|
||||
|
||||
result = handler(params)
|
||||
return web.json_response(
|
||||
{"jsonrpc": "2.0", "result": result, "id": request_id}
|
||||
)
|
||||
|
||||
|
||||
def create_test_mcp_app(auth_token: str | None = None) -> web.Application:
|
||||
"""Create an aiohttp app that acts as an MCP server."""
|
||||
app = web.Application()
|
||||
app.router.add_post("/mcp", handle_mcp_request)
|
||||
if auth_token:
|
||||
app["auth_token"] = auth_token
|
||||
return app
|
||||
@@ -246,7 +246,9 @@ class BlockSchema(BaseModel):
|
||||
f"is not of type {CredentialsMetaInput.__name__}"
|
||||
)
|
||||
|
||||
credentials_fields[field_name].validate_credentials_field_schema(cls)
|
||||
CredentialsMetaInput.validate_credentials_field_schema(
|
||||
cls.get_field_schema(field_name), field_name
|
||||
)
|
||||
|
||||
elif field_name in credentials_fields:
|
||||
raise KeyError(
|
||||
|
||||
@@ -3,7 +3,7 @@ import logging
|
||||
import uuid
|
||||
from collections import defaultdict
|
||||
from datetime import datetime, timezone
|
||||
from typing import TYPE_CHECKING, Annotated, Any, Literal, Optional, cast
|
||||
from typing import TYPE_CHECKING, Annotated, Any, Literal, Optional, Self, cast
|
||||
|
||||
from prisma.enums import SubmissionStatus
|
||||
from prisma.models import (
|
||||
@@ -20,7 +20,7 @@ from prisma.types import (
|
||||
AgentNodeLinkCreateInput,
|
||||
StoreListingVersionWhereInput,
|
||||
)
|
||||
from pydantic import BaseModel, BeforeValidator, Field, create_model
|
||||
from pydantic import BaseModel, BeforeValidator, Field
|
||||
from pydantic.fields import computed_field
|
||||
|
||||
from backend.blocks.agent import AgentExecutorBlock
|
||||
@@ -30,7 +30,6 @@ from backend.data.db import prisma as db
|
||||
from backend.data.dynamic_fields import is_tool_pin, sanitize_pin_name
|
||||
from backend.data.includes import MAX_GRAPH_VERSIONS_FETCH
|
||||
from backend.data.model import (
|
||||
CredentialsField,
|
||||
CredentialsFieldInfo,
|
||||
CredentialsMetaInput,
|
||||
is_credentials_field_name,
|
||||
@@ -45,7 +44,6 @@ from .block import (
|
||||
AnyBlockSchema,
|
||||
Block,
|
||||
BlockInput,
|
||||
BlockSchema,
|
||||
BlockType,
|
||||
EmptySchema,
|
||||
get_block,
|
||||
@@ -113,10 +111,12 @@ class Link(BaseDbModel):
|
||||
|
||||
class Node(BaseDbModel):
|
||||
block_id: str
|
||||
input_default: BlockInput = {} # dict[input_name, default_value]
|
||||
metadata: dict[str, Any] = {}
|
||||
input_links: list[Link] = []
|
||||
output_links: list[Link] = []
|
||||
input_default: BlockInput = Field( # dict[input_name, default_value]
|
||||
default_factory=dict
|
||||
)
|
||||
metadata: dict[str, Any] = Field(default_factory=dict)
|
||||
input_links: list[Link] = Field(default_factory=list)
|
||||
output_links: list[Link] = Field(default_factory=list)
|
||||
|
||||
@property
|
||||
def credentials_optional(self) -> bool:
|
||||
@@ -221,18 +221,33 @@ class NodeModel(Node):
|
||||
return result
|
||||
|
||||
|
||||
class BaseGraph(BaseDbModel):
|
||||
class GraphBaseMeta(BaseDbModel):
|
||||
"""
|
||||
Shared base for `GraphMeta` and `BaseGraph`, with core graph metadata fields.
|
||||
"""
|
||||
|
||||
version: int = 1
|
||||
is_active: bool = True
|
||||
name: str
|
||||
description: str
|
||||
instructions: str | None = None
|
||||
recommended_schedule_cron: str | None = None
|
||||
nodes: list[Node] = []
|
||||
links: list[Link] = []
|
||||
forked_from_id: str | None = None
|
||||
forked_from_version: int | None = None
|
||||
|
||||
|
||||
class BaseGraph(GraphBaseMeta):
|
||||
"""
|
||||
Graph with nodes, links, and computed I/O schema fields.
|
||||
|
||||
Used to represent sub-graphs within a `Graph`. Contains the full graph
|
||||
structure including nodes and links, plus computed fields for schemas
|
||||
and trigger info. Does NOT include user_id or created_at (see GraphModel).
|
||||
"""
|
||||
|
||||
nodes: list[Node] = Field(default_factory=list)
|
||||
links: list[Link] = Field(default_factory=list)
|
||||
|
||||
@computed_field
|
||||
@property
|
||||
def input_schema(self) -> dict[str, Any]:
|
||||
@@ -361,44 +376,79 @@ class GraphTriggerInfo(BaseModel):
|
||||
|
||||
|
||||
class Graph(BaseGraph):
|
||||
sub_graphs: list[BaseGraph] = [] # Flattened sub-graphs
|
||||
"""Creatable graph model used in API create/update endpoints."""
|
||||
|
||||
sub_graphs: list[BaseGraph] = Field(default_factory=list) # Flattened sub-graphs
|
||||
|
||||
|
||||
class GraphMeta(GraphBaseMeta):
|
||||
"""
|
||||
Lightweight graph metadata model representing an existing graph from the database,
|
||||
for use in listings and summaries.
|
||||
|
||||
Lacks `GraphModel`'s nodes, links, and expensive computed fields.
|
||||
Use for list endpoints where full graph data is not needed and performance matters.
|
||||
"""
|
||||
|
||||
id: str # type: ignore
|
||||
version: int # type: ignore
|
||||
user_id: str
|
||||
created_at: datetime
|
||||
|
||||
@classmethod
|
||||
def from_db(cls, graph: "AgentGraph") -> Self:
|
||||
return cls(
|
||||
id=graph.id,
|
||||
version=graph.version,
|
||||
is_active=graph.isActive,
|
||||
name=graph.name or "",
|
||||
description=graph.description or "",
|
||||
instructions=graph.instructions,
|
||||
recommended_schedule_cron=graph.recommendedScheduleCron,
|
||||
forked_from_id=graph.forkedFromId,
|
||||
forked_from_version=graph.forkedFromVersion,
|
||||
user_id=graph.userId,
|
||||
created_at=graph.createdAt,
|
||||
)
|
||||
|
||||
|
||||
class GraphModel(Graph, GraphMeta):
|
||||
"""
|
||||
Full graph model representing an existing graph from the database.
|
||||
|
||||
This is the primary model for working with persisted graphs. Includes all
|
||||
graph data (nodes, links, sub_graphs) plus user ownership and timestamps.
|
||||
Provides computed fields (input_schema, output_schema, etc.) used during
|
||||
set-up (frontend) and execution (backend).
|
||||
|
||||
Inherits from:
|
||||
- `Graph`: provides structure (nodes, links, sub_graphs) and computed schemas
|
||||
- `GraphMeta`: provides user_id, created_at for database records
|
||||
"""
|
||||
|
||||
nodes: list[NodeModel] = Field(default_factory=list) # type: ignore
|
||||
|
||||
@property
|
||||
def starting_nodes(self) -> list[NodeModel]:
|
||||
outbound_nodes = {link.sink_id for link in self.links}
|
||||
input_nodes = {
|
||||
node.id for node in self.nodes if node.block.block_type == BlockType.INPUT
|
||||
}
|
||||
return [
|
||||
node
|
||||
for node in self.nodes
|
||||
if node.id not in outbound_nodes or node.id in input_nodes
|
||||
]
|
||||
|
||||
@property
|
||||
def webhook_input_node(self) -> NodeModel | None: # type: ignore
|
||||
return cast(NodeModel, super().webhook_input_node)
|
||||
|
||||
@computed_field
|
||||
@property
|
||||
def credentials_input_schema(self) -> dict[str, Any]:
|
||||
schema = self._credentials_input_schema.jsonschema()
|
||||
|
||||
# Determine which credential fields are required based on credentials_optional metadata
|
||||
graph_credentials_inputs = self.aggregate_credentials_inputs()
|
||||
required_fields = []
|
||||
|
||||
# Build a map of node_id -> node for quick lookup
|
||||
all_nodes = {node.id: node for node in self.nodes}
|
||||
for sub_graph in self.sub_graphs:
|
||||
for node in sub_graph.nodes:
|
||||
all_nodes[node.id] = node
|
||||
|
||||
for field_key, (
|
||||
_field_info,
|
||||
node_field_pairs,
|
||||
) in graph_credentials_inputs.items():
|
||||
# A field is required if ANY node using it has credentials_optional=False
|
||||
is_required = False
|
||||
for node_id, _field_name in node_field_pairs:
|
||||
node = all_nodes.get(node_id)
|
||||
if node and not node.credentials_optional:
|
||||
is_required = True
|
||||
break
|
||||
|
||||
if is_required:
|
||||
required_fields.append(field_key)
|
||||
|
||||
schema["required"] = required_fields
|
||||
return schema
|
||||
|
||||
@property
|
||||
def _credentials_input_schema(self) -> type[BlockSchema]:
|
||||
graph_credentials_inputs = self.aggregate_credentials_inputs()
|
||||
logger.debug(
|
||||
f"Combined credentials input fields for graph #{self.id} ({self.name}): "
|
||||
f"{graph_credentials_inputs}"
|
||||
@@ -406,8 +456,8 @@ class Graph(BaseGraph):
|
||||
|
||||
# Warn if same-provider credentials inputs can't be combined (= bad UX)
|
||||
graph_cred_fields = list(graph_credentials_inputs.values())
|
||||
for i, (field, keys) in enumerate(graph_cred_fields):
|
||||
for other_field, other_keys in list(graph_cred_fields)[i + 1 :]:
|
||||
for i, (field, keys, _) in enumerate(graph_cred_fields):
|
||||
for other_field, other_keys, _ in list(graph_cred_fields)[i + 1 :]:
|
||||
if field.provider != other_field.provider:
|
||||
continue
|
||||
if ProviderName.HTTP in field.provider:
|
||||
@@ -423,31 +473,78 @@ class Graph(BaseGraph):
|
||||
f"keys: {keys} <> {other_keys}."
|
||||
)
|
||||
|
||||
fields: dict[str, tuple[type[CredentialsMetaInput], CredentialsMetaInput]] = {
|
||||
agg_field_key: (
|
||||
CredentialsMetaInput[
|
||||
Literal[tuple(field_info.provider)], # type: ignore
|
||||
Literal[tuple(field_info.supported_types)], # type: ignore
|
||||
],
|
||||
CredentialsField(
|
||||
required_scopes=set(field_info.required_scopes or []),
|
||||
discriminator=field_info.discriminator,
|
||||
discriminator_mapping=field_info.discriminator_mapping,
|
||||
discriminator_values=field_info.discriminator_values,
|
||||
),
|
||||
)
|
||||
for agg_field_key, (field_info, _) in graph_credentials_inputs.items()
|
||||
}
|
||||
# Build JSON schema directly to avoid expensive create_model + validation overhead
|
||||
properties = {}
|
||||
required_fields = []
|
||||
|
||||
return create_model(
|
||||
self.name.replace(" ", "") + "CredentialsInputSchema",
|
||||
__base__=BlockSchema,
|
||||
**fields, # type: ignore
|
||||
)
|
||||
for agg_field_key, (
|
||||
field_info,
|
||||
_,
|
||||
is_required,
|
||||
) in graph_credentials_inputs.items():
|
||||
providers = list(field_info.provider)
|
||||
cred_types = list(field_info.supported_types)
|
||||
|
||||
field_schema: dict[str, Any] = {
|
||||
"credentials_provider": providers,
|
||||
"credentials_types": cred_types,
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"id": {"title": "Id", "type": "string"},
|
||||
"title": {
|
||||
"anyOf": [{"type": "string"}, {"type": "null"}],
|
||||
"default": None,
|
||||
"title": "Title",
|
||||
},
|
||||
"provider": {
|
||||
"title": "Provider",
|
||||
"type": "string",
|
||||
**(
|
||||
{"enum": providers}
|
||||
if len(providers) > 1
|
||||
else {"const": providers[0]}
|
||||
),
|
||||
},
|
||||
"type": {
|
||||
"title": "Type",
|
||||
"type": "string",
|
||||
**(
|
||||
{"enum": cred_types}
|
||||
if len(cred_types) > 1
|
||||
else {"const": cred_types[0]}
|
||||
),
|
||||
},
|
||||
},
|
||||
"required": ["id", "provider", "type"],
|
||||
}
|
||||
|
||||
# Add other (optional) field info items
|
||||
field_schema.update(
|
||||
field_info.model_dump(
|
||||
by_alias=True,
|
||||
exclude_defaults=True,
|
||||
exclude={"provider", "supported_types"}, # already included above
|
||||
)
|
||||
)
|
||||
|
||||
# Ensure field schema is well-formed
|
||||
CredentialsMetaInput.validate_credentials_field_schema(
|
||||
field_schema, agg_field_key
|
||||
)
|
||||
|
||||
properties[agg_field_key] = field_schema
|
||||
if is_required:
|
||||
required_fields.append(agg_field_key)
|
||||
|
||||
return {
|
||||
"type": "object",
|
||||
"properties": properties,
|
||||
"required": required_fields,
|
||||
}
|
||||
|
||||
def aggregate_credentials_inputs(
|
||||
self,
|
||||
) -> dict[str, tuple[CredentialsFieldInfo, set[tuple[str, str]]]]:
|
||||
) -> dict[str, tuple[CredentialsFieldInfo, set[tuple[str, str]], bool]]:
|
||||
"""
|
||||
Returns:
|
||||
dict[aggregated_field_key, tuple(
|
||||
@@ -455,13 +552,19 @@ class Graph(BaseGraph):
|
||||
(now includes discriminator_values from matching nodes)
|
||||
set[(node_id, field_name)]: Node credentials fields that are
|
||||
compatible with this aggregated field spec
|
||||
bool: True if the field is required (any node has credentials_optional=False)
|
||||
)]
|
||||
"""
|
||||
# First collect all credential field data with input defaults
|
||||
node_credential_data = []
|
||||
# Track (field_info, (node_id, field_name), is_required) for each credential field
|
||||
node_credential_data: list[tuple[CredentialsFieldInfo, tuple[str, str]]] = []
|
||||
node_required_map: dict[str, bool] = {} # node_id -> is_required
|
||||
|
||||
for graph in [self] + self.sub_graphs:
|
||||
for node in graph.nodes:
|
||||
# Track if this node requires credentials (credentials_optional=False means required)
|
||||
node_required_map[node.id] = not node.credentials_optional
|
||||
|
||||
for (
|
||||
field_name,
|
||||
field_info,
|
||||
@@ -485,37 +588,21 @@ class Graph(BaseGraph):
|
||||
)
|
||||
|
||||
# Combine credential field info (this will merge discriminator_values automatically)
|
||||
return CredentialsFieldInfo.combine(*node_credential_data)
|
||||
combined = CredentialsFieldInfo.combine(*node_credential_data)
|
||||
|
||||
|
||||
class GraphModel(Graph):
|
||||
user_id: str
|
||||
nodes: list[NodeModel] = [] # type: ignore
|
||||
|
||||
created_at: datetime
|
||||
|
||||
@property
|
||||
def starting_nodes(self) -> list[NodeModel]:
|
||||
outbound_nodes = {link.sink_id for link in self.links}
|
||||
input_nodes = {
|
||||
node.id for node in self.nodes if node.block.block_type == BlockType.INPUT
|
||||
# Add is_required flag to each aggregated field
|
||||
# A field is required if ANY node using it has credentials_optional=False
|
||||
return {
|
||||
key: (
|
||||
field_info,
|
||||
node_field_pairs,
|
||||
any(
|
||||
node_required_map.get(node_id, True)
|
||||
for node_id, _ in node_field_pairs
|
||||
),
|
||||
)
|
||||
for key, (field_info, node_field_pairs) in combined.items()
|
||||
}
|
||||
return [
|
||||
node
|
||||
for node in self.nodes
|
||||
if node.id not in outbound_nodes or node.id in input_nodes
|
||||
]
|
||||
|
||||
@property
|
||||
def webhook_input_node(self) -> NodeModel | None: # type: ignore
|
||||
return cast(NodeModel, super().webhook_input_node)
|
||||
|
||||
def meta(self) -> "GraphMeta":
|
||||
"""
|
||||
Returns a GraphMeta object with metadata about the graph.
|
||||
This is used to return metadata about the graph without exposing nodes and links.
|
||||
"""
|
||||
return GraphMeta.from_graph(self)
|
||||
|
||||
def reassign_ids(self, user_id: str, reassign_graph_id: bool = False):
|
||||
"""
|
||||
@@ -799,13 +886,14 @@ class GraphModel(Graph):
|
||||
if is_static_output_block(link.source_id):
|
||||
link.is_static = True # Each value block output should be static.
|
||||
|
||||
@staticmethod
|
||||
def from_db(
|
||||
@classmethod
|
||||
def from_db( # type: ignore[reportIncompatibleMethodOverride]
|
||||
cls,
|
||||
graph: AgentGraph,
|
||||
for_export: bool = False,
|
||||
sub_graphs: list[AgentGraph] | None = None,
|
||||
) -> "GraphModel":
|
||||
return GraphModel(
|
||||
) -> Self:
|
||||
return cls(
|
||||
id=graph.id,
|
||||
user_id=graph.userId if not for_export else "",
|
||||
version=graph.version,
|
||||
@@ -831,17 +919,28 @@ class GraphModel(Graph):
|
||||
],
|
||||
)
|
||||
|
||||
def hide_nodes(self) -> "GraphModelWithoutNodes":
|
||||
"""
|
||||
Returns a copy of the `GraphModel` with nodes, links, and sub-graphs hidden
|
||||
(excluded from serialization). They are still present in the model instance
|
||||
so all computed fields (e.g. `credentials_input_schema`) still work.
|
||||
"""
|
||||
return GraphModelWithoutNodes.model_validate(self, from_attributes=True)
|
||||
|
||||
class GraphMeta(Graph):
|
||||
user_id: str
|
||||
|
||||
# Easy work-around to prevent exposing nodes and links in the API response
|
||||
nodes: list[NodeModel] = Field(default=[], exclude=True) # type: ignore
|
||||
links: list[Link] = Field(default=[], exclude=True)
|
||||
class GraphModelWithoutNodes(GraphModel):
|
||||
"""
|
||||
GraphModel variant that excludes nodes, links, and sub-graphs from serialization.
|
||||
|
||||
@staticmethod
|
||||
def from_graph(graph: GraphModel) -> "GraphMeta":
|
||||
return GraphMeta(**graph.model_dump())
|
||||
Used in contexts like the store where exposing internal graph structure
|
||||
is not desired. Inherits all computed fields from GraphModel but marks
|
||||
nodes and links as excluded from JSON output.
|
||||
"""
|
||||
|
||||
nodes: list[NodeModel] = Field(default_factory=list, exclude=True)
|
||||
links: list[Link] = Field(default_factory=list, exclude=True)
|
||||
|
||||
sub_graphs: list[BaseGraph] = Field(default_factory=list, exclude=True)
|
||||
|
||||
|
||||
class GraphsPaginated(BaseModel):
|
||||
@@ -912,21 +1011,11 @@ async def list_graphs_paginated(
|
||||
where=where_clause,
|
||||
distinct=["id"],
|
||||
order={"version": "desc"},
|
||||
include=AGENT_GRAPH_INCLUDE,
|
||||
skip=offset,
|
||||
take=page_size,
|
||||
)
|
||||
|
||||
graph_models: list[GraphMeta] = []
|
||||
for graph in graphs:
|
||||
try:
|
||||
graph_meta = GraphModel.from_db(graph).meta()
|
||||
# Trigger serialization to validate that the graph is well formed
|
||||
graph_meta.model_dump()
|
||||
graph_models.append(graph_meta)
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing graph {graph.id}: {e}")
|
||||
continue
|
||||
graph_models = [GraphMeta.from_db(graph) for graph in graphs]
|
||||
|
||||
return GraphsPaginated(
|
||||
graphs=graph_models,
|
||||
|
||||
@@ -163,7 +163,6 @@ class User(BaseModel):
|
||||
if TYPE_CHECKING:
|
||||
from prisma.models import User as PrismaUser
|
||||
|
||||
from backend.data.block import BlockSchema
|
||||
|
||||
T = TypeVar("T")
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -508,15 +507,13 @@ class CredentialsMetaInput(BaseModel, Generic[CP, CT]):
|
||||
def allowed_cred_types(cls) -> tuple[CredentialsType, ...]:
|
||||
return get_args(cls.model_fields["type"].annotation)
|
||||
|
||||
@classmethod
|
||||
def validate_credentials_field_schema(cls, model: type["BlockSchema"]):
|
||||
@staticmethod
|
||||
def validate_credentials_field_schema(
|
||||
field_schema: dict[str, Any], field_name: str
|
||||
):
|
||||
"""Validates the schema of a credentials input field"""
|
||||
field_name = next(
|
||||
name for name, type in model.get_credentials_fields().items() if type is cls
|
||||
)
|
||||
field_schema = model.jsonschema()["properties"][field_name]
|
||||
try:
|
||||
schema_extra = CredentialsFieldInfo[CP, CT].model_validate(field_schema)
|
||||
field_info = CredentialsFieldInfo[CP, CT].model_validate(field_schema)
|
||||
except ValidationError as e:
|
||||
if "Field required [type=missing" not in str(e):
|
||||
raise
|
||||
@@ -526,11 +523,11 @@ class CredentialsMetaInput(BaseModel, Generic[CP, CT]):
|
||||
f"{field_schema}"
|
||||
) from e
|
||||
|
||||
providers = cls.allowed_providers()
|
||||
providers = field_info.provider
|
||||
if (
|
||||
providers is not None
|
||||
and len(providers) > 1
|
||||
and not schema_extra.discriminator
|
||||
and not field_info.discriminator
|
||||
):
|
||||
raise TypeError(
|
||||
f"Multi-provider CredentialsField '{field_name}' "
|
||||
|
||||
@@ -373,7 +373,7 @@ def make_node_credentials_input_map(
|
||||
# Get aggregated credentials fields for the graph
|
||||
graph_cred_inputs = graph.aggregate_credentials_inputs()
|
||||
|
||||
for graph_input_name, (_, compatible_node_fields) in graph_cred_inputs.items():
|
||||
for graph_input_name, (_, compatible_node_fields, _) in graph_cred_inputs.items():
|
||||
# Best-effort map: skip missing items
|
||||
if graph_input_name not in graph_credentials_input:
|
||||
continue
|
||||
|
||||
@@ -30,6 +30,7 @@ class ProviderName(str, Enum):
|
||||
IDEOGRAM = "ideogram"
|
||||
JINA = "jina"
|
||||
LLAMA_API = "llama_api"
|
||||
MCP = "mcp"
|
||||
MEDIUM = "medium"
|
||||
MEM0 = "mem0"
|
||||
NOTION = "notion"
|
||||
|
||||
6936
autogpt_platform/backend/poetry.lock
generated
6936
autogpt_platform/backend/poetry.lock
generated
File diff suppressed because it is too large
Load Diff
@@ -13,7 +13,6 @@ aio-pika = "^9.5.5"
|
||||
aiohttp = "^3.10.0"
|
||||
aiodns = "^3.5.0"
|
||||
anthropic = "^0.59.0"
|
||||
claude-agent-sdk = "^0.1.0"
|
||||
apscheduler = "^3.11.1"
|
||||
autogpt-libs = { path = "../autogpt_libs", develop = true }
|
||||
bleach = { extras = ["css"], version = "^6.2.0" }
|
||||
@@ -22,7 +21,7 @@ cryptography = "^45.0"
|
||||
discord-py = "^2.5.2"
|
||||
e2b-code-interpreter = "^1.5.2"
|
||||
elevenlabs = "^1.50.0"
|
||||
fastapi = "^0.116.1"
|
||||
fastapi = "^0.128.0"
|
||||
feedparser = "^6.0.11"
|
||||
flake8 = "^7.3.0"
|
||||
google-api-python-client = "^2.177.0"
|
||||
@@ -36,7 +35,7 @@ jinja2 = "^3.1.6"
|
||||
jsonref = "^1.1.0"
|
||||
jsonschema = "^4.25.0"
|
||||
langfuse = "^3.11.0"
|
||||
launchdarkly-server-sdk = "^9.12.0"
|
||||
launchdarkly-server-sdk = "^9.14.1"
|
||||
mem0ai = "^0.1.115"
|
||||
moviepy = "^2.1.2"
|
||||
ollama = "^0.5.1"
|
||||
@@ -53,8 +52,8 @@ prometheus-client = "^0.22.1"
|
||||
prometheus-fastapi-instrumentator = "^7.0.0"
|
||||
psutil = "^7.0.0"
|
||||
psycopg2-binary = "^2.9.10"
|
||||
pydantic = { extras = ["email"], version = "^2.11.7" }
|
||||
pydantic-settings = "^2.10.1"
|
||||
pydantic = { extras = ["email"], version = "^2.12.5" }
|
||||
pydantic-settings = "^2.12.0"
|
||||
pytest = "^8.4.1"
|
||||
pytest-asyncio = "^1.1.0"
|
||||
python-dotenv = "^1.1.1"
|
||||
@@ -66,11 +65,11 @@ sentry-sdk = {extras = ["anthropic", "fastapi", "launchdarkly", "openai", "sqlal
|
||||
sqlalchemy = "^2.0.40"
|
||||
strenum = "^0.4.9"
|
||||
stripe = "^11.5.0"
|
||||
supabase = "2.17.0"
|
||||
supabase = "2.27.2"
|
||||
tenacity = "^9.1.2"
|
||||
todoist-api-python = "^2.1.7"
|
||||
tweepy = "^4.16.0"
|
||||
uvicorn = { extras = ["standard"], version = "^0.35.0" }
|
||||
uvicorn = { extras = ["standard"], version = "^0.40.0" }
|
||||
websockets = "^15.0"
|
||||
youtube-transcript-api = "^1.2.1"
|
||||
yt-dlp = "2025.12.08"
|
||||
|
||||
@@ -3,7 +3,6 @@
|
||||
"credentials_input_schema": {
|
||||
"properties": {},
|
||||
"required": [],
|
||||
"title": "TestGraphCredentialsInputSchema",
|
||||
"type": "object"
|
||||
},
|
||||
"description": "A test graph",
|
||||
|
||||
@@ -1,34 +1,14 @@
|
||||
[
|
||||
{
|
||||
"credentials_input_schema": {
|
||||
"properties": {},
|
||||
"required": [],
|
||||
"title": "TestGraphCredentialsInputSchema",
|
||||
"type": "object"
|
||||
},
|
||||
"created_at": "2025-09-04T13:37:00",
|
||||
"description": "A test graph",
|
||||
"forked_from_id": null,
|
||||
"forked_from_version": null,
|
||||
"has_external_trigger": false,
|
||||
"has_human_in_the_loop": false,
|
||||
"has_sensitive_action": false,
|
||||
"id": "graph-123",
|
||||
"input_schema": {
|
||||
"properties": {},
|
||||
"required": [],
|
||||
"type": "object"
|
||||
},
|
||||
"instructions": null,
|
||||
"is_active": true,
|
||||
"name": "Test Graph",
|
||||
"output_schema": {
|
||||
"properties": {},
|
||||
"required": [],
|
||||
"type": "object"
|
||||
},
|
||||
"recommended_schedule_cron": null,
|
||||
"sub_graphs": [],
|
||||
"trigger_setup_info": null,
|
||||
"user_id": "3e53486c-cf57-477e-ba2a-cb02dc828e1a",
|
||||
"version": 1
|
||||
}
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import { CredentialsMetaInput } from "@/app/api/__generated__/models/credentialsMetaInput";
|
||||
import { GraphMeta } from "@/app/api/__generated__/models/graphMeta";
|
||||
import { GraphModel } from "@/app/api/__generated__/models/graphModel";
|
||||
import { CredentialsInput } from "@/components/contextual/CredentialsInput/CredentialsInput";
|
||||
import { useState } from "react";
|
||||
import { getSchemaDefaultCredentials } from "../../helpers";
|
||||
@@ -9,7 +9,7 @@ type Credential = CredentialsMetaInput | undefined;
|
||||
type Credentials = Record<string, Credential>;
|
||||
|
||||
type Props = {
|
||||
agent: GraphMeta | null;
|
||||
agent: GraphModel | null;
|
||||
siblingInputs?: Record<string, any>;
|
||||
onCredentialsChange: (
|
||||
credentials: Record<string, CredentialsMetaInput>,
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
import { CredentialsMetaInput } from "@/app/api/__generated__/models/credentialsMetaInput";
|
||||
import { GraphMeta } from "@/app/api/__generated__/models/graphMeta";
|
||||
import { GraphModel } from "@/app/api/__generated__/models/graphModel";
|
||||
import { BlockIOCredentialsSubSchema } from "@/lib/autogpt-server-api/types";
|
||||
|
||||
export function getCredentialFields(
|
||||
agent: GraphMeta | null,
|
||||
agent: GraphModel | null,
|
||||
): AgentCredentialsFields {
|
||||
if (!agent) return {};
|
||||
|
||||
|
||||
@@ -3,10 +3,10 @@ import type {
|
||||
CredentialsMetaInput,
|
||||
} from "@/lib/autogpt-server-api/types";
|
||||
import type { InputValues } from "./types";
|
||||
import { GraphMeta } from "@/app/api/__generated__/models/graphMeta";
|
||||
import { GraphModel } from "@/app/api/__generated__/models/graphModel";
|
||||
|
||||
export function computeInitialAgentInputs(
|
||||
agent: GraphMeta | null,
|
||||
agent: GraphModel | null,
|
||||
existingInputs?: InputValues | null,
|
||||
): InputValues {
|
||||
const properties = agent?.input_schema?.properties || {};
|
||||
@@ -29,7 +29,7 @@ export function computeInitialAgentInputs(
|
||||
}
|
||||
|
||||
type IsRunDisabledParams = {
|
||||
agent: GraphMeta | null;
|
||||
agent: GraphModel | null;
|
||||
isRunning: boolean;
|
||||
agentInputs: InputValues | null | undefined;
|
||||
};
|
||||
|
||||
@@ -30,6 +30,8 @@ import {
|
||||
} from "@/components/atoms/Tooltip/BaseTooltip";
|
||||
import { GraphMeta } from "@/lib/autogpt-server-api";
|
||||
import jaro from "jaro-winkler";
|
||||
import { getV1GetSpecificGraph } from "@/app/api/__generated__/endpoints/graphs/graphs";
|
||||
import { okData } from "@/app/api/helpers";
|
||||
|
||||
type _Block = Omit<Block, "inputSchema" | "outputSchema"> & {
|
||||
uiKey?: string;
|
||||
@@ -107,6 +109,8 @@ export function BlocksControl({
|
||||
.filter((b) => b.uiType !== BlockUIType.AGENT)
|
||||
.sort((a, b) => a.name.localeCompare(b.name));
|
||||
|
||||
// Agent blocks are created from GraphMeta which doesn't include schemas.
|
||||
// Schemas will be fetched on-demand when the block is actually added.
|
||||
const agentBlockList = flows
|
||||
.map((flow): _Block => {
|
||||
return {
|
||||
@@ -116,8 +120,9 @@ export function BlocksControl({
|
||||
`Ver.${flow.version}` +
|
||||
(flow.description ? ` | ${flow.description}` : ""),
|
||||
categories: [{ category: "AGENT", description: "" }],
|
||||
inputSchema: flow.input_schema,
|
||||
outputSchema: flow.output_schema,
|
||||
// Empty schemas - will be populated when block is added
|
||||
inputSchema: { type: "object", properties: {} },
|
||||
outputSchema: { type: "object", properties: {} },
|
||||
staticOutput: false,
|
||||
uiType: BlockUIType.AGENT,
|
||||
costs: [],
|
||||
@@ -125,8 +130,7 @@ export function BlocksControl({
|
||||
hardcodedValues: {
|
||||
graph_id: flow.id,
|
||||
graph_version: flow.version,
|
||||
input_schema: flow.input_schema,
|
||||
output_schema: flow.output_schema,
|
||||
// Schemas will be fetched on-demand when block is added
|
||||
},
|
||||
};
|
||||
})
|
||||
@@ -182,6 +186,37 @@ export function BlocksControl({
|
||||
setSelectedCategory(null);
|
||||
}, []);
|
||||
|
||||
// Handler to add a block, fetching graph data on-demand for agent blocks
|
||||
const handleAddBlock = useCallback(
|
||||
async (block: _Block & { notAvailable: string | null }) => {
|
||||
if (block.notAvailable) return;
|
||||
|
||||
// For agent blocks, fetch the full graph to get schemas
|
||||
if (block.uiType === BlockUIType.AGENT && block.hardcodedValues) {
|
||||
const graphID = block.hardcodedValues.graph_id as string;
|
||||
const graphVersion = block.hardcodedValues.graph_version as number;
|
||||
const graphData = okData(
|
||||
await getV1GetSpecificGraph(graphID, { version: graphVersion }),
|
||||
);
|
||||
|
||||
if (graphData) {
|
||||
addBlock(block.id, block.name, {
|
||||
...block.hardcodedValues,
|
||||
input_schema: graphData.input_schema,
|
||||
output_schema: graphData.output_schema,
|
||||
});
|
||||
} else {
|
||||
// Fallback: add without schemas (will be incomplete)
|
||||
console.error("Failed to fetch graph data for agent block");
|
||||
addBlock(block.id, block.name, block.hardcodedValues || {});
|
||||
}
|
||||
} else {
|
||||
addBlock(block.id, block.name, block.hardcodedValues || {});
|
||||
}
|
||||
},
|
||||
[addBlock],
|
||||
);
|
||||
|
||||
// Extract unique categories from blocks
|
||||
const categories = useMemo(() => {
|
||||
return Array.from(
|
||||
@@ -303,10 +338,7 @@ export function BlocksControl({
|
||||
}),
|
||||
);
|
||||
}}
|
||||
onClick={() =>
|
||||
!block.notAvailable &&
|
||||
addBlock(block.id, block.name, block?.hardcodedValues || {})
|
||||
}
|
||||
onClick={() => handleAddBlock(block)}
|
||||
title={block.notAvailable ?? undefined}
|
||||
>
|
||||
<div
|
||||
|
||||
@@ -29,13 +29,17 @@ import "@xyflow/react/dist/style.css";
|
||||
import { ConnectedEdge, CustomNode } from "../CustomNode/CustomNode";
|
||||
import "./flow.css";
|
||||
import {
|
||||
BlockIORootSchema,
|
||||
BlockUIType,
|
||||
formatEdgeID,
|
||||
GraphExecutionID,
|
||||
GraphID,
|
||||
GraphMeta,
|
||||
LibraryAgent,
|
||||
SpecialBlockID,
|
||||
} from "@/lib/autogpt-server-api";
|
||||
import { getV1GetSpecificGraph } from "@/app/api/__generated__/endpoints/graphs/graphs";
|
||||
import { okData } from "@/app/api/helpers";
|
||||
import { IncompatibilityInfo } from "../../../hooks/useSubAgentUpdate/types";
|
||||
import { Key, storage } from "@/services/storage/local-storage";
|
||||
import { findNewlyAddedBlockCoordinates, getTypeColor } from "@/lib/utils";
|
||||
@@ -687,8 +691,94 @@ const FlowEditor: React.FC<{
|
||||
[getNode, updateNode, nodes],
|
||||
);
|
||||
|
||||
/* Shared helper to create and add a node */
|
||||
const createAndAddNode = useCallback(
|
||||
async (
|
||||
blockID: string,
|
||||
blockName: string,
|
||||
hardcodedValues: Record<string, any>,
|
||||
position: { x: number; y: number },
|
||||
): Promise<CustomNode | null> => {
|
||||
const nodeSchema = availableBlocks.find((node) => node.id === blockID);
|
||||
if (!nodeSchema) {
|
||||
console.error(`Schema not found for block ID: ${blockID}`);
|
||||
return null;
|
||||
}
|
||||
|
||||
// For agent blocks, fetch the full graph to get schemas
|
||||
let inputSchema: BlockIORootSchema = nodeSchema.inputSchema;
|
||||
let outputSchema: BlockIORootSchema = nodeSchema.outputSchema;
|
||||
let finalHardcodedValues = hardcodedValues;
|
||||
|
||||
if (blockID === SpecialBlockID.AGENT) {
|
||||
const graphID = hardcodedValues.graph_id as string;
|
||||
const graphVersion = hardcodedValues.graph_version as number;
|
||||
const graphData = okData(
|
||||
await getV1GetSpecificGraph(graphID, { version: graphVersion }),
|
||||
);
|
||||
|
||||
if (graphData) {
|
||||
inputSchema = graphData.input_schema as BlockIORootSchema;
|
||||
outputSchema = graphData.output_schema as BlockIORootSchema;
|
||||
finalHardcodedValues = {
|
||||
...hardcodedValues,
|
||||
input_schema: graphData.input_schema,
|
||||
output_schema: graphData.output_schema,
|
||||
};
|
||||
} else {
|
||||
console.error("Failed to fetch graph data for agent block");
|
||||
}
|
||||
}
|
||||
|
||||
const newNode: CustomNode = {
|
||||
id: nodeId.toString(),
|
||||
type: "custom",
|
||||
position,
|
||||
data: {
|
||||
blockType: blockName,
|
||||
blockCosts: nodeSchema.costs || [],
|
||||
title: `${blockName} ${nodeId}`,
|
||||
description: nodeSchema.description,
|
||||
categories: nodeSchema.categories,
|
||||
inputSchema: inputSchema,
|
||||
outputSchema: outputSchema,
|
||||
hardcodedValues: finalHardcodedValues,
|
||||
connections: [],
|
||||
isOutputOpen: false,
|
||||
block_id: blockID,
|
||||
isOutputStatic: nodeSchema.staticOutput,
|
||||
uiType: nodeSchema.uiType,
|
||||
},
|
||||
};
|
||||
|
||||
addNodes(newNode);
|
||||
setNodeId((prevId) => prevId + 1);
|
||||
clearNodesStatusAndOutput();
|
||||
|
||||
history.push({
|
||||
type: "ADD_NODE",
|
||||
payload: { node: { ...newNode, ...newNode.data } },
|
||||
undo: () => deleteElements({ nodes: [{ id: newNode.id }] }),
|
||||
redo: () => addNodes(newNode),
|
||||
});
|
||||
|
||||
return newNode;
|
||||
},
|
||||
[
|
||||
availableBlocks,
|
||||
nodeId,
|
||||
addNodes,
|
||||
deleteElements,
|
||||
clearNodesStatusAndOutput,
|
||||
],
|
||||
);
|
||||
|
||||
const addNode = useCallback(
|
||||
(blockId: string, nodeType: string, hardcodedValues: any = {}) => {
|
||||
async (
|
||||
blockId: string,
|
||||
nodeType: string,
|
||||
hardcodedValues: Record<string, any> = {},
|
||||
) => {
|
||||
const nodeSchema = availableBlocks.find((node) => node.id === blockId);
|
||||
if (!nodeSchema) {
|
||||
console.error(`Schema not found for block ID: ${blockId}`);
|
||||
@@ -707,73 +797,42 @@ const FlowEditor: React.FC<{
|
||||
// Alternative: We could also use D3 force, Intersection for this (React flow Pro examples)
|
||||
|
||||
const { x, y } = getViewport();
|
||||
const viewportCoordinates =
|
||||
const position =
|
||||
nodeDimensions && Object.keys(nodeDimensions).length > 0
|
||||
? // we will get all the dimension of nodes, then store
|
||||
findNewlyAddedBlockCoordinates(
|
||||
? findNewlyAddedBlockCoordinates(
|
||||
nodeDimensions,
|
||||
nodeSchema.uiType == BlockUIType.NOTE ? 300 : 500,
|
||||
60,
|
||||
1.0,
|
||||
)
|
||||
: // we will get all the dimension of nodes, then store
|
||||
{
|
||||
: {
|
||||
x: window.innerWidth / 2 - x,
|
||||
y: window.innerHeight / 2 - y,
|
||||
};
|
||||
|
||||
const newNode: CustomNode = {
|
||||
id: nodeId.toString(),
|
||||
type: "custom",
|
||||
position: viewportCoordinates, // Set the position to the calculated viewport center
|
||||
data: {
|
||||
blockType: nodeType,
|
||||
blockCosts: nodeSchema.costs,
|
||||
title: `${nodeType} ${nodeId}`,
|
||||
description: nodeSchema.description,
|
||||
categories: nodeSchema.categories,
|
||||
inputSchema: nodeSchema.inputSchema,
|
||||
outputSchema: nodeSchema.outputSchema,
|
||||
hardcodedValues: hardcodedValues,
|
||||
connections: [],
|
||||
isOutputOpen: false,
|
||||
block_id: blockId,
|
||||
isOutputStatic: nodeSchema.staticOutput,
|
||||
uiType: nodeSchema.uiType,
|
||||
},
|
||||
};
|
||||
|
||||
addNodes(newNode);
|
||||
setNodeId((prevId) => prevId + 1);
|
||||
clearNodesStatusAndOutput(); // Clear status and output when a new node is added
|
||||
const newNode = await createAndAddNode(
|
||||
blockId,
|
||||
nodeType,
|
||||
hardcodedValues,
|
||||
position,
|
||||
);
|
||||
if (!newNode) return;
|
||||
|
||||
setViewport(
|
||||
{
|
||||
// Rough estimate of the dimension of the node is: 500x400px.
|
||||
// Though we skip shifting the X, considering the block menu side-bar.
|
||||
x: -viewportCoordinates.x * 0.8 + (window.innerWidth - 0.0) / 2,
|
||||
y: -viewportCoordinates.y * 0.8 + (window.innerHeight - 400) / 2,
|
||||
x: -position.x * 0.8 + (window.innerWidth - 0.0) / 2,
|
||||
y: -position.y * 0.8 + (window.innerHeight - 400) / 2,
|
||||
zoom: 0.8,
|
||||
},
|
||||
{ duration: 500 },
|
||||
);
|
||||
|
||||
history.push({
|
||||
type: "ADD_NODE",
|
||||
payload: { node: { ...newNode, ...newNode.data } },
|
||||
undo: () => deleteElements({ nodes: [{ id: newNode.id }] }),
|
||||
redo: () => addNodes(newNode),
|
||||
});
|
||||
},
|
||||
[
|
||||
nodeId,
|
||||
getViewport,
|
||||
setViewport,
|
||||
availableBlocks,
|
||||
addNodes,
|
||||
nodeDimensions,
|
||||
deleteElements,
|
||||
clearNodesStatusAndOutput,
|
||||
createAndAddNode,
|
||||
],
|
||||
);
|
||||
|
||||
@@ -920,7 +979,7 @@ const FlowEditor: React.FC<{
|
||||
}, []);
|
||||
|
||||
const onDrop = useCallback(
|
||||
(event: React.DragEvent) => {
|
||||
async (event: React.DragEvent) => {
|
||||
event.preventDefault();
|
||||
|
||||
const blockData = event.dataTransfer.getData("application/reactflow");
|
||||
@@ -935,62 +994,17 @@ const FlowEditor: React.FC<{
|
||||
y: event.clientY,
|
||||
});
|
||||
|
||||
// Find the block schema
|
||||
const nodeSchema = availableBlocks.find((node) => node.id === blockId);
|
||||
if (!nodeSchema) {
|
||||
console.error(`Schema not found for block ID: ${blockId}`);
|
||||
return;
|
||||
}
|
||||
|
||||
// Create the new node at the drop position
|
||||
const newNode: CustomNode = {
|
||||
id: nodeId.toString(),
|
||||
type: "custom",
|
||||
await createAndAddNode(
|
||||
blockId,
|
||||
blockName,
|
||||
hardcodedValues || {},
|
||||
position,
|
||||
data: {
|
||||
blockType: blockName,
|
||||
blockCosts: nodeSchema.costs || [],
|
||||
title: `${blockName} ${nodeId}`,
|
||||
description: nodeSchema.description,
|
||||
categories: nodeSchema.categories,
|
||||
inputSchema: nodeSchema.inputSchema,
|
||||
outputSchema: nodeSchema.outputSchema,
|
||||
hardcodedValues: hardcodedValues,
|
||||
connections: [],
|
||||
isOutputOpen: false,
|
||||
block_id: blockId,
|
||||
uiType: nodeSchema.uiType,
|
||||
},
|
||||
};
|
||||
|
||||
history.push({
|
||||
type: "ADD_NODE",
|
||||
payload: { node: { ...newNode, ...newNode.data } },
|
||||
undo: () => {
|
||||
deleteElements({ nodes: [{ id: newNode.id } as any], edges: [] });
|
||||
},
|
||||
redo: () => {
|
||||
addNodes([newNode]);
|
||||
},
|
||||
});
|
||||
addNodes([newNode]);
|
||||
clearNodesStatusAndOutput();
|
||||
|
||||
setNodeId((prevId) => prevId + 1);
|
||||
);
|
||||
} catch (error) {
|
||||
console.error("Failed to drop block:", error);
|
||||
}
|
||||
},
|
||||
[
|
||||
nodeId,
|
||||
availableBlocks,
|
||||
nodes,
|
||||
edges,
|
||||
addNodes,
|
||||
screenToFlowPosition,
|
||||
deleteElements,
|
||||
clearNodesStatusAndOutput,
|
||||
],
|
||||
[screenToFlowPosition, createAndAddNode],
|
||||
);
|
||||
|
||||
const buildContextValue: BuilderContextType = useMemo(
|
||||
|
||||
@@ -4,13 +4,13 @@ import { AgentRunDraftView } from "@/app/(platform)/library/agents/[id]/componen
|
||||
import { Dialog } from "@/components/molecules/Dialog/Dialog";
|
||||
import type {
|
||||
CredentialsMetaInput,
|
||||
GraphMeta,
|
||||
Graph,
|
||||
} from "@/lib/autogpt-server-api/types";
|
||||
|
||||
interface RunInputDialogProps {
|
||||
isOpen: boolean;
|
||||
doClose: () => void;
|
||||
graph: GraphMeta;
|
||||
graph: Graph;
|
||||
doRun?: (
|
||||
inputs: Record<string, any>,
|
||||
credentialsInputs: Record<string, CredentialsMetaInput>,
|
||||
|
||||
@@ -9,13 +9,13 @@ import { CustomNodeData } from "@/app/(platform)/build/components/legacy-builder
|
||||
import {
|
||||
BlockUIType,
|
||||
CredentialsMetaInput,
|
||||
GraphMeta,
|
||||
Graph,
|
||||
} from "@/lib/autogpt-server-api/types";
|
||||
import RunnerOutputUI, { OutputNodeInfo } from "./RunnerOutputUI";
|
||||
import { RunnerInputDialog } from "./RunnerInputUI";
|
||||
|
||||
interface RunnerUIWrapperProps {
|
||||
graph: GraphMeta;
|
||||
graph: Graph;
|
||||
nodes: Node<CustomNodeData>[];
|
||||
graphExecutionError?: string | null;
|
||||
saveAndRun: (
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import { GraphInputSchema } from "@/lib/autogpt-server-api";
|
||||
import { GraphMetaLike, IncompatibilityInfo } from "./types";
|
||||
import { GraphLike, IncompatibilityInfo } from "./types";
|
||||
|
||||
// Helper type for schema properties - the generated types are too loose
|
||||
type SchemaProperties = Record<string, GraphInputSchema["properties"][string]>;
|
||||
@@ -36,7 +36,7 @@ export function getSchemaRequired(schema: unknown): SchemaRequired {
|
||||
*/
|
||||
export function createUpdatedAgentNodeInputs(
|
||||
currentInputs: Record<string, unknown>,
|
||||
latestSubGraphVersion: GraphMetaLike,
|
||||
latestSubGraphVersion: GraphLike,
|
||||
): Record<string, unknown> {
|
||||
return {
|
||||
...currentInputs,
|
||||
|
||||
@@ -1,7 +1,11 @@
|
||||
import type { GraphMeta as LegacyGraphMeta } from "@/lib/autogpt-server-api";
|
||||
import type {
|
||||
Graph as LegacyGraph,
|
||||
GraphMeta as LegacyGraphMeta,
|
||||
} from "@/lib/autogpt-server-api";
|
||||
import type { GraphModel as GeneratedGraph } from "@/app/api/__generated__/models/graphModel";
|
||||
import type { GraphMeta as GeneratedGraphMeta } from "@/app/api/__generated__/models/graphMeta";
|
||||
|
||||
export type SubAgentUpdateInfo<T extends GraphMetaLike = GraphMetaLike> = {
|
||||
export type SubAgentUpdateInfo<T extends GraphLike = GraphLike> = {
|
||||
hasUpdate: boolean;
|
||||
currentVersion: number;
|
||||
latestVersion: number;
|
||||
@@ -10,7 +14,10 @@ export type SubAgentUpdateInfo<T extends GraphMetaLike = GraphMetaLike> = {
|
||||
incompatibilities: IncompatibilityInfo | null;
|
||||
};
|
||||
|
||||
// Union type for GraphMeta that works with both legacy and new builder
|
||||
// Union type for Graph (with schemas) that works with both legacy and new builder
|
||||
export type GraphLike = LegacyGraph | GeneratedGraph;
|
||||
|
||||
// Union type for GraphMeta (without schemas) for version detection
|
||||
export type GraphMetaLike = LegacyGraphMeta | GeneratedGraphMeta;
|
||||
|
||||
export type IncompatibilityInfo = {
|
||||
|
||||
@@ -1,5 +1,11 @@
|
||||
import { useMemo } from "react";
|
||||
import { GraphInputSchema, GraphOutputSchema } from "@/lib/autogpt-server-api";
|
||||
import type {
|
||||
GraphInputSchema,
|
||||
GraphOutputSchema,
|
||||
} from "@/lib/autogpt-server-api";
|
||||
import type { GraphModel } from "@/app/api/__generated__/models/graphModel";
|
||||
import { useGetV1GetSpecificGraph } from "@/app/api/__generated__/endpoints/graphs/graphs";
|
||||
import { okData } from "@/app/api/helpers";
|
||||
import { getEffectiveType } from "@/lib/utils";
|
||||
import { EdgeLike, getSchemaProperties, getSchemaRequired } from "./helpers";
|
||||
import {
|
||||
@@ -11,26 +17,38 @@ import {
|
||||
/**
|
||||
* Checks if a newer version of a sub-agent is available and determines compatibility
|
||||
*/
|
||||
export function useSubAgentUpdate<T extends GraphMetaLike>(
|
||||
export function useSubAgentUpdate(
|
||||
nodeID: string,
|
||||
graphID: string | undefined,
|
||||
graphVersion: number | undefined,
|
||||
currentInputSchema: GraphInputSchema | undefined,
|
||||
currentOutputSchema: GraphOutputSchema | undefined,
|
||||
connections: EdgeLike[],
|
||||
availableGraphs: T[],
|
||||
): SubAgentUpdateInfo<T> {
|
||||
availableGraphs: GraphMetaLike[],
|
||||
): SubAgentUpdateInfo<GraphModel> {
|
||||
// Find the latest version of the same graph
|
||||
const latestGraph = useMemo(() => {
|
||||
const latestGraphInfo = useMemo(() => {
|
||||
if (!graphID) return null;
|
||||
return availableGraphs.find((graph) => graph.id === graphID) || null;
|
||||
}, [graphID, availableGraphs]);
|
||||
|
||||
// Check if there's an update available
|
||||
// Check if there's a newer version available
|
||||
const hasUpdate = useMemo(() => {
|
||||
if (!latestGraph || graphVersion === undefined) return false;
|
||||
return latestGraph.version! > graphVersion;
|
||||
}, [latestGraph, graphVersion]);
|
||||
if (!latestGraphInfo || graphVersion === undefined) return false;
|
||||
return latestGraphInfo.version! > graphVersion;
|
||||
}, [latestGraphInfo, graphVersion]);
|
||||
|
||||
// Fetch full graph IF an update is detected
|
||||
const { data: latestGraph } = useGetV1GetSpecificGraph(
|
||||
graphID ?? "",
|
||||
{ version: latestGraphInfo?.version },
|
||||
{
|
||||
query: {
|
||||
enabled: hasUpdate && !!graphID && !!latestGraphInfo?.version,
|
||||
select: okData,
|
||||
},
|
||||
},
|
||||
);
|
||||
|
||||
// Get connected input and output handles for this specific node
|
||||
const connectedHandles = useMemo(() => {
|
||||
@@ -152,8 +170,8 @@ export function useSubAgentUpdate<T extends GraphMetaLike>(
|
||||
return {
|
||||
hasUpdate,
|
||||
currentVersion: graphVersion || 0,
|
||||
latestVersion: latestGraph?.version || 0,
|
||||
latestGraph,
|
||||
latestVersion: latestGraphInfo?.version || 0,
|
||||
latestGraph: latestGraph || null,
|
||||
isCompatible: compatibilityResult.isCompatible,
|
||||
incompatibilities: compatibilityResult.incompatibilities,
|
||||
};
|
||||
|
||||
@@ -18,7 +18,7 @@ interface GraphStore {
|
||||
outputSchema: Record<string, any> | null,
|
||||
) => void;
|
||||
|
||||
// Available graphs; used for sub-graph updates
|
||||
// Available graphs; used for sub-graph updated version detection
|
||||
availableSubGraphs: GraphMeta[];
|
||||
setAvailableSubGraphs: (graphs: GraphMeta[]) => void;
|
||||
|
||||
|
||||
@@ -10,8 +10,8 @@ import React, {
|
||||
import {
|
||||
CredentialsMetaInput,
|
||||
CredentialsType,
|
||||
Graph,
|
||||
GraphExecutionID,
|
||||
GraphMeta,
|
||||
LibraryAgentPreset,
|
||||
LibraryAgentPresetID,
|
||||
LibraryAgentPresetUpdatable,
|
||||
@@ -69,7 +69,7 @@ export function AgentRunDraftView({
|
||||
className,
|
||||
recommendedScheduleCron,
|
||||
}: {
|
||||
graph: GraphMeta;
|
||||
graph: Graph;
|
||||
agentActions?: ButtonAction[];
|
||||
recommendedScheduleCron?: string | null;
|
||||
doRun?: (
|
||||
|
||||
@@ -2,8 +2,8 @@
|
||||
import React, { useCallback, useMemo } from "react";
|
||||
|
||||
import {
|
||||
Graph,
|
||||
GraphExecutionID,
|
||||
GraphMeta,
|
||||
Schedule,
|
||||
ScheduleID,
|
||||
} from "@/lib/autogpt-server-api";
|
||||
@@ -35,7 +35,7 @@ export function AgentScheduleDetailsView({
|
||||
onForcedRun,
|
||||
doDeleteSchedule,
|
||||
}: {
|
||||
graph: GraphMeta;
|
||||
graph: Graph;
|
||||
schedule: Schedule;
|
||||
agentActions: ButtonAction[];
|
||||
onForcedRun: (runID: GraphExecutionID) => void;
|
||||
|
||||
@@ -5629,7 +5629,9 @@
|
||||
"description": "Successful Response",
|
||||
"content": {
|
||||
"application/json": {
|
||||
"schema": { "$ref": "#/components/schemas/GraphMeta" }
|
||||
"schema": {
|
||||
"$ref": "#/components/schemas/GraphModelWithoutNodes"
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
@@ -6495,18 +6497,6 @@
|
||||
"anyOf": [{ "type": "string" }, { "type": "null" }],
|
||||
"title": "Recommended Schedule Cron"
|
||||
},
|
||||
"nodes": {
|
||||
"items": { "$ref": "#/components/schemas/Node" },
|
||||
"type": "array",
|
||||
"title": "Nodes",
|
||||
"default": []
|
||||
},
|
||||
"links": {
|
||||
"items": { "$ref": "#/components/schemas/Link" },
|
||||
"type": "array",
|
||||
"title": "Links",
|
||||
"default": []
|
||||
},
|
||||
"forked_from_id": {
|
||||
"anyOf": [{ "type": "string" }, { "type": "null" }],
|
||||
"title": "Forked From Id"
|
||||
@@ -6514,11 +6504,22 @@
|
||||
"forked_from_version": {
|
||||
"anyOf": [{ "type": "integer" }, { "type": "null" }],
|
||||
"title": "Forked From Version"
|
||||
},
|
||||
"nodes": {
|
||||
"items": { "$ref": "#/components/schemas/Node" },
|
||||
"type": "array",
|
||||
"title": "Nodes"
|
||||
},
|
||||
"links": {
|
||||
"items": { "$ref": "#/components/schemas/Link" },
|
||||
"type": "array",
|
||||
"title": "Links"
|
||||
}
|
||||
},
|
||||
"type": "object",
|
||||
"required": ["name", "description"],
|
||||
"title": "BaseGraph"
|
||||
"title": "BaseGraph",
|
||||
"description": "Graph with nodes, links, and computed I/O schema fields.\n\nUsed to represent sub-graphs within a `Graph`. Contains the full graph\nstructure including nodes and links, plus computed fields for schemas\nand trigger info. Does NOT include user_id or created_at (see GraphModel)."
|
||||
},
|
||||
"BaseGraph-Output": {
|
||||
"properties": {
|
||||
@@ -6539,18 +6540,6 @@
|
||||
"anyOf": [{ "type": "string" }, { "type": "null" }],
|
||||
"title": "Recommended Schedule Cron"
|
||||
},
|
||||
"nodes": {
|
||||
"items": { "$ref": "#/components/schemas/Node" },
|
||||
"type": "array",
|
||||
"title": "Nodes",
|
||||
"default": []
|
||||
},
|
||||
"links": {
|
||||
"items": { "$ref": "#/components/schemas/Link" },
|
||||
"type": "array",
|
||||
"title": "Links",
|
||||
"default": []
|
||||
},
|
||||
"forked_from_id": {
|
||||
"anyOf": [{ "type": "string" }, { "type": "null" }],
|
||||
"title": "Forked From Id"
|
||||
@@ -6559,6 +6548,16 @@
|
||||
"anyOf": [{ "type": "integer" }, { "type": "null" }],
|
||||
"title": "Forked From Version"
|
||||
},
|
||||
"nodes": {
|
||||
"items": { "$ref": "#/components/schemas/Node" },
|
||||
"type": "array",
|
||||
"title": "Nodes"
|
||||
},
|
||||
"links": {
|
||||
"items": { "$ref": "#/components/schemas/Link" },
|
||||
"type": "array",
|
||||
"title": "Links"
|
||||
},
|
||||
"input_schema": {
|
||||
"additionalProperties": true,
|
||||
"type": "object",
|
||||
@@ -6605,7 +6604,8 @@
|
||||
"has_sensitive_action",
|
||||
"trigger_setup_info"
|
||||
],
|
||||
"title": "BaseGraph"
|
||||
"title": "BaseGraph",
|
||||
"description": "Graph with nodes, links, and computed I/O schema fields.\n\nUsed to represent sub-graphs within a `Graph`. Contains the full graph\nstructure including nodes and links, plus computed fields for schemas\nand trigger info. Does NOT include user_id or created_at (see GraphModel)."
|
||||
},
|
||||
"BlockCategoryResponse": {
|
||||
"properties": {
|
||||
@@ -7399,18 +7399,6 @@
|
||||
"anyOf": [{ "type": "string" }, { "type": "null" }],
|
||||
"title": "Recommended Schedule Cron"
|
||||
},
|
||||
"nodes": {
|
||||
"items": { "$ref": "#/components/schemas/Node" },
|
||||
"type": "array",
|
||||
"title": "Nodes",
|
||||
"default": []
|
||||
},
|
||||
"links": {
|
||||
"items": { "$ref": "#/components/schemas/Link" },
|
||||
"type": "array",
|
||||
"title": "Links",
|
||||
"default": []
|
||||
},
|
||||
"forked_from_id": {
|
||||
"anyOf": [{ "type": "string" }, { "type": "null" }],
|
||||
"title": "Forked From Id"
|
||||
@@ -7419,16 +7407,26 @@
|
||||
"anyOf": [{ "type": "integer" }, { "type": "null" }],
|
||||
"title": "Forked From Version"
|
||||
},
|
||||
"nodes": {
|
||||
"items": { "$ref": "#/components/schemas/Node" },
|
||||
"type": "array",
|
||||
"title": "Nodes"
|
||||
},
|
||||
"links": {
|
||||
"items": { "$ref": "#/components/schemas/Link" },
|
||||
"type": "array",
|
||||
"title": "Links"
|
||||
},
|
||||
"sub_graphs": {
|
||||
"items": { "$ref": "#/components/schemas/BaseGraph-Input" },
|
||||
"type": "array",
|
||||
"title": "Sub Graphs",
|
||||
"default": []
|
||||
"title": "Sub Graphs"
|
||||
}
|
||||
},
|
||||
"type": "object",
|
||||
"required": ["name", "description"],
|
||||
"title": "Graph"
|
||||
"title": "Graph",
|
||||
"description": "Creatable graph model used in API create/update endpoints."
|
||||
},
|
||||
"GraphExecution": {
|
||||
"properties": {
|
||||
@@ -7778,6 +7776,52 @@
|
||||
"description": "Response schema for paginated graph executions."
|
||||
},
|
||||
"GraphMeta": {
|
||||
"properties": {
|
||||
"id": { "type": "string", "title": "Id" },
|
||||
"version": { "type": "integer", "title": "Version" },
|
||||
"is_active": {
|
||||
"type": "boolean",
|
||||
"title": "Is Active",
|
||||
"default": true
|
||||
},
|
||||
"name": { "type": "string", "title": "Name" },
|
||||
"description": { "type": "string", "title": "Description" },
|
||||
"instructions": {
|
||||
"anyOf": [{ "type": "string" }, { "type": "null" }],
|
||||
"title": "Instructions"
|
||||
},
|
||||
"recommended_schedule_cron": {
|
||||
"anyOf": [{ "type": "string" }, { "type": "null" }],
|
||||
"title": "Recommended Schedule Cron"
|
||||
},
|
||||
"forked_from_id": {
|
||||
"anyOf": [{ "type": "string" }, { "type": "null" }],
|
||||
"title": "Forked From Id"
|
||||
},
|
||||
"forked_from_version": {
|
||||
"anyOf": [{ "type": "integer" }, { "type": "null" }],
|
||||
"title": "Forked From Version"
|
||||
},
|
||||
"user_id": { "type": "string", "title": "User Id" },
|
||||
"created_at": {
|
||||
"type": "string",
|
||||
"format": "date-time",
|
||||
"title": "Created At"
|
||||
}
|
||||
},
|
||||
"type": "object",
|
||||
"required": [
|
||||
"id",
|
||||
"version",
|
||||
"name",
|
||||
"description",
|
||||
"user_id",
|
||||
"created_at"
|
||||
],
|
||||
"title": "GraphMeta",
|
||||
"description": "Lightweight graph metadata model representing an existing graph from the database,\nfor use in listings and summaries.\n\nLacks `GraphModel`'s nodes, links, and expensive computed fields.\nUse for list endpoints where full graph data is not needed and performance matters."
|
||||
},
|
||||
"GraphModel": {
|
||||
"properties": {
|
||||
"id": { "type": "string", "title": "Id" },
|
||||
"version": { "type": "integer", "title": "Version", "default": 1 },
|
||||
@@ -7804,13 +7848,27 @@
|
||||
"anyOf": [{ "type": "integer" }, { "type": "null" }],
|
||||
"title": "Forked From Version"
|
||||
},
|
||||
"user_id": { "type": "string", "title": "User Id" },
|
||||
"created_at": {
|
||||
"type": "string",
|
||||
"format": "date-time",
|
||||
"title": "Created At"
|
||||
},
|
||||
"nodes": {
|
||||
"items": { "$ref": "#/components/schemas/NodeModel" },
|
||||
"type": "array",
|
||||
"title": "Nodes"
|
||||
},
|
||||
"links": {
|
||||
"items": { "$ref": "#/components/schemas/Link" },
|
||||
"type": "array",
|
||||
"title": "Links"
|
||||
},
|
||||
"sub_graphs": {
|
||||
"items": { "$ref": "#/components/schemas/BaseGraph-Output" },
|
||||
"type": "array",
|
||||
"title": "Sub Graphs",
|
||||
"default": []
|
||||
"title": "Sub Graphs"
|
||||
},
|
||||
"user_id": { "type": "string", "title": "User Id" },
|
||||
"input_schema": {
|
||||
"additionalProperties": true,
|
||||
"type": "object",
|
||||
@@ -7857,6 +7915,7 @@
|
||||
"name",
|
||||
"description",
|
||||
"user_id",
|
||||
"created_at",
|
||||
"input_schema",
|
||||
"output_schema",
|
||||
"has_external_trigger",
|
||||
@@ -7865,9 +7924,10 @@
|
||||
"trigger_setup_info",
|
||||
"credentials_input_schema"
|
||||
],
|
||||
"title": "GraphMeta"
|
||||
"title": "GraphModel",
|
||||
"description": "Full graph model representing an existing graph from the database.\n\nThis is the primary model for working with persisted graphs. Includes all\ngraph data (nodes, links, sub_graphs) plus user ownership and timestamps.\nProvides computed fields (input_schema, output_schema, etc.) used during\nset-up (frontend) and execution (backend).\n\nInherits from:\n- `Graph`: provides structure (nodes, links, sub_graphs) and computed schemas\n- `GraphMeta`: provides user_id, created_at for database records"
|
||||
},
|
||||
"GraphModel": {
|
||||
"GraphModelWithoutNodes": {
|
||||
"properties": {
|
||||
"id": { "type": "string", "title": "Id" },
|
||||
"version": { "type": "integer", "title": "Version", "default": 1 },
|
||||
@@ -7886,18 +7946,6 @@
|
||||
"anyOf": [{ "type": "string" }, { "type": "null" }],
|
||||
"title": "Recommended Schedule Cron"
|
||||
},
|
||||
"nodes": {
|
||||
"items": { "$ref": "#/components/schemas/NodeModel" },
|
||||
"type": "array",
|
||||
"title": "Nodes",
|
||||
"default": []
|
||||
},
|
||||
"links": {
|
||||
"items": { "$ref": "#/components/schemas/Link" },
|
||||
"type": "array",
|
||||
"title": "Links",
|
||||
"default": []
|
||||
},
|
||||
"forked_from_id": {
|
||||
"anyOf": [{ "type": "string" }, { "type": "null" }],
|
||||
"title": "Forked From Id"
|
||||
@@ -7906,12 +7954,6 @@
|
||||
"anyOf": [{ "type": "integer" }, { "type": "null" }],
|
||||
"title": "Forked From Version"
|
||||
},
|
||||
"sub_graphs": {
|
||||
"items": { "$ref": "#/components/schemas/BaseGraph-Output" },
|
||||
"type": "array",
|
||||
"title": "Sub Graphs",
|
||||
"default": []
|
||||
},
|
||||
"user_id": { "type": "string", "title": "User Id" },
|
||||
"created_at": {
|
||||
"type": "string",
|
||||
@@ -7973,7 +8015,8 @@
|
||||
"trigger_setup_info",
|
||||
"credentials_input_schema"
|
||||
],
|
||||
"title": "GraphModel"
|
||||
"title": "GraphModelWithoutNodes",
|
||||
"description": "GraphModel variant that excludes nodes, links, and sub-graphs from serialization.\n\nUsed in contexts like the store where exposing internal graph structure\nis not desired. Inherits all computed fields from GraphModel but marks\nnodes and links as excluded from JSON output."
|
||||
},
|
||||
"GraphSettings": {
|
||||
"properties": {
|
||||
@@ -8613,26 +8656,22 @@
|
||||
"input_default": {
|
||||
"additionalProperties": true,
|
||||
"type": "object",
|
||||
"title": "Input Default",
|
||||
"default": {}
|
||||
"title": "Input Default"
|
||||
},
|
||||
"metadata": {
|
||||
"additionalProperties": true,
|
||||
"type": "object",
|
||||
"title": "Metadata",
|
||||
"default": {}
|
||||
"title": "Metadata"
|
||||
},
|
||||
"input_links": {
|
||||
"items": { "$ref": "#/components/schemas/Link" },
|
||||
"type": "array",
|
||||
"title": "Input Links",
|
||||
"default": []
|
||||
"title": "Input Links"
|
||||
},
|
||||
"output_links": {
|
||||
"items": { "$ref": "#/components/schemas/Link" },
|
||||
"type": "array",
|
||||
"title": "Output Links",
|
||||
"default": []
|
||||
"title": "Output Links"
|
||||
}
|
||||
},
|
||||
"type": "object",
|
||||
@@ -8712,26 +8751,22 @@
|
||||
"input_default": {
|
||||
"additionalProperties": true,
|
||||
"type": "object",
|
||||
"title": "Input Default",
|
||||
"default": {}
|
||||
"title": "Input Default"
|
||||
},
|
||||
"metadata": {
|
||||
"additionalProperties": true,
|
||||
"type": "object",
|
||||
"title": "Metadata",
|
||||
"default": {}
|
||||
"title": "Metadata"
|
||||
},
|
||||
"input_links": {
|
||||
"items": { "$ref": "#/components/schemas/Link" },
|
||||
"type": "array",
|
||||
"title": "Input Links",
|
||||
"default": []
|
||||
"title": "Input Links"
|
||||
},
|
||||
"output_links": {
|
||||
"items": { "$ref": "#/components/schemas/Link" },
|
||||
"type": "array",
|
||||
"title": "Output Links",
|
||||
"default": []
|
||||
"title": "Output Links"
|
||||
},
|
||||
"graph_id": { "type": "string", "title": "Graph Id" },
|
||||
"graph_version": { "type": "integer", "title": "Graph Version" },
|
||||
@@ -12272,7 +12307,9 @@
|
||||
"title": "Location"
|
||||
},
|
||||
"msg": { "type": "string", "title": "Message" },
|
||||
"type": { "type": "string", "title": "Error Type" }
|
||||
"type": { "type": "string", "title": "Error Type" },
|
||||
"input": { "title": "Input" },
|
||||
"ctx": { "type": "object", "title": "Context" }
|
||||
},
|
||||
"type": "object",
|
||||
"required": ["loc", "msg", "type"],
|
||||
|
||||
@@ -102,18 +102,6 @@ export function ChatMessage({
|
||||
}
|
||||
}
|
||||
|
||||
function handleClarificationAnswers(answers: Record<string, string>) {
|
||||
if (onSendMessage) {
|
||||
const contextMessage = Object.entries(answers)
|
||||
.map(([keyword, answer]) => `${keyword}: ${answer}`)
|
||||
.join("\n");
|
||||
|
||||
onSendMessage(
|
||||
`I have the answers to your questions:\n\n${contextMessage}\n\nPlease proceed with creating the agent.`,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
const handleCopy = useCallback(
|
||||
async function handleCopy() {
|
||||
if (message.type !== "message") return;
|
||||
@@ -162,6 +150,22 @@ export function ChatMessage({
|
||||
.slice(index + 1)
|
||||
.some((m) => m.type === "message" && m.role === "user");
|
||||
|
||||
const handleClarificationAnswers = (answers: Record<string, string>) => {
|
||||
if (onSendMessage) {
|
||||
// Iterate over questions (preserves original order) instead of answers
|
||||
const contextMessage = message.questions
|
||||
.map((q) => {
|
||||
const answer = answers[q.keyword] || "";
|
||||
return `> ${q.question}\n\n${answer}`;
|
||||
})
|
||||
.join("\n\n");
|
||||
|
||||
onSendMessage(
|
||||
`**Here are my answers:**\n\n${contextMessage}\n\nPlease proceed with creating the agent.`,
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
return (
|
||||
<ClarificationQuestionsWidget
|
||||
questions={message.questions}
|
||||
|
||||
@@ -19,13 +19,13 @@ export function ThinkingMessage({ className }: ThinkingMessageProps) {
|
||||
if (timerRef.current === null) {
|
||||
timerRef.current = setTimeout(() => {
|
||||
setShowSlowLoader(true);
|
||||
}, 3000);
|
||||
}, 8000);
|
||||
}
|
||||
|
||||
if (coffeeTimerRef.current === null) {
|
||||
coffeeTimerRef.current = setTimeout(() => {
|
||||
setShowCoffeeMessage(true);
|
||||
}, 8000);
|
||||
}, 10000);
|
||||
}
|
||||
|
||||
return () => {
|
||||
|
||||
@@ -362,25 +362,14 @@ export type GraphMeta = {
|
||||
user_id: UserID;
|
||||
version: number;
|
||||
is_active: boolean;
|
||||
created_at: Date;
|
||||
name: string;
|
||||
description: string;
|
||||
instructions?: string | null;
|
||||
recommended_schedule_cron: string | null;
|
||||
forked_from_id?: GraphID | null;
|
||||
forked_from_version?: number | null;
|
||||
input_schema: GraphInputSchema;
|
||||
output_schema: GraphOutputSchema;
|
||||
credentials_input_schema: CredentialsInputSchema;
|
||||
} & (
|
||||
| {
|
||||
has_external_trigger: true;
|
||||
trigger_setup_info: GraphTriggerInfo;
|
||||
}
|
||||
| {
|
||||
has_external_trigger: false;
|
||||
trigger_setup_info: null;
|
||||
}
|
||||
);
|
||||
};
|
||||
|
||||
export type GraphID = Brand<string, "GraphID">;
|
||||
|
||||
@@ -447,11 +436,22 @@ export type GraphTriggerInfo = {
|
||||
|
||||
/* Mirror of backend/data/graph.py:Graph */
|
||||
export type Graph = GraphMeta & {
|
||||
created_at: Date;
|
||||
nodes: Node[];
|
||||
links: Link[];
|
||||
sub_graphs: Omit<Graph, "sub_graphs">[]; // Flattened sub-graphs
|
||||
};
|
||||
input_schema: GraphInputSchema;
|
||||
output_schema: GraphOutputSchema;
|
||||
credentials_input_schema: CredentialsInputSchema;
|
||||
} & (
|
||||
| {
|
||||
has_external_trigger: true;
|
||||
trigger_setup_info: GraphTriggerInfo;
|
||||
}
|
||||
| {
|
||||
has_external_trigger: false;
|
||||
trigger_setup_info: null;
|
||||
}
|
||||
);
|
||||
|
||||
export type GraphUpdateable = Omit<
|
||||
Graph,
|
||||
|
||||
Reference in New Issue
Block a user