Compare commits

..

4 Commits

Author SHA1 Message Date
Nicholas Tindle
f4f81bc4fc fix(backend): Remove _credentials_id key on fork instead of setting to None
Setting _credentials_id to None on fork was ambiguous — both "forked,
needs re-auth" and "chained data from upstream" were represented as None.
This caused _acquire_auto_credentials to silently skip credential
acquisition for forked agents, leading to confusing TypeErrors at runtime.

Now the key is deleted entirely, making the three states unambiguous:
- Present with value: user-selected credentials
- Present as None: chained data from upstream block
- Absent: forked/needs re-authentication

Also adds pre-run validation for the missing key case and makes error
messages provider-agnostic.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-06 17:34:16 -06:00
Nicholas Tindle
c5abc01f25 fix(backend): Add error handling for auto-credentials store lookup
Wrap get_creds_by_id call in try/except in the auto-credentials
validation path to match the error handling pattern used for regular
credentials.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-06 16:53:29 -06:00
Nicholas Tindle
8b7053c1de merge: Resolve conflicts with dev (PR #11986 graph model refactor)
Adapt auto-credentials filtering to dev's refactored graph model:
- aggregate_credentials_inputs() now returns 3-tuples (field_info, node_pairs, is_required)
- credentials_input_schema moved to GraphModel, builds JSON schema directly
- Update regular/auto_credentials_inputs properties for 3-tuple format
- Update test mocks and assertions for new tuple format and class hierarchy

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-06 16:39:57 -06:00
Nicholas Tindle
e00c1202ad fix(platform): Fix Google Drive auto-credentials handling across the platform
- Tag auto-credentials with `is_auto_credential` and `input_field_name` on `CredentialsFieldInfo` to distinguish them from regular user-provided credentials
- Add `regular_credentials_inputs` and `auto_credentials_inputs` properties to `Graph` so UI schemas, CoPilot, and library presets only surface regular credentials
- Extract `_acquire_auto_credentials()` helper in executor to resolve embedded `_credentials_id` at execution time with proper lock management
- Validate auto-credentials ownership in `_validate_node_input_credentials()` to catch stale/missing credentials before execution
- Clear `_credentials_id` in `_reassign_ids()` on graph fork so cloned agents require re-authentication
- Propagate `is_auto_credential` through `combine()` and `discriminate()` on `CredentialsFieldInfo`
- Add `referrerPolicy: "no-referrer-when-downgrade"` to Google API script loading to fix Firefox API key validation
- Comprehensive test coverage for all new behavior

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-06 16:08:53 -06:00
35 changed files with 4528 additions and 6891 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -11,15 +11,15 @@ python = ">=3.10,<4.0"
colorama = "^0.4.6" colorama = "^0.4.6"
cryptography = "^45.0" cryptography = "^45.0"
expiringdict = "^1.2.2" expiringdict = "^1.2.2"
fastapi = "^0.128.0" fastapi = "^0.116.1"
google-cloud-logging = "^3.13.0" google-cloud-logging = "^3.12.1"
launchdarkly-server-sdk = "^9.14.1" launchdarkly-server-sdk = "^9.12.0"
pydantic = "^2.12.5" pydantic = "^2.11.7"
pydantic-settings = "^2.12.0" pydantic-settings = "^2.10.1"
pyjwt = { version = "^2.11.0", extras = ["crypto"] } pyjwt = { version = "^2.10.1", extras = ["crypto"] }
redis = "^6.2.0" redis = "^6.2.0"
supabase = "^2.27.2" supabase = "^2.16.0"
uvicorn = "^0.40.0" uvicorn = "^0.35.0"
[tool.poetry.group.dev.dependencies] [tool.poetry.group.dev.dependencies]
pyright = "^1.1.404" pyright = "^1.1.404"

View File

@@ -27,20 +27,12 @@ class ChatConfig(BaseSettings):
session_ttl: int = Field(default=43200, description="Session TTL in seconds") session_ttl: int = Field(default=43200, description="Session TTL in seconds")
# Streaming Configuration # Streaming Configuration
# Note: When using Claude Agent SDK, context management is handled automatically
# via the SDK's built-in compaction. This is mainly used for the fallback path.
max_context_messages: int = Field( max_context_messages: int = Field(
default=100, default=50, ge=1, le=200, description="Maximum context messages"
ge=1,
le=500,
description="Max context messages (SDK handles compaction automatically)",
) )
stream_timeout: int = Field(default=300, description="Stream timeout in seconds") stream_timeout: int = Field(default=300, description="Stream timeout in seconds")
max_retries: int = Field( max_retries: int = Field(default=3, description="Maximum number of retries")
default=3,
description="Max retries for fallback path (SDK handles retries internally)",
)
max_agent_runs: int = Field(default=30, description="Maximum number of agent runs") max_agent_runs: int = Field(default=30, description="Maximum number of agent runs")
max_agent_schedules: int = Field( max_agent_schedules: int = Field(
default=30, description="Maximum number of agent schedules" default=30, description="Maximum number of agent schedules"
@@ -101,12 +93,6 @@ class ChatConfig(BaseSettings):
description="Name of the prompt in Langfuse to fetch", description="Name of the prompt in Langfuse to fetch",
) )
# Claude Agent SDK Configuration
use_claude_agent_sdk: bool = Field(
default=True,
description="Use Claude Agent SDK for chat completions",
)
@field_validator("api_key", mode="before") @field_validator("api_key", mode="before")
@classmethod @classmethod
def get_api_key(cls, v): def get_api_key(cls, v):
@@ -146,17 +132,6 @@ class ChatConfig(BaseSettings):
v = os.getenv("CHAT_INTERNAL_API_KEY") v = os.getenv("CHAT_INTERNAL_API_KEY")
return v return v
@field_validator("use_claude_agent_sdk", mode="before")
@classmethod
def get_use_claude_agent_sdk(cls, v):
"""Get use_claude_agent_sdk from environment if not provided."""
# Check environment variable - default to True if not set
env_val = os.getenv("CHAT_USE_CLAUDE_AGENT_SDK", "").lower()
if env_val:
return env_val in ("true", "1", "yes", "on")
# Default to True (SDK enabled by default)
return True if v is None else v
# Prompt paths for different contexts # Prompt paths for different contexts
PROMPT_PATHS: dict[str, str] = { PROMPT_PATHS: dict[str, str] = {
"default": "prompts/chat_system.md", "default": "prompts/chat_system.md",

View File

@@ -273,8 +273,9 @@ async def _get_session_from_cache(session_id: str) -> ChatSession | None:
try: try:
session = ChatSession.model_validate_json(raw_session) session = ChatSession.model_validate_json(raw_session)
logger.info( logger.info(
f"[CACHE] Loaded session {session_id}: {len(session.messages)} messages, " f"Loading session {session_id} from cache: "
f"last_roles={[m.role for m in session.messages[-3:]]}" # Last 3 roles f"message_count={len(session.messages)}, "
f"roles={[m.role for m in session.messages]}"
) )
return session return session
except Exception as e: except Exception as e:
@@ -316,9 +317,11 @@ async def _get_session_from_db(session_id: str) -> ChatSession | None:
return None return None
messages = prisma_session.Messages messages = prisma_session.Messages
logger.debug( logger.info(
f"[DB] Loaded session {session_id}: {len(messages) if messages else 0} messages, " f"Loading session {session_id} from DB: "
f"roles={[m.role for m in messages[-3:]] if messages else []}" # Last 3 roles f"has_messages={messages is not None}, "
f"message_count={len(messages) if messages else 0}, "
f"roles={[m.role for m in messages] if messages else []}"
) )
return ChatSession.from_db(prisma_session, messages) return ChatSession.from_db(prisma_session, messages)
@@ -369,9 +372,10 @@ async def _save_session_to_db(
"function_call": msg.function_call, "function_call": msg.function_call,
} }
) )
logger.debug( logger.info(
f"[DB] Saving {len(new_messages)} messages to session {session.session_id}, " f"Saving {len(new_messages)} new messages to DB for session {session.session_id}: "
f"roles={[m['role'] for m in messages_data]}" f"roles={[m['role'] for m in messages_data]}, "
f"start_sequence={existing_message_count}"
) )
await chat_db.add_chat_messages_batch( await chat_db.add_chat_messages_batch(
session_id=session.session_id, session_id=session.session_id,
@@ -411,7 +415,7 @@ async def get_chat_session(
logger.warning(f"Unexpected cache error for session {session_id}: {e}") logger.warning(f"Unexpected cache error for session {session_id}: {e}")
# Fall back to database # Fall back to database
logger.debug(f"Session {session_id} not in cache, checking database") logger.info(f"Session {session_id} not in cache, checking database")
session = await _get_session_from_db(session_id) session = await _get_session_from_db(session_id)
if session is None: if session is None:
@@ -428,6 +432,7 @@ async def get_chat_session(
# Cache the session from DB # Cache the session from DB
try: try:
await _cache_session(session) await _cache_session(session)
logger.info(f"Cached session {session_id} from database")
except Exception as e: except Exception as e:
logger.warning(f"Failed to cache session {session_id}: {e}") logger.warning(f"Failed to cache session {session_id}: {e}")
@@ -598,19 +603,13 @@ async def update_session_title(session_id: str, title: str) -> bool:
logger.warning(f"Session {session_id} not found for title update") logger.warning(f"Session {session_id} not found for title update")
return False return False
# Update title in cache if it exists (instead of invalidating). # Invalidate cache so next fetch gets updated title
# This prevents race conditions where cache invalidation causes
# the frontend to see stale DB data while streaming is still in progress.
try: try:
cached = await _get_session_from_cache(session_id) redis_key = _get_session_cache_key(session_id)
if cached: async_redis = await get_redis_async()
cached.title = title await async_redis.delete(redis_key)
await _cache_session(cached)
except Exception as e: except Exception as e:
# Not critical - title will be correct on next full cache refresh logger.warning(f"Failed to invalidate cache for session {session_id}: {e}")
logger.warning(
f"Failed to update title in cache for session {session_id}: {e}"
)
return True return True
except Exception as e: except Exception as e:

View File

@@ -1,6 +1,5 @@
"""Chat API routes for chat session management and streaming via SSE.""" """Chat API routes for chat session management and streaming via SSE."""
import asyncio
import logging import logging
import uuid as uuid_module import uuid as uuid_module
from collections.abc import AsyncGenerator from collections.abc import AsyncGenerator
@@ -17,17 +16,8 @@ from . import service as chat_service
from . import stream_registry from . import stream_registry
from .completion_handler import process_operation_failure, process_operation_success from .completion_handler import process_operation_failure, process_operation_success
from .config import ChatConfig from .config import ChatConfig
from .model import ( from .model import ChatSession, create_chat_session, get_chat_session, get_user_sessions
ChatMessage,
ChatSession,
create_chat_session,
get_chat_session,
get_user_sessions,
upsert_chat_session,
)
from .response_model import StreamFinish, StreamHeartbeat, StreamStart from .response_model import StreamFinish, StreamHeartbeat, StreamStart
from .sdk import service as sdk_service
from .tracking import track_user_message
config = ChatConfig() config = ChatConfig()
@@ -219,10 +209,6 @@ async def get_session(
active_task, last_message_id = await stream_registry.get_active_task_for_session( active_task, last_message_id = await stream_registry.get_active_task_for_session(
session_id, user_id session_id, user_id
) )
logger.info(
f"[GET_SESSION] session={session_id}, active_task={active_task is not None}, "
f"msg_count={len(messages)}, last_role={messages[-1].get('role') if messages else 'none'}"
)
if active_task: if active_task:
# Filter out the in-progress assistant message from the session response. # Filter out the in-progress assistant message from the session response.
# The client will receive the complete assistant response through the SSE # The client will receive the complete assistant response through the SSE
@@ -279,29 +265,9 @@ async def stream_chat_post(
containing the task_id for reconnection. containing the task_id for reconnection.
""" """
session = await _validate_and_get_session(session_id, user_id) import asyncio
# Add user message to session BEFORE creating task to avoid race condition session = await _validate_and_get_session(session_id, user_id)
# where GET_SESSION sees the task as "running" but the message isn't saved yet
if request.message:
session.messages.append(
ChatMessage(
role="user" if request.is_user_message else "assistant",
content=request.message,
)
)
if request.is_user_message:
track_user_message(
user_id=user_id,
session_id=session_id,
message_length=len(request.message),
)
logger.info(
f"[STREAM] Saving user message to session {session_id}, "
f"msg_count={len(session.messages)}"
)
session = await upsert_chat_session(session)
logger.info(f"[STREAM] User message saved for session {session_id}")
# Create a task in the stream registry for reconnection support # Create a task in the stream registry for reconnection support
task_id = str(uuid_module.uuid4()) task_id = str(uuid_module.uuid4())
@@ -317,38 +283,24 @@ async def stream_chat_post(
# Background task that runs the AI generation independently of SSE connection # Background task that runs the AI generation independently of SSE connection
async def run_ai_generation(): async def run_ai_generation():
chunk_count = 0
try: try:
# Emit a start event with task_id for reconnection # Emit a start event with task_id for reconnection
start_chunk = StreamStart(messageId=task_id, taskId=task_id) start_chunk = StreamStart(messageId=task_id, taskId=task_id)
await stream_registry.publish_chunk(task_id, start_chunk) await stream_registry.publish_chunk(task_id, start_chunk)
# Choose service based on configuration async for chunk in chat_service.stream_chat_completion(
use_sdk = config.use_claude_agent_sdk
stream_fn = (
sdk_service.stream_chat_completion_sdk
if use_sdk
else chat_service.stream_chat_completion
)
# Pass message=None since we already added it to the session above
async for chunk in stream_fn(
session_id, session_id,
None, # Message already in session request.message,
is_user_message=request.is_user_message, is_user_message=request.is_user_message,
user_id=user_id, user_id=user_id,
session=session, # Pass session with message already added session=session, # Pass pre-fetched session to avoid double-fetch
context=request.context, context=request.context,
): ):
chunk_count += 1
# Write to Redis (subscribers will receive via XREAD) # Write to Redis (subscribers will receive via XREAD)
await stream_registry.publish_chunk(task_id, chunk) await stream_registry.publish_chunk(task_id, chunk)
logger.info( # Mark task as completed
f"[BG_TASK] AI generation completed for session {session_id}: {chunk_count} chunks, marking task {task_id} as completed" await stream_registry.mark_task_completed(task_id, "completed")
)
# Mark task as completed (also publishes StreamFinish)
completed = await stream_registry.mark_task_completed(task_id, "completed")
logger.info(f"[BG_TASK] mark_task_completed returned: {completed}")
except Exception as e: except Exception as e:
logger.error( logger.error(
f"Error in background AI generation for session {session_id}: {e}" f"Error in background AI generation for session {session_id}: {e}"
@@ -363,7 +315,7 @@ async def stream_chat_post(
async def event_generator() -> AsyncGenerator[str, None]: async def event_generator() -> AsyncGenerator[str, None]:
subscriber_queue = None subscriber_queue = None
try: try:
# Subscribe to the task stream (replays + live updates) # Subscribe to the task stream (this replays existing messages + live updates)
subscriber_queue = await stream_registry.subscribe_to_task( subscriber_queue = await stream_registry.subscribe_to_task(
task_id=task_id, task_id=task_id,
user_id=user_id, user_id=user_id,
@@ -371,7 +323,6 @@ async def stream_chat_post(
) )
if subscriber_queue is None: if subscriber_queue is None:
logger.warning(f"Failed to subscribe to task {task_id}")
yield StreamFinish().to_sse() yield StreamFinish().to_sse()
yield "data: [DONE]\n\n" yield "data: [DONE]\n\n"
return return
@@ -390,11 +341,11 @@ async def stream_chat_post(
yield StreamHeartbeat().to_sse() yield StreamHeartbeat().to_sse()
except GeneratorExit: except GeneratorExit:
pass # Client disconnected - normal behavior pass # Client disconnected - background task continues
except Exception as e: except Exception as e:
logger.error(f"Error in SSE stream for task {task_id}: {e}") logger.error(f"Error in SSE stream for task {task_id}: {e}")
finally: finally:
# Unsubscribe when client disconnects or stream ends # Unsubscribe when client disconnects or stream ends to prevent resource leak
if subscriber_queue is not None: if subscriber_queue is not None:
try: try:
await stream_registry.unsubscribe_from_task( await stream_registry.unsubscribe_from_task(
@@ -449,21 +400,35 @@ async def stream_chat_get(
session = await _validate_and_get_session(session_id, user_id) session = await _validate_and_get_session(session_id, user_id)
async def event_generator() -> AsyncGenerator[str, None]: async def event_generator() -> AsyncGenerator[str, None]:
# Choose service based on configuration chunk_count = 0
use_sdk = config.use_claude_agent_sdk first_chunk_type: str | None = None
stream_fn = ( async for chunk in chat_service.stream_chat_completion(
sdk_service.stream_chat_completion_sdk
if use_sdk
else chat_service.stream_chat_completion
)
async for chunk in stream_fn(
session_id, session_id,
message, message,
is_user_message=is_user_message, is_user_message=is_user_message,
user_id=user_id, user_id=user_id,
session=session, # Pass pre-fetched session to avoid double-fetch session=session, # Pass pre-fetched session to avoid double-fetch
): ):
if chunk_count < 3:
logger.info(
"Chat stream chunk",
extra={
"session_id": session_id,
"chunk_type": str(chunk.type),
},
)
if not first_chunk_type:
first_chunk_type = str(chunk.type)
chunk_count += 1
yield chunk.to_sse() yield chunk.to_sse()
logger.info(
"Chat stream completed",
extra={
"session_id": session_id,
"chunk_count": chunk_count,
"first_chunk_type": first_chunk_type,
},
)
# AI SDK protocol termination # AI SDK protocol termination
yield "data: [DONE]\n\n" yield "data: [DONE]\n\n"
@@ -585,6 +550,8 @@ async def stream_task(
) )
async def event_generator() -> AsyncGenerator[str, None]: async def event_generator() -> AsyncGenerator[str, None]:
import asyncio
heartbeat_interval = 15.0 # Send heartbeat every 15 seconds heartbeat_interval = 15.0 # Send heartbeat every 15 seconds
try: try:
while True: while True:

View File

@@ -1,14 +0,0 @@
"""Claude Agent SDK integration for CoPilot.
This module provides the integration layer between the Claude Agent SDK
and the existing CoPilot tool system, enabling drop-in replacement of
the current LLM orchestration with the battle-tested Claude Agent SDK.
"""
from .service import stream_chat_completion_sdk
from .tool_adapter import create_copilot_mcp_server
__all__ = [
"stream_chat_completion_sdk",
"create_copilot_mcp_server",
]

View File

@@ -1,348 +0,0 @@
"""Anthropic SDK fallback implementation.
This module provides the fallback streaming implementation using the Anthropic SDK
directly when the Claude Agent SDK is not available.
"""
import json
import logging
import os
import uuid
from collections.abc import AsyncGenerator
from typing import Any, cast
from ..model import ChatMessage, ChatSession
from ..response_model import (
StreamBaseResponse,
StreamError,
StreamFinish,
StreamTextDelta,
StreamTextEnd,
StreamTextStart,
StreamToolInputAvailable,
StreamToolInputStart,
StreamToolOutputAvailable,
StreamUsage,
)
from .tool_adapter import get_tool_definitions, get_tool_handlers
logger = logging.getLogger(__name__)
async def stream_with_anthropic(
session: ChatSession,
system_prompt: str,
text_block_id: str,
) -> AsyncGenerator[StreamBaseResponse, None]:
"""Stream using Anthropic SDK directly with tool calling support.
This function accumulates messages into the session for persistence.
The caller should NOT yield an additional StreamFinish - this function handles it.
"""
import anthropic
# Only use ANTHROPIC_API_KEY - don't fall back to OpenRouter keys
api_key = os.getenv("ANTHROPIC_API_KEY")
if not api_key:
yield StreamError(
errorText="ANTHROPIC_API_KEY not configured for fallback",
code="config_error",
)
yield StreamFinish()
return
client = anthropic.AsyncAnthropic(api_key=api_key)
tool_definitions = get_tool_definitions()
tool_handlers = get_tool_handlers()
anthropic_tools = [
{
"name": t["name"],
"description": t["description"],
"input_schema": t["inputSchema"],
}
for t in tool_definitions
]
anthropic_messages = _convert_session_to_anthropic(session)
if not anthropic_messages or anthropic_messages[-1]["role"] != "user":
anthropic_messages.append(
{"role": "user", "content": "Continue with the task."}
)
has_started_text = False
max_iterations = 10
accumulated_text = ""
accumulated_tool_calls: list[dict[str, Any]] = []
for _ in range(max_iterations):
try:
async with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=4096,
system=system_prompt,
messages=cast(Any, anthropic_messages),
tools=cast(Any, anthropic_tools) if anthropic_tools else [],
) as stream:
async for event in stream:
if event.type == "content_block_start":
block = event.content_block
if hasattr(block, "type"):
if block.type == "text" and not has_started_text:
yield StreamTextStart(id=text_block_id)
has_started_text = True
elif block.type == "tool_use":
yield StreamToolInputStart(
toolCallId=block.id, toolName=block.name
)
elif event.type == "content_block_delta":
delta = event.delta
if hasattr(delta, "type") and delta.type == "text_delta":
accumulated_text += delta.text
yield StreamTextDelta(id=text_block_id, delta=delta.text)
final_message = await stream.get_final_message()
if final_message.stop_reason == "tool_use":
if has_started_text:
yield StreamTextEnd(id=text_block_id)
has_started_text = False
text_block_id = str(uuid.uuid4())
tool_results = []
assistant_content: list[dict[str, Any]] = []
for block in final_message.content:
if block.type == "text":
assistant_content.append(
{"type": "text", "text": block.text}
)
elif block.type == "tool_use":
assistant_content.append(
{
"type": "tool_use",
"id": block.id,
"name": block.name,
"input": block.input,
}
)
# Track tool call for session persistence
accumulated_tool_calls.append(
{
"id": block.id,
"type": "function",
"function": {
"name": block.name,
"arguments": json.dumps(
block.input
if isinstance(block.input, dict)
else {}
),
},
}
)
yield StreamToolInputAvailable(
toolCallId=block.id,
toolName=block.name,
input=(
block.input if isinstance(block.input, dict) else {}
),
)
output, is_error = await _execute_tool(
block.name, block.input, tool_handlers
)
yield StreamToolOutputAvailable(
toolCallId=block.id,
toolName=block.name,
output=output,
success=not is_error,
)
# Save tool result to session
session.messages.append(
ChatMessage(
role="tool",
content=output,
tool_call_id=block.id,
)
)
tool_results.append(
{
"type": "tool_result",
"tool_use_id": block.id,
"content": output,
"is_error": is_error,
}
)
# Save assistant message with tool calls to session
session.messages.append(
ChatMessage(
role="assistant",
content=accumulated_text or None,
tool_calls=(
accumulated_tool_calls
if accumulated_tool_calls
else None
),
)
)
# Reset for next iteration
accumulated_text = ""
accumulated_tool_calls = []
anthropic_messages.append(
{"role": "assistant", "content": assistant_content}
)
anthropic_messages.append({"role": "user", "content": tool_results})
continue
else:
if has_started_text:
yield StreamTextEnd(id=text_block_id)
# Save final assistant response to session
if accumulated_text:
session.messages.append(
ChatMessage(role="assistant", content=accumulated_text)
)
yield StreamUsage(
promptTokens=final_message.usage.input_tokens,
completionTokens=final_message.usage.output_tokens,
totalTokens=final_message.usage.input_tokens
+ final_message.usage.output_tokens,
)
yield StreamFinish()
return
except Exception as e:
logger.error(f"[Anthropic Fallback] Error: {e}", exc_info=True)
yield StreamError(
errorText="An error occurred. Please try again.",
code="anthropic_error",
)
yield StreamFinish()
return
yield StreamError(errorText="Max tool iterations reached", code="max_iterations")
yield StreamFinish()
def _convert_session_to_anthropic(session: ChatSession) -> list[dict[str, Any]]:
"""Convert session messages to Anthropic format.
Handles merging consecutive same-role messages (Anthropic requires alternating roles).
"""
messages: list[dict[str, Any]] = []
for msg in session.messages:
if msg.role == "user":
new_msg = {"role": "user", "content": msg.content or ""}
elif msg.role == "assistant":
content: list[dict[str, Any]] = []
if msg.content:
content.append({"type": "text", "text": msg.content})
if msg.tool_calls:
for tc in msg.tool_calls:
func = tc.get("function", {})
args = func.get("arguments", {})
if isinstance(args, str):
try:
args = json.loads(args)
except json.JSONDecodeError:
args = {}
content.append(
{
"type": "tool_use",
"id": tc.get("id", str(uuid.uuid4())),
"name": func.get("name", ""),
"input": args,
}
)
if content:
new_msg = {"role": "assistant", "content": content}
else:
continue # Skip empty assistant messages
elif msg.role == "tool":
new_msg = {
"role": "user",
"content": [
{
"type": "tool_result",
"tool_use_id": msg.tool_call_id or "",
"content": msg.content or "",
}
],
}
else:
continue
messages.append(new_msg)
# Merge consecutive same-role messages (Anthropic requires alternating roles)
return _merge_consecutive_roles(messages)
def _merge_consecutive_roles(messages: list[dict[str, Any]]) -> list[dict[str, Any]]:
"""Merge consecutive messages with the same role.
Anthropic API requires alternating user/assistant roles.
"""
if not messages:
return []
merged: list[dict[str, Any]] = []
for msg in messages:
if merged and merged[-1]["role"] == msg["role"]:
# Merge with previous message
prev_content = merged[-1]["content"]
new_content = msg["content"]
# Normalize both to list-of-blocks form
if isinstance(prev_content, str):
prev_content = [{"type": "text", "text": prev_content}]
if isinstance(new_content, str):
new_content = [{"type": "text", "text": new_content}]
# Ensure both are lists
if not isinstance(prev_content, list):
prev_content = [prev_content]
if not isinstance(new_content, list):
new_content = [new_content]
merged[-1]["content"] = prev_content + new_content
else:
merged.append(msg)
return merged
async def _execute_tool(
tool_name: str, tool_input: Any, handlers: dict[str, Any]
) -> tuple[str, bool]:
"""Execute a tool and return (output, is_error)."""
handler = handlers.get(tool_name)
if not handler:
return f"Unknown tool: {tool_name}", True
try:
result = await handler(tool_input)
# Safely extract output - handle empty or missing content
content = result.get("content") or []
if content and isinstance(content, list) and len(content) > 0:
first_item = content[0]
output = first_item.get("text", "") if isinstance(first_item, dict) else ""
else:
output = ""
is_error = result.get("isError", False)
return output, is_error
except Exception as e:
return f"Error: {str(e)}", True

View File

@@ -1,311 +0,0 @@
"""Response adapter for converting Claude Agent SDK messages to Vercel AI SDK format.
This module provides the adapter layer that converts streaming messages from
the Claude Agent SDK into the Vercel AI SDK UI Stream Protocol format that
the frontend expects.
"""
import json
import logging
import uuid
from typing import Any, AsyncGenerator
from backend.api.features.chat.response_model import (
StreamBaseResponse,
StreamError,
StreamFinish,
StreamHeartbeat,
StreamStart,
StreamTextDelta,
StreamTextEnd,
StreamTextStart,
StreamToolInputAvailable,
StreamToolInputStart,
StreamToolOutputAvailable,
StreamUsage,
)
logger = logging.getLogger(__name__)
class SDKResponseAdapter:
"""Adapter for converting Claude Agent SDK messages to Vercel AI SDK format.
This class maintains state during a streaming session to properly track
text blocks, tool calls, and message lifecycle.
"""
def __init__(self, message_id: str | None = None):
"""Initialize the adapter.
Args:
message_id: Optional message ID. If not provided, one will be generated.
"""
self.message_id = message_id or str(uuid.uuid4())
self.text_block_id = str(uuid.uuid4())
self.has_started_text = False
self.has_ended_text = False
self.current_tool_calls: dict[str, dict[str, Any]] = {}
self.task_id: str | None = None
def set_task_id(self, task_id: str) -> None:
"""Set the task ID for reconnection support."""
self.task_id = task_id
def convert_message(self, sdk_message: Any) -> list[StreamBaseResponse]:
"""Convert a single SDK message to Vercel AI SDK format.
Args:
sdk_message: A message from the Claude Agent SDK.
Returns:
List of StreamBaseResponse objects (may be empty or multiple).
"""
responses: list[StreamBaseResponse] = []
# Handle different SDK message types - use class name since SDK uses dataclasses
class_name = type(sdk_message).__name__
msg_subtype = getattr(sdk_message, "subtype", None)
if class_name == "SystemMessage":
if msg_subtype == "init":
# Session initialization - emit start
responses.append(
StreamStart(
messageId=self.message_id,
taskId=self.task_id,
)
)
elif class_name == "AssistantMessage":
# Assistant message with content blocks
content = getattr(sdk_message, "content", [])
for block in content:
# Check block type by class name (SDK uses dataclasses) or dict type
block_class = type(block).__name__
block_type = block.get("type") if isinstance(block, dict) else None
if block_class == "TextBlock" or block_type == "text":
# Text content
text = getattr(block, "text", None) or (
block.get("text") if isinstance(block, dict) else ""
)
if text:
# Start text block if needed (or restart after tool calls)
if not self.has_started_text or self.has_ended_text:
# Generate new text block ID for text after tools
if self.has_ended_text:
self.text_block_id = str(uuid.uuid4())
self.has_ended_text = False
responses.append(StreamTextStart(id=self.text_block_id))
self.has_started_text = True
# Emit text delta
responses.append(
StreamTextDelta(
id=self.text_block_id,
delta=text,
)
)
elif block_class == "ToolUseBlock" or block_type == "tool_use":
# Tool call
tool_id_raw = getattr(block, "id", None) or (
block.get("id") if isinstance(block, dict) else None
)
tool_id: str = (
str(tool_id_raw) if tool_id_raw else str(uuid.uuid4())
)
tool_name_raw = getattr(block, "name", None) or (
block.get("name") if isinstance(block, dict) else None
)
tool_name: str = str(tool_name_raw) if tool_name_raw else "unknown"
tool_input = getattr(block, "input", None) or (
block.get("input") if isinstance(block, dict) else {}
)
# End text block if we were streaming text
if self.has_started_text and not self.has_ended_text:
responses.append(StreamTextEnd(id=self.text_block_id))
self.has_ended_text = True
# Emit tool input start
responses.append(
StreamToolInputStart(
toolCallId=tool_id,
toolName=tool_name,
)
)
# Emit tool input available with full input
responses.append(
StreamToolInputAvailable(
toolCallId=tool_id,
toolName=tool_name,
input=tool_input if isinstance(tool_input, dict) else {},
)
)
# Track the tool call
self.current_tool_calls[tool_id] = {
"name": tool_name,
"input": tool_input,
}
elif class_name in ("ToolResultMessage", "UserMessage"):
# Tool result - check for tool_result content
content = getattr(sdk_message, "content", [])
for block in content:
block_class = type(block).__name__
block_type = block.get("type") if isinstance(block, dict) else None
if block_class == "ToolResultBlock" or block_type == "tool_result":
tool_use_id = getattr(block, "tool_use_id", None) or (
block.get("tool_use_id") if isinstance(block, dict) else None
)
result_content = getattr(block, "content", None) or (
block.get("content") if isinstance(block, dict) else ""
)
is_error = getattr(block, "is_error", False) or (
block.get("is_error", False)
if isinstance(block, dict)
else False
)
if tool_use_id:
tool_info = self.current_tool_calls.get(tool_use_id, {})
tool_name = tool_info.get("name", "unknown")
# Format the output
if isinstance(result_content, list):
# Extract text from content blocks
output_text = ""
for item in result_content:
if (
isinstance(item, dict)
and item.get("type") == "text"
):
output_text += item.get("text", "")
elif hasattr(item, "text"):
output_text += getattr(item, "text", "")
output = output_text
elif isinstance(result_content, str):
output = result_content
else:
output = json.dumps(result_content)
responses.append(
StreamToolOutputAvailable(
toolCallId=tool_use_id,
toolName=tool_name,
output=output,
success=not is_error,
)
)
elif class_name == "ResultMessage":
# Final result
if msg_subtype == "success":
# End text block if still open
if self.has_started_text and not self.has_ended_text:
responses.append(StreamTextEnd(id=self.text_block_id))
self.has_ended_text = True
# Emit finish
responses.append(StreamFinish())
elif msg_subtype in ("error", "error_during_execution"):
error_msg = getattr(sdk_message, "error", "Unknown error")
responses.append(
StreamError(
errorText=str(error_msg),
code="sdk_error",
)
)
responses.append(StreamFinish())
elif class_name == "ErrorMessage":
# Error message
error_msg = getattr(sdk_message, "message", None) or getattr(
sdk_message, "error", "Unknown error"
)
responses.append(
StreamError(
errorText=str(error_msg),
code="sdk_error",
)
)
responses.append(StreamFinish())
else:
logger.debug(f"Unhandled SDK message type: {class_name}")
return responses
def create_heartbeat(self, tool_call_id: str | None = None) -> StreamHeartbeat:
"""Create a heartbeat response."""
return StreamHeartbeat(toolCallId=tool_call_id)
def create_usage(
self,
prompt_tokens: int,
completion_tokens: int,
) -> StreamUsage:
"""Create a usage statistics response."""
return StreamUsage(
promptTokens=prompt_tokens,
completionTokens=completion_tokens,
totalTokens=prompt_tokens + completion_tokens,
)
async def adapt_sdk_stream(
sdk_stream: AsyncGenerator[Any, None],
message_id: str | None = None,
task_id: str | None = None,
) -> AsyncGenerator[StreamBaseResponse, None]:
"""Adapt a Claude Agent SDK stream to Vercel AI SDK format.
Args:
sdk_stream: The async generator from the Claude Agent SDK.
message_id: Optional message ID for the response.
task_id: Optional task ID for reconnection support.
Yields:
StreamBaseResponse objects in Vercel AI SDK format.
"""
adapter = SDKResponseAdapter(message_id=message_id)
if task_id:
adapter.set_task_id(task_id)
# Emit start immediately
yield StreamStart(messageId=adapter.message_id, taskId=task_id)
finished = False
try:
async for sdk_message in sdk_stream:
responses = adapter.convert_message(sdk_message)
for response in responses:
# Skip duplicate start messages
if isinstance(response, StreamStart):
continue
if isinstance(response, StreamFinish):
finished = True
yield response
except Exception as e:
logger.error(f"Error in SDK stream: {e}", exc_info=True)
yield StreamError(
errorText="An error occurred. Please try again.",
code="stream_error",
)
yield StreamFinish()
return
# Ensure terminal StreamFinish if SDK stream ended without one
if not finished:
yield StreamFinish()

View File

@@ -1,278 +0,0 @@
"""Security hooks for Claude Agent SDK integration.
This module provides security hooks that validate tool calls before execution,
ensuring multi-user isolation and preventing unauthorized operations.
"""
import logging
import re
from typing import Any, cast
logger = logging.getLogger(__name__)
# Tools that are blocked entirely (CLI/system access)
BLOCKED_TOOLS = {
"Bash",
"bash",
"shell",
"exec",
"terminal",
"command",
"Read", # Block raw file read - use workspace tools instead
"Write", # Block raw file write - use workspace tools instead
"Edit", # Block raw file edit - use workspace tools instead
"Glob", # Block raw file glob - use workspace tools instead
"Grep", # Block raw file grep - use workspace tools instead
}
# Dangerous patterns in tool inputs
DANGEROUS_PATTERNS = [
r"sudo",
r"rm\s+-rf",
r"dd\s+if=",
r"/etc/passwd",
r"/etc/shadow",
r"chmod\s+777",
r"curl\s+.*\|.*sh",
r"wget\s+.*\|.*sh",
r"eval\s*\(",
r"exec\s*\(",
r"__import__",
r"os\.system",
r"subprocess",
]
def _validate_tool_access(tool_name: str, tool_input: dict[str, Any]) -> dict[str, Any]:
"""Validate that a tool call is allowed.
Returns:
Empty dict to allow, or dict with hookSpecificOutput to deny
"""
# Block forbidden tools
if tool_name in BLOCKED_TOOLS:
logger.warning(f"Blocked tool access attempt: {tool_name}")
return {
"hookSpecificOutput": {
"hookEventName": "PreToolUse",
"permissionDecision": "deny",
"permissionDecisionReason": (
f"Tool '{tool_name}' is not available. "
"Use the CoPilot-specific tools instead."
),
}
}
# Check for dangerous patterns in tool input
input_str = str(tool_input)
for pattern in DANGEROUS_PATTERNS:
if re.search(pattern, input_str, re.IGNORECASE):
logger.warning(
f"Blocked dangerous pattern in tool input: {pattern} in {tool_name}"
)
return {
"hookSpecificOutput": {
"hookEventName": "PreToolUse",
"permissionDecision": "deny",
"permissionDecisionReason": "Input contains blocked pattern",
}
}
return {}
def _validate_user_isolation(
tool_name: str, tool_input: dict[str, Any], user_id: str | None
) -> dict[str, Any]:
"""Validate that tool calls respect user isolation."""
# For workspace file tools, ensure path doesn't escape
if "workspace" in tool_name.lower():
path = tool_input.get("path", "") or tool_input.get("file_path", "")
if path:
# Check for path traversal
if ".." in path or path.startswith("/"):
logger.warning(
f"Blocked path traversal attempt: {path} by user {user_id}"
)
return {
"hookSpecificOutput": {
"hookEventName": "PreToolUse",
"permissionDecision": "deny",
"permissionDecisionReason": "Path traversal not allowed",
}
}
return {}
def create_security_hooks(user_id: str | None) -> dict[str, Any]:
"""Create the security hooks configuration for Claude Agent SDK.
Includes security validation and observability hooks:
- PreToolUse: Security validation before tool execution
- PostToolUse: Log successful tool executions
- PostToolUseFailure: Log and handle failed tool executions
- PreCompact: Log context compaction events (SDK handles compaction automatically)
Args:
user_id: Current user ID for isolation validation
Returns:
Hooks configuration dict for ClaudeAgentOptions
"""
try:
from claude_agent_sdk import HookMatcher
from claude_agent_sdk.types import HookContext, HookInput, SyncHookJSONOutput
async def pre_tool_use_hook(
input_data: HookInput,
tool_use_id: str | None,
context: HookContext,
) -> SyncHookJSONOutput:
"""Combined pre-tool-use validation hook."""
_ = context # unused but required by signature
tool_name = cast(str, input_data.get("tool_name", ""))
tool_input = cast(dict[str, Any], input_data.get("tool_input", {}))
# Validate basic tool access
result = _validate_tool_access(tool_name, tool_input)
if result:
return cast(SyncHookJSONOutput, result)
# Validate user isolation
result = _validate_user_isolation(tool_name, tool_input, user_id)
if result:
return cast(SyncHookJSONOutput, result)
logger.debug(f"[SDK] Tool start: {tool_name}, user={user_id}")
return cast(SyncHookJSONOutput, {})
async def post_tool_use_hook(
input_data: HookInput,
tool_use_id: str | None,
context: HookContext,
) -> SyncHookJSONOutput:
"""Log successful tool executions for observability."""
_ = context
tool_name = cast(str, input_data.get("tool_name", ""))
logger.debug(f"[SDK] Tool success: {tool_name}, tool_use_id={tool_use_id}")
return cast(SyncHookJSONOutput, {})
async def post_tool_failure_hook(
input_data: HookInput,
tool_use_id: str | None,
context: HookContext,
) -> SyncHookJSONOutput:
"""Log failed tool executions for debugging."""
_ = context
tool_name = cast(str, input_data.get("tool_name", ""))
error = input_data.get("error", "Unknown error")
logger.warning(
f"[SDK] Tool failed: {tool_name}, error={error}, "
f"user={user_id}, tool_use_id={tool_use_id}"
)
return cast(SyncHookJSONOutput, {})
async def pre_compact_hook(
input_data: HookInput,
tool_use_id: str | None,
context: HookContext,
) -> SyncHookJSONOutput:
"""Log when SDK triggers context compaction.
The SDK automatically compacts conversation history when it grows too large.
This hook provides visibility into when compaction happens.
"""
_ = context, tool_use_id
trigger = input_data.get("trigger", "auto")
logger.info(
f"[SDK] Context compaction triggered: {trigger}, user={user_id}"
)
return cast(SyncHookJSONOutput, {})
return {
"PreToolUse": [HookMatcher(matcher="*", hooks=[pre_tool_use_hook])],
"PostToolUse": [HookMatcher(matcher="*", hooks=[post_tool_use_hook])],
"PostToolUseFailure": [
HookMatcher(matcher="*", hooks=[post_tool_failure_hook])
],
"PreCompact": [HookMatcher(matcher="*", hooks=[pre_compact_hook])],
}
except ImportError:
# Fallback for when SDK isn't available - return empty hooks
return {}
def create_strict_security_hooks(
user_id: str | None,
allowed_tools: list[str] | None = None,
) -> dict[str, Any]:
"""Create strict security hooks that only allow specific tools.
Args:
user_id: Current user ID
allowed_tools: List of allowed tool names (defaults to CoPilot tools)
Returns:
Hooks configuration dict
"""
try:
from claude_agent_sdk import HookMatcher
from claude_agent_sdk.types import HookContext, HookInput, SyncHookJSONOutput
from .tool_adapter import RAW_TOOL_NAMES
tools_list = allowed_tools if allowed_tools is not None else RAW_TOOL_NAMES
allowed_set = set(tools_list)
async def strict_pre_tool_use(
input_data: HookInput,
tool_use_id: str | None,
context: HookContext,
) -> SyncHookJSONOutput:
"""Strict validation that only allows whitelisted tools."""
_ = context # unused but required by signature
tool_name = cast(str, input_data.get("tool_name", ""))
tool_input = cast(dict[str, Any], input_data.get("tool_input", {}))
# Remove MCP prefix if present
clean_name = tool_name.removeprefix("mcp__copilot__")
if clean_name not in allowed_set:
logger.warning(f"Blocked non-whitelisted tool: {tool_name}")
return cast(
SyncHookJSONOutput,
{
"hookSpecificOutput": {
"hookEventName": "PreToolUse",
"permissionDecision": "deny",
"permissionDecisionReason": (
f"Tool '{tool_name}' is not in the allowed list"
),
}
},
)
# Run standard validations using clean_name for consistent checks
result = _validate_tool_access(clean_name, tool_input)
if result:
return cast(SyncHookJSONOutput, result)
result = _validate_user_isolation(clean_name, tool_input, user_id)
if result:
return cast(SyncHookJSONOutput, result)
logger.debug(
f"[SDK Audit] Tool call: tool={tool_name}, "
f"user={user_id}, tool_use_id={tool_use_id}"
)
return cast(SyncHookJSONOutput, {})
return {
"PreToolUse": [
HookMatcher(matcher="*", hooks=[strict_pre_tool_use]),
],
}
except ImportError:
return {}

View File

@@ -1,475 +0,0 @@
"""Claude Agent SDK service layer for CoPilot chat completions."""
import asyncio
import json
import logging
import uuid
from collections.abc import AsyncGenerator
from typing import Any
import openai
from backend.data.understanding import (
format_understanding_for_prompt,
get_business_understanding,
)
from backend.util.exceptions import NotFoundError
from ..config import ChatConfig
from ..model import (
ChatMessage,
ChatSession,
get_chat_session,
update_session_title,
upsert_chat_session,
)
from ..response_model import (
StreamBaseResponse,
StreamError,
StreamFinish,
StreamStart,
StreamTextDelta,
StreamToolInputAvailable,
StreamToolOutputAvailable,
)
from ..tracking import track_user_message
from .anthropic_fallback import stream_with_anthropic
from .response_adapter import SDKResponseAdapter
from .security_hooks import create_security_hooks
from .tool_adapter import (
COPILOT_TOOL_NAMES,
create_copilot_mcp_server,
set_execution_context,
)
logger = logging.getLogger(__name__)
config = ChatConfig()
# Set to hold background tasks to prevent garbage collection
_background_tasks: set[asyncio.Task[Any]] = set()
DEFAULT_SYSTEM_PROMPT = """You are **Otto**, an AI Co-Pilot for AutoGPT and a Forward-Deployed Automation Engineer serving small business owners. Your mission is to help users automate business tasks with AI by delivering tangible value through working automations—not through documentation or lengthy explanations.
Here is everything you know about the current user from previous interactions:
<users_information>
{users_information}
</users_information>
## YOUR CORE MANDATE
You are action-oriented. Your success is measured by:
- **Value Delivery**: Does the user think "wow, that was amazing" or "what was the point"?
- **Demonstrable Proof**: Show working automations, not descriptions of what's possible
- **Time Saved**: Focus on tangible efficiency gains
- **Quality Output**: Deliver results that meet or exceed expectations
## YOUR WORKFLOW
Adapt flexibly to the conversation context. Not every interaction requires all stages:
1. **Explore & Understand**: Learn about the user's business, tasks, and goals. Use `add_understanding` to capture important context that will improve future conversations.
2. **Assess Automation Potential**: Help the user understand whether and how AI can automate their task.
3. **Prepare for AI**: Provide brief, actionable guidance on prerequisites (data, access, etc.).
4. **Discover or Create Agents**:
- **Always check the user's library first** with `find_library_agent` (these may be customized to their needs)
- Search the marketplace with `find_agent` for pre-built automations
- Find reusable components with `find_block`
- Create custom solutions with `create_agent` if nothing suitable exists
- Modify existing library agents with `edit_agent`
5. **Execute**: Run automations immediately, schedule them, or set up webhooks using `run_agent`. Test specific components with `run_block`.
6. **Show Results**: Display outputs using `agent_output`.
## BEHAVIORAL GUIDELINES
**Be Concise:**
- Target 2-5 short lines maximum
- Make every word count—no repetition or filler
- Use lightweight structure for scannability (bullets, numbered lists, short prompts)
- Avoid jargon (blocks, slugs, cron) unless the user asks
**Be Proactive:**
- Suggest next steps before being asked
- Anticipate needs based on conversation context and user information
- Look for opportunities to expand scope when relevant
- Reveal capabilities through action, not explanation
**Use Tools Effectively:**
- Select the right tool for each task
- **Always check `find_library_agent` before searching the marketplace**
- Use `add_understanding` to capture valuable business context
- When tool calls fail, try alternative approaches
## CRITICAL REMINDER
You are NOT a chatbot. You are NOT documentation. You are a partner who helps busy business owners get value quickly by showing proof through working automations. Bias toward action over explanation."""
async def _build_system_prompt(
user_id: str | None, has_conversation_history: bool = False
) -> tuple[str, Any]:
"""Build the system prompt with user's business understanding context.
Args:
user_id: The user ID to fetch understanding for.
has_conversation_history: Whether there's existing conversation history.
If True, we don't tell the model to greet/introduce (since they're
already in a conversation).
"""
understanding = None
if user_id:
try:
understanding = await get_business_understanding(user_id)
except Exception as e:
logger.warning(f"Failed to fetch business understanding: {e}")
if understanding:
context = format_understanding_for_prompt(understanding)
elif has_conversation_history:
# Don't tell model to greet if there's conversation history
context = "No prior understanding saved yet. Continue the existing conversation naturally."
else:
context = "This is the first time you are meeting the user. Greet them and introduce them to the platform"
return DEFAULT_SYSTEM_PROMPT.format(users_information=context), understanding
def _format_conversation_history(session: ChatSession) -> str:
"""Format conversation history as a prompt context.
The SDK handles context compaction automatically, but we apply
max_context_messages as a safety guard to limit initial prompt size.
"""
if not session.messages:
return ""
# Get all messages except the last user message (which will be the prompt)
messages = session.messages[:-1] if session.messages else []
if not messages:
return ""
# Apply max_context_messages limit as a safety guard
# (SDK handles compaction, but this prevents excessively large initial prompts)
max_messages = config.max_context_messages
if len(messages) > max_messages:
messages = messages[-max_messages:]
history_parts = ["<conversation_history>"]
for msg in messages:
if msg.role == "user":
history_parts.append(f"User: {msg.content or ''}")
elif msg.role == "assistant":
# Pass full content - SDK handles compaction automatically
history_parts.append(f"Assistant: {msg.content or ''}")
if msg.tool_calls:
for tc in msg.tool_calls:
func = tc.get("function", {})
history_parts.append(
f" [Called tool: {func.get('name', 'unknown')}]"
)
elif msg.role == "tool":
# Truncate large tool results to avoid blowing context window
tool_content = msg.content or ""
if len(tool_content) > 500:
tool_content = tool_content[:500] + "... (truncated)"
history_parts.append(f" [Tool result: {tool_content}]")
history_parts.append("</conversation_history>")
history_parts.append("")
history_parts.append(
"Continue this conversation. Respond to the user's latest message:"
)
history_parts.append("")
return "\n".join(history_parts)
async def _generate_session_title(
message: str,
user_id: str | None = None,
session_id: str | None = None,
) -> str | None:
"""Generate a concise title for a chat session."""
from backend.util.settings import Settings
settings = Settings()
try:
# Build extra_body for OpenRouter tracing
extra_body: dict[str, Any] = {
"posthogProperties": {"environment": settings.config.app_env.value},
}
if user_id:
extra_body["user"] = user_id[:128]
extra_body["posthogDistinctId"] = user_id
if session_id:
extra_body["session_id"] = session_id[:128]
client = openai.AsyncOpenAI(api_key=config.api_key, base_url=config.base_url)
response = await client.chat.completions.create(
model=config.title_model,
messages=[
{
"role": "system",
"content": "Generate a very short title (3-6 words) for a chat conversation based on the user's first message. Return ONLY the title, no quotes or punctuation.",
},
{"role": "user", "content": message[:500]},
],
max_tokens=20,
extra_body=extra_body,
)
title = response.choices[0].message.content
if title:
title = title.strip().strip("\"'")
return title[:47] + "..." if len(title) > 50 else title
return None
except Exception as e:
logger.warning(f"Failed to generate session title: {e}")
return None
async def stream_chat_completion_sdk(
session_id: str,
message: str | None = None,
tool_call_response: str | None = None, # noqa: ARG001
is_user_message: bool = True,
user_id: str | None = None,
retry_count: int = 0, # noqa: ARG001
session: ChatSession | None = None,
context: dict[str, str] | None = None, # noqa: ARG001
) -> AsyncGenerator[StreamBaseResponse, None]:
"""Stream chat completion using Claude Agent SDK.
Drop-in replacement for stream_chat_completion with improved reliability.
"""
if session is None:
session = await get_chat_session(session_id, user_id)
if not session:
raise NotFoundError(
f"Session {session_id} not found. Please create a new session first."
)
if message:
session.messages.append(
ChatMessage(
role="user" if is_user_message else "assistant", content=message
)
)
if is_user_message:
track_user_message(
user_id=user_id, session_id=session_id, message_length=len(message)
)
session = await upsert_chat_session(session)
# Generate title for new sessions (first user message)
if is_user_message and not session.title:
user_messages = [m for m in session.messages if m.role == "user"]
if len(user_messages) == 1:
first_message = user_messages[0].content or message or ""
if first_message:
task = asyncio.create_task(
_update_title_async(session_id, first_message, user_id)
)
# Store reference to prevent garbage collection
_background_tasks.add(task)
task.add_done_callback(_background_tasks.discard)
# Check if there's conversation history (more than just the current message)
has_history = len(session.messages) > 1
system_prompt, _ = await _build_system_prompt(
user_id, has_conversation_history=has_history
)
set_execution_context(user_id, session, None)
message_id = str(uuid.uuid4())
text_block_id = str(uuid.uuid4())
task_id = str(uuid.uuid4())
yield StreamStart(messageId=message_id, taskId=task_id)
# Track whether the stream completed normally via ResultMessage
stream_completed = False
try:
try:
from claude_agent_sdk import ClaudeAgentOptions, ClaudeSDKClient
# Create MCP server with CoPilot tools
mcp_server = create_copilot_mcp_server()
options = ClaudeAgentOptions(
system_prompt=system_prompt,
mcp_servers={"copilot": mcp_server}, # type: ignore[arg-type]
allowed_tools=COPILOT_TOOL_NAMES,
hooks=create_security_hooks(user_id), # type: ignore[arg-type]
continue_conversation=True, # Enable conversation continuation
)
adapter = SDKResponseAdapter(message_id=message_id)
adapter.set_task_id(task_id)
async with ClaudeSDKClient(options=options) as client:
# Build prompt with conversation history for context
# The SDK doesn't support replaying full conversation history,
# so we include it as context in the prompt
current_message = message or ""
if not current_message and session.messages:
last_user = [m for m in session.messages if m.role == "user"]
if last_user:
current_message = last_user[-1].content or ""
# Include conversation history if there are prior messages
if len(session.messages) > 1:
history_context = _format_conversation_history(session)
prompt = f"{history_context}{current_message}"
else:
prompt = current_message
# Guard against empty prompts
if not prompt.strip():
yield StreamError(
errorText="Message cannot be empty.",
code="empty_prompt",
)
yield StreamFinish()
return
await client.query(prompt, session_id=session_id)
# Track assistant response to save to session
# We may need multiple assistant messages if text comes after tool results
assistant_response = ChatMessage(role="assistant", content="")
accumulated_tool_calls: list[dict[str, Any]] = []
has_appended_assistant = False
has_tool_results = False # Track if we've received tool results
# Receive messages from the SDK
async for sdk_msg in client.receive_messages():
for response in adapter.convert_message(sdk_msg):
if isinstance(response, StreamStart):
continue
yield response
# Accumulate text deltas into assistant response
if isinstance(response, StreamTextDelta):
delta = response.delta or ""
# After tool results, create new assistant message for post-tool text
if has_tool_results and has_appended_assistant:
assistant_response = ChatMessage(
role="assistant", content=delta
)
accumulated_tool_calls = [] # Reset for new message
session.messages.append(assistant_response)
has_tool_results = False
else:
assistant_response.content = (
assistant_response.content or ""
) + delta
if not has_appended_assistant:
session.messages.append(assistant_response)
has_appended_assistant = True
# Track tool calls on the assistant message
elif isinstance(response, StreamToolInputAvailable):
accumulated_tool_calls.append(
{
"id": response.toolCallId,
"type": "function",
"function": {
"name": response.toolName,
"arguments": json.dumps(response.input or {}),
},
}
)
# Update assistant message with tool calls
assistant_response.tool_calls = accumulated_tool_calls
# Append assistant message if not already (tool-only response)
if not has_appended_assistant:
session.messages.append(assistant_response)
has_appended_assistant = True
elif isinstance(response, StreamToolOutputAvailable):
session.messages.append(
ChatMessage(
role="tool",
content=(
response.output
if isinstance(response.output, str)
else str(response.output)
),
tool_call_id=response.toolCallId,
)
)
has_tool_results = True
elif isinstance(response, StreamFinish):
stream_completed = True
# Break out of the message loop if we received finish signal
if stream_completed:
break
# Ensure assistant response is saved even if no text deltas
# (e.g., only tool calls were made)
if (
assistant_response.content or assistant_response.tool_calls
) and not has_appended_assistant:
session.messages.append(assistant_response)
except ImportError:
logger.warning(
"[SDK] claude-agent-sdk not available, using Anthropic fallback"
)
async for response in stream_with_anthropic(
session, system_prompt, text_block_id
):
if isinstance(response, StreamFinish):
stream_completed = True
yield response
# Save the session with accumulated messages
await upsert_chat_session(session)
logger.debug(
f"[SDK] Session {session_id} saved with {len(session.messages)} messages"
)
# Yield StreamFinish to signal completion to the caller (routes.py)
# Only if one hasn't already been yielded by the stream
if not stream_completed:
yield StreamFinish()
except Exception as e:
logger.error(f"[SDK] Error: {e}", exc_info=True)
# Save session even on error to preserve any partial response
try:
await upsert_chat_session(session)
except Exception as save_err:
logger.error(f"[SDK] Failed to save session on error: {save_err}")
# Sanitize error message to avoid exposing internal details
yield StreamError(
errorText="An error occurred. Please try again.",
code="sdk_error",
)
yield StreamFinish()
async def _update_title_async(
session_id: str, message: str, user_id: str | None = None
) -> None:
"""Background task to update session title."""
try:
title = await _generate_session_title(
message, user_id=user_id, session_id=session_id
)
if title:
await update_session_title(session_id, title)
logger.debug(f"[SDK] Generated title for {session_id}: {title}")
except Exception as e:
logger.warning(f"[SDK] Failed to update session title: {e}")

View File

@@ -1,217 +0,0 @@
"""Tool adapter for wrapping existing CoPilot tools as Claude Agent SDK MCP tools.
This module provides the adapter layer that converts existing BaseTool implementations
into in-process MCP tools that can be used with the Claude Agent SDK.
"""
import json
import logging
import uuid
from contextvars import ContextVar
from typing import Any
from backend.api.features.chat.model import ChatSession
from backend.api.features.chat.tools import TOOL_REGISTRY
from backend.api.features.chat.tools.base import BaseTool
logger = logging.getLogger(__name__)
# Context variables to pass user/session info to tool execution
_current_user_id: ContextVar[str | None] = ContextVar("current_user_id", default=None)
_current_session: ContextVar[ChatSession | None] = ContextVar(
"current_session", default=None
)
_current_tool_call_id: ContextVar[str | None] = ContextVar(
"current_tool_call_id", default=None
)
def set_execution_context(
user_id: str | None,
session: ChatSession,
tool_call_id: str | None = None,
) -> None:
"""Set the execution context for tool calls.
This must be called before streaming begins to ensure tools have access
to user_id and session information.
"""
_current_user_id.set(user_id)
_current_session.set(session)
_current_tool_call_id.set(tool_call_id)
def get_execution_context() -> tuple[str | None, ChatSession | None, str | None]:
"""Get the current execution context."""
return (
_current_user_id.get(),
_current_session.get(),
_current_tool_call_id.get(),
)
def create_tool_handler(base_tool: BaseTool):
"""Create an async handler function for a BaseTool.
This wraps the existing BaseTool._execute method to be compatible
with the Claude Agent SDK MCP tool format.
"""
async def tool_handler(args: dict[str, Any]) -> dict[str, Any]:
"""Execute the wrapped tool and return MCP-formatted response."""
user_id, session, tool_call_id = get_execution_context()
if session is None:
return {
"content": [
{
"type": "text",
"text": json.dumps(
{
"error": "No session context available",
"type": "error",
}
),
}
],
"isError": True,
}
try:
# Call the existing tool's execute method
# Generate unique tool_call_id per invocation for proper correlation
effective_id = tool_call_id or f"sdk-{uuid.uuid4().hex[:12]}"
result = await base_tool.execute(
user_id=user_id,
session=session,
tool_call_id=effective_id,
**args,
)
# The result is a StreamToolOutputAvailable, extract the output
return {
"content": [
{
"type": "text",
"text": (
result.output
if isinstance(result.output, str)
else json.dumps(result.output)
),
}
],
"isError": not result.success,
}
except Exception as e:
logger.error(f"Error executing tool {base_tool.name}: {e}", exc_info=True)
return {
"content": [
{
"type": "text",
"text": json.dumps(
{
"error": str(e),
"type": "error",
"message": f"Failed to execute {base_tool.name}",
}
),
}
],
"isError": True,
}
return tool_handler
def _build_input_schema(base_tool: BaseTool) -> dict[str, Any]:
"""Build a JSON Schema input schema for a tool."""
return {
"type": "object",
"properties": base_tool.parameters.get("properties", {}),
"required": base_tool.parameters.get("required", []),
}
def get_tool_definitions() -> list[dict[str, Any]]:
"""Get all tool definitions in MCP format.
Returns a list of tool definitions that can be used with
create_sdk_mcp_server or as raw tool definitions.
"""
tool_definitions = []
for tool_name, base_tool in TOOL_REGISTRY.items():
tool_def = {
"name": tool_name,
"description": base_tool.description,
"inputSchema": _build_input_schema(base_tool),
}
tool_definitions.append(tool_def)
return tool_definitions
def get_tool_handlers() -> dict[str, Any]:
"""Get all tool handlers mapped by name.
Returns a dictionary mapping tool names to their handler functions.
"""
handlers = {}
for tool_name, base_tool in TOOL_REGISTRY.items():
handlers[tool_name] = create_tool_handler(base_tool)
return handlers
# Create the MCP server configuration
def create_copilot_mcp_server():
"""Create an in-process MCP server configuration for CoPilot tools.
This can be passed to ClaudeAgentOptions.mcp_servers.
Note: The actual SDK MCP server creation depends on the claude-agent-sdk
package being available. This function returns the configuration that
can be used with the SDK.
"""
try:
from claude_agent_sdk import create_sdk_mcp_server, tool
# Create decorated tool functions
sdk_tools = []
for tool_name, base_tool in TOOL_REGISTRY.items():
# Get the handler
handler = create_tool_handler(base_tool)
# Create the decorated tool
# The @tool decorator expects (name, description, schema)
# Pass full JSON schema with type, properties, and required
decorated = tool(
tool_name,
base_tool.description,
_build_input_schema(base_tool),
)(handler)
sdk_tools.append(decorated)
# Create the MCP server
server = create_sdk_mcp_server(
name="copilot",
version="1.0.0",
tools=sdk_tools,
)
return server
except ImportError:
# Let ImportError propagate so service.py handles the fallback
raise
# List of tool names for allowed_tools configuration
COPILOT_TOOL_NAMES = [f"mcp__copilot__{name}" for name in TOOL_REGISTRY.keys()]
# Also export the raw tool names for flexibility
RAW_TOOL_NAMES = list(TOOL_REGISTRY.keys())

View File

@@ -555,10 +555,6 @@ async def get_active_task_for_session(
if task_user_id and user_id != task_user_id: if task_user_id and user_id != task_user_id:
continue continue
logger.info(
f"[TASK_LOOKUP] Found running task {task_id[:8]}... for session {session_id[:8]}..."
)
# Get the last message ID from Redis Stream # Get the last message ID from Redis Stream
stream_key = _get_task_stream_key(task_id) stream_key = _get_task_stream_key(task_id)
last_id = "0-0" last_id = "0-0"

View File

@@ -117,7 +117,7 @@ def build_missing_credentials_from_graph(
preserving all supported credential types for each field. preserving all supported credential types for each field.
""" """
matched_keys = set(matched_credentials.keys()) if matched_credentials else set() matched_keys = set(matched_credentials.keys()) if matched_credentials else set()
aggregated_fields = graph.aggregate_credentials_inputs() aggregated_fields = graph.regular_credentials_inputs
return { return {
field_key: _serialize_missing_credential(field_key, field_info) field_key: _serialize_missing_credential(field_key, field_info)
@@ -244,7 +244,7 @@ async def match_user_credentials_to_graph(
missing_creds: list[str] = [] missing_creds: list[str] = []
# Get aggregated credentials requirements from the graph # Get aggregated credentials requirements from the graph
aggregated_creds = graph.aggregate_credentials_inputs() aggregated_creds = graph.regular_credentials_inputs
logger.debug( logger.debug(
f"Matching credentials for graph {graph.id}: {len(aggregated_creds)} required" f"Matching credentials for graph {graph.id}: {len(aggregated_creds)} required"
) )

View File

@@ -0,0 +1,78 @@
"""Tests for chat tools utility functions."""
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from backend.data.model import CredentialsFieldInfo
def _make_regular_field() -> CredentialsFieldInfo:
return CredentialsFieldInfo.model_validate(
{
"credentials_provider": ["github"],
"credentials_types": ["api_key"],
"is_auto_credential": False,
},
by_alias=True,
)
def test_build_missing_credentials_excludes_auto_creds():
"""
build_missing_credentials_from_graph() should use regular_credentials_inputs
and thus exclude auto_credentials from the "missing" set.
"""
from backend.api.features.chat.tools.utils import (
build_missing_credentials_from_graph,
)
regular_field = _make_regular_field()
mock_graph = MagicMock()
# regular_credentials_inputs should only return the non-auto field
mock_graph.regular_credentials_inputs = {
"github_api_key": (regular_field, {("node-1", "credentials")}, True),
}
result = build_missing_credentials_from_graph(mock_graph, matched_credentials=None)
# Should include the regular credential
assert "github_api_key" in result
# Should NOT include the auto_credential (not in regular_credentials_inputs)
assert "google_oauth2" not in result
@pytest.mark.asyncio
async def test_match_user_credentials_excludes_auto_creds():
"""
match_user_credentials_to_graph() should use regular_credentials_inputs
and thus exclude auto_credentials from matching.
"""
from backend.api.features.chat.tools.utils import match_user_credentials_to_graph
regular_field = _make_regular_field()
mock_graph = MagicMock()
mock_graph.id = "test-graph"
# regular_credentials_inputs returns only non-auto fields
mock_graph.regular_credentials_inputs = {
"github_api_key": (regular_field, {("node-1", "credentials")}, True),
}
# Mock the credentials manager to return no credentials
with patch(
"backend.api.features.chat.tools.utils.IntegrationCredentialsManager"
) as MockCredsMgr:
mock_store = AsyncMock()
mock_store.get_all_creds.return_value = []
MockCredsMgr.return_value.store = mock_store
matched, missing = await match_user_credentials_to_graph(
user_id="test-user", graph=mock_graph
)
# No credentials available, so github should be missing
assert len(matched) == 0
assert len(missing) == 1
assert "github_api_key" in missing[0]

View File

@@ -1103,7 +1103,7 @@ async def create_preset_from_graph_execution(
raise NotFoundError( raise NotFoundError(
f"Graph #{graph_execution.graph_id} not found or accessible" f"Graph #{graph_execution.graph_id} not found or accessible"
) )
elif len(graph.aggregate_credentials_inputs()) > 0: elif len(graph.regular_credentials_inputs) > 0:
raise ValueError( raise ValueError(
f"Graph execution #{graph_exec_id} can't be turned into a preset " f"Graph execution #{graph_exec_id} can't be turned into a preset "
"because it was run before this feature existed " "because it was run before this feature existed "

View File

@@ -478,7 +478,7 @@ class ExaCreateOrFindWebsetBlock(Block):
aexa = AsyncExa(api_key=credentials.api_key.get_secret_value()) aexa = AsyncExa(api_key=credentials.api_key.get_secret_value())
try: try:
webset = await aexa.websets.get(id=input_data.external_id) webset = aexa.websets.get(id=input_data.external_id)
webset_result = Webset.model_validate(webset.model_dump(by_alias=True)) webset_result = Webset.model_validate(webset.model_dump(by_alias=True))
yield "webset", webset_result yield "webset", webset_result
@@ -494,7 +494,7 @@ class ExaCreateOrFindWebsetBlock(Block):
count=input_data.search_count, count=input_data.search_count,
) )
webset = await aexa.websets.create( webset = aexa.websets.create(
params=CreateWebsetParameters( params=CreateWebsetParameters(
search=search_params, search=search_params,
external_id=input_data.external_id, external_id=input_data.external_id,
@@ -554,7 +554,7 @@ class ExaUpdateWebsetBlock(Block):
if input_data.metadata is not None: if input_data.metadata is not None:
payload["metadata"] = input_data.metadata payload["metadata"] = input_data.metadata
sdk_webset = await aexa.websets.update(id=input_data.webset_id, params=payload) sdk_webset = aexa.websets.update(id=input_data.webset_id, params=payload)
status_str = ( status_str = (
sdk_webset.status.value sdk_webset.status.value
@@ -617,7 +617,7 @@ class ExaListWebsetsBlock(Block):
) -> BlockOutput: ) -> BlockOutput:
aexa = AsyncExa(api_key=credentials.api_key.get_secret_value()) aexa = AsyncExa(api_key=credentials.api_key.get_secret_value())
response = await aexa.websets.list( response = aexa.websets.list(
cursor=input_data.cursor, cursor=input_data.cursor,
limit=input_data.limit, limit=input_data.limit,
) )
@@ -678,7 +678,7 @@ class ExaGetWebsetBlock(Block):
) -> BlockOutput: ) -> BlockOutput:
aexa = AsyncExa(api_key=credentials.api_key.get_secret_value()) aexa = AsyncExa(api_key=credentials.api_key.get_secret_value())
sdk_webset = await aexa.websets.get(id=input_data.webset_id) sdk_webset = aexa.websets.get(id=input_data.webset_id)
status_str = ( status_str = (
sdk_webset.status.value sdk_webset.status.value
@@ -748,7 +748,7 @@ class ExaDeleteWebsetBlock(Block):
) -> BlockOutput: ) -> BlockOutput:
aexa = AsyncExa(api_key=credentials.api_key.get_secret_value()) aexa = AsyncExa(api_key=credentials.api_key.get_secret_value())
deleted_webset = await aexa.websets.delete(id=input_data.webset_id) deleted_webset = aexa.websets.delete(id=input_data.webset_id)
status_str = ( status_str = (
deleted_webset.status.value deleted_webset.status.value
@@ -798,7 +798,7 @@ class ExaCancelWebsetBlock(Block):
) -> BlockOutput: ) -> BlockOutput:
aexa = AsyncExa(api_key=credentials.api_key.get_secret_value()) aexa = AsyncExa(api_key=credentials.api_key.get_secret_value())
canceled_webset = await aexa.websets.cancel(id=input_data.webset_id) canceled_webset = aexa.websets.cancel(id=input_data.webset_id)
status_str = ( status_str = (
canceled_webset.status.value canceled_webset.status.value
@@ -968,7 +968,7 @@ class ExaPreviewWebsetBlock(Block):
entity["description"] = input_data.entity_description entity["description"] = input_data.entity_description
payload["entity"] = entity payload["entity"] = entity
sdk_preview = await aexa.websets.preview(params=payload) sdk_preview = aexa.websets.preview(params=payload)
preview = PreviewWebsetModel.from_sdk(sdk_preview) preview = PreviewWebsetModel.from_sdk(sdk_preview)
@@ -1051,7 +1051,7 @@ class ExaWebsetStatusBlock(Block):
) -> BlockOutput: ) -> BlockOutput:
aexa = AsyncExa(api_key=credentials.api_key.get_secret_value()) aexa = AsyncExa(api_key=credentials.api_key.get_secret_value())
webset = await aexa.websets.get(id=input_data.webset_id) webset = aexa.websets.get(id=input_data.webset_id)
status = ( status = (
webset.status.value webset.status.value
@@ -1185,7 +1185,7 @@ class ExaWebsetSummaryBlock(Block):
) -> BlockOutput: ) -> BlockOutput:
aexa = AsyncExa(api_key=credentials.api_key.get_secret_value()) aexa = AsyncExa(api_key=credentials.api_key.get_secret_value())
webset = await aexa.websets.get(id=input_data.webset_id) webset = aexa.websets.get(id=input_data.webset_id)
# Extract basic info # Extract basic info
webset_id = webset.id webset_id = webset.id
@@ -1211,7 +1211,7 @@ class ExaWebsetSummaryBlock(Block):
total_items = 0 total_items = 0
if input_data.include_sample_items and input_data.sample_size > 0: if input_data.include_sample_items and input_data.sample_size > 0:
items_response = await aexa.websets.items.list( items_response = aexa.websets.items.list(
webset_id=input_data.webset_id, limit=input_data.sample_size webset_id=input_data.webset_id, limit=input_data.sample_size
) )
sample_items_data = [ sample_items_data = [
@@ -1362,7 +1362,7 @@ class ExaWebsetReadyCheckBlock(Block):
aexa = AsyncExa(api_key=credentials.api_key.get_secret_value()) aexa = AsyncExa(api_key=credentials.api_key.get_secret_value())
# Get webset details # Get webset details
webset = await aexa.websets.get(id=input_data.webset_id) webset = aexa.websets.get(id=input_data.webset_id)
status = ( status = (
webset.status.value webset.status.value

View File

@@ -202,7 +202,7 @@ class ExaCreateEnrichmentBlock(Block):
# Use AsyncExa SDK # Use AsyncExa SDK
aexa = AsyncExa(api_key=credentials.api_key.get_secret_value()) aexa = AsyncExa(api_key=credentials.api_key.get_secret_value())
sdk_enrichment = await aexa.websets.enrichments.create( sdk_enrichment = aexa.websets.enrichments.create(
webset_id=input_data.webset_id, params=payload webset_id=input_data.webset_id, params=payload
) )
@@ -223,7 +223,7 @@ class ExaCreateEnrichmentBlock(Block):
items_enriched = 0 items_enriched = 0
while time.time() - poll_start < input_data.polling_timeout: while time.time() - poll_start < input_data.polling_timeout:
current_enrich = await aexa.websets.enrichments.get( current_enrich = aexa.websets.enrichments.get(
webset_id=input_data.webset_id, id=enrichment_id webset_id=input_data.webset_id, id=enrichment_id
) )
current_status = ( current_status = (
@@ -234,7 +234,7 @@ class ExaCreateEnrichmentBlock(Block):
if current_status in ["completed", "failed", "cancelled"]: if current_status in ["completed", "failed", "cancelled"]:
# Estimate items from webset searches # Estimate items from webset searches
webset = await aexa.websets.get(id=input_data.webset_id) webset = aexa.websets.get(id=input_data.webset_id)
if webset.searches: if webset.searches:
for search in webset.searches: for search in webset.searches:
if search.progress: if search.progress:
@@ -329,7 +329,7 @@ class ExaGetEnrichmentBlock(Block):
# Use AsyncExa SDK # Use AsyncExa SDK
aexa = AsyncExa(api_key=credentials.api_key.get_secret_value()) aexa = AsyncExa(api_key=credentials.api_key.get_secret_value())
sdk_enrichment = await aexa.websets.enrichments.get( sdk_enrichment = aexa.websets.enrichments.get(
webset_id=input_data.webset_id, id=input_data.enrichment_id webset_id=input_data.webset_id, id=input_data.enrichment_id
) )
@@ -474,7 +474,7 @@ class ExaDeleteEnrichmentBlock(Block):
# Use AsyncExa SDK # Use AsyncExa SDK
aexa = AsyncExa(api_key=credentials.api_key.get_secret_value()) aexa = AsyncExa(api_key=credentials.api_key.get_secret_value())
deleted_enrichment = await aexa.websets.enrichments.delete( deleted_enrichment = aexa.websets.enrichments.delete(
webset_id=input_data.webset_id, id=input_data.enrichment_id webset_id=input_data.webset_id, id=input_data.enrichment_id
) )
@@ -525,13 +525,13 @@ class ExaCancelEnrichmentBlock(Block):
# Use AsyncExa SDK # Use AsyncExa SDK
aexa = AsyncExa(api_key=credentials.api_key.get_secret_value()) aexa = AsyncExa(api_key=credentials.api_key.get_secret_value())
canceled_enrichment = await aexa.websets.enrichments.cancel( canceled_enrichment = aexa.websets.enrichments.cancel(
webset_id=input_data.webset_id, id=input_data.enrichment_id webset_id=input_data.webset_id, id=input_data.enrichment_id
) )
# Try to estimate how many items were enriched before cancellation # Try to estimate how many items were enriched before cancellation
items_enriched = 0 items_enriched = 0
items_response = await aexa.websets.items.list( items_response = aexa.websets.items.list(
webset_id=input_data.webset_id, limit=100 webset_id=input_data.webset_id, limit=100
) )

View File

@@ -222,7 +222,7 @@ class ExaCreateImportBlock(Block):
def _create_test_mock(): def _create_test_mock():
"""Create test mocks for the AsyncExa SDK.""" """Create test mocks for the AsyncExa SDK."""
from datetime import datetime from datetime import datetime
from unittest.mock import AsyncMock, MagicMock from unittest.mock import MagicMock
# Create mock SDK import object # Create mock SDK import object
mock_import = MagicMock() mock_import = MagicMock()
@@ -247,7 +247,7 @@ class ExaCreateImportBlock(Block):
return { return {
"_get_client": lambda *args, **kwargs: MagicMock( "_get_client": lambda *args, **kwargs: MagicMock(
websets=MagicMock( websets=MagicMock(
imports=MagicMock(create=AsyncMock(return_value=mock_import)) imports=MagicMock(create=lambda *args, **kwargs: mock_import)
) )
) )
} }
@@ -294,7 +294,7 @@ class ExaCreateImportBlock(Block):
if input_data.metadata: if input_data.metadata:
payload["metadata"] = input_data.metadata payload["metadata"] = input_data.metadata
sdk_import = await aexa.websets.imports.create( sdk_import = aexa.websets.imports.create(
params=payload, csv_data=input_data.csv_data params=payload, csv_data=input_data.csv_data
) )
@@ -360,7 +360,7 @@ class ExaGetImportBlock(Block):
# Use AsyncExa SDK # Use AsyncExa SDK
aexa = AsyncExa(api_key=credentials.api_key.get_secret_value()) aexa = AsyncExa(api_key=credentials.api_key.get_secret_value())
sdk_import = await aexa.websets.imports.get(import_id=input_data.import_id) sdk_import = aexa.websets.imports.get(import_id=input_data.import_id)
import_obj = ImportModel.from_sdk(sdk_import) import_obj = ImportModel.from_sdk(sdk_import)
@@ -426,7 +426,7 @@ class ExaListImportsBlock(Block):
# Use AsyncExa SDK # Use AsyncExa SDK
aexa = AsyncExa(api_key=credentials.api_key.get_secret_value()) aexa = AsyncExa(api_key=credentials.api_key.get_secret_value())
response = await aexa.websets.imports.list( response = aexa.websets.imports.list(
cursor=input_data.cursor, cursor=input_data.cursor,
limit=input_data.limit, limit=input_data.limit,
) )
@@ -474,9 +474,7 @@ class ExaDeleteImportBlock(Block):
# Use AsyncExa SDK # Use AsyncExa SDK
aexa = AsyncExa(api_key=credentials.api_key.get_secret_value()) aexa = AsyncExa(api_key=credentials.api_key.get_secret_value())
deleted_import = await aexa.websets.imports.delete( deleted_import = aexa.websets.imports.delete(import_id=input_data.import_id)
import_id=input_data.import_id
)
yield "import_id", deleted_import.id yield "import_id", deleted_import.id
yield "success", "true" yield "success", "true"
@@ -575,14 +573,14 @@ class ExaExportWebsetBlock(Block):
} }
) )
# Create async iterator for list_all # Create mock iterator
async def async_item_iterator(*args, **kwargs): mock_items = [mock_item1, mock_item2]
for item in [mock_item1, mock_item2]:
yield item
return { return {
"_get_client": lambda *args, **kwargs: MagicMock( "_get_client": lambda *args, **kwargs: MagicMock(
websets=MagicMock(items=MagicMock(list_all=async_item_iterator)) websets=MagicMock(
items=MagicMock(list_all=lambda *args, **kwargs: iter(mock_items))
)
) )
} }
@@ -604,7 +602,7 @@ class ExaExportWebsetBlock(Block):
webset_id=input_data.webset_id, limit=input_data.max_items webset_id=input_data.webset_id, limit=input_data.max_items
) )
async for sdk_item in item_iterator: for sdk_item in item_iterator:
if len(all_items) >= input_data.max_items: if len(all_items) >= input_data.max_items:
break break

View File

@@ -178,7 +178,7 @@ class ExaGetWebsetItemBlock(Block):
) -> BlockOutput: ) -> BlockOutput:
aexa = AsyncExa(api_key=credentials.api_key.get_secret_value()) aexa = AsyncExa(api_key=credentials.api_key.get_secret_value())
sdk_item = await aexa.websets.items.get( sdk_item = aexa.websets.items.get(
webset_id=input_data.webset_id, id=input_data.item_id webset_id=input_data.webset_id, id=input_data.item_id
) )
@@ -269,7 +269,7 @@ class ExaListWebsetItemsBlock(Block):
response = None response = None
while time.time() - start_time < input_data.wait_timeout: while time.time() - start_time < input_data.wait_timeout:
response = await aexa.websets.items.list( response = aexa.websets.items.list(
webset_id=input_data.webset_id, webset_id=input_data.webset_id,
cursor=input_data.cursor, cursor=input_data.cursor,
limit=input_data.limit, limit=input_data.limit,
@@ -282,13 +282,13 @@ class ExaListWebsetItemsBlock(Block):
interval = min(interval * 1.2, 10) interval = min(interval * 1.2, 10)
if not response: if not response:
response = await aexa.websets.items.list( response = aexa.websets.items.list(
webset_id=input_data.webset_id, webset_id=input_data.webset_id,
cursor=input_data.cursor, cursor=input_data.cursor,
limit=input_data.limit, limit=input_data.limit,
) )
else: else:
response = await aexa.websets.items.list( response = aexa.websets.items.list(
webset_id=input_data.webset_id, webset_id=input_data.webset_id,
cursor=input_data.cursor, cursor=input_data.cursor,
limit=input_data.limit, limit=input_data.limit,
@@ -340,7 +340,7 @@ class ExaDeleteWebsetItemBlock(Block):
) -> BlockOutput: ) -> BlockOutput:
aexa = AsyncExa(api_key=credentials.api_key.get_secret_value()) aexa = AsyncExa(api_key=credentials.api_key.get_secret_value())
deleted_item = await aexa.websets.items.delete( deleted_item = aexa.websets.items.delete(
webset_id=input_data.webset_id, id=input_data.item_id webset_id=input_data.webset_id, id=input_data.item_id
) )
@@ -408,7 +408,7 @@ class ExaBulkWebsetItemsBlock(Block):
webset_id=input_data.webset_id, limit=input_data.max_items webset_id=input_data.webset_id, limit=input_data.max_items
) )
async for sdk_item in item_iterator: for sdk_item in item_iterator:
if len(all_items) >= input_data.max_items: if len(all_items) >= input_data.max_items:
break break
@@ -475,7 +475,7 @@ class ExaWebsetItemsSummaryBlock(Block):
# Use AsyncExa SDK # Use AsyncExa SDK
aexa = AsyncExa(api_key=credentials.api_key.get_secret_value()) aexa = AsyncExa(api_key=credentials.api_key.get_secret_value())
webset = await aexa.websets.get(id=input_data.webset_id) webset = aexa.websets.get(id=input_data.webset_id)
entity_type = "unknown" entity_type = "unknown"
if webset.searches: if webset.searches:
@@ -495,7 +495,7 @@ class ExaWebsetItemsSummaryBlock(Block):
# Get sample items if requested # Get sample items if requested
sample_items: List[WebsetItemModel] = [] sample_items: List[WebsetItemModel] = []
if input_data.sample_size > 0: if input_data.sample_size > 0:
items_response = await aexa.websets.items.list( items_response = aexa.websets.items.list(
webset_id=input_data.webset_id, limit=input_data.sample_size webset_id=input_data.webset_id, limit=input_data.sample_size
) )
# Convert to our stable models # Convert to our stable models
@@ -569,7 +569,7 @@ class ExaGetNewItemsBlock(Block):
aexa = AsyncExa(api_key=credentials.api_key.get_secret_value()) aexa = AsyncExa(api_key=credentials.api_key.get_secret_value())
# Get items starting from cursor # Get items starting from cursor
response = await aexa.websets.items.list( response = aexa.websets.items.list(
webset_id=input_data.webset_id, webset_id=input_data.webset_id,
cursor=input_data.since_cursor, cursor=input_data.since_cursor,
limit=input_data.max_items, limit=input_data.max_items,

View File

@@ -233,7 +233,7 @@ class ExaCreateMonitorBlock(Block):
def _create_test_mock(): def _create_test_mock():
"""Create test mocks for the AsyncExa SDK.""" """Create test mocks for the AsyncExa SDK."""
from datetime import datetime from datetime import datetime
from unittest.mock import AsyncMock, MagicMock from unittest.mock import MagicMock
# Create mock SDK monitor object # Create mock SDK monitor object
mock_monitor = MagicMock() mock_monitor = MagicMock()
@@ -263,7 +263,7 @@ class ExaCreateMonitorBlock(Block):
return { return {
"_get_client": lambda *args, **kwargs: MagicMock( "_get_client": lambda *args, **kwargs: MagicMock(
websets=MagicMock( websets=MagicMock(
monitors=MagicMock(create=AsyncMock(return_value=mock_monitor)) monitors=MagicMock(create=lambda *args, **kwargs: mock_monitor)
) )
) )
} }
@@ -320,7 +320,7 @@ class ExaCreateMonitorBlock(Block):
if input_data.metadata: if input_data.metadata:
payload["metadata"] = input_data.metadata payload["metadata"] = input_data.metadata
sdk_monitor = await aexa.websets.monitors.create(params=payload) sdk_monitor = aexa.websets.monitors.create(params=payload)
monitor = MonitorModel.from_sdk(sdk_monitor) monitor = MonitorModel.from_sdk(sdk_monitor)
@@ -384,7 +384,7 @@ class ExaGetMonitorBlock(Block):
# Use AsyncExa SDK # Use AsyncExa SDK
aexa = AsyncExa(api_key=credentials.api_key.get_secret_value()) aexa = AsyncExa(api_key=credentials.api_key.get_secret_value())
sdk_monitor = await aexa.websets.monitors.get(monitor_id=input_data.monitor_id) sdk_monitor = aexa.websets.monitors.get(monitor_id=input_data.monitor_id)
monitor = MonitorModel.from_sdk(sdk_monitor) monitor = MonitorModel.from_sdk(sdk_monitor)
@@ -476,7 +476,7 @@ class ExaUpdateMonitorBlock(Block):
if input_data.metadata is not None: if input_data.metadata is not None:
payload["metadata"] = input_data.metadata payload["metadata"] = input_data.metadata
sdk_monitor = await aexa.websets.monitors.update( sdk_monitor = aexa.websets.monitors.update(
monitor_id=input_data.monitor_id, params=payload monitor_id=input_data.monitor_id, params=payload
) )
@@ -522,9 +522,7 @@ class ExaDeleteMonitorBlock(Block):
# Use AsyncExa SDK # Use AsyncExa SDK
aexa = AsyncExa(api_key=credentials.api_key.get_secret_value()) aexa = AsyncExa(api_key=credentials.api_key.get_secret_value())
deleted_monitor = await aexa.websets.monitors.delete( deleted_monitor = aexa.websets.monitors.delete(monitor_id=input_data.monitor_id)
monitor_id=input_data.monitor_id
)
yield "monitor_id", deleted_monitor.id yield "monitor_id", deleted_monitor.id
yield "success", "true" yield "success", "true"
@@ -581,7 +579,7 @@ class ExaListMonitorsBlock(Block):
# Use AsyncExa SDK # Use AsyncExa SDK
aexa = AsyncExa(api_key=credentials.api_key.get_secret_value()) aexa = AsyncExa(api_key=credentials.api_key.get_secret_value())
response = await aexa.websets.monitors.list( response = aexa.websets.monitors.list(
cursor=input_data.cursor, cursor=input_data.cursor,
limit=input_data.limit, limit=input_data.limit,
webset_id=input_data.webset_id, webset_id=input_data.webset_id,

View File

@@ -121,7 +121,7 @@ class ExaWaitForWebsetBlock(Block):
WebsetTargetStatus.IDLE, WebsetTargetStatus.IDLE,
WebsetTargetStatus.ANY_COMPLETE, WebsetTargetStatus.ANY_COMPLETE,
]: ]:
final_webset = await aexa.websets.wait_until_idle( final_webset = aexa.websets.wait_until_idle(
id=input_data.webset_id, id=input_data.webset_id,
timeout=input_data.timeout, timeout=input_data.timeout,
poll_interval=input_data.check_interval, poll_interval=input_data.check_interval,
@@ -164,7 +164,7 @@ class ExaWaitForWebsetBlock(Block):
interval = input_data.check_interval interval = input_data.check_interval
while time.time() - start_time < input_data.timeout: while time.time() - start_time < input_data.timeout:
# Get current webset status # Get current webset status
webset = await aexa.websets.get(id=input_data.webset_id) webset = aexa.websets.get(id=input_data.webset_id)
current_status = ( current_status = (
webset.status.value webset.status.value
if hasattr(webset.status, "value") if hasattr(webset.status, "value")
@@ -209,7 +209,7 @@ class ExaWaitForWebsetBlock(Block):
# Timeout reached # Timeout reached
elapsed = time.time() - start_time elapsed = time.time() - start_time
webset = await aexa.websets.get(id=input_data.webset_id) webset = aexa.websets.get(id=input_data.webset_id)
final_status = ( final_status = (
webset.status.value webset.status.value
if hasattr(webset.status, "value") if hasattr(webset.status, "value")
@@ -345,7 +345,7 @@ class ExaWaitForSearchBlock(Block):
try: try:
while time.time() - start_time < input_data.timeout: while time.time() - start_time < input_data.timeout:
# Get current search status using SDK # Get current search status using SDK
search = await aexa.websets.searches.get( search = aexa.websets.searches.get(
webset_id=input_data.webset_id, id=input_data.search_id webset_id=input_data.webset_id, id=input_data.search_id
) )
@@ -401,7 +401,7 @@ class ExaWaitForSearchBlock(Block):
elapsed = time.time() - start_time elapsed = time.time() - start_time
# Get last known status # Get last known status
search = await aexa.websets.searches.get( search = aexa.websets.searches.get(
webset_id=input_data.webset_id, id=input_data.search_id webset_id=input_data.webset_id, id=input_data.search_id
) )
final_status = ( final_status = (
@@ -503,7 +503,7 @@ class ExaWaitForEnrichmentBlock(Block):
try: try:
while time.time() - start_time < input_data.timeout: while time.time() - start_time < input_data.timeout:
# Get current enrichment status using SDK # Get current enrichment status using SDK
enrichment = await aexa.websets.enrichments.get( enrichment = aexa.websets.enrichments.get(
webset_id=input_data.webset_id, id=input_data.enrichment_id webset_id=input_data.webset_id, id=input_data.enrichment_id
) )
@@ -548,7 +548,7 @@ class ExaWaitForEnrichmentBlock(Block):
elapsed = time.time() - start_time elapsed = time.time() - start_time
# Get last known status # Get last known status
enrichment = await aexa.websets.enrichments.get( enrichment = aexa.websets.enrichments.get(
webset_id=input_data.webset_id, id=input_data.enrichment_id webset_id=input_data.webset_id, id=input_data.enrichment_id
) )
final_status = ( final_status = (
@@ -575,7 +575,7 @@ class ExaWaitForEnrichmentBlock(Block):
) -> tuple[list[SampleEnrichmentModel], int]: ) -> tuple[list[SampleEnrichmentModel], int]:
"""Get sample enriched data and count.""" """Get sample enriched data and count."""
# Get a few items to see enrichment results using SDK # Get a few items to see enrichment results using SDK
response = await aexa.websets.items.list(webset_id=webset_id, limit=5) response = aexa.websets.items.list(webset_id=webset_id, limit=5)
sample_data: list[SampleEnrichmentModel] = [] sample_data: list[SampleEnrichmentModel] = []
enriched_count = 0 enriched_count = 0

View File

@@ -317,7 +317,7 @@ class ExaCreateWebsetSearchBlock(Block):
aexa = AsyncExa(api_key=credentials.api_key.get_secret_value()) aexa = AsyncExa(api_key=credentials.api_key.get_secret_value())
sdk_search = await aexa.websets.searches.create( sdk_search = aexa.websets.searches.create(
webset_id=input_data.webset_id, params=payload webset_id=input_data.webset_id, params=payload
) )
@@ -350,7 +350,7 @@ class ExaCreateWebsetSearchBlock(Block):
poll_start = time.time() poll_start = time.time()
while time.time() - poll_start < input_data.polling_timeout: while time.time() - poll_start < input_data.polling_timeout:
current_search = await aexa.websets.searches.get( current_search = aexa.websets.searches.get(
webset_id=input_data.webset_id, id=search_id webset_id=input_data.webset_id, id=search_id
) )
current_status = ( current_status = (
@@ -442,7 +442,7 @@ class ExaGetWebsetSearchBlock(Block):
# Use AsyncExa SDK # Use AsyncExa SDK
aexa = AsyncExa(api_key=credentials.api_key.get_secret_value()) aexa = AsyncExa(api_key=credentials.api_key.get_secret_value())
sdk_search = await aexa.websets.searches.get( sdk_search = aexa.websets.searches.get(
webset_id=input_data.webset_id, id=input_data.search_id webset_id=input_data.webset_id, id=input_data.search_id
) )
@@ -523,7 +523,7 @@ class ExaCancelWebsetSearchBlock(Block):
# Use AsyncExa SDK # Use AsyncExa SDK
aexa = AsyncExa(api_key=credentials.api_key.get_secret_value()) aexa = AsyncExa(api_key=credentials.api_key.get_secret_value())
canceled_search = await aexa.websets.searches.cancel( canceled_search = aexa.websets.searches.cancel(
webset_id=input_data.webset_id, id=input_data.search_id webset_id=input_data.webset_id, id=input_data.search_id
) )
@@ -604,7 +604,7 @@ class ExaFindOrCreateSearchBlock(Block):
aexa = AsyncExa(api_key=credentials.api_key.get_secret_value()) aexa = AsyncExa(api_key=credentials.api_key.get_secret_value())
# Get webset to check existing searches # Get webset to check existing searches
webset = await aexa.websets.get(id=input_data.webset_id) webset = aexa.websets.get(id=input_data.webset_id)
# Look for existing search with same query # Look for existing search with same query
existing_search = None existing_search = None
@@ -636,7 +636,7 @@ class ExaFindOrCreateSearchBlock(Block):
if input_data.entity_type != SearchEntityType.AUTO: if input_data.entity_type != SearchEntityType.AUTO:
payload["entity"] = {"type": input_data.entity_type.value} payload["entity"] = {"type": input_data.entity_type.value}
sdk_search = await aexa.websets.searches.create( sdk_search = aexa.websets.searches.create(
webset_id=input_data.webset_id, params=payload webset_id=input_data.webset_id, params=payload
) )

View File

@@ -596,10 +596,10 @@ def extract_openai_tool_calls(response) -> list[ToolContentBlock] | None:
def get_parallel_tool_calls_param( def get_parallel_tool_calls_param(
llm_model: LlmModel, parallel_tool_calls: bool | None llm_model: LlmModel, parallel_tool_calls: bool | None
) -> bool | openai.Omit: ):
"""Get the appropriate parallel_tool_calls parameter for OpenAI-compatible APIs.""" """Get the appropriate parallel_tool_calls parameter for OpenAI-compatible APIs."""
if llm_model.startswith("o") or parallel_tool_calls is None: if llm_model.startswith("o") or parallel_tool_calls is None:
return openai.omit return openai.NOT_GIVEN
return parallel_tool_calls return parallel_tool_calls

View File

@@ -319,6 +319,8 @@ class BlockSchema(BaseModel):
"credentials_provider": [config.get("provider", "google")], "credentials_provider": [config.get("provider", "google")],
"credentials_types": [config.get("type", "oauth2")], "credentials_types": [config.get("type", "oauth2")],
"credentials_scopes": config.get("scopes"), "credentials_scopes": config.get("scopes"),
"is_auto_credential": True,
"input_field_name": info["field_name"],
} }
result[kwarg_name] = CredentialsFieldInfo.model_validate( result[kwarg_name] = CredentialsFieldInfo.model_validate(
auto_schema, by_alias=True auto_schema, by_alias=True

View File

@@ -447,8 +447,7 @@ class GraphModel(Graph, GraphMeta):
@computed_field @computed_field
@property @property
def credentials_input_schema(self) -> dict[str, Any]: def credentials_input_schema(self) -> dict[str, Any]:
graph_credentials_inputs = self.aggregate_credentials_inputs() graph_credentials_inputs = self.regular_credentials_inputs
logger.debug( logger.debug(
f"Combined credentials input fields for graph #{self.id} ({self.name}): " f"Combined credentials input fields for graph #{self.id} ({self.name}): "
f"{graph_credentials_inputs}" f"{graph_credentials_inputs}"
@@ -604,6 +603,28 @@ class GraphModel(Graph, GraphMeta):
for key, (field_info, node_field_pairs) in combined.items() for key, (field_info, node_field_pairs) in combined.items()
} }
@property
def regular_credentials_inputs(
self,
) -> dict[str, tuple[CredentialsFieldInfo, set[tuple[str, str]], bool]]:
"""Credentials that need explicit user mapping (CredentialsMetaInput fields)."""
return {
k: v
for k, v in self.aggregate_credentials_inputs().items()
if not v[0].is_auto_credential
}
@property
def auto_credentials_inputs(
self,
) -> dict[str, tuple[CredentialsFieldInfo, set[tuple[str, str]], bool]]:
"""Credentials embedded in file fields (_credentials_id), resolved at execution time."""
return {
k: v
for k, v in self.aggregate_credentials_inputs().items()
if v[0].is_auto_credential
}
def reassign_ids(self, user_id: str, reassign_graph_id: bool = False): def reassign_ids(self, user_id: str, reassign_graph_id: bool = False):
""" """
Reassigns all IDs in the graph to new UUIDs. Reassigns all IDs in the graph to new UUIDs.
@@ -654,6 +675,16 @@ class GraphModel(Graph, GraphMeta):
) and graph_id in graph_id_map: ) and graph_id in graph_id_map:
node.input_default["graph_id"] = graph_id_map[graph_id] node.input_default["graph_id"] = graph_id_map[graph_id]
# Clear auto-credentials references (e.g., _credentials_id in
# GoogleDriveFile fields) so the new user must re-authenticate
# with their own account
for node in graph.nodes:
if not node.input_default:
continue
for key, value in node.input_default.items():
if isinstance(value, dict) and "_credentials_id" in value:
del value["_credentials_id"]
def validate_graph( def validate_graph(
self, self,
for_run: bool = False, for_run: bool = False,

View File

@@ -463,3 +463,328 @@ def test_node_credentials_optional_with_other_metadata():
assert node.credentials_optional is True assert node.credentials_optional is True
assert node.metadata["position"] == {"x": 100, "y": 200} assert node.metadata["position"] == {"x": 100, "y": 200}
assert node.metadata["customized_name"] == "My Custom Node" assert node.metadata["customized_name"] == "My Custom Node"
# ============================================================================
# Tests for _reassign_ids credential clearing (Fix 3: SECRT-1772)
def test_combine_preserves_is_auto_credential_flag():
"""
CredentialsFieldInfo.combine() must propagate is_auto_credential and
input_field_name to the combined result. Regression test for reviewer
finding that combine() dropped these fields.
"""
from backend.data.model import CredentialsFieldInfo
auto_field = CredentialsFieldInfo.model_validate(
{
"credentials_provider": ["google"],
"credentials_types": ["oauth2"],
"credentials_scopes": ["drive.readonly"],
"is_auto_credential": True,
"input_field_name": "spreadsheet",
},
by_alias=True,
)
# combine() takes *args of (field_info, key) tuples
combined = CredentialsFieldInfo.combine(
(auto_field, ("node-1", "credentials")),
(auto_field, ("node-2", "credentials")),
)
assert len(combined) == 1
group_key = next(iter(combined))
combined_info, combined_keys = combined[group_key]
assert combined_info.is_auto_credential is True
assert combined_info.input_field_name == "spreadsheet"
assert combined_keys == {("node-1", "credentials"), ("node-2", "credentials")}
def test_combine_preserves_regular_credential_defaults():
"""Regular credentials should have is_auto_credential=False after combine()."""
from backend.data.model import CredentialsFieldInfo
regular_field = CredentialsFieldInfo.model_validate(
{
"credentials_provider": ["github"],
"credentials_types": ["api_key"],
"is_auto_credential": False,
},
by_alias=True,
)
combined = CredentialsFieldInfo.combine(
(regular_field, ("node-1", "credentials")),
)
group_key = next(iter(combined))
combined_info, _ = combined[group_key]
assert combined_info.is_auto_credential is False
assert combined_info.input_field_name is None
# ============================================================================
def test_reassign_ids_clears_credentials_id():
"""
[SECRT-1772] _reassign_ids should clear _credentials_id from
GoogleDriveFile-style input_default fields so forked agents
don't retain the original creator's credential references.
"""
from backend.data.graph import GraphModel
node = Node(
id="node-1",
block_id=StoreValueBlock().id,
input_default={
"spreadsheet": {
"_credentials_id": "original-cred-id",
"id": "file-123",
"name": "test.xlsx",
"mimeType": "application/vnd.google-apps.spreadsheet",
"url": "https://docs.google.com/spreadsheets/d/file-123",
},
},
)
graph = Graph(
id="test-graph",
name="Test",
description="Test",
nodes=[node],
links=[],
)
GraphModel._reassign_ids(graph, user_id="new-user", graph_id_map={})
# _credentials_id key should be removed (not set to None) so that
# _acquire_auto_credentials correctly errors instead of treating it as chained data
assert "_credentials_id" not in graph.nodes[0].input_default["spreadsheet"]
def test_reassign_ids_preserves_non_credential_fields():
"""
Regression guard: _reassign_ids should NOT modify non-credential fields
like name, mimeType, id, url.
"""
from backend.data.graph import GraphModel
node = Node(
id="node-1",
block_id=StoreValueBlock().id,
input_default={
"spreadsheet": {
"_credentials_id": "cred-abc",
"id": "file-123",
"name": "test.xlsx",
"mimeType": "application/vnd.google-apps.spreadsheet",
"url": "https://docs.google.com/spreadsheets/d/file-123",
},
},
)
graph = Graph(
id="test-graph",
name="Test",
description="Test",
nodes=[node],
links=[],
)
GraphModel._reassign_ids(graph, user_id="new-user", graph_id_map={})
field = graph.nodes[0].input_default["spreadsheet"]
assert field["id"] == "file-123"
assert field["name"] == "test.xlsx"
assert field["mimeType"] == "application/vnd.google-apps.spreadsheet"
assert field["url"] == "https://docs.google.com/spreadsheets/d/file-123"
def test_reassign_ids_handles_no_credentials():
"""
Regression guard: _reassign_ids should not error when input_default
has no dict fields with _credentials_id.
"""
from backend.data.graph import GraphModel
node = Node(
id="node-1",
block_id=StoreValueBlock().id,
input_default={
"input": "some value",
"another_input": 42,
},
)
graph = Graph(
id="test-graph",
name="Test",
description="Test",
nodes=[node],
links=[],
)
GraphModel._reassign_ids(graph, user_id="new-user", graph_id_map={})
# Should not error, fields unchanged
assert graph.nodes[0].input_default["input"] == "some value"
assert graph.nodes[0].input_default["another_input"] == 42
def test_reassign_ids_handles_multiple_credential_fields():
"""
[SECRT-1772] When a node has multiple dict fields with _credentials_id,
ALL of them should be cleared.
"""
from backend.data.graph import GraphModel
node = Node(
id="node-1",
block_id=StoreValueBlock().id,
input_default={
"spreadsheet": {
"_credentials_id": "cred-1",
"id": "file-1",
"name": "file1.xlsx",
},
"doc_file": {
"_credentials_id": "cred-2",
"id": "file-2",
"name": "file2.docx",
},
"plain_input": "not a dict",
},
)
graph = Graph(
id="test-graph",
name="Test",
description="Test",
nodes=[node],
links=[],
)
GraphModel._reassign_ids(graph, user_id="new-user", graph_id_map={})
assert "_credentials_id" not in graph.nodes[0].input_default["spreadsheet"]
assert "_credentials_id" not in graph.nodes[0].input_default["doc_file"]
assert graph.nodes[0].input_default["plain_input"] == "not a dict"
# ============================================================================
# Tests for discriminate() field propagation
def test_discriminate_preserves_is_auto_credential_flag():
"""
CredentialsFieldInfo.discriminate() must propagate is_auto_credential and
input_field_name to the discriminated result. Regression test for
discriminate() dropping these fields (same class of bug as combine()).
"""
from backend.data.model import CredentialsFieldInfo
auto_field = CredentialsFieldInfo.model_validate(
{
"credentials_provider": ["google", "openai"],
"credentials_types": ["oauth2"],
"credentials_scopes": ["drive.readonly"],
"is_auto_credential": True,
"input_field_name": "spreadsheet",
"discriminator": "model",
"discriminator_mapping": {"gpt-4": "openai", "gemini": "google"},
},
by_alias=True,
)
discriminated = auto_field.discriminate("gemini")
assert discriminated.is_auto_credential is True
assert discriminated.input_field_name == "spreadsheet"
assert discriminated.provider == frozenset(["google"])
def test_discriminate_preserves_regular_credential_defaults():
"""Regular credentials should have is_auto_credential=False after discriminate()."""
from backend.data.model import CredentialsFieldInfo
regular_field = CredentialsFieldInfo.model_validate(
{
"credentials_provider": ["google", "openai"],
"credentials_types": ["api_key"],
"is_auto_credential": False,
"discriminator": "model",
"discriminator_mapping": {"gpt-4": "openai", "gemini": "google"},
},
by_alias=True,
)
discriminated = regular_field.discriminate("gpt-4")
assert discriminated.is_auto_credential is False
assert discriminated.input_field_name is None
assert discriminated.provider == frozenset(["openai"])
# ============================================================================
# Tests for credentials_input_schema excluding auto_credentials
def test_credentials_input_schema_excludes_auto_creds():
"""
GraphModel.credentials_input_schema should exclude auto_credentials
(is_auto_credential=True) from the schema. Auto_credentials are
transparently resolved at execution time via file picker data.
"""
from datetime import datetime, timezone
from unittest.mock import PropertyMock, patch
from backend.data.graph import GraphModel, NodeModel
from backend.data.model import CredentialsFieldInfo
regular_field_info = CredentialsFieldInfo.model_validate(
{
"credentials_provider": ["github"],
"credentials_types": ["api_key"],
"is_auto_credential": False,
},
by_alias=True,
)
graph = GraphModel(
id="test-graph",
version=1,
name="Test",
description="Test",
user_id="test-user",
created_at=datetime.now(timezone.utc),
nodes=[
NodeModel(
id="node-1",
block_id=StoreValueBlock().id,
input_default={},
graph_id="test-graph",
graph_version=1,
),
],
links=[],
)
# Mock regular_credentials_inputs to return only the non-auto field (3-tuple)
regular_only = {
"github_credentials": (
regular_field_info,
{("node-1", "credentials")},
True,
),
}
with patch.object(
type(graph),
"regular_credentials_inputs",
new_callable=PropertyMock,
return_value=regular_only,
):
schema = graph.credentials_input_schema
field_names = set(schema.get("properties", {}).keys())
# Should include regular credential but NOT auto_credential
assert "github_credentials" in field_names
assert "google_credentials" not in field_names

View File

@@ -571,6 +571,8 @@ class CredentialsFieldInfo(BaseModel, Generic[CP, CT]):
discriminator: Optional[str] = None discriminator: Optional[str] = None
discriminator_mapping: Optional[dict[str, CP]] = None discriminator_mapping: Optional[dict[str, CP]] = None
discriminator_values: set[Any] = Field(default_factory=set) discriminator_values: set[Any] = Field(default_factory=set)
is_auto_credential: bool = False
input_field_name: Optional[str] = None
@classmethod @classmethod
def combine( def combine(
@@ -651,6 +653,9 @@ class CredentialsFieldInfo(BaseModel, Generic[CP, CT]):
+ "_credentials" + "_credentials"
) )
# Propagate is_auto_credential from the combined field.
# All fields in a group should share the same is_auto_credential
# value since auto and regular credentials serve different purposes.
result[group_key] = ( result[group_key] = (
CredentialsFieldInfo[CP, CT]( CredentialsFieldInfo[CP, CT](
credentials_provider=combined.provider, credentials_provider=combined.provider,
@@ -659,6 +664,8 @@ class CredentialsFieldInfo(BaseModel, Generic[CP, CT]):
discriminator=combined.discriminator, discriminator=combined.discriminator,
discriminator_mapping=combined.discriminator_mapping, discriminator_mapping=combined.discriminator_mapping,
discriminator_values=set(all_discriminator_values), discriminator_values=set(all_discriminator_values),
is_auto_credential=combined.is_auto_credential,
input_field_name=combined.input_field_name,
), ),
combined_keys, combined_keys,
) )
@@ -684,6 +691,8 @@ class CredentialsFieldInfo(BaseModel, Generic[CP, CT]):
discriminator=self.discriminator, discriminator=self.discriminator,
discriminator_mapping=self.discriminator_mapping, discriminator_mapping=self.discriminator_mapping,
discriminator_values=self.discriminator_values, discriminator_values=self.discriminator_values,
is_auto_credential=self.is_auto_credential,
input_field_name=self.input_field_name,
) )

View File

@@ -172,6 +172,81 @@ def execute_graph(
T = TypeVar("T") T = TypeVar("T")
async def _acquire_auto_credentials(
input_model: type[BlockSchema],
input_data: dict[str, Any],
creds_manager: "IntegrationCredentialsManager",
user_id: str,
) -> tuple[dict[str, Any], list[AsyncRedisLock]]:
"""
Resolve auto_credentials from GoogleDriveFileField-style inputs.
Returns:
(extra_exec_kwargs, locks): kwargs to inject into block execution, and
credential locks to release after execution completes.
"""
extra_exec_kwargs: dict[str, Any] = {}
locks: list[AsyncRedisLock] = []
# NOTE: If a block ever has multiple auto-credential fields, a ValueError
# on a later field will strand locks acquired for earlier fields. They'll
# auto-expire via Redis TTL, but add a try/except to release partial locks
# if that becomes a real scenario.
for kwarg_name, info in input_model.get_auto_credentials_fields().items():
field_name = info["field_name"]
field_data = input_data.get(field_name)
if field_data and isinstance(field_data, dict):
# Check if _credentials_id key exists in the field data
if "_credentials_id" in field_data:
cred_id = field_data["_credentials_id"]
if cred_id:
# Credential ID provided - acquire credentials
provider = info.get("config", {}).get(
"provider", "external service"
)
file_name = field_data.get("name", "selected file")
try:
credentials, lock = await creds_manager.acquire(
user_id, cred_id
)
locks.append(lock)
extra_exec_kwargs[kwarg_name] = credentials
except ValueError:
raise ValueError(
f"{provider.capitalize()} credentials for "
f"'{file_name}' in field '{field_name}' are not "
f"available in your account. "
f"This can happen if the agent was created by another "
f"user or the credentials were deleted. "
f"Please open the agent in the builder and re-select "
f"the file to authenticate with your own account."
)
# else: _credentials_id is explicitly None, skip (chained data)
else:
# _credentials_id key missing entirely - this is an error
provider = info.get("config", {}).get("provider", "external service")
file_name = field_data.get("name", "selected file")
raise ValueError(
f"Authentication missing for '{file_name}' in field "
f"'{field_name}'. Please re-select the file to authenticate "
f"with {provider.capitalize()}."
)
elif field_data is None and field_name not in input_data:
# Field not in input_data at all = connected from upstream block, skip
pass
else:
# field_data is None/empty but key IS in input_data = user didn't select
provider = info.get("config", {}).get("provider", "external service")
raise ValueError(
f"No file selected for '{field_name}'. "
f"Please select a file to provide "
f"{provider.capitalize()} authentication."
)
return extra_exec_kwargs, locks
async def execute_node( async def execute_node(
node: Node, node: Node,
data: NodeExecutionEntry, data: NodeExecutionEntry,
@@ -271,41 +346,14 @@ async def execute_node(
extra_exec_kwargs[field_name] = credentials extra_exec_kwargs[field_name] = credentials
# Handle auto-generated credentials (e.g., from GoogleDriveFileInput) # Handle auto-generated credentials (e.g., from GoogleDriveFileInput)
for kwarg_name, info in input_model.get_auto_credentials_fields().items(): auto_extra_kwargs, auto_locks = await _acquire_auto_credentials(
field_name = info["field_name"] input_model=input_model,
field_data = input_data.get(field_name) input_data=input_data,
if field_data and isinstance(field_data, dict): creds_manager=creds_manager,
# Check if _credentials_id key exists in the field data user_id=user_id,
if "_credentials_id" in field_data: )
cred_id = field_data["_credentials_id"] extra_exec_kwargs.update(auto_extra_kwargs)
if cred_id: creds_locks.extend(auto_locks)
# Credential ID provided - acquire credentials
provider = info.get("config", {}).get(
"provider", "external service"
)
file_name = field_data.get("name", "selected file")
try:
credentials, lock = await creds_manager.acquire(
user_id, cred_id
)
creds_locks.append(lock)
extra_exec_kwargs[kwarg_name] = credentials
except ValueError:
# Credential was deleted or doesn't exist
raise ValueError(
f"Authentication expired for '{file_name}' in field '{field_name}'. "
f"The saved {provider.capitalize()} credentials no longer exist. "
f"Please re-select the file to re-authenticate."
)
# else: _credentials_id is explicitly None, skip credentials (for chained data)
else:
# _credentials_id key missing entirely - this is an error
provider = info.get("config", {}).get("provider", "external service")
file_name = field_data.get("name", "selected file")
raise ValueError(
f"Authentication missing for '{file_name}' in field '{field_name}'. "
f"Please re-select the file to authenticate with {provider.capitalize()}."
)
output_size = 0 output_size = 0

View File

@@ -0,0 +1,320 @@
"""
Tests for auto_credentials handling in execute_node().
These test the _acquire_auto_credentials() helper function extracted from
execute_node() (manager.py lines 273-308).
"""
import pytest
from pytest_mock import MockerFixture
@pytest.fixture
def google_drive_file_data():
return {
"valid": {
"_credentials_id": "cred-id-123",
"id": "file-123",
"name": "test.xlsx",
"mimeType": "application/vnd.google-apps.spreadsheet",
},
"chained": {
"_credentials_id": None,
"id": "file-456",
"name": "chained.xlsx",
"mimeType": "application/vnd.google-apps.spreadsheet",
},
"missing_key": {
"id": "file-789",
"name": "bad.xlsx",
"mimeType": "application/vnd.google-apps.spreadsheet",
},
}
@pytest.fixture
def mock_input_model(mocker: MockerFixture):
"""Create a mock input model with get_auto_credentials_fields() returning one field."""
input_model = mocker.MagicMock()
input_model.get_auto_credentials_fields.return_value = {
"credentials": {
"field_name": "spreadsheet",
"config": {
"provider": "google",
"type": "oauth2",
"scopes": ["https://www.googleapis.com/auth/drive.readonly"],
},
}
}
return input_model
@pytest.fixture
def mock_creds_manager(mocker: MockerFixture):
manager = mocker.AsyncMock()
mock_lock = mocker.AsyncMock()
mock_creds = mocker.MagicMock()
mock_creds.id = "cred-id-123"
mock_creds.provider = "google"
manager.acquire.return_value = (mock_creds, mock_lock)
return manager, mock_creds, mock_lock
@pytest.mark.asyncio
async def test_auto_credentials_happy_path(
mocker: MockerFixture,
google_drive_file_data,
mock_input_model,
mock_creds_manager,
):
"""When field_data has a valid _credentials_id, credentials should be acquired."""
from backend.executor.manager import _acquire_auto_credentials
manager, mock_creds, mock_lock = mock_creds_manager
input_data = {"spreadsheet": google_drive_file_data["valid"]}
extra_kwargs, locks = await _acquire_auto_credentials(
input_model=mock_input_model,
input_data=input_data,
creds_manager=manager,
user_id="user-1",
)
manager.acquire.assert_called_once_with("user-1", "cred-id-123")
assert extra_kwargs["credentials"] == mock_creds
assert mock_lock in locks
@pytest.mark.asyncio
async def test_auto_credentials_field_none_static_raises(
mocker: MockerFixture,
mock_input_model,
mock_creds_manager,
):
"""
[THE BUG FIX TEST — OPEN-2895]
When field_data is None and the key IS in input_data (user didn't select a file),
should raise ValueError instead of silently skipping.
"""
from backend.executor.manager import _acquire_auto_credentials
manager, _, _ = mock_creds_manager
# Key is present but value is None = user didn't select a file
input_data = {"spreadsheet": None}
with pytest.raises(ValueError, match="No file selected"):
await _acquire_auto_credentials(
input_model=mock_input_model,
input_data=input_data,
creds_manager=manager,
user_id="user-1",
)
@pytest.mark.asyncio
async def test_auto_credentials_field_absent_skips(
mocker: MockerFixture,
mock_input_model,
mock_creds_manager,
):
"""
When the field key is NOT in input_data at all (upstream connection),
should skip without error.
"""
from backend.executor.manager import _acquire_auto_credentials
manager, _, _ = mock_creds_manager
# Key not present = connected from upstream block
input_data = {}
extra_kwargs, locks = await _acquire_auto_credentials(
input_model=mock_input_model,
input_data=input_data,
creds_manager=manager,
user_id="user-1",
)
manager.acquire.assert_not_called()
assert "credentials" not in extra_kwargs
assert locks == []
@pytest.mark.asyncio
async def test_auto_credentials_chained_cred_id_none(
mocker: MockerFixture,
google_drive_file_data,
mock_input_model,
mock_creds_manager,
):
"""
When _credentials_id is explicitly None (chained data from upstream),
should skip credential acquisition.
"""
from backend.executor.manager import _acquire_auto_credentials
manager, _, _ = mock_creds_manager
input_data = {"spreadsheet": google_drive_file_data["chained"]}
extra_kwargs, locks = await _acquire_auto_credentials(
input_model=mock_input_model,
input_data=input_data,
creds_manager=manager,
user_id="user-1",
)
manager.acquire.assert_not_called()
assert "credentials" not in extra_kwargs
@pytest.mark.asyncio
async def test_auto_credentials_missing_cred_id_key_raises(
mocker: MockerFixture,
google_drive_file_data,
mock_input_model,
mock_creds_manager,
):
"""
When _credentials_id key is missing entirely from field_data dict,
should raise ValueError.
"""
from backend.executor.manager import _acquire_auto_credentials
manager, _, _ = mock_creds_manager
input_data = {"spreadsheet": google_drive_file_data["missing_key"]}
with pytest.raises(ValueError, match="Authentication missing"):
await _acquire_auto_credentials(
input_model=mock_input_model,
input_data=input_data,
creds_manager=manager,
user_id="user-1",
)
@pytest.mark.asyncio
async def test_auto_credentials_ownership_mismatch_error(
mocker: MockerFixture,
google_drive_file_data,
mock_input_model,
mock_creds_manager,
):
"""
[SECRT-1772] When acquire() raises ValueError (credential belongs to another user),
the error message should mention 'not available' (not 'expired').
"""
from backend.executor.manager import _acquire_auto_credentials
manager, _, _ = mock_creds_manager
manager.acquire.side_effect = ValueError(
"Credentials #cred-id-123 for user #user-2 not found"
)
input_data = {"spreadsheet": google_drive_file_data["valid"]}
with pytest.raises(ValueError, match="not available in your account"):
await _acquire_auto_credentials(
input_model=mock_input_model,
input_data=input_data,
creds_manager=manager,
user_id="user-2",
)
@pytest.mark.asyncio
async def test_auto_credentials_deleted_credential_error(
mocker: MockerFixture,
google_drive_file_data,
mock_input_model,
mock_creds_manager,
):
"""
[SECRT-1772] When acquire() raises ValueError (credential was deleted),
the error message should mention 'not available' (not 'expired').
"""
from backend.executor.manager import _acquire_auto_credentials
manager, _, _ = mock_creds_manager
manager.acquire.side_effect = ValueError(
"Credentials #cred-id-123 for user #user-1 not found"
)
input_data = {"spreadsheet": google_drive_file_data["valid"]}
with pytest.raises(ValueError, match="not available in your account"):
await _acquire_auto_credentials(
input_model=mock_input_model,
input_data=input_data,
creds_manager=manager,
user_id="user-1",
)
@pytest.mark.asyncio
async def test_auto_credentials_lock_appended(
mocker: MockerFixture,
google_drive_file_data,
mock_input_model,
mock_creds_manager,
):
"""Lock from acquire() should be included in returned locks list."""
from backend.executor.manager import _acquire_auto_credentials
manager, _, mock_lock = mock_creds_manager
input_data = {"spreadsheet": google_drive_file_data["valid"]}
extra_kwargs, locks = await _acquire_auto_credentials(
input_model=mock_input_model,
input_data=input_data,
creds_manager=manager,
user_id="user-1",
)
assert len(locks) == 1
assert locks[0] is mock_lock
@pytest.mark.asyncio
async def test_auto_credentials_multiple_fields(
mocker: MockerFixture,
mock_creds_manager,
):
"""When there are multiple auto_credentials fields, only valid ones should acquire."""
from backend.executor.manager import _acquire_auto_credentials
manager, mock_creds, mock_lock = mock_creds_manager
input_model = mocker.MagicMock()
input_model.get_auto_credentials_fields.return_value = {
"credentials": {
"field_name": "spreadsheet",
"config": {"provider": "google", "type": "oauth2"},
},
"credentials2": {
"field_name": "doc_file",
"config": {"provider": "google", "type": "oauth2"},
},
}
input_data = {
"spreadsheet": {
"_credentials_id": "cred-id-123",
"id": "file-1",
"name": "file1.xlsx",
},
"doc_file": {
"_credentials_id": None,
"id": "file-2",
"name": "chained.doc",
},
}
extra_kwargs, locks = await _acquire_auto_credentials(
input_model=input_model,
input_data=input_data,
creds_manager=manager,
user_id="user-1",
)
# Only the first field should have acquired credentials
manager.acquire.assert_called_once_with("user-1", "cred-id-123")
assert "credentials" in extra_kwargs
assert "credentials2" not in extra_kwargs
assert len(locks) == 1

View File

@@ -259,7 +259,8 @@ async def _validate_node_input_credentials(
# Find any fields of type CredentialsMetaInput # Find any fields of type CredentialsMetaInput
credentials_fields = block.input_schema.get_credentials_fields() credentials_fields = block.input_schema.get_credentials_fields()
if not credentials_fields: auto_credentials_fields = block.input_schema.get_auto_credentials_fields()
if not credentials_fields and not auto_credentials_fields:
continue continue
# Track if any credential field is missing for this node # Track if any credential field is missing for this node
@@ -339,6 +340,47 @@ async def _validate_node_input_credentials(
] = "Invalid credentials: type/provider mismatch" ] = "Invalid credentials: type/provider mismatch"
continue continue
# Validate auto-credentials (GoogleDriveFileField-based)
# These have _credentials_id embedded in the file field data
if auto_credentials_fields:
for _kwarg_name, info in auto_credentials_fields.items():
field_name = info["field_name"]
# Check input_default and nodes_input_masks for the field value
field_value = node.input_default.get(field_name)
if nodes_input_masks and node.id in nodes_input_masks:
field_value = nodes_input_masks[node.id].get(
field_name, field_value
)
if field_value and isinstance(field_value, dict):
if "_credentials_id" not in field_value:
# Key removed (e.g., on fork) — needs re-auth
has_missing_credentials = True
credential_errors[node.id][field_name] = (
"Authentication missing for the selected file. "
"Please re-select the file to authenticate with "
"your own account."
)
continue
cred_id = field_value.get("_credentials_id")
if cred_id and isinstance(cred_id, str):
try:
creds_store = get_integration_credentials_store()
creds = await creds_store.get_creds_by_id(user_id, cred_id)
except Exception as e:
has_missing_credentials = True
credential_errors[node.id][
field_name
] = f"Credentials not available: {e}"
continue
if not creds:
has_missing_credentials = True
credential_errors[node.id][field_name] = (
"The saved credentials are not available "
"for your account. Please re-select the file to "
"authenticate with your own account."
)
# If node has optional credentials and any are missing, mark for skipping # If node has optional credentials and any are missing, mark for skipping
# But only if there are no other errors for this node # But only if there are no other errors for this node
if ( if (
@@ -370,8 +412,9 @@ def make_node_credentials_input_map(
""" """
result: dict[str, dict[str, JsonValue]] = {} result: dict[str, dict[str, JsonValue]] = {}
# Get aggregated credentials fields for the graph # Only map regular credentials (not auto_credentials, which are resolved
graph_cred_inputs = graph.aggregate_credentials_inputs() # at execution time from _credentials_id in file field data)
graph_cred_inputs = graph.regular_credentials_inputs
for graph_input_name, (_, compatible_node_fields, _) in graph_cred_inputs.items(): for graph_input_name, (_, compatible_node_fields, _) in graph_cred_inputs.items():
# Best-effort map: skip missing items # Best-effort map: skip missing items

View File

@@ -907,3 +907,335 @@ async def test_stop_graph_execution_cascades_to_child_with_reviews(
# Verify both parent and child status updates # Verify both parent and child status updates
assert mock_execution_db.update_graph_execution_stats.call_count >= 1 assert mock_execution_db.update_graph_execution_stats.call_count >= 1
# ============================================================================
# Tests for auto_credentials validation in _validate_node_input_credentials
# (Fix 3: SECRT-1772 + Fix 4: Path 4)
# ============================================================================
@pytest.mark.asyncio
async def test_validate_node_input_credentials_auto_creds_valid(
mocker: MockerFixture,
):
"""
[SECRT-1772] When a node has auto_credentials with a valid _credentials_id
that exists in the store, validation should pass without errors.
"""
from backend.executor.utils import _validate_node_input_credentials
mock_node = mocker.MagicMock()
mock_node.id = "node-with-auto-creds"
mock_node.credentials_optional = False
mock_node.input_default = {
"spreadsheet": {
"_credentials_id": "valid-cred-id",
"id": "file-123",
"name": "test.xlsx",
}
}
mock_block = mocker.MagicMock()
# No regular credentials fields
mock_block.input_schema.get_credentials_fields.return_value = {}
# Has auto_credentials fields
mock_block.input_schema.get_auto_credentials_fields.return_value = {
"credentials": {
"field_name": "spreadsheet",
"config": {"provider": "google", "type": "oauth2"},
}
}
mock_node.block = mock_block
mock_graph = mocker.MagicMock()
mock_graph.nodes = [mock_node]
# Mock the credentials store to return valid credentials
mock_store = mocker.MagicMock()
mock_creds = mocker.MagicMock()
mock_creds.id = "valid-cred-id"
mock_store.get_creds_by_id = mocker.AsyncMock(return_value=mock_creds)
mocker.patch(
"backend.executor.utils.get_integration_credentials_store",
return_value=mock_store,
)
errors, nodes_to_skip = await _validate_node_input_credentials(
graph=mock_graph,
user_id="test-user",
nodes_input_masks=None,
)
assert mock_node.id not in errors
assert mock_node.id not in nodes_to_skip
@pytest.mark.asyncio
async def test_validate_node_input_credentials_auto_creds_missing(
mocker: MockerFixture,
):
"""
[SECRT-1772] When a node has auto_credentials with a _credentials_id
that doesn't exist for the current user, validation should report an error.
"""
from backend.executor.utils import _validate_node_input_credentials
mock_node = mocker.MagicMock()
mock_node.id = "node-with-bad-auto-creds"
mock_node.credentials_optional = False
mock_node.input_default = {
"spreadsheet": {
"_credentials_id": "other-users-cred-id",
"id": "file-123",
"name": "test.xlsx",
}
}
mock_block = mocker.MagicMock()
mock_block.input_schema.get_credentials_fields.return_value = {}
mock_block.input_schema.get_auto_credentials_fields.return_value = {
"credentials": {
"field_name": "spreadsheet",
"config": {"provider": "google", "type": "oauth2"},
}
}
mock_node.block = mock_block
mock_graph = mocker.MagicMock()
mock_graph.nodes = [mock_node]
# Mock the credentials store to return None (cred not found for this user)
mock_store = mocker.MagicMock()
mock_store.get_creds_by_id = mocker.AsyncMock(return_value=None)
mocker.patch(
"backend.executor.utils.get_integration_credentials_store",
return_value=mock_store,
)
errors, nodes_to_skip = await _validate_node_input_credentials(
graph=mock_graph,
user_id="different-user",
nodes_input_masks=None,
)
assert mock_node.id in errors
assert "spreadsheet" in errors[mock_node.id]
assert "not available" in errors[mock_node.id]["spreadsheet"].lower()
@pytest.mark.asyncio
async def test_validate_node_input_credentials_both_regular_and_auto(
mocker: MockerFixture,
):
"""
[SECRT-1772] A node that has BOTH regular credentials AND auto_credentials
should have both validated.
"""
from backend.executor.utils import _validate_node_input_credentials
mock_node = mocker.MagicMock()
mock_node.id = "node-with-both-creds"
mock_node.credentials_optional = False
mock_node.input_default = {
"credentials": {
"id": "regular-cred-id",
"provider": "github",
"type": "api_key",
},
"spreadsheet": {
"_credentials_id": "auto-cred-id",
"id": "file-123",
"name": "test.xlsx",
},
}
mock_credentials_field_type = mocker.MagicMock()
mock_credentials_meta = mocker.MagicMock()
mock_credentials_meta.id = "regular-cred-id"
mock_credentials_meta.provider = "github"
mock_credentials_meta.type = "api_key"
mock_credentials_field_type.model_validate.return_value = mock_credentials_meta
mock_block = mocker.MagicMock()
# Regular credentials field
mock_block.input_schema.get_credentials_fields.return_value = {
"credentials": mock_credentials_field_type,
}
# Auto-credentials field
mock_block.input_schema.get_auto_credentials_fields.return_value = {
"auto_credentials": {
"field_name": "spreadsheet",
"config": {"provider": "google", "type": "oauth2"},
}
}
mock_node.block = mock_block
mock_graph = mocker.MagicMock()
mock_graph.nodes = [mock_node]
# Mock the credentials store to return valid credentials for both
mock_store = mocker.MagicMock()
mock_regular_creds = mocker.MagicMock()
mock_regular_creds.id = "regular-cred-id"
mock_regular_creds.provider = "github"
mock_regular_creds.type = "api_key"
mock_auto_creds = mocker.MagicMock()
mock_auto_creds.id = "auto-cred-id"
def get_creds_side_effect(user_id, cred_id):
if cred_id == "regular-cred-id":
return mock_regular_creds
elif cred_id == "auto-cred-id":
return mock_auto_creds
return None
mock_store.get_creds_by_id = mocker.AsyncMock(side_effect=get_creds_side_effect)
mocker.patch(
"backend.executor.utils.get_integration_credentials_store",
return_value=mock_store,
)
errors, nodes_to_skip = await _validate_node_input_credentials(
graph=mock_graph,
user_id="test-user",
nodes_input_masks=None,
)
# Both should validate successfully - no errors
assert mock_node.id not in errors
assert mock_node.id not in nodes_to_skip
@pytest.mark.asyncio
async def test_validate_node_input_credentials_auto_creds_skipped_when_none(
mocker: MockerFixture,
):
"""
When a node has auto_credentials but the field value has _credentials_id=None
(e.g., from upstream connection), validation should skip it without error.
"""
from backend.executor.utils import _validate_node_input_credentials
mock_node = mocker.MagicMock()
mock_node.id = "node-with-chained-auto-creds"
mock_node.credentials_optional = False
mock_node.input_default = {
"spreadsheet": {
"_credentials_id": None,
"id": "file-123",
"name": "test.xlsx",
}
}
mock_block = mocker.MagicMock()
mock_block.input_schema.get_credentials_fields.return_value = {}
mock_block.input_schema.get_auto_credentials_fields.return_value = {
"credentials": {
"field_name": "spreadsheet",
"config": {"provider": "google", "type": "oauth2"},
}
}
mock_node.block = mock_block
mock_graph = mocker.MagicMock()
mock_graph.nodes = [mock_node]
errors, nodes_to_skip = await _validate_node_input_credentials(
graph=mock_graph,
user_id="test-user",
nodes_input_masks=None,
)
# No error - chained data with None cred_id is valid
assert mock_node.id not in errors
# ============================================================================
# Tests for CredentialsFieldInfo auto_credential tag (Fix 4: Path 4)
# ============================================================================
def test_credentials_field_info_auto_credential_tag():
"""
[Path 4] CredentialsFieldInfo should support is_auto_credential and
input_field_name fields for distinguishing auto from regular credentials.
"""
from backend.data.model import CredentialsFieldInfo
# Regular credential should have is_auto_credential=False by default
regular = CredentialsFieldInfo.model_validate(
{
"credentials_provider": ["github"],
"credentials_types": ["api_key"],
},
by_alias=True,
)
assert regular.is_auto_credential is False
assert regular.input_field_name is None
# Auto credential should have is_auto_credential=True
auto = CredentialsFieldInfo.model_validate(
{
"credentials_provider": ["google"],
"credentials_types": ["oauth2"],
"is_auto_credential": True,
"input_field_name": "spreadsheet",
},
by_alias=True,
)
assert auto.is_auto_credential is True
assert auto.input_field_name == "spreadsheet"
def test_make_node_credentials_input_map_excludes_auto_creds(
mocker: MockerFixture,
):
"""
[Path 4] make_node_credentials_input_map should only include regular credentials,
not auto_credentials (which are resolved at execution time).
"""
from backend.data.model import CredentialsFieldInfo, CredentialsMetaInput
from backend.executor.utils import make_node_credentials_input_map
from backend.integrations.providers import ProviderName
# Create a mock graph with aggregate_credentials_inputs that returns
# both regular and auto credentials
mock_graph = mocker.MagicMock()
regular_field_info = CredentialsFieldInfo.model_validate(
{
"credentials_provider": ["github"],
"credentials_types": ["api_key"],
"is_auto_credential": False,
},
by_alias=True,
)
# Mock regular_credentials_inputs property (auto_credentials are excluded)
mock_graph.regular_credentials_inputs = {
"github_creds": (regular_field_info, {("node-1", "credentials")}, True),
}
graph_credentials_input = {
"github_creds": CredentialsMetaInput(
id="cred-123",
provider=ProviderName("github"),
type="api_key",
),
}
result = make_node_credentials_input_map(mock_graph, graph_credentials_input)
# Regular credentials should be mapped
assert "node-1" in result
assert "credentials" in result["node-1"]
# Auto credentials should NOT appear in the result
# (they would have been mapped to the kwarg_name "credentials" not "spreadsheet")
for node_id, fields in result.items():
for field_name, value in fields.items():
# Verify no auto-credential phantom entries
if isinstance(value, dict):
assert "_credentials_id" not in value

File diff suppressed because it is too large Load Diff

View File

@@ -13,7 +13,6 @@ aio-pika = "^9.5.5"
aiohttp = "^3.10.0" aiohttp = "^3.10.0"
aiodns = "^3.5.0" aiodns = "^3.5.0"
anthropic = "^0.59.0" anthropic = "^0.59.0"
claude-agent-sdk = "^0.1.0"
apscheduler = "^3.11.1" apscheduler = "^3.11.1"
autogpt-libs = { path = "../autogpt_libs", develop = true } autogpt-libs = { path = "../autogpt_libs", develop = true }
bleach = { extras = ["css"], version = "^6.2.0" } bleach = { extras = ["css"], version = "^6.2.0" }
@@ -22,7 +21,7 @@ cryptography = "^45.0"
discord-py = "^2.5.2" discord-py = "^2.5.2"
e2b-code-interpreter = "^1.5.2" e2b-code-interpreter = "^1.5.2"
elevenlabs = "^1.50.0" elevenlabs = "^1.50.0"
fastapi = "^0.128.0" fastapi = "^0.116.1"
feedparser = "^6.0.11" feedparser = "^6.0.11"
flake8 = "^7.3.0" flake8 = "^7.3.0"
google-api-python-client = "^2.177.0" google-api-python-client = "^2.177.0"
@@ -36,7 +35,7 @@ jinja2 = "^3.1.6"
jsonref = "^1.1.0" jsonref = "^1.1.0"
jsonschema = "^4.25.0" jsonschema = "^4.25.0"
langfuse = "^3.11.0" langfuse = "^3.11.0"
launchdarkly-server-sdk = "^9.14.1" launchdarkly-server-sdk = "^9.12.0"
mem0ai = "^0.1.115" mem0ai = "^0.1.115"
moviepy = "^2.1.2" moviepy = "^2.1.2"
ollama = "^0.5.1" ollama = "^0.5.1"
@@ -53,8 +52,8 @@ prometheus-client = "^0.22.1"
prometheus-fastapi-instrumentator = "^7.0.0" prometheus-fastapi-instrumentator = "^7.0.0"
psutil = "^7.0.0" psutil = "^7.0.0"
psycopg2-binary = "^2.9.10" psycopg2-binary = "^2.9.10"
pydantic = { extras = ["email"], version = "^2.12.5" } pydantic = { extras = ["email"], version = "^2.11.7" }
pydantic-settings = "^2.12.0" pydantic-settings = "^2.10.1"
pytest = "^8.4.1" pytest = "^8.4.1"
pytest-asyncio = "^1.1.0" pytest-asyncio = "^1.1.0"
python-dotenv = "^1.1.1" python-dotenv = "^1.1.1"
@@ -66,11 +65,11 @@ sentry-sdk = {extras = ["anthropic", "fastapi", "launchdarkly", "openai", "sqlal
sqlalchemy = "^2.0.40" sqlalchemy = "^2.0.40"
strenum = "^0.4.9" strenum = "^0.4.9"
stripe = "^11.5.0" stripe = "^11.5.0"
supabase = "2.27.2" supabase = "2.17.0"
tenacity = "^9.1.2" tenacity = "^9.1.2"
todoist-api-python = "^2.1.7" todoist-api-python = "^2.1.7"
tweepy = "^4.16.0" tweepy = "^4.16.0"
uvicorn = { extras = ["standard"], version = "^0.40.0" } uvicorn = { extras = ["standard"], version = "^0.35.0" }
websockets = "^15.0" websockets = "^15.0"
youtube-transcript-api = "^1.2.1" youtube-transcript-api = "^1.2.1"
yt-dlp = "2025.12.08" yt-dlp = "2025.12.08"

View File

@@ -12307,9 +12307,7 @@
"title": "Location" "title": "Location"
}, },
"msg": { "type": "string", "title": "Message" }, "msg": { "type": "string", "title": "Message" },
"type": { "type": "string", "title": "Error Type" }, "type": { "type": "string", "title": "Error Type" }
"input": { "title": "Input" },
"ctx": { "type": "object", "title": "Context" }
}, },
"type": "object", "type": "object",
"required": ["loc", "msg", "type"], "required": ["loc", "msg", "type"],

View File

@@ -4,7 +4,9 @@ import { loadScript } from "@/services/scripts/scripts";
export async function loadGoogleAPIPicker(): Promise<void> { export async function loadGoogleAPIPicker(): Promise<void> {
validateWindow(); validateWindow();
await loadScript("https://apis.google.com/js/api.js"); await loadScript("https://apis.google.com/js/api.js", {
referrerPolicy: "no-referrer-when-downgrade",
});
const googleAPI = window.gapi; const googleAPI = window.gapi;
if (!googleAPI) { if (!googleAPI) {
@@ -27,7 +29,9 @@ export async function loadGoogleIdentityServices(): Promise<void> {
throw new Error("Google Identity Services cannot load on server"); throw new Error("Google Identity Services cannot load on server");
} }
await loadScript("https://accounts.google.com/gsi/client"); await loadScript("https://accounts.google.com/gsi/client", {
referrerPolicy: "no-referrer-when-downgrade",
});
const google = window.google; const google = window.google;
if (!google?.accounts?.oauth2) { if (!google?.accounts?.oauth2) {