mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-02-03 11:24:57 -05:00
Compare commits
15 Commits
fix/autopi
...
fix/file-i
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ac826df61b | ||
|
|
a9d12594c2 | ||
|
|
a967b0dc63 | ||
|
|
e5420f8bd9 | ||
|
|
638e150a12 | ||
|
|
678ddde751 | ||
|
|
aef6f57cfd | ||
|
|
14cee1670a | ||
|
|
d81d1ce024 | ||
|
|
2dd341c369 | ||
|
|
1081590384 | ||
|
|
7e37de8e30 | ||
|
|
7ee94d986c | ||
|
|
18a1661fa3 | ||
|
|
b72521daa9 |
@@ -54,7 +54,7 @@ Before proceeding with the installation, ensure your system meets the following
|
||||
### Updated Setup Instructions:
|
||||
We've moved to a fully maintained and regularly updated documentation site.
|
||||
|
||||
👉 [Follow the official self-hosting guide here](https://docs.agpt.co/platform/getting-started/)
|
||||
👉 [Follow the official self-hosting guide here](https://agpt.co/docs/platform/getting-started/getting-started)
|
||||
|
||||
|
||||
This tutorial assumes you have Docker, VSCode, git and npm installed.
|
||||
|
||||
@@ -3,9 +3,13 @@ import logging
|
||||
import time
|
||||
from asyncio import CancelledError
|
||||
from collections.abc import AsyncGenerator
|
||||
from typing import Any
|
||||
from typing import TYPE_CHECKING, Any, cast
|
||||
|
||||
import openai
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from backend.util.prompt import CompressResult
|
||||
|
||||
import orjson
|
||||
from langfuse import get_client
|
||||
from openai import (
|
||||
@@ -15,7 +19,13 @@ from openai import (
|
||||
PermissionDeniedError,
|
||||
RateLimitError,
|
||||
)
|
||||
from openai.types.chat import ChatCompletionChunk, ChatCompletionToolParam
|
||||
from openai.types.chat import (
|
||||
ChatCompletionChunk,
|
||||
ChatCompletionMessageParam,
|
||||
ChatCompletionStreamOptionsParam,
|
||||
ChatCompletionSystemMessageParam,
|
||||
ChatCompletionToolParam,
|
||||
)
|
||||
|
||||
from backend.data.redis_client import get_redis_async
|
||||
from backend.data.understanding import (
|
||||
@@ -794,207 +804,58 @@ def _is_region_blocked_error(error: Exception) -> bool:
|
||||
return "not available in your region" in str(error).lower()
|
||||
|
||||
|
||||
async def _summarize_messages(
|
||||
async def _manage_context_window(
|
||||
messages: list,
|
||||
model: str,
|
||||
api_key: str | None = None,
|
||||
base_url: str | None = None,
|
||||
timeout: float = 30.0,
|
||||
) -> str:
|
||||
"""Summarize a list of messages into concise context.
|
||||
) -> "CompressResult":
|
||||
"""
|
||||
Manage context window using the unified compress_context function.
|
||||
|
||||
Uses the same model as the chat for higher quality summaries.
|
||||
This is a thin wrapper that creates an OpenAI client for summarization
|
||||
and delegates to the shared compression logic in prompt.py.
|
||||
|
||||
Args:
|
||||
messages: List of message dicts to summarize
|
||||
model: Model to use for summarization (same as chat model)
|
||||
api_key: API key for OpenAI client
|
||||
base_url: Base URL for OpenAI client
|
||||
timeout: Request timeout in seconds (default: 30.0)
|
||||
messages: List of messages in OpenAI format
|
||||
model: Model name for token counting and summarization
|
||||
api_key: API key for summarization calls
|
||||
base_url: Base URL for summarization calls
|
||||
|
||||
Returns:
|
||||
Summarized text
|
||||
CompressResult with compacted messages and metadata
|
||||
"""
|
||||
# Format messages for summarization
|
||||
conversation = []
|
||||
for msg in messages:
|
||||
role = msg.get("role", "")
|
||||
content = msg.get("content", "")
|
||||
# Include user, assistant, and tool messages (tool outputs are important context)
|
||||
if content and role in ("user", "assistant", "tool"):
|
||||
conversation.append(f"{role.upper()}: {content}")
|
||||
|
||||
conversation_text = "\n\n".join(conversation)
|
||||
|
||||
# Handle empty conversation
|
||||
if not conversation_text:
|
||||
return "No conversation history available."
|
||||
|
||||
# Truncate conversation to fit within summarization model's context
|
||||
# gpt-4o-mini has 128k context, but we limit to ~25k tokens (~100k chars) for safety
|
||||
MAX_CHARS = 100_000
|
||||
if len(conversation_text) > MAX_CHARS:
|
||||
conversation_text = conversation_text[:MAX_CHARS] + "\n\n[truncated]"
|
||||
|
||||
# Call LLM to summarize
|
||||
import openai
|
||||
|
||||
summarization_client = openai.AsyncOpenAI(
|
||||
api_key=api_key, base_url=base_url, timeout=timeout
|
||||
)
|
||||
from backend.util.prompt import compress_context
|
||||
|
||||
response = await summarization_client.chat.completions.create(
|
||||
model=model,
|
||||
messages=[
|
||||
{
|
||||
"role": "system",
|
||||
"content": (
|
||||
"Create a detailed summary of the conversation so far. "
|
||||
"This summary will be used as context when continuing the conversation.\n\n"
|
||||
"Before writing the summary, analyze each message chronologically to identify:\n"
|
||||
"- User requests and their explicit goals\n"
|
||||
"- Your approach and key decisions made\n"
|
||||
"- Technical specifics (file names, tool outputs, function signatures)\n"
|
||||
"- Errors encountered and resolutions applied\n\n"
|
||||
"You MUST include ALL of the following sections:\n\n"
|
||||
"## 1. Primary Request and Intent\n"
|
||||
"The user's explicit goals and what they are trying to accomplish.\n\n"
|
||||
"## 2. Key Technical Concepts\n"
|
||||
"Technologies, frameworks, tools, and patterns being used or discussed.\n\n"
|
||||
"## 3. Files and Resources Involved\n"
|
||||
"Specific files examined or modified, with relevant snippets and identifiers.\n\n"
|
||||
"## 4. Errors and Fixes\n"
|
||||
"Problems encountered, error messages, and their resolutions. "
|
||||
"Include any user feedback on fixes.\n\n"
|
||||
"## 5. Problem Solving\n"
|
||||
"Issues that have been resolved and how they were addressed.\n\n"
|
||||
"## 6. All User Messages\n"
|
||||
"A complete list of all user inputs (excluding tool outputs) to preserve their exact requests.\n\n"
|
||||
"## 7. Pending Tasks\n"
|
||||
"Work items the user explicitly requested that have not yet been completed.\n\n"
|
||||
"## 8. Current Work\n"
|
||||
"Precise description of what was being worked on most recently, including relevant context.\n\n"
|
||||
"## 9. Next Steps\n"
|
||||
"What should happen next, aligned with the user's most recent requests. "
|
||||
"Include verbatim quotes of recent instructions if relevant."
|
||||
),
|
||||
},
|
||||
{"role": "user", "content": f"Summarize:\n\n{conversation_text}"},
|
||||
],
|
||||
max_tokens=1500,
|
||||
temperature=0.3,
|
||||
)
|
||||
# Convert messages to dict format
|
||||
messages_dict = []
|
||||
for msg in messages:
|
||||
if isinstance(msg, dict):
|
||||
msg_dict = {k: v for k, v in msg.items() if v is not None}
|
||||
else:
|
||||
msg_dict = dict(msg)
|
||||
messages_dict.append(msg_dict)
|
||||
|
||||
summary = response.choices[0].message.content
|
||||
return summary or "No summary available."
|
||||
|
||||
|
||||
def _ensure_tool_pairs_intact(
|
||||
recent_messages: list[dict],
|
||||
all_messages: list[dict],
|
||||
start_index: int,
|
||||
) -> list[dict]:
|
||||
"""
|
||||
Ensure tool_call/tool_response pairs stay together after slicing.
|
||||
|
||||
When slicing messages for context compaction, a naive slice can separate
|
||||
an assistant message containing tool_calls from its corresponding tool
|
||||
response messages. This causes API validation errors (e.g., Anthropic's
|
||||
"unexpected tool_use_id found in tool_result blocks").
|
||||
|
||||
This function checks for orphan tool responses in the slice and extends
|
||||
backwards to include their corresponding assistant messages.
|
||||
|
||||
Args:
|
||||
recent_messages: The sliced messages to validate
|
||||
all_messages: The complete message list (for looking up missing assistants)
|
||||
start_index: The index in all_messages where recent_messages begins
|
||||
|
||||
Returns:
|
||||
A potentially extended list of messages with tool pairs intact
|
||||
"""
|
||||
if not recent_messages:
|
||||
return recent_messages
|
||||
|
||||
# Collect all tool_call_ids from assistant messages in the slice
|
||||
available_tool_call_ids: set[str] = set()
|
||||
for msg in recent_messages:
|
||||
if msg.get("role") == "assistant" and msg.get("tool_calls"):
|
||||
for tc in msg["tool_calls"]:
|
||||
tc_id = tc.get("id")
|
||||
if tc_id:
|
||||
available_tool_call_ids.add(tc_id)
|
||||
|
||||
# Find orphan tool responses (tool messages whose tool_call_id is missing)
|
||||
orphan_tool_call_ids: set[str] = set()
|
||||
for msg in recent_messages:
|
||||
if msg.get("role") == "tool":
|
||||
tc_id = msg.get("tool_call_id")
|
||||
if tc_id and tc_id not in available_tool_call_ids:
|
||||
orphan_tool_call_ids.add(tc_id)
|
||||
|
||||
if not orphan_tool_call_ids:
|
||||
# No orphans, slice is valid
|
||||
return recent_messages
|
||||
|
||||
# Find the assistant messages that contain the orphan tool_call_ids
|
||||
# Search backwards from start_index in all_messages
|
||||
messages_to_prepend: list[dict] = []
|
||||
for i in range(start_index - 1, -1, -1):
|
||||
msg = all_messages[i]
|
||||
if msg.get("role") == "assistant" and msg.get("tool_calls"):
|
||||
msg_tool_ids = {tc.get("id") for tc in msg["tool_calls"] if tc.get("id")}
|
||||
if msg_tool_ids & orphan_tool_call_ids:
|
||||
# This assistant message has tool_calls we need
|
||||
# Also collect its contiguous tool responses that follow it
|
||||
assistant_and_responses: list[dict] = [msg]
|
||||
|
||||
# Scan forward from this assistant to collect tool responses
|
||||
for j in range(i + 1, start_index):
|
||||
following_msg = all_messages[j]
|
||||
if following_msg.get("role") == "tool":
|
||||
tool_id = following_msg.get("tool_call_id")
|
||||
if tool_id and tool_id in msg_tool_ids:
|
||||
assistant_and_responses.append(following_msg)
|
||||
else:
|
||||
# Stop at first non-tool message
|
||||
break
|
||||
|
||||
# Prepend the assistant and its tool responses (maintain order)
|
||||
messages_to_prepend = assistant_and_responses + messages_to_prepend
|
||||
# Mark these as found
|
||||
orphan_tool_call_ids -= msg_tool_ids
|
||||
# Also add this assistant's tool_call_ids to available set
|
||||
available_tool_call_ids |= msg_tool_ids
|
||||
|
||||
if not orphan_tool_call_ids:
|
||||
# Found all missing assistants
|
||||
break
|
||||
|
||||
if orphan_tool_call_ids:
|
||||
# Some tool_call_ids couldn't be resolved - remove those tool responses
|
||||
# This shouldn't happen in normal operation but handles edge cases
|
||||
logger.warning(
|
||||
f"Could not find assistant messages for tool_call_ids: {orphan_tool_call_ids}. "
|
||||
"Removing orphan tool responses."
|
||||
)
|
||||
recent_messages = [
|
||||
msg
|
||||
for msg in recent_messages
|
||||
if not (
|
||||
msg.get("role") == "tool"
|
||||
and msg.get("tool_call_id") in orphan_tool_call_ids
|
||||
# Only create client if api_key is provided (enables summarization)
|
||||
# Use context manager to avoid socket leaks
|
||||
if api_key:
|
||||
async with openai.AsyncOpenAI(
|
||||
api_key=api_key, base_url=base_url, timeout=30.0
|
||||
) as client:
|
||||
return await compress_context(
|
||||
messages=messages_dict,
|
||||
model=model,
|
||||
client=client,
|
||||
)
|
||||
]
|
||||
|
||||
if messages_to_prepend:
|
||||
logger.info(
|
||||
f"Extended recent messages by {len(messages_to_prepend)} to preserve "
|
||||
f"tool_call/tool_response pairs"
|
||||
else:
|
||||
# No API key - use truncation-only mode
|
||||
return await compress_context(
|
||||
messages=messages_dict,
|
||||
model=model,
|
||||
client=None,
|
||||
)
|
||||
return messages_to_prepend + recent_messages
|
||||
|
||||
return recent_messages
|
||||
|
||||
|
||||
async def _stream_chat_chunks(
|
||||
@@ -1022,11 +883,8 @@ async def _stream_chat_chunks(
|
||||
|
||||
logger.info("Starting pure chat stream")
|
||||
|
||||
# Build messages with system prompt prepended
|
||||
messages = session.to_openai_messages()
|
||||
if system_prompt:
|
||||
from openai.types.chat import ChatCompletionSystemMessageParam
|
||||
|
||||
system_message = ChatCompletionSystemMessageParam(
|
||||
role="system",
|
||||
content=system_prompt,
|
||||
@@ -1034,314 +892,38 @@ async def _stream_chat_chunks(
|
||||
messages = [system_message] + messages
|
||||
|
||||
# Apply context window management
|
||||
token_count = 0 # Initialize for exception handler
|
||||
try:
|
||||
from backend.util.prompt import estimate_token_count
|
||||
context_result = await _manage_context_window(
|
||||
messages=messages,
|
||||
model=model,
|
||||
api_key=config.api_key,
|
||||
base_url=config.base_url,
|
||||
)
|
||||
|
||||
# Convert to dict for token counting
|
||||
# OpenAI message types are TypedDicts, so they're already dict-like
|
||||
messages_dict = []
|
||||
for msg in messages:
|
||||
# TypedDict objects are already dicts, just filter None values
|
||||
if isinstance(msg, dict):
|
||||
msg_dict = {k: v for k, v in msg.items() if v is not None}
|
||||
else:
|
||||
# Fallback for unexpected types
|
||||
msg_dict = dict(msg)
|
||||
messages_dict.append(msg_dict)
|
||||
|
||||
# Estimate tokens using appropriate tokenizer
|
||||
# Normalize model name for token counting (tiktoken only supports OpenAI models)
|
||||
token_count_model = model
|
||||
if "/" in model:
|
||||
# Strip provider prefix (e.g., "anthropic/claude-opus-4.5" -> "claude-opus-4.5")
|
||||
token_count_model = model.split("/")[-1]
|
||||
|
||||
# For Claude and other non-OpenAI models, approximate with gpt-4o tokenizer
|
||||
# Most modern LLMs have similar tokenization (~1 token per 4 chars)
|
||||
if "claude" in token_count_model.lower() or not any(
|
||||
known in token_count_model.lower()
|
||||
for known in ["gpt", "o1", "chatgpt", "text-"]
|
||||
):
|
||||
token_count_model = "gpt-4o"
|
||||
|
||||
# Attempt token counting with error handling
|
||||
try:
|
||||
token_count = estimate_token_count(messages_dict, model=token_count_model)
|
||||
except Exception as token_error:
|
||||
# If token counting fails, use gpt-4o as fallback approximation
|
||||
logger.warning(
|
||||
f"Token counting failed for model {token_count_model}: {token_error}. "
|
||||
"Using gpt-4o approximation."
|
||||
)
|
||||
token_count = estimate_token_count(messages_dict, model="gpt-4o")
|
||||
|
||||
# If over threshold, summarize old messages
|
||||
if token_count > 120_000:
|
||||
KEEP_RECENT = 15
|
||||
|
||||
# Check if we have a system prompt at the start
|
||||
has_system_prompt = (
|
||||
len(messages) > 0 and messages[0].get("role") == "system"
|
||||
)
|
||||
|
||||
# Always attempt mitigation when over limit, even with few messages
|
||||
if messages:
|
||||
# Split messages based on whether system prompt exists
|
||||
# Calculate start index for the slice
|
||||
slice_start = max(0, len(messages_dict) - KEEP_RECENT)
|
||||
recent_messages = messages_dict[-KEEP_RECENT:]
|
||||
|
||||
# Ensure tool_call/tool_response pairs stay together
|
||||
# This prevents API errors from orphan tool responses
|
||||
recent_messages = _ensure_tool_pairs_intact(
|
||||
recent_messages, messages_dict, slice_start
|
||||
)
|
||||
|
||||
if has_system_prompt:
|
||||
# Keep system prompt separate, summarize everything between system and recent
|
||||
system_msg = messages[0]
|
||||
old_messages_dict = messages_dict[1:-KEEP_RECENT]
|
||||
else:
|
||||
# No system prompt, summarize everything except recent
|
||||
system_msg = None
|
||||
old_messages_dict = messages_dict[:-KEEP_RECENT]
|
||||
|
||||
# Summarize any non-empty old messages (no minimum threshold)
|
||||
# If we're over the token limit, we need to compress whatever we can
|
||||
if old_messages_dict:
|
||||
# Summarize old messages using the same model as chat
|
||||
summary_text = await _summarize_messages(
|
||||
old_messages_dict,
|
||||
model=model,
|
||||
api_key=config.api_key,
|
||||
base_url=config.base_url,
|
||||
)
|
||||
|
||||
# Build new message list
|
||||
# Use assistant role (not system) to prevent privilege escalation
|
||||
# of user-influenced content to instruction-level authority
|
||||
from openai.types.chat import ChatCompletionAssistantMessageParam
|
||||
|
||||
summary_msg = ChatCompletionAssistantMessageParam(
|
||||
role="assistant",
|
||||
content=(
|
||||
"[Previous conversation summary — for context only]: "
|
||||
f"{summary_text}"
|
||||
),
|
||||
)
|
||||
|
||||
# Rebuild messages based on whether we have a system prompt
|
||||
if has_system_prompt:
|
||||
# system_prompt + summary + recent_messages
|
||||
messages = [system_msg, summary_msg] + recent_messages
|
||||
else:
|
||||
# summary + recent_messages (no original system prompt)
|
||||
messages = [summary_msg] + recent_messages
|
||||
|
||||
logger.info(
|
||||
f"Context summarized: {token_count} tokens, "
|
||||
f"summarized {len(old_messages_dict)} old messages, "
|
||||
f"kept last {KEEP_RECENT} messages"
|
||||
)
|
||||
|
||||
# Fallback: If still over limit after summarization, progressively drop recent messages
|
||||
# This handles edge cases where recent messages are extremely large
|
||||
new_messages_dict = []
|
||||
for msg in messages:
|
||||
if isinstance(msg, dict):
|
||||
msg_dict = {k: v for k, v in msg.items() if v is not None}
|
||||
else:
|
||||
msg_dict = dict(msg)
|
||||
new_messages_dict.append(msg_dict)
|
||||
|
||||
new_token_count = estimate_token_count(
|
||||
new_messages_dict, model=token_count_model
|
||||
)
|
||||
|
||||
if new_token_count > 120_000:
|
||||
# Still over limit - progressively reduce KEEP_RECENT
|
||||
logger.warning(
|
||||
f"Still over limit after summarization: {new_token_count} tokens. "
|
||||
"Reducing number of recent messages kept."
|
||||
)
|
||||
|
||||
for keep_count in [12, 10, 8, 5, 3, 2, 1, 0]:
|
||||
if keep_count == 0:
|
||||
# Try with just system prompt + summary (no recent messages)
|
||||
if has_system_prompt:
|
||||
messages = [system_msg, summary_msg]
|
||||
else:
|
||||
messages = [summary_msg]
|
||||
logger.info(
|
||||
"Trying with 0 recent messages (system + summary only)"
|
||||
)
|
||||
else:
|
||||
# Slice from ORIGINAL recent_messages to avoid duplicating summary
|
||||
reduced_recent = (
|
||||
recent_messages[-keep_count:]
|
||||
if len(recent_messages) >= keep_count
|
||||
else recent_messages
|
||||
)
|
||||
# Ensure tool pairs stay intact in the reduced slice
|
||||
reduced_slice_start = max(
|
||||
0, len(recent_messages) - keep_count
|
||||
)
|
||||
reduced_recent = _ensure_tool_pairs_intact(
|
||||
reduced_recent, recent_messages, reduced_slice_start
|
||||
)
|
||||
if has_system_prompt:
|
||||
messages = [
|
||||
system_msg,
|
||||
summary_msg,
|
||||
] + reduced_recent
|
||||
else:
|
||||
messages = [summary_msg] + reduced_recent
|
||||
|
||||
new_messages_dict = []
|
||||
for msg in messages:
|
||||
if isinstance(msg, dict):
|
||||
msg_dict = {
|
||||
k: v for k, v in msg.items() if v is not None
|
||||
}
|
||||
else:
|
||||
msg_dict = dict(msg)
|
||||
new_messages_dict.append(msg_dict)
|
||||
|
||||
new_token_count = estimate_token_count(
|
||||
new_messages_dict, model=token_count_model
|
||||
)
|
||||
|
||||
if new_token_count <= 120_000:
|
||||
logger.info(
|
||||
f"Reduced to {keep_count} recent messages, "
|
||||
f"now {new_token_count} tokens"
|
||||
)
|
||||
break
|
||||
else:
|
||||
logger.error(
|
||||
f"Unable to reduce token count below threshold even with 0 messages. "
|
||||
f"Final count: {new_token_count} tokens"
|
||||
)
|
||||
# ABSOLUTE LAST RESORT: Drop system prompt
|
||||
# This should only happen if summary itself is massive
|
||||
if has_system_prompt and len(messages) > 1:
|
||||
messages = messages[1:] # Drop system prompt
|
||||
logger.critical(
|
||||
"CRITICAL: Dropped system prompt as absolute last resort. "
|
||||
"Behavioral consistency may be affected."
|
||||
)
|
||||
# Yield error to user
|
||||
yield StreamError(
|
||||
errorText=(
|
||||
"Warning: System prompt dropped due to size constraints. "
|
||||
"Assistant behavior may be affected."
|
||||
)
|
||||
)
|
||||
else:
|
||||
# No old messages to summarize - all messages are "recent"
|
||||
# Apply progressive truncation to reduce token count
|
||||
logger.warning(
|
||||
f"Token count {token_count} exceeds threshold but no old messages to summarize. "
|
||||
f"Applying progressive truncation to recent messages."
|
||||
)
|
||||
|
||||
# Create a base list excluding system prompt to avoid duplication
|
||||
# This is the pool of messages we'll slice from in the loop
|
||||
# Use messages_dict for type consistency with _ensure_tool_pairs_intact
|
||||
base_msgs = (
|
||||
messages_dict[1:] if has_system_prompt else messages_dict
|
||||
)
|
||||
|
||||
# Try progressively smaller keep counts
|
||||
new_token_count = token_count # Initialize with current count
|
||||
for keep_count in [12, 10, 8, 5, 3, 2, 1, 0]:
|
||||
if keep_count == 0:
|
||||
# Try with just system prompt (no recent messages)
|
||||
if has_system_prompt:
|
||||
messages = [system_msg]
|
||||
logger.info(
|
||||
"Trying with 0 recent messages (system prompt only)"
|
||||
)
|
||||
else:
|
||||
# No system prompt and no recent messages = empty messages list
|
||||
# This is invalid, skip this iteration
|
||||
continue
|
||||
else:
|
||||
if len(base_msgs) < keep_count:
|
||||
continue # Skip if we don't have enough messages
|
||||
|
||||
# Slice from base_msgs to get recent messages (without system prompt)
|
||||
recent_messages = base_msgs[-keep_count:]
|
||||
|
||||
# Ensure tool pairs stay intact in the reduced slice
|
||||
reduced_slice_start = max(0, len(base_msgs) - keep_count)
|
||||
recent_messages = _ensure_tool_pairs_intact(
|
||||
recent_messages, base_msgs, reduced_slice_start
|
||||
)
|
||||
|
||||
if has_system_prompt:
|
||||
messages = [system_msg] + recent_messages
|
||||
else:
|
||||
messages = recent_messages
|
||||
|
||||
new_messages_dict = []
|
||||
for msg in messages:
|
||||
if msg is None:
|
||||
continue # Skip None messages (type safety)
|
||||
if isinstance(msg, dict):
|
||||
msg_dict = {
|
||||
k: v for k, v in msg.items() if v is not None
|
||||
}
|
||||
else:
|
||||
msg_dict = dict(msg)
|
||||
new_messages_dict.append(msg_dict)
|
||||
|
||||
new_token_count = estimate_token_count(
|
||||
new_messages_dict, model=token_count_model
|
||||
)
|
||||
|
||||
if new_token_count <= 120_000:
|
||||
logger.info(
|
||||
f"Reduced to {keep_count} recent messages, "
|
||||
f"now {new_token_count} tokens"
|
||||
)
|
||||
break
|
||||
else:
|
||||
# Even with 0 messages still over limit
|
||||
logger.error(
|
||||
f"Unable to reduce token count below threshold even with 0 messages. "
|
||||
f"Final count: {new_token_count} tokens. Messages may be extremely large."
|
||||
)
|
||||
# ABSOLUTE LAST RESORT: Drop system prompt
|
||||
if has_system_prompt and len(messages) > 1:
|
||||
messages = messages[1:] # Drop system prompt
|
||||
logger.critical(
|
||||
"CRITICAL: Dropped system prompt as absolute last resort. "
|
||||
"Behavioral consistency may be affected."
|
||||
)
|
||||
# Yield error to user
|
||||
yield StreamError(
|
||||
errorText=(
|
||||
"Warning: System prompt dropped due to size constraints. "
|
||||
"Assistant behavior may be affected."
|
||||
)
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Context summarization failed: {e}", exc_info=True)
|
||||
# If we were over the token limit, yield error to user
|
||||
# Don't silently continue with oversized messages that will fail
|
||||
if token_count > 120_000:
|
||||
if context_result.error:
|
||||
if "System prompt dropped" in context_result.error:
|
||||
# Warning only - continue with reduced context
|
||||
yield StreamError(
|
||||
errorText=(
|
||||
f"Unable to manage context window (token limit exceeded: {token_count} tokens). "
|
||||
"Context summarization failed. Please start a new conversation."
|
||||
"Warning: System prompt dropped due to size constraints. "
|
||||
"Assistant behavior may be affected."
|
||||
)
|
||||
)
|
||||
else:
|
||||
# Any other error - abort to prevent failed LLM calls
|
||||
yield StreamError(
|
||||
errorText=(
|
||||
f"Context window management failed: {context_result.error}. "
|
||||
"Please start a new conversation."
|
||||
)
|
||||
)
|
||||
yield StreamFinish()
|
||||
return
|
||||
# Otherwise, continue with original messages (under limit)
|
||||
|
||||
messages = context_result.messages
|
||||
if context_result.was_compacted:
|
||||
logger.info(
|
||||
f"Context compacted for streaming: {context_result.token_count} tokens"
|
||||
)
|
||||
|
||||
# Loop to handle tool calls and continue conversation
|
||||
while True:
|
||||
@@ -1369,14 +951,6 @@ async def _stream_chat_chunks(
|
||||
:128
|
||||
] # OpenRouter limit
|
||||
|
||||
# Create the stream with proper types
|
||||
from typing import cast
|
||||
|
||||
from openai.types.chat import (
|
||||
ChatCompletionMessageParam,
|
||||
ChatCompletionStreamOptionsParam,
|
||||
)
|
||||
|
||||
stream = await client.chat.completions.create(
|
||||
model=model,
|
||||
messages=cast(list[ChatCompletionMessageParam], messages),
|
||||
@@ -1834,6 +1408,11 @@ async def _execute_long_running_tool(
|
||||
tool_call_id=tool_call_id,
|
||||
result=error_response.model_dump_json(),
|
||||
)
|
||||
# Generate LLM continuation so user sees explanation even for errors
|
||||
try:
|
||||
await _generate_llm_continuation(session_id=session_id, user_id=user_id)
|
||||
except Exception as llm_err:
|
||||
logger.warning(f"Failed to generate LLM continuation for error: {llm_err}")
|
||||
finally:
|
||||
await _mark_operation_completed(tool_call_id)
|
||||
|
||||
@@ -1895,17 +1474,36 @@ async def _generate_llm_continuation(
|
||||
# Build system prompt
|
||||
system_prompt, _ = await _build_system_prompt(user_id)
|
||||
|
||||
# Build messages in OpenAI format
|
||||
messages = session.to_openai_messages()
|
||||
if system_prompt:
|
||||
from openai.types.chat import ChatCompletionSystemMessageParam
|
||||
|
||||
system_message = ChatCompletionSystemMessageParam(
|
||||
role="system",
|
||||
content=system_prompt,
|
||||
)
|
||||
messages = [system_message] + messages
|
||||
|
||||
# Apply context window management to prevent oversized requests
|
||||
context_result = await _manage_context_window(
|
||||
messages=messages,
|
||||
model=config.model,
|
||||
api_key=config.api_key,
|
||||
base_url=config.base_url,
|
||||
)
|
||||
|
||||
if context_result.error and "System prompt dropped" not in context_result.error:
|
||||
logger.error(
|
||||
f"Context window management failed for session {session_id}: "
|
||||
f"{context_result.error} (tokens={context_result.token_count})"
|
||||
)
|
||||
return
|
||||
|
||||
messages = context_result.messages
|
||||
if context_result.was_compacted:
|
||||
logger.info(
|
||||
f"Context compacted for LLM continuation: "
|
||||
f"{context_result.token_count} tokens"
|
||||
)
|
||||
|
||||
# Build extra_body for tracing
|
||||
extra_body: dict[str, Any] = {
|
||||
"posthogProperties": {
|
||||
@@ -1918,19 +1516,54 @@ async def _generate_llm_continuation(
|
||||
if session_id:
|
||||
extra_body["session_id"] = session_id[:128]
|
||||
|
||||
# Make non-streaming LLM call (no tools - just text response)
|
||||
from typing import cast
|
||||
retry_count = 0
|
||||
last_error: Exception | None = None
|
||||
response = None
|
||||
|
||||
from openai.types.chat import ChatCompletionMessageParam
|
||||
while retry_count <= MAX_RETRIES:
|
||||
try:
|
||||
logger.info(
|
||||
f"Generating LLM continuation for session {session_id}"
|
||||
f"{f' (retry {retry_count}/{MAX_RETRIES})' if retry_count > 0 else ''}"
|
||||
)
|
||||
|
||||
# No tools parameter = text-only response (no tool calls)
|
||||
response = await client.chat.completions.create(
|
||||
model=config.model,
|
||||
messages=cast(list[ChatCompletionMessageParam], messages),
|
||||
extra_body=extra_body,
|
||||
)
|
||||
response = await client.chat.completions.create(
|
||||
model=config.model,
|
||||
messages=cast(list[ChatCompletionMessageParam], messages),
|
||||
extra_body=extra_body,
|
||||
)
|
||||
last_error = None # Clear any previous error on success
|
||||
break # Success, exit retry loop
|
||||
except Exception as e:
|
||||
last_error = e
|
||||
if _is_retryable_error(e) and retry_count < MAX_RETRIES:
|
||||
retry_count += 1
|
||||
delay = min(
|
||||
BASE_DELAY_SECONDS * (2 ** (retry_count - 1)),
|
||||
MAX_DELAY_SECONDS,
|
||||
)
|
||||
logger.warning(
|
||||
f"Retryable error in LLM continuation: {e!s}. "
|
||||
f"Retrying in {delay:.1f}s (attempt {retry_count}/{MAX_RETRIES})"
|
||||
)
|
||||
await asyncio.sleep(delay)
|
||||
continue
|
||||
else:
|
||||
# Non-retryable error - log and exit gracefully
|
||||
logger.error(
|
||||
f"Non-retryable error in LLM continuation: {e!s}",
|
||||
exc_info=True,
|
||||
)
|
||||
return
|
||||
|
||||
if response.choices and response.choices[0].message.content:
|
||||
if last_error:
|
||||
logger.error(
|
||||
f"Max retries ({MAX_RETRIES}) exceeded for LLM continuation. "
|
||||
f"Last error: {last_error!s}"
|
||||
)
|
||||
return
|
||||
|
||||
if response and response.choices and response.choices[0].message.content:
|
||||
assistant_content = response.choices[0].message.content
|
||||
|
||||
# Reload session from DB to avoid race condition with user messages
|
||||
|
||||
@@ -2,30 +2,54 @@
|
||||
|
||||
from .core import (
|
||||
AgentGeneratorNotConfiguredError,
|
||||
AgentJsonValidationError,
|
||||
AgentSummary,
|
||||
DecompositionResult,
|
||||
DecompositionStep,
|
||||
LibraryAgentSummary,
|
||||
MarketplaceAgentSummary,
|
||||
decompose_goal,
|
||||
enrich_library_agents_from_steps,
|
||||
extract_search_terms_from_steps,
|
||||
extract_uuids_from_text,
|
||||
generate_agent,
|
||||
generate_agent_patch,
|
||||
get_agent_as_json,
|
||||
get_all_relevant_agents_for_generation,
|
||||
get_library_agent_by_graph_id,
|
||||
get_library_agent_by_id,
|
||||
get_library_agents_for_generation,
|
||||
json_to_graph,
|
||||
save_agent_to_library,
|
||||
search_marketplace_agents_for_generation,
|
||||
)
|
||||
from .errors import get_user_message_for_error
|
||||
from .service import health_check as check_external_service_health
|
||||
from .service import is_external_service_configured
|
||||
|
||||
__all__ = [
|
||||
# Core functions
|
||||
"AgentGeneratorNotConfiguredError",
|
||||
"AgentJsonValidationError",
|
||||
"AgentSummary",
|
||||
"DecompositionResult",
|
||||
"DecompositionStep",
|
||||
"LibraryAgentSummary",
|
||||
"MarketplaceAgentSummary",
|
||||
"check_external_service_health",
|
||||
"decompose_goal",
|
||||
"enrich_library_agents_from_steps",
|
||||
"extract_search_terms_from_steps",
|
||||
"extract_uuids_from_text",
|
||||
"generate_agent",
|
||||
"generate_agent_patch",
|
||||
"save_agent_to_library",
|
||||
"get_agent_as_json",
|
||||
"json_to_graph",
|
||||
# Exceptions
|
||||
"AgentGeneratorNotConfiguredError",
|
||||
# Service
|
||||
"is_external_service_configured",
|
||||
"check_external_service_health",
|
||||
# Error handling
|
||||
"get_all_relevant_agents_for_generation",
|
||||
"get_library_agent_by_graph_id",
|
||||
"get_library_agent_by_id",
|
||||
"get_library_agents_for_generation",
|
||||
"get_user_message_for_error",
|
||||
"is_external_service_configured",
|
||||
"json_to_graph",
|
||||
"save_agent_to_library",
|
||||
"search_marketplace_agents_for_generation",
|
||||
]
|
||||
|
||||
@@ -1,11 +1,22 @@
|
||||
"""Core agent generation functions."""
|
||||
|
||||
import logging
|
||||
import re
|
||||
import uuid
|
||||
from typing import Any
|
||||
from typing import Any, NotRequired, TypedDict
|
||||
|
||||
from backend.api.features.library import db as library_db
|
||||
from backend.data.graph import Graph, Link, Node, create_graph
|
||||
from backend.api.features.store import db as store_db
|
||||
from backend.data.graph import (
|
||||
Graph,
|
||||
Link,
|
||||
Node,
|
||||
create_graph,
|
||||
get_graph,
|
||||
get_graph_all_versions,
|
||||
get_store_listed_graphs,
|
||||
)
|
||||
from backend.util.exceptions import DatabaseError, NotFoundError
|
||||
|
||||
from .service import (
|
||||
decompose_goal_external,
|
||||
@@ -16,6 +27,74 @@ from .service import (
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
AGENT_EXECUTOR_BLOCK_ID = "e189baac-8c20-45a1-94a7-55177ea42565"
|
||||
|
||||
|
||||
class ExecutionSummary(TypedDict):
|
||||
"""Summary of a single execution for quality assessment."""
|
||||
|
||||
status: str
|
||||
correctness_score: NotRequired[float]
|
||||
activity_summary: NotRequired[str]
|
||||
|
||||
|
||||
class LibraryAgentSummary(TypedDict):
|
||||
"""Summary of a library agent for sub-agent composition.
|
||||
|
||||
Includes recent executions to help the LLM decide whether to use this agent.
|
||||
Each execution shows status, correctness_score (0-1), and activity_summary.
|
||||
"""
|
||||
|
||||
graph_id: str
|
||||
graph_version: int
|
||||
name: str
|
||||
description: str
|
||||
input_schema: dict[str, Any]
|
||||
output_schema: dict[str, Any]
|
||||
recent_executions: NotRequired[list[ExecutionSummary]]
|
||||
|
||||
|
||||
class MarketplaceAgentSummary(TypedDict):
|
||||
"""Summary of a marketplace agent for sub-agent composition."""
|
||||
|
||||
name: str
|
||||
description: str
|
||||
sub_heading: str
|
||||
creator: str
|
||||
is_marketplace_agent: bool
|
||||
|
||||
|
||||
class DecompositionStep(TypedDict, total=False):
|
||||
"""A single step in decomposed instructions."""
|
||||
|
||||
description: str
|
||||
action: str
|
||||
block_name: str
|
||||
tool: str
|
||||
name: str
|
||||
|
||||
|
||||
class DecompositionResult(TypedDict, total=False):
|
||||
"""Result from decompose_goal - can be instructions, questions, or error."""
|
||||
|
||||
type: str
|
||||
steps: list[DecompositionStep]
|
||||
questions: list[dict[str, Any]]
|
||||
error: str
|
||||
error_type: str
|
||||
|
||||
|
||||
AgentSummary = LibraryAgentSummary | MarketplaceAgentSummary | dict[str, Any]
|
||||
|
||||
|
||||
def _to_dict_list(
|
||||
agents: list[AgentSummary] | list[dict[str, Any]] | None,
|
||||
) -> list[dict[str, Any]] | None:
|
||||
"""Convert typed agent summaries to plain dicts for external service calls."""
|
||||
if agents is None:
|
||||
return None
|
||||
return [dict(a) for a in agents]
|
||||
|
||||
|
||||
class AgentGeneratorNotConfiguredError(Exception):
|
||||
"""Raised when the external Agent Generator service is not configured."""
|
||||
@@ -36,15 +115,422 @@ def _check_service_configured() -> None:
|
||||
)
|
||||
|
||||
|
||||
async def decompose_goal(description: str, context: str = "") -> dict[str, Any] | None:
|
||||
_UUID_PATTERN = re.compile(
|
||||
r"[a-f0-9]{8}-[a-f0-9]{4}-4[a-f0-9]{3}-[89ab][a-f0-9]{3}-[a-f0-9]{12}",
|
||||
re.IGNORECASE,
|
||||
)
|
||||
|
||||
|
||||
def extract_uuids_from_text(text: str) -> list[str]:
|
||||
"""Extract all UUID v4 strings from text.
|
||||
|
||||
Args:
|
||||
text: Text that may contain UUIDs (e.g., user's goal description)
|
||||
|
||||
Returns:
|
||||
List of unique UUIDs found in the text (lowercase)
|
||||
"""
|
||||
matches = _UUID_PATTERN.findall(text)
|
||||
return list({m.lower() for m in matches})
|
||||
|
||||
|
||||
async def get_library_agent_by_id(
|
||||
user_id: str, agent_id: str
|
||||
) -> LibraryAgentSummary | None:
|
||||
"""Fetch a specific library agent by its ID (library agent ID or graph_id).
|
||||
|
||||
This function tries multiple lookup strategies:
|
||||
1. First tries to find by graph_id (AgentGraph primary key)
|
||||
2. If not found, tries to find by library agent ID (LibraryAgent primary key)
|
||||
|
||||
This handles both cases:
|
||||
- User provides graph_id (e.g., from AgentExecutorBlock)
|
||||
- User provides library agent ID (e.g., from library URL)
|
||||
|
||||
Args:
|
||||
user_id: The user ID
|
||||
agent_id: The ID to look up (can be graph_id or library agent ID)
|
||||
|
||||
Returns:
|
||||
LibraryAgentSummary if found, None otherwise
|
||||
"""
|
||||
try:
|
||||
agent = await library_db.get_library_agent_by_graph_id(user_id, agent_id)
|
||||
if agent:
|
||||
logger.debug(f"Found library agent by graph_id: {agent.name}")
|
||||
return LibraryAgentSummary(
|
||||
graph_id=agent.graph_id,
|
||||
graph_version=agent.graph_version,
|
||||
name=agent.name,
|
||||
description=agent.description,
|
||||
input_schema=agent.input_schema,
|
||||
output_schema=agent.output_schema,
|
||||
)
|
||||
except DatabaseError:
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.debug(f"Could not fetch library agent by graph_id {agent_id}: {e}")
|
||||
|
||||
try:
|
||||
agent = await library_db.get_library_agent(agent_id, user_id)
|
||||
if agent:
|
||||
logger.debug(f"Found library agent by library_id: {agent.name}")
|
||||
return LibraryAgentSummary(
|
||||
graph_id=agent.graph_id,
|
||||
graph_version=agent.graph_version,
|
||||
name=agent.name,
|
||||
description=agent.description,
|
||||
input_schema=agent.input_schema,
|
||||
output_schema=agent.output_schema,
|
||||
)
|
||||
except NotFoundError:
|
||||
logger.debug(f"Library agent not found by library_id: {agent_id}")
|
||||
except DatabaseError:
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
f"Could not fetch library agent by library_id {agent_id}: {e}",
|
||||
exc_info=True,
|
||||
)
|
||||
|
||||
return None
|
||||
|
||||
|
||||
get_library_agent_by_graph_id = get_library_agent_by_id
|
||||
|
||||
|
||||
async def get_library_agents_for_generation(
|
||||
user_id: str,
|
||||
search_query: str | None = None,
|
||||
exclude_graph_id: str | None = None,
|
||||
max_results: int = 15,
|
||||
) -> list[LibraryAgentSummary]:
|
||||
"""Fetch user's library agents formatted for Agent Generator.
|
||||
|
||||
Uses search-based fetching to return relevant agents instead of all agents.
|
||||
This is more scalable for users with large libraries.
|
||||
|
||||
Includes recent_executions list to help the LLM assess agent quality:
|
||||
- Each execution has status, correctness_score (0-1), and activity_summary
|
||||
- This gives the LLM concrete examples of recent performance
|
||||
|
||||
Args:
|
||||
user_id: The user ID
|
||||
search_query: Optional search term to find relevant agents (user's goal/description)
|
||||
exclude_graph_id: Optional graph ID to exclude (prevents circular references)
|
||||
max_results: Maximum number of agents to return (default 15)
|
||||
|
||||
Returns:
|
||||
List of LibraryAgentSummary with schemas and recent executions for sub-agent composition
|
||||
"""
|
||||
try:
|
||||
response = await library_db.list_library_agents(
|
||||
user_id=user_id,
|
||||
search_term=search_query,
|
||||
page=1,
|
||||
page_size=max_results,
|
||||
include_executions=True,
|
||||
)
|
||||
|
||||
results: list[LibraryAgentSummary] = []
|
||||
for agent in response.agents:
|
||||
if exclude_graph_id is not None and agent.graph_id == exclude_graph_id:
|
||||
continue
|
||||
|
||||
summary = LibraryAgentSummary(
|
||||
graph_id=agent.graph_id,
|
||||
graph_version=agent.graph_version,
|
||||
name=agent.name,
|
||||
description=agent.description,
|
||||
input_schema=agent.input_schema,
|
||||
output_schema=agent.output_schema,
|
||||
)
|
||||
if agent.recent_executions:
|
||||
exec_summaries: list[ExecutionSummary] = []
|
||||
for ex in agent.recent_executions:
|
||||
exec_sum = ExecutionSummary(status=ex.status)
|
||||
if ex.correctness_score is not None:
|
||||
exec_sum["correctness_score"] = ex.correctness_score
|
||||
if ex.activity_summary:
|
||||
exec_sum["activity_summary"] = ex.activity_summary
|
||||
exec_summaries.append(exec_sum)
|
||||
summary["recent_executions"] = exec_summaries
|
||||
results.append(summary)
|
||||
return results
|
||||
except DatabaseError:
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to fetch library agents: {e}")
|
||||
return []
|
||||
|
||||
|
||||
async def search_marketplace_agents_for_generation(
|
||||
search_query: str,
|
||||
max_results: int = 10,
|
||||
) -> list[LibraryAgentSummary]:
|
||||
"""Search marketplace agents formatted for Agent Generator.
|
||||
|
||||
Fetches marketplace agents and their full schemas so they can be used
|
||||
as sub-agents in generated workflows.
|
||||
|
||||
Args:
|
||||
search_query: Search term to find relevant public agents
|
||||
max_results: Maximum number of agents to return (default 10)
|
||||
|
||||
Returns:
|
||||
List of LibraryAgentSummary with full input/output schemas
|
||||
"""
|
||||
try:
|
||||
response = await store_db.get_store_agents(
|
||||
search_query=search_query,
|
||||
page=1,
|
||||
page_size=max_results,
|
||||
)
|
||||
|
||||
agents_with_graphs = [
|
||||
agent for agent in response.agents if agent.agent_graph_id
|
||||
]
|
||||
|
||||
if not agents_with_graphs:
|
||||
return []
|
||||
|
||||
graph_ids = [agent.agent_graph_id for agent in agents_with_graphs]
|
||||
graphs = await get_store_listed_graphs(*graph_ids)
|
||||
|
||||
results: list[LibraryAgentSummary] = []
|
||||
for agent in agents_with_graphs:
|
||||
graph_id = agent.agent_graph_id
|
||||
if graph_id and graph_id in graphs:
|
||||
graph = graphs[graph_id]
|
||||
results.append(
|
||||
LibraryAgentSummary(
|
||||
graph_id=graph.id,
|
||||
graph_version=graph.version,
|
||||
name=agent.agent_name,
|
||||
description=agent.description,
|
||||
input_schema=graph.input_schema,
|
||||
output_schema=graph.output_schema,
|
||||
)
|
||||
)
|
||||
return results
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to search marketplace agents: {e}")
|
||||
return []
|
||||
|
||||
|
||||
async def get_all_relevant_agents_for_generation(
|
||||
user_id: str,
|
||||
search_query: str | None = None,
|
||||
exclude_graph_id: str | None = None,
|
||||
include_library: bool = True,
|
||||
include_marketplace: bool = True,
|
||||
max_library_results: int = 15,
|
||||
max_marketplace_results: int = 10,
|
||||
) -> list[AgentSummary]:
|
||||
"""Fetch relevant agents from library and/or marketplace.
|
||||
|
||||
Searches both user's library and marketplace by default.
|
||||
Explicitly mentioned UUIDs in the search query are always looked up.
|
||||
|
||||
Args:
|
||||
user_id: The user ID
|
||||
search_query: Search term to find relevant agents (user's goal/description)
|
||||
exclude_graph_id: Optional graph ID to exclude (prevents circular references)
|
||||
include_library: Whether to search user's library (default True)
|
||||
include_marketplace: Whether to also search marketplace (default True)
|
||||
max_library_results: Max library agents to return (default 15)
|
||||
max_marketplace_results: Max marketplace agents to return (default 10)
|
||||
|
||||
Returns:
|
||||
List of AgentSummary with full schemas (both library and marketplace agents)
|
||||
"""
|
||||
agents: list[AgentSummary] = []
|
||||
seen_graph_ids: set[str] = set()
|
||||
|
||||
if search_query:
|
||||
mentioned_uuids = extract_uuids_from_text(search_query)
|
||||
for graph_id in mentioned_uuids:
|
||||
if graph_id == exclude_graph_id:
|
||||
continue
|
||||
agent = await get_library_agent_by_graph_id(user_id, graph_id)
|
||||
agent_graph_id = agent.get("graph_id") if agent else None
|
||||
if agent and agent_graph_id and agent_graph_id not in seen_graph_ids:
|
||||
agents.append(agent)
|
||||
seen_graph_ids.add(agent_graph_id)
|
||||
logger.debug(
|
||||
f"Found explicitly mentioned agent: {agent.get('name') or 'Unknown'}"
|
||||
)
|
||||
|
||||
if include_library:
|
||||
library_agents = await get_library_agents_for_generation(
|
||||
user_id=user_id,
|
||||
search_query=search_query,
|
||||
exclude_graph_id=exclude_graph_id,
|
||||
max_results=max_library_results,
|
||||
)
|
||||
for agent in library_agents:
|
||||
graph_id = agent.get("graph_id")
|
||||
if graph_id and graph_id not in seen_graph_ids:
|
||||
agents.append(agent)
|
||||
seen_graph_ids.add(graph_id)
|
||||
|
||||
if include_marketplace and search_query:
|
||||
marketplace_agents = await search_marketplace_agents_for_generation(
|
||||
search_query=search_query,
|
||||
max_results=max_marketplace_results,
|
||||
)
|
||||
for agent in marketplace_agents:
|
||||
graph_id = agent.get("graph_id")
|
||||
if graph_id and graph_id not in seen_graph_ids:
|
||||
agents.append(agent)
|
||||
seen_graph_ids.add(graph_id)
|
||||
|
||||
return agents
|
||||
|
||||
|
||||
def extract_search_terms_from_steps(
|
||||
decomposition_result: DecompositionResult | dict[str, Any],
|
||||
) -> list[str]:
|
||||
"""Extract search terms from decomposed instruction steps.
|
||||
|
||||
Analyzes the decomposition result to extract relevant keywords
|
||||
for additional library agent searches.
|
||||
|
||||
Args:
|
||||
decomposition_result: Result from decompose_goal containing steps
|
||||
|
||||
Returns:
|
||||
List of unique search terms extracted from steps
|
||||
"""
|
||||
search_terms: list[str] = []
|
||||
|
||||
if decomposition_result.get("type") != "instructions":
|
||||
return search_terms
|
||||
|
||||
steps = decomposition_result.get("steps", [])
|
||||
if not steps:
|
||||
return search_terms
|
||||
|
||||
step_keys: list[str] = ["description", "action", "block_name", "tool", "name"]
|
||||
|
||||
for step in steps:
|
||||
for key in step_keys:
|
||||
value = step.get(key) # type: ignore[union-attr]
|
||||
if isinstance(value, str) and len(value) > 3:
|
||||
search_terms.append(value)
|
||||
|
||||
seen: set[str] = set()
|
||||
unique_terms: list[str] = []
|
||||
for term in search_terms:
|
||||
term_lower = term.lower()
|
||||
if term_lower not in seen:
|
||||
seen.add(term_lower)
|
||||
unique_terms.append(term)
|
||||
|
||||
return unique_terms
|
||||
|
||||
|
||||
async def enrich_library_agents_from_steps(
|
||||
user_id: str,
|
||||
decomposition_result: DecompositionResult | dict[str, Any],
|
||||
existing_agents: list[AgentSummary] | list[dict[str, Any]],
|
||||
exclude_graph_id: str | None = None,
|
||||
include_marketplace: bool = True,
|
||||
max_additional_results: int = 10,
|
||||
) -> list[AgentSummary] | list[dict[str, Any]]:
|
||||
"""Enrich library agents list with additional searches based on decomposed steps.
|
||||
|
||||
This implements two-phase search: after decomposition, we search for additional
|
||||
relevant agents based on the specific steps identified.
|
||||
|
||||
Args:
|
||||
user_id: The user ID
|
||||
decomposition_result: Result from decompose_goal containing steps
|
||||
existing_agents: Already fetched library agents from initial search
|
||||
exclude_graph_id: Optional graph ID to exclude
|
||||
include_marketplace: Whether to also search marketplace
|
||||
max_additional_results: Max additional agents per search term (default 10)
|
||||
|
||||
Returns:
|
||||
Combined list of library agents (existing + newly discovered)
|
||||
"""
|
||||
search_terms = extract_search_terms_from_steps(decomposition_result)
|
||||
|
||||
if not search_terms:
|
||||
return existing_agents
|
||||
|
||||
existing_ids: set[str] = set()
|
||||
existing_names: set[str] = set()
|
||||
|
||||
for agent in existing_agents:
|
||||
agent_name = agent.get("name")
|
||||
if agent_name and isinstance(agent_name, str):
|
||||
existing_names.add(agent_name.lower())
|
||||
graph_id = agent.get("graph_id") # type: ignore[call-overload]
|
||||
if graph_id and isinstance(graph_id, str):
|
||||
existing_ids.add(graph_id)
|
||||
|
||||
all_agents: list[AgentSummary] | list[dict[str, Any]] = list(existing_agents)
|
||||
|
||||
for term in search_terms[:3]:
|
||||
try:
|
||||
additional_agents = await get_all_relevant_agents_for_generation(
|
||||
user_id=user_id,
|
||||
search_query=term,
|
||||
exclude_graph_id=exclude_graph_id,
|
||||
include_marketplace=include_marketplace,
|
||||
max_library_results=max_additional_results,
|
||||
max_marketplace_results=5,
|
||||
)
|
||||
|
||||
for agent in additional_agents:
|
||||
agent_name = agent.get("name")
|
||||
if not agent_name or not isinstance(agent_name, str):
|
||||
continue
|
||||
agent_name_lower = agent_name.lower()
|
||||
|
||||
if agent_name_lower in existing_names:
|
||||
continue
|
||||
|
||||
graph_id = agent.get("graph_id") # type: ignore[call-overload]
|
||||
if graph_id and graph_id in existing_ids:
|
||||
continue
|
||||
|
||||
all_agents.append(agent)
|
||||
existing_names.add(agent_name_lower)
|
||||
if graph_id and isinstance(graph_id, str):
|
||||
existing_ids.add(graph_id)
|
||||
|
||||
except DatabaseError:
|
||||
logger.error(f"Database error searching for agents with term '{term}'")
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
f"Failed to search for additional agents with term '{term}': {e}"
|
||||
)
|
||||
|
||||
logger.debug(
|
||||
f"Enriched library agents: {len(existing_agents)} initial + "
|
||||
f"{len(all_agents) - len(existing_agents)} additional = {len(all_agents)} total"
|
||||
)
|
||||
|
||||
return all_agents
|
||||
|
||||
|
||||
async def decompose_goal(
|
||||
description: str,
|
||||
context: str = "",
|
||||
library_agents: list[AgentSummary] | None = None,
|
||||
) -> DecompositionResult | None:
|
||||
"""Break down a goal into steps or return clarifying questions.
|
||||
|
||||
Args:
|
||||
description: Natural language goal description
|
||||
context: Additional context (e.g., answers to previous questions)
|
||||
library_agents: User's library agents available for sub-agent composition
|
||||
|
||||
Returns:
|
||||
Dict with either:
|
||||
DecompositionResult with either:
|
||||
- {"type": "clarifying_questions", "questions": [...]}
|
||||
- {"type": "instructions", "steps": [...]}
|
||||
Or None on error
|
||||
@@ -54,14 +540,21 @@ async def decompose_goal(description: str, context: str = "") -> dict[str, Any]
|
||||
"""
|
||||
_check_service_configured()
|
||||
logger.info("Calling external Agent Generator service for decompose_goal")
|
||||
return await decompose_goal_external(description, context)
|
||||
result = await decompose_goal_external(
|
||||
description, context, _to_dict_list(library_agents)
|
||||
)
|
||||
return result # type: ignore[return-value]
|
||||
|
||||
|
||||
async def generate_agent(instructions: dict[str, Any]) -> dict[str, Any] | None:
|
||||
async def generate_agent(
|
||||
instructions: DecompositionResult | dict[str, Any],
|
||||
library_agents: list[AgentSummary] | list[dict[str, Any]] | None = None,
|
||||
) -> dict[str, Any] | None:
|
||||
"""Generate agent JSON from instructions.
|
||||
|
||||
Args:
|
||||
instructions: Structured instructions from decompose_goal
|
||||
library_agents: User's library agents available for sub-agent composition
|
||||
|
||||
Returns:
|
||||
Agent JSON dict, error dict {"type": "error", ...}, or None on error
|
||||
@@ -71,12 +564,12 @@ async def generate_agent(instructions: dict[str, Any]) -> dict[str, Any] | None:
|
||||
"""
|
||||
_check_service_configured()
|
||||
logger.info("Calling external Agent Generator service for generate_agent")
|
||||
result = await generate_agent_external(instructions)
|
||||
result = await generate_agent_external(
|
||||
dict(instructions), _to_dict_list(library_agents)
|
||||
)
|
||||
if result:
|
||||
# Check if it's an error response - pass through as-is
|
||||
if isinstance(result, dict) and result.get("type") == "error":
|
||||
return result
|
||||
# Ensure required fields for successful agent generation
|
||||
if "id" not in result:
|
||||
result["id"] = str(uuid.uuid4())
|
||||
if "version" not in result:
|
||||
@@ -86,6 +579,12 @@ async def generate_agent(instructions: dict[str, Any]) -> dict[str, Any] | None:
|
||||
return result
|
||||
|
||||
|
||||
class AgentJsonValidationError(Exception):
|
||||
"""Raised when agent JSON is invalid or missing required fields."""
|
||||
|
||||
pass
|
||||
|
||||
|
||||
def json_to_graph(agent_json: dict[str, Any]) -> Graph:
|
||||
"""Convert agent JSON dict to Graph model.
|
||||
|
||||
@@ -94,25 +593,55 @@ def json_to_graph(agent_json: dict[str, Any]) -> Graph:
|
||||
|
||||
Returns:
|
||||
Graph ready for saving
|
||||
|
||||
Raises:
|
||||
AgentJsonValidationError: If required fields are missing from nodes or links
|
||||
"""
|
||||
nodes = []
|
||||
for n in agent_json.get("nodes", []):
|
||||
for idx, n in enumerate(agent_json.get("nodes", [])):
|
||||
block_id = n.get("block_id")
|
||||
if not block_id:
|
||||
node_id = n.get("id", f"index_{idx}")
|
||||
raise AgentJsonValidationError(
|
||||
f"Node '{node_id}' is missing required field 'block_id'"
|
||||
)
|
||||
node = Node(
|
||||
id=n.get("id", str(uuid.uuid4())),
|
||||
block_id=n["block_id"],
|
||||
block_id=block_id,
|
||||
input_default=n.get("input_default", {}),
|
||||
metadata=n.get("metadata", {}),
|
||||
)
|
||||
nodes.append(node)
|
||||
|
||||
links = []
|
||||
for link_data in agent_json.get("links", []):
|
||||
for idx, link_data in enumerate(agent_json.get("links", [])):
|
||||
source_id = link_data.get("source_id")
|
||||
sink_id = link_data.get("sink_id")
|
||||
source_name = link_data.get("source_name")
|
||||
sink_name = link_data.get("sink_name")
|
||||
|
||||
missing_fields = []
|
||||
if not source_id:
|
||||
missing_fields.append("source_id")
|
||||
if not sink_id:
|
||||
missing_fields.append("sink_id")
|
||||
if not source_name:
|
||||
missing_fields.append("source_name")
|
||||
if not sink_name:
|
||||
missing_fields.append("sink_name")
|
||||
|
||||
if missing_fields:
|
||||
link_id = link_data.get("id", f"index_{idx}")
|
||||
raise AgentJsonValidationError(
|
||||
f"Link '{link_id}' is missing required fields: {', '.join(missing_fields)}"
|
||||
)
|
||||
|
||||
link = Link(
|
||||
id=link_data.get("id", str(uuid.uuid4())),
|
||||
source_id=link_data["source_id"],
|
||||
sink_id=link_data["sink_id"],
|
||||
source_name=link_data["source_name"],
|
||||
sink_name=link_data["sink_name"],
|
||||
source_id=source_id,
|
||||
sink_id=sink_id,
|
||||
source_name=source_name,
|
||||
sink_name=sink_name,
|
||||
is_static=link_data.get("is_static", False),
|
||||
)
|
||||
links.append(link)
|
||||
@@ -133,22 +662,40 @@ def _reassign_node_ids(graph: Graph) -> None:
|
||||
|
||||
This is needed when creating a new version to avoid unique constraint violations.
|
||||
"""
|
||||
# Create mapping from old node IDs to new UUIDs
|
||||
id_map = {node.id: str(uuid.uuid4()) for node in graph.nodes}
|
||||
|
||||
# Reassign node IDs
|
||||
for node in graph.nodes:
|
||||
node.id = id_map[node.id]
|
||||
|
||||
# Update link references to use new node IDs
|
||||
for link in graph.links:
|
||||
link.id = str(uuid.uuid4()) # Also give links new IDs
|
||||
link.id = str(uuid.uuid4())
|
||||
if link.source_id in id_map:
|
||||
link.source_id = id_map[link.source_id]
|
||||
if link.sink_id in id_map:
|
||||
link.sink_id = id_map[link.sink_id]
|
||||
|
||||
|
||||
def _populate_agent_executor_user_ids(agent_json: dict[str, Any], user_id: str) -> None:
|
||||
"""Populate user_id in AgentExecutorBlock nodes.
|
||||
|
||||
The external agent generator creates AgentExecutorBlock nodes with empty user_id.
|
||||
This function fills in the actual user_id so sub-agents run with correct permissions.
|
||||
|
||||
Args:
|
||||
agent_json: Agent JSON dict (modified in place)
|
||||
user_id: User ID to set
|
||||
"""
|
||||
for node in agent_json.get("nodes", []):
|
||||
if node.get("block_id") == AGENT_EXECUTOR_BLOCK_ID:
|
||||
input_default = node.get("input_default") or {}
|
||||
if not input_default.get("user_id"):
|
||||
input_default["user_id"] = user_id
|
||||
node["input_default"] = input_default
|
||||
logger.debug(
|
||||
f"Set user_id for AgentExecutorBlock node {node.get('id')}"
|
||||
)
|
||||
|
||||
|
||||
async def save_agent_to_library(
|
||||
agent_json: dict[str, Any], user_id: str, is_update: bool = False
|
||||
) -> tuple[Graph, Any]:
|
||||
@@ -162,33 +709,27 @@ async def save_agent_to_library(
|
||||
Returns:
|
||||
Tuple of (created Graph, LibraryAgent)
|
||||
"""
|
||||
from backend.data.graph import get_graph_all_versions
|
||||
# Populate user_id in AgentExecutorBlock nodes before conversion
|
||||
_populate_agent_executor_user_ids(agent_json, user_id)
|
||||
|
||||
graph = json_to_graph(agent_json)
|
||||
|
||||
if is_update:
|
||||
# For updates, keep the same graph ID but increment version
|
||||
# and reassign node/link IDs to avoid conflicts
|
||||
if graph.id:
|
||||
existing_versions = await get_graph_all_versions(graph.id, user_id)
|
||||
if existing_versions:
|
||||
latest_version = max(v.version for v in existing_versions)
|
||||
graph.version = latest_version + 1
|
||||
# Reassign node IDs (but keep graph ID the same)
|
||||
_reassign_node_ids(graph)
|
||||
logger.info(f"Updating agent {graph.id} to version {graph.version}")
|
||||
else:
|
||||
# For new agents, always generate a fresh UUID to avoid collisions
|
||||
graph.id = str(uuid.uuid4())
|
||||
graph.version = 1
|
||||
# Reassign all node IDs as well
|
||||
_reassign_node_ids(graph)
|
||||
logger.info(f"Creating new agent with ID {graph.id}")
|
||||
|
||||
# Save to database
|
||||
created_graph = await create_graph(graph, user_id)
|
||||
|
||||
# Add to user's library (or update existing library agent)
|
||||
library_agents = await library_db.create_library_agent(
|
||||
graph=created_graph,
|
||||
user_id=user_id,
|
||||
@@ -200,25 +741,31 @@ async def save_agent_to_library(
|
||||
|
||||
|
||||
async def get_agent_as_json(
|
||||
graph_id: str, user_id: str | None
|
||||
agent_id: str, user_id: str | None
|
||||
) -> dict[str, Any] | None:
|
||||
"""Fetch an agent and convert to JSON format for editing.
|
||||
|
||||
Args:
|
||||
graph_id: Graph ID or library agent ID
|
||||
agent_id: Graph ID or library agent ID
|
||||
user_id: User ID
|
||||
|
||||
Returns:
|
||||
Agent as JSON dict or None if not found
|
||||
"""
|
||||
from backend.data.graph import get_graph
|
||||
graph = await get_graph(agent_id, version=None, user_id=user_id)
|
||||
|
||||
if not graph and user_id:
|
||||
try:
|
||||
library_agent = await library_db.get_library_agent(agent_id, user_id)
|
||||
graph = await get_graph(
|
||||
library_agent.graph_id, version=None, user_id=user_id
|
||||
)
|
||||
except NotFoundError:
|
||||
pass
|
||||
|
||||
# Try to get the graph (version=None gets the active version)
|
||||
graph = await get_graph(graph_id, version=None, user_id=user_id)
|
||||
if not graph:
|
||||
return None
|
||||
|
||||
# Convert to JSON format
|
||||
nodes = []
|
||||
for node in graph.nodes:
|
||||
nodes.append(
|
||||
@@ -256,7 +803,9 @@ async def get_agent_as_json(
|
||||
|
||||
|
||||
async def generate_agent_patch(
|
||||
update_request: str, current_agent: dict[str, Any]
|
||||
update_request: str,
|
||||
current_agent: dict[str, Any],
|
||||
library_agents: list[AgentSummary] | None = None,
|
||||
) -> dict[str, Any] | None:
|
||||
"""Update an existing agent using natural language.
|
||||
|
||||
@@ -268,6 +817,7 @@ async def generate_agent_patch(
|
||||
Args:
|
||||
update_request: Natural language description of changes
|
||||
current_agent: Current agent JSON
|
||||
library_agents: User's library agents available for sub-agent composition
|
||||
|
||||
Returns:
|
||||
Updated agent JSON, clarifying questions dict {"type": "clarifying_questions", ...},
|
||||
@@ -278,4 +828,6 @@ async def generate_agent_patch(
|
||||
"""
|
||||
_check_service_configured()
|
||||
logger.info("Calling external Agent Generator service for generate_agent_patch")
|
||||
return await generate_agent_patch_external(update_request, current_agent)
|
||||
return await generate_agent_patch_external(
|
||||
update_request, current_agent, _to_dict_list(library_agents)
|
||||
)
|
||||
|
||||
@@ -1,11 +1,43 @@
|
||||
"""Error handling utilities for agent generator."""
|
||||
|
||||
import re
|
||||
|
||||
|
||||
def _sanitize_error_details(details: str) -> str:
|
||||
"""Sanitize error details to remove sensitive information.
|
||||
|
||||
Strips common patterns that could expose internal system info:
|
||||
- File paths (Unix and Windows)
|
||||
- Database connection strings
|
||||
- URLs with credentials
|
||||
- Stack trace internals
|
||||
|
||||
Args:
|
||||
details: Raw error details string
|
||||
|
||||
Returns:
|
||||
Sanitized error details safe for user display
|
||||
"""
|
||||
sanitized = re.sub(
|
||||
r"/[a-zA-Z0-9_./\-]+\.(py|js|ts|json|yaml|yml)", "[path]", details
|
||||
)
|
||||
sanitized = re.sub(r"[A-Z]:\\[a-zA-Z0-9_\\.\\-]+", "[path]", sanitized)
|
||||
sanitized = re.sub(
|
||||
r"(postgres|mysql|mongodb|redis)://[^\s]+", "[database_url]", sanitized
|
||||
)
|
||||
sanitized = re.sub(r"https?://[^:]+:[^@]+@[^\s]+", "[url]", sanitized)
|
||||
sanitized = re.sub(r", line \d+", "", sanitized)
|
||||
sanitized = re.sub(r'File "[^"]+",?', "", sanitized)
|
||||
|
||||
return sanitized.strip()
|
||||
|
||||
|
||||
def get_user_message_for_error(
|
||||
error_type: str,
|
||||
operation: str = "process the request",
|
||||
llm_parse_message: str | None = None,
|
||||
validation_message: str | None = None,
|
||||
error_details: str | None = None,
|
||||
) -> str:
|
||||
"""Get a user-friendly error message based on error type.
|
||||
|
||||
@@ -19,25 +51,45 @@ def get_user_message_for_error(
|
||||
message (e.g., "analyze the goal", "generate the agent")
|
||||
llm_parse_message: Custom message for llm_parse_error type
|
||||
validation_message: Custom message for validation_error type
|
||||
error_details: Optional additional details about the error
|
||||
|
||||
Returns:
|
||||
User-friendly error message suitable for display to the user
|
||||
"""
|
||||
base_message = ""
|
||||
|
||||
if error_type == "llm_parse_error":
|
||||
return (
|
||||
base_message = (
|
||||
llm_parse_message
|
||||
or "The AI had trouble processing this request. Please try again."
|
||||
)
|
||||
elif error_type == "validation_error":
|
||||
return (
|
||||
base_message = (
|
||||
validation_message
|
||||
or "The request failed validation. Please try rephrasing."
|
||||
or "The generated agent failed validation. "
|
||||
"This usually happens when the agent structure doesn't match "
|
||||
"what the platform expects. Please try simplifying your goal "
|
||||
"or breaking it into smaller parts."
|
||||
)
|
||||
elif error_type == "patch_error":
|
||||
return "Failed to apply the changes. Please try a different approach."
|
||||
base_message = (
|
||||
"Failed to apply the changes. The modification couldn't be "
|
||||
"validated. Please try a different approach or simplify the change."
|
||||
)
|
||||
elif error_type in ("timeout", "llm_timeout"):
|
||||
return "The request took too long. Please try again."
|
||||
base_message = (
|
||||
"The request took too long to process. This can happen with "
|
||||
"complex agents. Please try again or simplify your goal."
|
||||
)
|
||||
elif error_type in ("rate_limit", "llm_rate_limit"):
|
||||
return "The service is currently busy. Please try again in a moment."
|
||||
base_message = "The service is currently busy. Please try again in a moment."
|
||||
else:
|
||||
return f"Failed to {operation}. Please try again."
|
||||
base_message = f"Failed to {operation}. Please try again."
|
||||
|
||||
if error_details:
|
||||
details = _sanitize_error_details(error_details)
|
||||
if len(details) > 200:
|
||||
details = details[:200] + "..."
|
||||
base_message += f"\n\nTechnical details: {details}"
|
||||
|
||||
return base_message
|
||||
|
||||
@@ -117,13 +117,16 @@ def _get_client() -> httpx.AsyncClient:
|
||||
|
||||
|
||||
async def decompose_goal_external(
|
||||
description: str, context: str = ""
|
||||
description: str,
|
||||
context: str = "",
|
||||
library_agents: list[dict[str, Any]] | None = None,
|
||||
) -> dict[str, Any] | None:
|
||||
"""Call the external service to decompose a goal.
|
||||
|
||||
Args:
|
||||
description: Natural language goal description
|
||||
context: Additional context (e.g., answers to previous questions)
|
||||
library_agents: User's library agents available for sub-agent composition
|
||||
|
||||
Returns:
|
||||
Dict with either:
|
||||
@@ -136,11 +139,12 @@ async def decompose_goal_external(
|
||||
"""
|
||||
client = _get_client()
|
||||
|
||||
# Build the request payload
|
||||
payload: dict[str, Any] = {"description": description}
|
||||
if context:
|
||||
# The external service uses user_instruction for additional context
|
||||
payload["user_instruction"] = context
|
||||
description = f"{description}\n\nAdditional context from user:\n{context}"
|
||||
|
||||
payload: dict[str, Any] = {"description": description}
|
||||
if library_agents:
|
||||
payload["library_agents"] = library_agents
|
||||
|
||||
try:
|
||||
response = await client.post("/api/decompose-description", json=payload)
|
||||
@@ -207,21 +211,25 @@ async def decompose_goal_external(
|
||||
|
||||
async def generate_agent_external(
|
||||
instructions: dict[str, Any],
|
||||
library_agents: list[dict[str, Any]] | None = None,
|
||||
) -> dict[str, Any] | None:
|
||||
"""Call the external service to generate an agent from instructions.
|
||||
|
||||
Args:
|
||||
instructions: Structured instructions from decompose_goal
|
||||
library_agents: User's library agents available for sub-agent composition
|
||||
|
||||
Returns:
|
||||
Agent JSON dict on success, or error dict {"type": "error", ...} on error
|
||||
"""
|
||||
client = _get_client()
|
||||
|
||||
payload: dict[str, Any] = {"instructions": instructions}
|
||||
if library_agents:
|
||||
payload["library_agents"] = library_agents
|
||||
|
||||
try:
|
||||
response = await client.post(
|
||||
"/api/generate-agent", json={"instructions": instructions}
|
||||
)
|
||||
response = await client.post("/api/generate-agent", json=payload)
|
||||
response.raise_for_status()
|
||||
data = response.json()
|
||||
|
||||
@@ -229,8 +237,7 @@ async def generate_agent_external(
|
||||
error_msg = data.get("error", "Unknown error from Agent Generator")
|
||||
error_type = data.get("error_type", "unknown")
|
||||
logger.error(
|
||||
f"Agent Generator generation failed: {error_msg} "
|
||||
f"(type: {error_type})"
|
||||
f"Agent Generator generation failed: {error_msg} (type: {error_type})"
|
||||
)
|
||||
return _create_error_response(error_msg, error_type)
|
||||
|
||||
@@ -251,27 +258,31 @@ async def generate_agent_external(
|
||||
|
||||
|
||||
async def generate_agent_patch_external(
|
||||
update_request: str, current_agent: dict[str, Any]
|
||||
update_request: str,
|
||||
current_agent: dict[str, Any],
|
||||
library_agents: list[dict[str, Any]] | None = None,
|
||||
) -> dict[str, Any] | None:
|
||||
"""Call the external service to generate a patch for an existing agent.
|
||||
|
||||
Args:
|
||||
update_request: Natural language description of changes
|
||||
current_agent: Current agent JSON
|
||||
library_agents: User's library agents available for sub-agent composition
|
||||
|
||||
Returns:
|
||||
Updated agent JSON, clarifying questions dict, or error dict on error
|
||||
"""
|
||||
client = _get_client()
|
||||
|
||||
payload: dict[str, Any] = {
|
||||
"update_request": update_request,
|
||||
"current_agent_json": current_agent,
|
||||
}
|
||||
if library_agents:
|
||||
payload["library_agents"] = library_agents
|
||||
|
||||
try:
|
||||
response = await client.post(
|
||||
"/api/update-agent",
|
||||
json={
|
||||
"update_request": update_request,
|
||||
"current_agent_json": current_agent,
|
||||
},
|
||||
)
|
||||
response = await client.post("/api/update-agent", json=payload)
|
||||
response.raise_for_status()
|
||||
data = response.json()
|
||||
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
"""Shared agent search functionality for find_agent and find_library_agent tools."""
|
||||
|
||||
import logging
|
||||
import re
|
||||
from typing import Literal
|
||||
|
||||
from backend.api.features.library import db as library_db
|
||||
@@ -19,6 +20,85 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
SearchSource = Literal["marketplace", "library"]
|
||||
|
||||
_UUID_PATTERN = re.compile(
|
||||
r"^[a-f0-9]{8}-[a-f0-9]{4}-4[a-f0-9]{3}-[89ab][a-f0-9]{3}-[a-f0-9]{12}$",
|
||||
re.IGNORECASE,
|
||||
)
|
||||
|
||||
|
||||
def _is_uuid(text: str) -> bool:
|
||||
"""Check if text is a valid UUID v4."""
|
||||
return bool(_UUID_PATTERN.match(text.strip()))
|
||||
|
||||
|
||||
async def _get_library_agent_by_id(user_id: str, agent_id: str) -> AgentInfo | None:
|
||||
"""Fetch a library agent by ID (library agent ID or graph_id).
|
||||
|
||||
Tries multiple lookup strategies:
|
||||
1. First by graph_id (AgentGraph primary key)
|
||||
2. Then by library agent ID (LibraryAgent primary key)
|
||||
|
||||
Args:
|
||||
user_id: The user ID
|
||||
agent_id: The ID to look up (can be graph_id or library agent ID)
|
||||
|
||||
Returns:
|
||||
AgentInfo if found, None otherwise
|
||||
"""
|
||||
try:
|
||||
agent = await library_db.get_library_agent_by_graph_id(user_id, agent_id)
|
||||
if agent:
|
||||
logger.debug(f"Found library agent by graph_id: {agent.name}")
|
||||
return AgentInfo(
|
||||
id=agent.id,
|
||||
name=agent.name,
|
||||
description=agent.description or "",
|
||||
source="library",
|
||||
in_library=True,
|
||||
creator=agent.creator_name,
|
||||
status=agent.status.value,
|
||||
can_access_graph=agent.can_access_graph,
|
||||
has_external_trigger=agent.has_external_trigger,
|
||||
new_output=agent.new_output,
|
||||
graph_id=agent.graph_id,
|
||||
)
|
||||
except DatabaseError:
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
f"Could not fetch library agent by graph_id {agent_id}: {e}",
|
||||
exc_info=True,
|
||||
)
|
||||
|
||||
try:
|
||||
agent = await library_db.get_library_agent(agent_id, user_id)
|
||||
if agent:
|
||||
logger.debug(f"Found library agent by library_id: {agent.name}")
|
||||
return AgentInfo(
|
||||
id=agent.id,
|
||||
name=agent.name,
|
||||
description=agent.description or "",
|
||||
source="library",
|
||||
in_library=True,
|
||||
creator=agent.creator_name,
|
||||
status=agent.status.value,
|
||||
can_access_graph=agent.can_access_graph,
|
||||
has_external_trigger=agent.has_external_trigger,
|
||||
new_output=agent.new_output,
|
||||
graph_id=agent.graph_id,
|
||||
)
|
||||
except NotFoundError:
|
||||
logger.debug(f"Library agent not found by library_id: {agent_id}")
|
||||
except DatabaseError:
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
f"Could not fetch library agent by library_id {agent_id}: {e}",
|
||||
exc_info=True,
|
||||
)
|
||||
|
||||
return None
|
||||
|
||||
|
||||
async def search_agents(
|
||||
query: str,
|
||||
@@ -69,29 +149,37 @@ async def search_agents(
|
||||
is_featured=False,
|
||||
)
|
||||
)
|
||||
else: # library
|
||||
logger.info(f"Searching user library for: {query}")
|
||||
results = await library_db.list_library_agents(
|
||||
user_id=user_id, # type: ignore[arg-type]
|
||||
search_term=query,
|
||||
page_size=10,
|
||||
)
|
||||
for agent in results.agents:
|
||||
agents.append(
|
||||
AgentInfo(
|
||||
id=agent.id,
|
||||
name=agent.name,
|
||||
description=agent.description or "",
|
||||
source="library",
|
||||
in_library=True,
|
||||
creator=agent.creator_name,
|
||||
status=agent.status.value,
|
||||
can_access_graph=agent.can_access_graph,
|
||||
has_external_trigger=agent.has_external_trigger,
|
||||
new_output=agent.new_output,
|
||||
graph_id=agent.graph_id,
|
||||
)
|
||||
else:
|
||||
if _is_uuid(query):
|
||||
logger.info(f"Query looks like UUID, trying direct lookup: {query}")
|
||||
agent = await _get_library_agent_by_id(user_id, query) # type: ignore[arg-type]
|
||||
if agent:
|
||||
agents.append(agent)
|
||||
logger.info(f"Found agent by direct ID lookup: {agent.name}")
|
||||
|
||||
if not agents:
|
||||
logger.info(f"Searching user library for: {query}")
|
||||
results = await library_db.list_library_agents(
|
||||
user_id=user_id, # type: ignore[arg-type]
|
||||
search_term=query,
|
||||
page_size=10,
|
||||
)
|
||||
for agent in results.agents:
|
||||
agents.append(
|
||||
AgentInfo(
|
||||
id=agent.id,
|
||||
name=agent.name,
|
||||
description=agent.description or "",
|
||||
source="library",
|
||||
in_library=True,
|
||||
creator=agent.creator_name,
|
||||
status=agent.status.value,
|
||||
can_access_graph=agent.can_access_graph,
|
||||
has_external_trigger=agent.has_external_trigger,
|
||||
new_output=agent.new_output,
|
||||
graph_id=agent.graph_id,
|
||||
)
|
||||
)
|
||||
logger.info(f"Found {len(agents)} agents in {source}")
|
||||
except NotFoundError:
|
||||
pass
|
||||
|
||||
@@ -8,7 +8,9 @@ from backend.api.features.chat.model import ChatSession
|
||||
from .agent_generator import (
|
||||
AgentGeneratorNotConfiguredError,
|
||||
decompose_goal,
|
||||
enrich_library_agents_from_steps,
|
||||
generate_agent,
|
||||
get_all_relevant_agents_for_generation,
|
||||
get_user_message_for_error,
|
||||
save_agent_to_library,
|
||||
)
|
||||
@@ -103,9 +105,24 @@ class CreateAgentTool(BaseTool):
|
||||
session_id=session_id,
|
||||
)
|
||||
|
||||
# Step 1: Decompose goal into steps
|
||||
library_agents = None
|
||||
if user_id:
|
||||
try:
|
||||
library_agents = await get_all_relevant_agents_for_generation(
|
||||
user_id=user_id,
|
||||
search_query=description,
|
||||
include_marketplace=True,
|
||||
)
|
||||
logger.debug(
|
||||
f"Found {len(library_agents)} relevant agents for sub-agent composition"
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to fetch library agents: {e}")
|
||||
|
||||
try:
|
||||
decomposition_result = await decompose_goal(description, context)
|
||||
decomposition_result = await decompose_goal(
|
||||
description, context, library_agents
|
||||
)
|
||||
except AgentGeneratorNotConfiguredError:
|
||||
return ErrorResponse(
|
||||
message=(
|
||||
@@ -124,7 +141,6 @@ class CreateAgentTool(BaseTool):
|
||||
session_id=session_id,
|
||||
)
|
||||
|
||||
# Check if the result is an error from the external service
|
||||
if decomposition_result.get("type") == "error":
|
||||
error_msg = decomposition_result.get("error", "Unknown error")
|
||||
error_type = decomposition_result.get("error_type", "unknown")
|
||||
@@ -144,7 +160,6 @@ class CreateAgentTool(BaseTool):
|
||||
session_id=session_id,
|
||||
)
|
||||
|
||||
# Check if LLM returned clarifying questions
|
||||
if decomposition_result.get("type") == "clarifying_questions":
|
||||
questions = decomposition_result.get("questions", [])
|
||||
return ClarificationNeededResponse(
|
||||
@@ -163,7 +178,6 @@ class CreateAgentTool(BaseTool):
|
||||
session_id=session_id,
|
||||
)
|
||||
|
||||
# Check for unachievable/vague goals
|
||||
if decomposition_result.get("type") == "unachievable_goal":
|
||||
suggested = decomposition_result.get("suggested_goal", "")
|
||||
reason = decomposition_result.get("reason", "")
|
||||
@@ -190,9 +204,22 @@ class CreateAgentTool(BaseTool):
|
||||
session_id=session_id,
|
||||
)
|
||||
|
||||
# Step 2: Generate agent JSON (external service handles fixing and validation)
|
||||
if user_id and library_agents is not None:
|
||||
try:
|
||||
library_agents = await enrich_library_agents_from_steps(
|
||||
user_id=user_id,
|
||||
decomposition_result=decomposition_result,
|
||||
existing_agents=library_agents,
|
||||
include_marketplace=True,
|
||||
)
|
||||
logger.debug(
|
||||
f"After enrichment: {len(library_agents)} total agents for sub-agent composition"
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to enrich library agents from steps: {e}")
|
||||
|
||||
try:
|
||||
agent_json = await generate_agent(decomposition_result)
|
||||
agent_json = await generate_agent(decomposition_result, library_agents)
|
||||
except AgentGeneratorNotConfiguredError:
|
||||
return ErrorResponse(
|
||||
message=(
|
||||
@@ -211,7 +238,6 @@ class CreateAgentTool(BaseTool):
|
||||
session_id=session_id,
|
||||
)
|
||||
|
||||
# Check if the result is an error from the external service
|
||||
if isinstance(agent_json, dict) and agent_json.get("type") == "error":
|
||||
error_msg = agent_json.get("error", "Unknown error")
|
||||
error_type = agent_json.get("error_type", "unknown")
|
||||
@@ -219,7 +245,12 @@ class CreateAgentTool(BaseTool):
|
||||
error_type,
|
||||
operation="generate the agent",
|
||||
llm_parse_message="The AI had trouble generating the agent. Please try again or simplify your goal.",
|
||||
validation_message="The generated agent failed validation. Please try rephrasing your goal.",
|
||||
validation_message=(
|
||||
"I wasn't able to create a valid agent for this request. "
|
||||
"The generated workflow had some structural issues. "
|
||||
"Please try simplifying your goal or breaking it into smaller steps."
|
||||
),
|
||||
error_details=error_msg,
|
||||
)
|
||||
return ErrorResponse(
|
||||
message=user_message,
|
||||
@@ -237,7 +268,6 @@ class CreateAgentTool(BaseTool):
|
||||
node_count = len(agent_json.get("nodes", []))
|
||||
link_count = len(agent_json.get("links", []))
|
||||
|
||||
# Step 3: Preview or save
|
||||
if not save:
|
||||
return AgentPreviewResponse(
|
||||
message=(
|
||||
@@ -252,7 +282,6 @@ class CreateAgentTool(BaseTool):
|
||||
session_id=session_id,
|
||||
)
|
||||
|
||||
# Save to library
|
||||
if not user_id:
|
||||
return ErrorResponse(
|
||||
message="You must be logged in to save agents.",
|
||||
@@ -270,7 +299,7 @@ class CreateAgentTool(BaseTool):
|
||||
agent_id=created_graph.id,
|
||||
agent_name=created_graph.name,
|
||||
library_agent_id=library_agent.id,
|
||||
library_agent_link=f"/library/{library_agent.id}",
|
||||
library_agent_link=f"/library/agents/{library_agent.id}",
|
||||
agent_page_link=f"/build?flowID={created_graph.id}",
|
||||
session_id=session_id,
|
||||
)
|
||||
|
||||
@@ -9,6 +9,7 @@ from .agent_generator import (
|
||||
AgentGeneratorNotConfiguredError,
|
||||
generate_agent_patch,
|
||||
get_agent_as_json,
|
||||
get_all_relevant_agents_for_generation,
|
||||
get_user_message_for_error,
|
||||
save_agent_to_library,
|
||||
)
|
||||
@@ -117,7 +118,6 @@ class EditAgentTool(BaseTool):
|
||||
session_id=session_id,
|
||||
)
|
||||
|
||||
# Step 1: Fetch current agent
|
||||
current_agent = await get_agent_as_json(agent_id, user_id)
|
||||
|
||||
if current_agent is None:
|
||||
@@ -127,14 +127,30 @@ class EditAgentTool(BaseTool):
|
||||
session_id=session_id,
|
||||
)
|
||||
|
||||
# Build the update request with context
|
||||
library_agents = None
|
||||
if user_id:
|
||||
try:
|
||||
graph_id = current_agent.get("id")
|
||||
library_agents = await get_all_relevant_agents_for_generation(
|
||||
user_id=user_id,
|
||||
search_query=changes,
|
||||
exclude_graph_id=graph_id,
|
||||
include_marketplace=True,
|
||||
)
|
||||
logger.debug(
|
||||
f"Found {len(library_agents)} relevant agents for sub-agent composition"
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to fetch library agents: {e}")
|
||||
|
||||
update_request = changes
|
||||
if context:
|
||||
update_request = f"{changes}\n\nAdditional context:\n{context}"
|
||||
|
||||
# Step 2: Generate updated agent (external service handles fixing and validation)
|
||||
try:
|
||||
result = await generate_agent_patch(update_request, current_agent)
|
||||
result = await generate_agent_patch(
|
||||
update_request, current_agent, library_agents
|
||||
)
|
||||
except AgentGeneratorNotConfiguredError:
|
||||
return ErrorResponse(
|
||||
message=(
|
||||
@@ -153,7 +169,6 @@ class EditAgentTool(BaseTool):
|
||||
session_id=session_id,
|
||||
)
|
||||
|
||||
# Check if the result is an error from the external service
|
||||
if isinstance(result, dict) and result.get("type") == "error":
|
||||
error_msg = result.get("error", "Unknown error")
|
||||
error_type = result.get("error_type", "unknown")
|
||||
@@ -162,6 +177,7 @@ class EditAgentTool(BaseTool):
|
||||
operation="generate the changes",
|
||||
llm_parse_message="The AI had trouble generating the changes. Please try again or simplify your request.",
|
||||
validation_message="The generated changes failed validation. Please try rephrasing your request.",
|
||||
error_details=error_msg,
|
||||
)
|
||||
return ErrorResponse(
|
||||
message=user_message,
|
||||
@@ -175,7 +191,6 @@ class EditAgentTool(BaseTool):
|
||||
session_id=session_id,
|
||||
)
|
||||
|
||||
# Check if LLM returned clarifying questions
|
||||
if result.get("type") == "clarifying_questions":
|
||||
questions = result.get("questions", [])
|
||||
return ClarificationNeededResponse(
|
||||
@@ -194,7 +209,6 @@ class EditAgentTool(BaseTool):
|
||||
session_id=session_id,
|
||||
)
|
||||
|
||||
# Result is the updated agent JSON
|
||||
updated_agent = result
|
||||
|
||||
agent_name = updated_agent.get("name", "Updated Agent")
|
||||
@@ -202,7 +216,6 @@ class EditAgentTool(BaseTool):
|
||||
node_count = len(updated_agent.get("nodes", []))
|
||||
link_count = len(updated_agent.get("links", []))
|
||||
|
||||
# Step 3: Preview or save
|
||||
if not save:
|
||||
return AgentPreviewResponse(
|
||||
message=(
|
||||
@@ -218,7 +231,6 @@ class EditAgentTool(BaseTool):
|
||||
session_id=session_id,
|
||||
)
|
||||
|
||||
# Save to library (creates a new version)
|
||||
if not user_id:
|
||||
return ErrorResponse(
|
||||
message="You must be logged in to save agents.",
|
||||
@@ -236,7 +248,7 @@ class EditAgentTool(BaseTool):
|
||||
agent_id=created_graph.id,
|
||||
agent_name=created_graph.name,
|
||||
library_agent_id=library_agent.id,
|
||||
library_agent_link=f"/library/{library_agent.id}",
|
||||
library_agent_link=f"/library/agents/{library_agent.id}",
|
||||
agent_page_link=f"/build?flowID={created_graph.id}",
|
||||
session_id=session_id,
|
||||
)
|
||||
|
||||
@@ -39,6 +39,7 @@ async def list_library_agents(
|
||||
sort_by: library_model.LibraryAgentSort = library_model.LibraryAgentSort.UPDATED_AT,
|
||||
page: int = 1,
|
||||
page_size: int = 50,
|
||||
include_executions: bool = False,
|
||||
) -> library_model.LibraryAgentResponse:
|
||||
"""
|
||||
Retrieves a paginated list of LibraryAgent records for a given user.
|
||||
@@ -49,6 +50,9 @@ async def list_library_agents(
|
||||
sort_by: Sorting field (createdAt, updatedAt, isFavorite, isCreatedByUser).
|
||||
page: Current page (1-indexed).
|
||||
page_size: Number of items per page.
|
||||
include_executions: Whether to include execution data for status calculation.
|
||||
Defaults to False for performance (UI fetches status separately).
|
||||
Set to True when accurate status/metrics are needed (e.g., agent generator).
|
||||
|
||||
Returns:
|
||||
A LibraryAgentResponse containing the list of agents and pagination details.
|
||||
@@ -76,7 +80,6 @@ async def list_library_agents(
|
||||
"isArchived": False,
|
||||
}
|
||||
|
||||
# Build search filter if applicable
|
||||
if search_term:
|
||||
where_clause["OR"] = [
|
||||
{
|
||||
@@ -93,7 +96,6 @@ async def list_library_agents(
|
||||
},
|
||||
]
|
||||
|
||||
# Determine sorting
|
||||
order_by: prisma.types.LibraryAgentOrderByInput | None = None
|
||||
|
||||
if sort_by == library_model.LibraryAgentSort.CREATED_AT:
|
||||
@@ -105,7 +107,7 @@ async def list_library_agents(
|
||||
library_agents = await prisma.models.LibraryAgent.prisma().find_many(
|
||||
where=where_clause,
|
||||
include=library_agent_include(
|
||||
user_id, include_nodes=False, include_executions=False
|
||||
user_id, include_nodes=False, include_executions=include_executions
|
||||
),
|
||||
order=order_by,
|
||||
skip=(page - 1) * page_size,
|
||||
|
||||
@@ -9,6 +9,7 @@ import pydantic
|
||||
from backend.data.block import BlockInput
|
||||
from backend.data.graph import GraphModel, GraphSettings, GraphTriggerInfo
|
||||
from backend.data.model import CredentialsMetaInput, is_credentials_field_name
|
||||
from backend.util.json import loads as json_loads
|
||||
from backend.util.models import Pagination
|
||||
|
||||
if TYPE_CHECKING:
|
||||
@@ -16,10 +17,10 @@ if TYPE_CHECKING:
|
||||
|
||||
|
||||
class LibraryAgentStatus(str, Enum):
|
||||
COMPLETED = "COMPLETED" # All runs completed
|
||||
HEALTHY = "HEALTHY" # Agent is running (not all runs have completed)
|
||||
WAITING = "WAITING" # Agent is queued or waiting to start
|
||||
ERROR = "ERROR" # Agent is in an error state
|
||||
COMPLETED = "COMPLETED"
|
||||
HEALTHY = "HEALTHY"
|
||||
WAITING = "WAITING"
|
||||
ERROR = "ERROR"
|
||||
|
||||
|
||||
class MarketplaceListingCreator(pydantic.BaseModel):
|
||||
@@ -39,6 +40,30 @@ class MarketplaceListing(pydantic.BaseModel):
|
||||
creator: MarketplaceListingCreator
|
||||
|
||||
|
||||
class RecentExecution(pydantic.BaseModel):
|
||||
"""Summary of a recent execution for quality assessment.
|
||||
|
||||
Used by the LLM to understand the agent's recent performance with specific examples
|
||||
rather than just aggregate statistics.
|
||||
"""
|
||||
|
||||
status: str
|
||||
correctness_score: float | None = None
|
||||
activity_summary: str | None = None
|
||||
|
||||
|
||||
def _parse_settings(settings: dict | str | None) -> GraphSettings:
|
||||
"""Parse settings from database, handling both dict and string formats."""
|
||||
if settings is None:
|
||||
return GraphSettings()
|
||||
try:
|
||||
if isinstance(settings, str):
|
||||
settings = json_loads(settings)
|
||||
return GraphSettings.model_validate(settings)
|
||||
except Exception:
|
||||
return GraphSettings()
|
||||
|
||||
|
||||
class LibraryAgent(pydantic.BaseModel):
|
||||
"""
|
||||
Represents an agent in the library, including metadata for display and
|
||||
@@ -48,7 +73,7 @@ class LibraryAgent(pydantic.BaseModel):
|
||||
id: str
|
||||
graph_id: str
|
||||
graph_version: int
|
||||
owner_user_id: str # ID of user who owns/created this agent graph
|
||||
owner_user_id: str
|
||||
|
||||
image_url: str | None
|
||||
|
||||
@@ -64,7 +89,7 @@ class LibraryAgent(pydantic.BaseModel):
|
||||
description: str
|
||||
instructions: str | None = None
|
||||
|
||||
input_schema: dict[str, Any] # Should be BlockIOObjectSubSchema in frontend
|
||||
input_schema: dict[str, Any]
|
||||
output_schema: dict[str, Any]
|
||||
credentials_input_schema: dict[str, Any] | None = pydantic.Field(
|
||||
description="Input schema for credentials required by the agent",
|
||||
@@ -81,25 +106,19 @@ class LibraryAgent(pydantic.BaseModel):
|
||||
)
|
||||
trigger_setup_info: Optional[GraphTriggerInfo] = None
|
||||
|
||||
# Indicates whether there's a new output (based on recent runs)
|
||||
new_output: bool
|
||||
|
||||
# Whether the user can access the underlying graph
|
||||
execution_count: int = 0
|
||||
success_rate: float | None = None
|
||||
avg_correctness_score: float | None = None
|
||||
recent_executions: list[RecentExecution] = pydantic.Field(
|
||||
default_factory=list,
|
||||
description="List of recent executions with status, score, and summary",
|
||||
)
|
||||
can_access_graph: bool
|
||||
|
||||
# Indicates if this agent is the latest version
|
||||
is_latest_version: bool
|
||||
|
||||
# Whether the agent is marked as favorite by the user
|
||||
is_favorite: bool
|
||||
|
||||
# Recommended schedule cron (from marketplace agents)
|
||||
recommended_schedule_cron: str | None = None
|
||||
|
||||
# User-specific settings for this library agent
|
||||
settings: GraphSettings = pydantic.Field(default_factory=GraphSettings)
|
||||
|
||||
# Marketplace listing information if the agent has been published
|
||||
marketplace_listing: Optional["MarketplaceListing"] = None
|
||||
|
||||
@staticmethod
|
||||
@@ -123,7 +142,6 @@ class LibraryAgent(pydantic.BaseModel):
|
||||
agent_updated_at = agent.AgentGraph.updatedAt
|
||||
lib_agent_updated_at = agent.updatedAt
|
||||
|
||||
# Compute updated_at as the latest between library agent and graph
|
||||
updated_at = (
|
||||
max(agent_updated_at, lib_agent_updated_at)
|
||||
if agent_updated_at
|
||||
@@ -136,7 +154,6 @@ class LibraryAgent(pydantic.BaseModel):
|
||||
creator_name = agent.Creator.name or "Unknown"
|
||||
creator_image_url = agent.Creator.avatarUrl or ""
|
||||
|
||||
# Logic to calculate status and new_output
|
||||
week_ago = datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(
|
||||
days=7
|
||||
)
|
||||
@@ -145,13 +162,55 @@ class LibraryAgent(pydantic.BaseModel):
|
||||
status = status_result.status
|
||||
new_output = status_result.new_output
|
||||
|
||||
# Check if user can access the graph
|
||||
can_access_graph = agent.AgentGraph.userId == agent.userId
|
||||
execution_count = len(executions)
|
||||
success_rate: float | None = None
|
||||
avg_correctness_score: float | None = None
|
||||
if execution_count > 0:
|
||||
success_count = sum(
|
||||
1
|
||||
for e in executions
|
||||
if e.executionStatus == prisma.enums.AgentExecutionStatus.COMPLETED
|
||||
)
|
||||
success_rate = (success_count / execution_count) * 100
|
||||
|
||||
# Hard-coded to True until a method to check is implemented
|
||||
correctness_scores = []
|
||||
for e in executions:
|
||||
if e.stats and isinstance(e.stats, dict):
|
||||
score = e.stats.get("correctness_score")
|
||||
if score is not None and isinstance(score, (int, float)):
|
||||
correctness_scores.append(float(score))
|
||||
if correctness_scores:
|
||||
avg_correctness_score = sum(correctness_scores) / len(
|
||||
correctness_scores
|
||||
)
|
||||
|
||||
recent_executions: list[RecentExecution] = []
|
||||
for e in executions:
|
||||
exec_score: float | None = None
|
||||
exec_summary: str | None = None
|
||||
if e.stats and isinstance(e.stats, dict):
|
||||
score = e.stats.get("correctness_score")
|
||||
if score is not None and isinstance(score, (int, float)):
|
||||
exec_score = float(score)
|
||||
summary = e.stats.get("activity_status")
|
||||
if summary is not None and isinstance(summary, str):
|
||||
exec_summary = summary
|
||||
exec_status = (
|
||||
e.executionStatus.value
|
||||
if hasattr(e.executionStatus, "value")
|
||||
else str(e.executionStatus)
|
||||
)
|
||||
recent_executions.append(
|
||||
RecentExecution(
|
||||
status=exec_status,
|
||||
correctness_score=exec_score,
|
||||
activity_summary=exec_summary,
|
||||
)
|
||||
)
|
||||
|
||||
can_access_graph = agent.AgentGraph.userId == agent.userId
|
||||
is_latest_version = True
|
||||
|
||||
# Build marketplace_listing if available
|
||||
marketplace_listing_data = None
|
||||
if store_listing and store_listing.ActiveVersion and profile:
|
||||
creator_data = MarketplaceListingCreator(
|
||||
@@ -190,11 +249,15 @@ class LibraryAgent(pydantic.BaseModel):
|
||||
has_sensitive_action=graph.has_sensitive_action,
|
||||
trigger_setup_info=graph.trigger_setup_info,
|
||||
new_output=new_output,
|
||||
execution_count=execution_count,
|
||||
success_rate=success_rate,
|
||||
avg_correctness_score=avg_correctness_score,
|
||||
recent_executions=recent_executions,
|
||||
can_access_graph=can_access_graph,
|
||||
is_latest_version=is_latest_version,
|
||||
is_favorite=agent.isFavorite,
|
||||
recommended_schedule_cron=agent.AgentGraph.recommendedScheduleCron,
|
||||
settings=GraphSettings.model_validate(agent.settings),
|
||||
settings=_parse_settings(agent.settings),
|
||||
marketplace_listing=marketplace_listing_data,
|
||||
)
|
||||
|
||||
@@ -220,18 +283,15 @@ def _calculate_agent_status(
|
||||
if not executions:
|
||||
return AgentStatusResult(status=LibraryAgentStatus.COMPLETED, new_output=False)
|
||||
|
||||
# Track how many times each execution status appears
|
||||
status_counts = {status: 0 for status in prisma.enums.AgentExecutionStatus}
|
||||
new_output = False
|
||||
|
||||
for execution in executions:
|
||||
# Check if there's a completed run more recent than `recent_threshold`
|
||||
if execution.createdAt >= recent_threshold:
|
||||
if execution.executionStatus == prisma.enums.AgentExecutionStatus.COMPLETED:
|
||||
new_output = True
|
||||
status_counts[execution.executionStatus] += 1
|
||||
|
||||
# Determine the final status based on counts
|
||||
if status_counts[prisma.enums.AgentExecutionStatus.FAILED] > 0:
|
||||
return AgentStatusResult(status=LibraryAgentStatus.ERROR, new_output=new_output)
|
||||
elif status_counts[prisma.enums.AgentExecutionStatus.QUEUED] > 0:
|
||||
|
||||
@@ -112,6 +112,7 @@ async def get_store_agents(
|
||||
description=agent["description"],
|
||||
runs=agent["runs"],
|
||||
rating=agent["rating"],
|
||||
agent_graph_id=agent.get("agentGraphId", ""),
|
||||
)
|
||||
store_agents.append(store_agent)
|
||||
except Exception as e:
|
||||
@@ -170,6 +171,7 @@ async def get_store_agents(
|
||||
description=agent.description,
|
||||
runs=agent.runs,
|
||||
rating=agent.rating,
|
||||
agent_graph_id=agent.agentGraphId,
|
||||
)
|
||||
# Add to the list only if creation was successful
|
||||
store_agents.append(store_agent)
|
||||
|
||||
@@ -600,6 +600,7 @@ async def hybrid_search(
|
||||
sa.featured,
|
||||
sa.is_available,
|
||||
sa.updated_at,
|
||||
sa."agentGraphId",
|
||||
-- Searchable text for BM25 reranking
|
||||
COALESCE(sa.agent_name, '') || ' ' || COALESCE(sa.sub_heading, '') || ' ' || COALESCE(sa.description, '') as searchable_text,
|
||||
-- Semantic score
|
||||
@@ -659,6 +660,7 @@ async def hybrid_search(
|
||||
featured,
|
||||
is_available,
|
||||
updated_at,
|
||||
"agentGraphId",
|
||||
searchable_text,
|
||||
semantic_score,
|
||||
lexical_score,
|
||||
|
||||
@@ -38,6 +38,7 @@ class StoreAgent(pydantic.BaseModel):
|
||||
description: str
|
||||
runs: int
|
||||
rating: float
|
||||
agent_graph_id: str
|
||||
|
||||
|
||||
class StoreAgentsResponse(pydantic.BaseModel):
|
||||
|
||||
@@ -26,11 +26,13 @@ def test_store_agent():
|
||||
description="Test description",
|
||||
runs=50,
|
||||
rating=4.5,
|
||||
agent_graph_id="test-graph-id",
|
||||
)
|
||||
assert agent.slug == "test-agent"
|
||||
assert agent.agent_name == "Test Agent"
|
||||
assert agent.runs == 50
|
||||
assert agent.rating == 4.5
|
||||
assert agent.agent_graph_id == "test-graph-id"
|
||||
|
||||
|
||||
def test_store_agents_response():
|
||||
@@ -46,6 +48,7 @@ def test_store_agents_response():
|
||||
description="Test description",
|
||||
runs=50,
|
||||
rating=4.5,
|
||||
agent_graph_id="test-graph-id",
|
||||
)
|
||||
],
|
||||
pagination=store_model.Pagination(
|
||||
|
||||
@@ -82,6 +82,7 @@ def test_get_agents_featured(
|
||||
description="Featured agent description",
|
||||
runs=100,
|
||||
rating=4.5,
|
||||
agent_graph_id="test-graph-1",
|
||||
)
|
||||
],
|
||||
pagination=store_model.Pagination(
|
||||
@@ -127,6 +128,7 @@ def test_get_agents_by_creator(
|
||||
description="Creator agent description",
|
||||
runs=50,
|
||||
rating=4.0,
|
||||
agent_graph_id="test-graph-2",
|
||||
)
|
||||
],
|
||||
pagination=store_model.Pagination(
|
||||
@@ -172,6 +174,7 @@ def test_get_agents_sorted(
|
||||
description="Top agent description",
|
||||
runs=1000,
|
||||
rating=5.0,
|
||||
agent_graph_id="test-graph-3",
|
||||
)
|
||||
],
|
||||
pagination=store_model.Pagination(
|
||||
@@ -217,6 +220,7 @@ def test_get_agents_search(
|
||||
description="Specific search term description",
|
||||
runs=75,
|
||||
rating=4.2,
|
||||
agent_graph_id="test-graph-search",
|
||||
)
|
||||
],
|
||||
pagination=store_model.Pagination(
|
||||
@@ -262,6 +266,7 @@ def test_get_agents_category(
|
||||
description="Category agent description",
|
||||
runs=60,
|
||||
rating=4.1,
|
||||
agent_graph_id="test-graph-category",
|
||||
)
|
||||
],
|
||||
pagination=store_model.Pagination(
|
||||
@@ -306,6 +311,7 @@ def test_get_agents_pagination(
|
||||
description=f"Agent {i} description",
|
||||
runs=i * 10,
|
||||
rating=4.0,
|
||||
agent_graph_id="test-graph-2",
|
||||
)
|
||||
for i in range(5)
|
||||
],
|
||||
|
||||
@@ -33,6 +33,7 @@ class TestCacheDeletion:
|
||||
description="Test description",
|
||||
runs=100,
|
||||
rating=4.5,
|
||||
agent_graph_id="test-graph-id",
|
||||
)
|
||||
],
|
||||
pagination=Pagination(
|
||||
|
||||
@@ -66,18 +66,24 @@ async def event_broadcaster(manager: ConnectionManager):
|
||||
execution_bus = AsyncRedisExecutionEventBus()
|
||||
notification_bus = AsyncRedisNotificationEventBus()
|
||||
|
||||
async def execution_worker():
|
||||
async for event in execution_bus.listen("*"):
|
||||
await manager.send_execution_update(event)
|
||||
try:
|
||||
|
||||
async def notification_worker():
|
||||
async for notification in notification_bus.listen("*"):
|
||||
await manager.send_notification(
|
||||
user_id=notification.user_id,
|
||||
payload=notification.payload,
|
||||
)
|
||||
async def execution_worker():
|
||||
async for event in execution_bus.listen("*"):
|
||||
await manager.send_execution_update(event)
|
||||
|
||||
await asyncio.gather(execution_worker(), notification_worker())
|
||||
async def notification_worker():
|
||||
async for notification in notification_bus.listen("*"):
|
||||
await manager.send_notification(
|
||||
user_id=notification.user_id,
|
||||
payload=notification.payload,
|
||||
)
|
||||
|
||||
await asyncio.gather(execution_worker(), notification_worker())
|
||||
finally:
|
||||
# Ensure PubSub connections are closed on any exit to prevent leaks
|
||||
await execution_bus.close()
|
||||
await notification_bus.close()
|
||||
|
||||
|
||||
async def authenticate_websocket(websocket: WebSocket) -> str:
|
||||
|
||||
@@ -32,7 +32,7 @@ from backend.data.model import (
|
||||
from backend.integrations.providers import ProviderName
|
||||
from backend.util import json
|
||||
from backend.util.logging import TruncatedLogger
|
||||
from backend.util.prompt import compress_prompt, estimate_token_count
|
||||
from backend.util.prompt import compress_context, estimate_token_count
|
||||
from backend.util.text import TextFormatter
|
||||
|
||||
logger = TruncatedLogger(logging.getLogger(__name__), "[LLM-Block]")
|
||||
@@ -634,11 +634,18 @@ async def llm_call(
|
||||
context_window = llm_model.context_window
|
||||
|
||||
if compress_prompt_to_fit:
|
||||
prompt = compress_prompt(
|
||||
result = await compress_context(
|
||||
messages=prompt,
|
||||
target_tokens=llm_model.context_window // 2,
|
||||
lossy_ok=True,
|
||||
client=None, # Truncation-only, no LLM summarization
|
||||
reserve=0, # Caller handles response token budget separately
|
||||
)
|
||||
if result.error:
|
||||
logger.warning(
|
||||
f"Prompt compression did not meet target: {result.error}. "
|
||||
f"Proceeding with {result.token_count} tokens."
|
||||
)
|
||||
prompt = result.messages
|
||||
|
||||
# Calculate available tokens based on context window and input length
|
||||
estimated_input_tokens = estimate_token_count(prompt)
|
||||
|
||||
@@ -133,10 +133,23 @@ class RedisEventBus(BaseRedisEventBus[M], ABC):
|
||||
|
||||
|
||||
class AsyncRedisEventBus(BaseRedisEventBus[M], ABC):
|
||||
def __init__(self):
|
||||
self._pubsub: AsyncPubSub | None = None
|
||||
|
||||
@property
|
||||
async def connection(self) -> redis.AsyncRedis:
|
||||
return await redis.get_redis_async()
|
||||
|
||||
async def close(self) -> None:
|
||||
"""Close the PubSub connection if it exists."""
|
||||
if self._pubsub is not None:
|
||||
try:
|
||||
await self._pubsub.close()
|
||||
except Exception:
|
||||
logger.warning("Failed to close PubSub connection", exc_info=True)
|
||||
finally:
|
||||
self._pubsub = None
|
||||
|
||||
async def publish_event(self, event: M, channel_key: str):
|
||||
"""
|
||||
Publish an event to Redis. Gracefully handles connection failures
|
||||
@@ -157,6 +170,7 @@ class AsyncRedisEventBus(BaseRedisEventBus[M], ABC):
|
||||
await self.connection, channel_key
|
||||
)
|
||||
assert isinstance(pubsub, AsyncPubSub)
|
||||
self._pubsub = pubsub
|
||||
|
||||
if "*" in channel_key:
|
||||
await pubsub.psubscribe(full_channel_name)
|
||||
|
||||
@@ -1028,6 +1028,39 @@ async def get_graph(
|
||||
return GraphModel.from_db(graph, for_export)
|
||||
|
||||
|
||||
async def get_store_listed_graphs(*graph_ids: str) -> dict[str, GraphModel]:
|
||||
"""Batch-fetch multiple store-listed graphs by their IDs.
|
||||
|
||||
Only returns graphs that have approved store listings (publicly available).
|
||||
Does not require permission checks since store-listed graphs are public.
|
||||
|
||||
Args:
|
||||
*graph_ids: Variable number of graph IDs to fetch
|
||||
|
||||
Returns:
|
||||
Dict mapping graph_id to GraphModel for graphs with approved store listings
|
||||
"""
|
||||
if not graph_ids:
|
||||
return {}
|
||||
|
||||
store_listings = await StoreListingVersion.prisma().find_many(
|
||||
where={
|
||||
"agentGraphId": {"in": list(graph_ids)},
|
||||
"submissionStatus": SubmissionStatus.APPROVED,
|
||||
"isDeleted": False,
|
||||
},
|
||||
include={"AgentGraph": {"include": AGENT_GRAPH_INCLUDE}},
|
||||
distinct=["agentGraphId"],
|
||||
order={"agentGraphVersion": "desc"},
|
||||
)
|
||||
|
||||
return {
|
||||
listing.agentGraphId: GraphModel.from_db(listing.AgentGraph)
|
||||
for listing in store_listings
|
||||
if listing.AgentGraph
|
||||
}
|
||||
|
||||
|
||||
async def get_graph_as_admin(
|
||||
graph_id: str,
|
||||
version: int | None = None,
|
||||
|
||||
@@ -17,6 +17,7 @@ from backend.data.analytics import (
|
||||
get_accuracy_trends_and_alerts,
|
||||
get_marketplace_graphs_for_monitoring,
|
||||
)
|
||||
from backend.data.auth.oauth import cleanup_expired_oauth_tokens
|
||||
from backend.data.credit import UsageTransactionMetadata, get_user_credit_model
|
||||
from backend.data.execution import (
|
||||
create_graph_execution,
|
||||
@@ -219,6 +220,9 @@ class DatabaseManager(AppService):
|
||||
# Onboarding
|
||||
increment_onboarding_runs = _(increment_onboarding_runs)
|
||||
|
||||
# OAuth
|
||||
cleanup_expired_oauth_tokens = _(cleanup_expired_oauth_tokens)
|
||||
|
||||
# Store
|
||||
get_store_agents = _(get_store_agents)
|
||||
get_store_agent_details = _(get_store_agent_details)
|
||||
@@ -349,6 +353,9 @@ class DatabaseManagerAsyncClient(AppServiceClient):
|
||||
# Onboarding
|
||||
increment_onboarding_runs = d.increment_onboarding_runs
|
||||
|
||||
# OAuth
|
||||
cleanup_expired_oauth_tokens = d.cleanup_expired_oauth_tokens
|
||||
|
||||
# Store
|
||||
get_store_agents = d.get_store_agents
|
||||
get_store_agent_details = d.get_store_agent_details
|
||||
|
||||
@@ -24,11 +24,9 @@ from dotenv import load_dotenv
|
||||
from pydantic import BaseModel, Field, ValidationError
|
||||
from sqlalchemy import MetaData, create_engine
|
||||
|
||||
from backend.data.auth.oauth import cleanup_expired_oauth_tokens
|
||||
from backend.data.block import BlockInput
|
||||
from backend.data.execution import GraphExecutionWithNodes
|
||||
from backend.data.model import CredentialsMetaInput
|
||||
from backend.data.onboarding import increment_onboarding_runs
|
||||
from backend.executor import utils as execution_utils
|
||||
from backend.monitoring import (
|
||||
NotificationJobArgs,
|
||||
@@ -38,7 +36,11 @@ from backend.monitoring import (
|
||||
report_execution_accuracy_alerts,
|
||||
report_late_executions,
|
||||
)
|
||||
from backend.util.clients import get_database_manager_client, get_scheduler_client
|
||||
from backend.util.clients import (
|
||||
get_database_manager_async_client,
|
||||
get_database_manager_client,
|
||||
get_scheduler_client,
|
||||
)
|
||||
from backend.util.cloud_storage import cleanup_expired_files_async
|
||||
from backend.util.exceptions import (
|
||||
GraphNotFoundError,
|
||||
@@ -148,6 +150,7 @@ def execute_graph(**kwargs):
|
||||
async def _execute_graph(**kwargs):
|
||||
args = GraphExecutionJobArgs(**kwargs)
|
||||
start_time = asyncio.get_event_loop().time()
|
||||
db = get_database_manager_async_client()
|
||||
try:
|
||||
logger.info(f"Executing recurring job for graph #{args.graph_id}")
|
||||
graph_exec: GraphExecutionWithNodes = await execution_utils.add_graph_execution(
|
||||
@@ -157,7 +160,7 @@ async def _execute_graph(**kwargs):
|
||||
inputs=args.input_data,
|
||||
graph_credentials_inputs=args.input_credentials,
|
||||
)
|
||||
await increment_onboarding_runs(args.user_id)
|
||||
await db.increment_onboarding_runs(args.user_id)
|
||||
elapsed = asyncio.get_event_loop().time() - start_time
|
||||
logger.info(
|
||||
f"Graph execution started with ID {graph_exec.id} for graph {args.graph_id} "
|
||||
@@ -246,8 +249,13 @@ def cleanup_expired_files():
|
||||
|
||||
def cleanup_oauth_tokens():
|
||||
"""Clean up expired OAuth tokens from the database."""
|
||||
|
||||
# Wait for completion
|
||||
run_async(cleanup_expired_oauth_tokens())
|
||||
async def _cleanup():
|
||||
db = get_database_manager_async_client()
|
||||
return await db.cleanup_expired_oauth_tokens()
|
||||
|
||||
run_async(_cleanup())
|
||||
|
||||
|
||||
def execution_accuracy_alerts():
|
||||
|
||||
@@ -0,0 +1,39 @@
|
||||
from urllib.parse import urlparse
|
||||
|
||||
import fastapi
|
||||
from fastapi.routing import APIRoute
|
||||
|
||||
from backend.api.features.integrations.router import router as integrations_router
|
||||
from backend.integrations.providers import ProviderName
|
||||
from backend.integrations.webhooks import utils as webhooks_utils
|
||||
|
||||
|
||||
def test_webhook_ingress_url_matches_route(monkeypatch) -> None:
|
||||
app = fastapi.FastAPI()
|
||||
app.include_router(integrations_router, prefix="/api/integrations")
|
||||
|
||||
provider = ProviderName.GITHUB
|
||||
webhook_id = "webhook_123"
|
||||
base_url = "https://example.com"
|
||||
|
||||
monkeypatch.setattr(webhooks_utils.app_config, "platform_base_url", base_url)
|
||||
|
||||
route = next(
|
||||
route
|
||||
for route in integrations_router.routes
|
||||
if isinstance(route, APIRoute)
|
||||
and route.path == "/{provider}/webhooks/{webhook_id}/ingress"
|
||||
and "POST" in route.methods
|
||||
)
|
||||
expected_path = f"/api/integrations{route.path}".format(
|
||||
provider=provider.value,
|
||||
webhook_id=webhook_id,
|
||||
)
|
||||
actual_url = urlparse(webhooks_utils.webhook_ingress_url(provider, webhook_id))
|
||||
expected_base = urlparse(base_url)
|
||||
|
||||
assert (actual_url.scheme, actual_url.netloc) == (
|
||||
expected_base.scheme,
|
||||
expected_base.netloc,
|
||||
)
|
||||
assert actual_url.path == expected_path
|
||||
@@ -1,10 +1,19 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from copy import deepcopy
|
||||
from typing import Any
|
||||
from dataclasses import dataclass
|
||||
from typing import TYPE_CHECKING, Any
|
||||
|
||||
from tiktoken import encoding_for_model
|
||||
|
||||
from backend.util import json
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from openai import AsyncOpenAI
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# ---------------------------------------------------------------------------#
|
||||
# CONSTANTS #
|
||||
# ---------------------------------------------------------------------------#
|
||||
@@ -100,9 +109,17 @@ def _is_objective_message(msg: dict) -> bool:
|
||||
def _truncate_tool_message_content(msg: dict, enc, max_tokens: int) -> None:
|
||||
"""
|
||||
Carefully truncate tool message content while preserving tool structure.
|
||||
Only truncates tool_result content, leaves tool_use intact.
|
||||
Handles both Anthropic-style (list content) and OpenAI-style (string content) tool messages.
|
||||
"""
|
||||
content = msg.get("content")
|
||||
|
||||
# OpenAI-style tool message: role="tool" with string content
|
||||
if msg.get("role") == "tool" and isinstance(content, str):
|
||||
if _tok_len(content, enc) > max_tokens:
|
||||
msg["content"] = _truncate_middle_tokens(content, enc, max_tokens)
|
||||
return
|
||||
|
||||
# Anthropic-style: list content with tool_result items
|
||||
if not isinstance(content, list):
|
||||
return
|
||||
|
||||
@@ -140,141 +157,6 @@ def _truncate_middle_tokens(text: str, enc, max_tok: int) -> str:
|
||||
# ---------------------------------------------------------------------------#
|
||||
|
||||
|
||||
def compress_prompt(
|
||||
messages: list[dict],
|
||||
target_tokens: int,
|
||||
*,
|
||||
model: str = "gpt-4o",
|
||||
reserve: int = 2_048,
|
||||
start_cap: int = 8_192,
|
||||
floor_cap: int = 128,
|
||||
lossy_ok: bool = True,
|
||||
) -> list[dict]:
|
||||
"""
|
||||
Shrink *messages* so that::
|
||||
|
||||
token_count(prompt) + reserve ≤ target_tokens
|
||||
|
||||
Strategy
|
||||
--------
|
||||
1. **Token-aware truncation** – progressively halve a per-message cap
|
||||
(`start_cap`, `start_cap/2`, … `floor_cap`) and apply it to the
|
||||
*content* of every message except the first and last. Tool shells
|
||||
are included: we keep the envelope but shorten huge payloads.
|
||||
2. **Middle-out deletion** – if still over the limit, delete whole
|
||||
messages working outward from the centre, **skipping** any message
|
||||
that contains ``tool_calls`` or has ``role == "tool"``.
|
||||
3. **Last-chance trim** – if still too big, truncate the *first* and
|
||||
*last* message bodies down to `floor_cap` tokens.
|
||||
4. If the prompt is *still* too large:
|
||||
• raise ``ValueError`` when ``lossy_ok == False`` (default)
|
||||
• return the partially-trimmed prompt when ``lossy_ok == True``
|
||||
|
||||
Parameters
|
||||
----------
|
||||
messages Complete chat history (will be deep-copied).
|
||||
model Model name; passed to tiktoken to pick the right
|
||||
tokenizer (gpt-4o → 'o200k_base', others fallback).
|
||||
target_tokens Hard ceiling for prompt size **excluding** the model's
|
||||
forthcoming answer.
|
||||
reserve How many tokens you want to leave available for that
|
||||
answer (`max_tokens` in your subsequent completion call).
|
||||
start_cap Initial per-message truncation ceiling (tokens).
|
||||
floor_cap Lowest cap we'll accept before moving to deletions.
|
||||
lossy_ok If *True* return best-effort prompt instead of raising
|
||||
after all trim passes have been exhausted.
|
||||
|
||||
Returns
|
||||
-------
|
||||
list[dict] – A *new* messages list that abides by the rules above.
|
||||
"""
|
||||
enc = encoding_for_model(model) # best-match tokenizer
|
||||
msgs = deepcopy(messages) # never mutate caller
|
||||
|
||||
def total_tokens() -> int:
|
||||
"""Current size of *msgs* in tokens."""
|
||||
return sum(_msg_tokens(m, enc) for m in msgs)
|
||||
|
||||
original_token_count = total_tokens()
|
||||
|
||||
if original_token_count + reserve <= target_tokens:
|
||||
return msgs
|
||||
|
||||
# ---- STEP 0 : normalise content --------------------------------------
|
||||
# Convert non-string payloads to strings so token counting is coherent.
|
||||
for i, m in enumerate(msgs):
|
||||
if not isinstance(m.get("content"), str) and m.get("content") is not None:
|
||||
if _is_tool_message(m):
|
||||
continue
|
||||
|
||||
# Keep first and last messages intact (unless they're tool messages)
|
||||
if i == 0 or i == len(msgs) - 1:
|
||||
continue
|
||||
|
||||
# Reasonable 20k-char ceiling prevents pathological blobs
|
||||
content_str = json.dumps(m["content"], separators=(",", ":"))
|
||||
if len(content_str) > 20_000:
|
||||
content_str = _truncate_middle_tokens(content_str, enc, 20_000)
|
||||
m["content"] = content_str
|
||||
|
||||
# ---- STEP 1 : token-aware truncation ---------------------------------
|
||||
cap = start_cap
|
||||
while total_tokens() + reserve > target_tokens and cap >= floor_cap:
|
||||
for m in msgs[1:-1]: # keep first & last intact
|
||||
if _is_tool_message(m):
|
||||
# For tool messages, only truncate tool result content, preserve structure
|
||||
_truncate_tool_message_content(m, enc, cap)
|
||||
continue
|
||||
|
||||
if _is_objective_message(m):
|
||||
# Never truncate objective messages - they contain the core task
|
||||
continue
|
||||
|
||||
content = m.get("content") or ""
|
||||
if _tok_len(content, enc) > cap:
|
||||
m["content"] = _truncate_middle_tokens(content, enc, cap)
|
||||
cap //= 2 # tighten the screw
|
||||
|
||||
# ---- STEP 2 : middle-out deletion -----------------------------------
|
||||
while total_tokens() + reserve > target_tokens and len(msgs) > 2:
|
||||
# Identify all deletable messages (not first/last, not tool messages, not objective messages)
|
||||
deletable_indices = []
|
||||
for i in range(1, len(msgs) - 1): # Skip first and last
|
||||
if not _is_tool_message(msgs[i]) and not _is_objective_message(msgs[i]):
|
||||
deletable_indices.append(i)
|
||||
|
||||
if not deletable_indices:
|
||||
break # nothing more we can drop
|
||||
|
||||
# Delete from center outward - find the index closest to center
|
||||
centre = len(msgs) // 2
|
||||
to_delete = min(deletable_indices, key=lambda i: abs(i - centre))
|
||||
del msgs[to_delete]
|
||||
|
||||
# ---- STEP 3 : final safety-net trim on first & last ------------------
|
||||
cap = start_cap
|
||||
while total_tokens() + reserve > target_tokens and cap >= floor_cap:
|
||||
for idx in (0, -1): # first and last
|
||||
if _is_tool_message(msgs[idx]):
|
||||
# For tool messages at first/last position, truncate tool result content only
|
||||
_truncate_tool_message_content(msgs[idx], enc, cap)
|
||||
continue
|
||||
|
||||
text = msgs[idx].get("content") or ""
|
||||
if _tok_len(text, enc) > cap:
|
||||
msgs[idx]["content"] = _truncate_middle_tokens(text, enc, cap)
|
||||
cap //= 2 # tighten the screw
|
||||
|
||||
# ---- STEP 4 : success or fail-gracefully -----------------------------
|
||||
if total_tokens() + reserve > target_tokens and not lossy_ok:
|
||||
raise ValueError(
|
||||
"compress_prompt: prompt still exceeds budget "
|
||||
f"({total_tokens() + reserve} > {target_tokens})."
|
||||
)
|
||||
|
||||
return msgs
|
||||
|
||||
|
||||
def estimate_token_count(
|
||||
messages: list[dict],
|
||||
*,
|
||||
@@ -293,7 +175,8 @@ def estimate_token_count(
|
||||
-------
|
||||
int – Token count.
|
||||
"""
|
||||
enc = encoding_for_model(model) # best-match tokenizer
|
||||
token_model = _normalize_model_for_tokenizer(model)
|
||||
enc = encoding_for_model(token_model)
|
||||
return sum(_msg_tokens(m, enc) for m in messages)
|
||||
|
||||
|
||||
@@ -315,6 +198,543 @@ def estimate_token_count_str(
|
||||
-------
|
||||
int – Token count.
|
||||
"""
|
||||
enc = encoding_for_model(model) # best-match tokenizer
|
||||
token_model = _normalize_model_for_tokenizer(model)
|
||||
enc = encoding_for_model(token_model)
|
||||
text = json.dumps(text) if not isinstance(text, str) else text
|
||||
return _tok_len(text, enc)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------#
|
||||
# UNIFIED CONTEXT COMPRESSION #
|
||||
# ---------------------------------------------------------------------------#
|
||||
|
||||
# Default thresholds
|
||||
DEFAULT_TOKEN_THRESHOLD = 120_000
|
||||
DEFAULT_KEEP_RECENT = 15
|
||||
|
||||
|
||||
@dataclass
|
||||
class CompressResult:
|
||||
"""Result of context compression."""
|
||||
|
||||
messages: list[dict]
|
||||
token_count: int
|
||||
was_compacted: bool
|
||||
error: str | None = None
|
||||
original_token_count: int = 0
|
||||
messages_summarized: int = 0
|
||||
messages_dropped: int = 0
|
||||
|
||||
|
||||
def _normalize_model_for_tokenizer(model: str) -> str:
|
||||
"""Normalize model name for tiktoken tokenizer selection."""
|
||||
if "/" in model:
|
||||
model = model.split("/")[-1]
|
||||
if "claude" in model.lower() or not any(
|
||||
known in model.lower() for known in ["gpt", "o1", "chatgpt", "text-"]
|
||||
):
|
||||
return "gpt-4o"
|
||||
return model
|
||||
|
||||
|
||||
def _extract_tool_call_ids_from_message(msg: dict) -> set[str]:
|
||||
"""
|
||||
Extract tool_call IDs from an assistant message.
|
||||
|
||||
Supports both formats:
|
||||
- OpenAI: {"role": "assistant", "tool_calls": [{"id": "..."}]}
|
||||
- Anthropic: {"role": "assistant", "content": [{"type": "tool_use", "id": "..."}]}
|
||||
|
||||
Returns:
|
||||
Set of tool_call IDs found in the message.
|
||||
"""
|
||||
ids: set[str] = set()
|
||||
if msg.get("role") != "assistant":
|
||||
return ids
|
||||
|
||||
# OpenAI format: tool_calls array
|
||||
if msg.get("tool_calls"):
|
||||
for tc in msg["tool_calls"]:
|
||||
tc_id = tc.get("id")
|
||||
if tc_id:
|
||||
ids.add(tc_id)
|
||||
|
||||
# Anthropic format: content list with tool_use blocks
|
||||
content = msg.get("content")
|
||||
if isinstance(content, list):
|
||||
for block in content:
|
||||
if isinstance(block, dict) and block.get("type") == "tool_use":
|
||||
tc_id = block.get("id")
|
||||
if tc_id:
|
||||
ids.add(tc_id)
|
||||
|
||||
return ids
|
||||
|
||||
|
||||
def _extract_tool_response_ids_from_message(msg: dict) -> set[str]:
|
||||
"""
|
||||
Extract tool_call IDs that this message is responding to.
|
||||
|
||||
Supports both formats:
|
||||
- OpenAI: {"role": "tool", "tool_call_id": "..."}
|
||||
- Anthropic: {"role": "user", "content": [{"type": "tool_result", "tool_use_id": "..."}]}
|
||||
|
||||
Returns:
|
||||
Set of tool_call IDs this message responds to.
|
||||
"""
|
||||
ids: set[str] = set()
|
||||
|
||||
# OpenAI format: role=tool with tool_call_id
|
||||
if msg.get("role") == "tool":
|
||||
tc_id = msg.get("tool_call_id")
|
||||
if tc_id:
|
||||
ids.add(tc_id)
|
||||
|
||||
# Anthropic format: content list with tool_result blocks
|
||||
content = msg.get("content")
|
||||
if isinstance(content, list):
|
||||
for block in content:
|
||||
if isinstance(block, dict) and block.get("type") == "tool_result":
|
||||
tc_id = block.get("tool_use_id")
|
||||
if tc_id:
|
||||
ids.add(tc_id)
|
||||
|
||||
return ids
|
||||
|
||||
|
||||
def _is_tool_response_message(msg: dict) -> bool:
|
||||
"""Check if message is a tool response (OpenAI or Anthropic format)."""
|
||||
# OpenAI format
|
||||
if msg.get("role") == "tool":
|
||||
return True
|
||||
# Anthropic format
|
||||
content = msg.get("content")
|
||||
if isinstance(content, list):
|
||||
for block in content:
|
||||
if isinstance(block, dict) and block.get("type") == "tool_result":
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def _remove_orphan_tool_responses(
|
||||
messages: list[dict], orphan_ids: set[str]
|
||||
) -> list[dict]:
|
||||
"""
|
||||
Remove tool response messages/blocks that reference orphan tool_call IDs.
|
||||
|
||||
Supports both OpenAI and Anthropic formats.
|
||||
For Anthropic messages with mixed valid/orphan tool_result blocks,
|
||||
filters out only the orphan blocks instead of dropping the entire message.
|
||||
"""
|
||||
result = []
|
||||
for msg in messages:
|
||||
# OpenAI format: role=tool - drop entire message if orphan
|
||||
if msg.get("role") == "tool":
|
||||
tc_id = msg.get("tool_call_id")
|
||||
if tc_id and tc_id in orphan_ids:
|
||||
continue
|
||||
result.append(msg)
|
||||
continue
|
||||
|
||||
# Anthropic format: content list may have mixed tool_result blocks
|
||||
content = msg.get("content")
|
||||
if isinstance(content, list):
|
||||
has_tool_results = any(
|
||||
isinstance(b, dict) and b.get("type") == "tool_result" for b in content
|
||||
)
|
||||
if has_tool_results:
|
||||
# Filter out orphan tool_result blocks, keep valid ones
|
||||
filtered_content = [
|
||||
block
|
||||
for block in content
|
||||
if not (
|
||||
isinstance(block, dict)
|
||||
and block.get("type") == "tool_result"
|
||||
and block.get("tool_use_id") in orphan_ids
|
||||
)
|
||||
]
|
||||
# Only keep message if it has remaining content
|
||||
if filtered_content:
|
||||
msg = msg.copy()
|
||||
msg["content"] = filtered_content
|
||||
result.append(msg)
|
||||
continue
|
||||
|
||||
result.append(msg)
|
||||
return result
|
||||
|
||||
|
||||
def _ensure_tool_pairs_intact(
|
||||
recent_messages: list[dict],
|
||||
all_messages: list[dict],
|
||||
start_index: int,
|
||||
) -> list[dict]:
|
||||
"""
|
||||
Ensure tool_call/tool_response pairs stay together after slicing.
|
||||
|
||||
When slicing messages for context compaction, a naive slice can separate
|
||||
an assistant message containing tool_calls from its corresponding tool
|
||||
response messages. This causes API validation errors (e.g., Anthropic's
|
||||
"unexpected tool_use_id found in tool_result blocks").
|
||||
|
||||
This function checks for orphan tool responses in the slice and extends
|
||||
backwards to include their corresponding assistant messages.
|
||||
|
||||
Supports both formats:
|
||||
- OpenAI: tool_calls array + role="tool" responses
|
||||
- Anthropic: tool_use blocks + tool_result blocks
|
||||
|
||||
Args:
|
||||
recent_messages: The sliced messages to validate
|
||||
all_messages: The complete message list (for looking up missing assistants)
|
||||
start_index: The index in all_messages where recent_messages begins
|
||||
|
||||
Returns:
|
||||
A potentially extended list of messages with tool pairs intact
|
||||
"""
|
||||
if not recent_messages:
|
||||
return recent_messages
|
||||
|
||||
# Collect all tool_call_ids from assistant messages in the slice
|
||||
available_tool_call_ids: set[str] = set()
|
||||
for msg in recent_messages:
|
||||
available_tool_call_ids |= _extract_tool_call_ids_from_message(msg)
|
||||
|
||||
# Find orphan tool responses (responses whose tool_call_id is missing)
|
||||
orphan_tool_call_ids: set[str] = set()
|
||||
for msg in recent_messages:
|
||||
response_ids = _extract_tool_response_ids_from_message(msg)
|
||||
for tc_id in response_ids:
|
||||
if tc_id not in available_tool_call_ids:
|
||||
orphan_tool_call_ids.add(tc_id)
|
||||
|
||||
if not orphan_tool_call_ids:
|
||||
# No orphans, slice is valid
|
||||
return recent_messages
|
||||
|
||||
# Find the assistant messages that contain the orphan tool_call_ids
|
||||
# Search backwards from start_index in all_messages
|
||||
messages_to_prepend: list[dict] = []
|
||||
for i in range(start_index - 1, -1, -1):
|
||||
msg = all_messages[i]
|
||||
msg_tool_ids = _extract_tool_call_ids_from_message(msg)
|
||||
if msg_tool_ids & orphan_tool_call_ids:
|
||||
# This assistant message has tool_calls we need
|
||||
# Also collect its contiguous tool responses that follow it
|
||||
assistant_and_responses: list[dict] = [msg]
|
||||
|
||||
# Scan forward from this assistant to collect tool responses
|
||||
for j in range(i + 1, start_index):
|
||||
following_msg = all_messages[j]
|
||||
following_response_ids = _extract_tool_response_ids_from_message(
|
||||
following_msg
|
||||
)
|
||||
if following_response_ids and following_response_ids & msg_tool_ids:
|
||||
assistant_and_responses.append(following_msg)
|
||||
elif not _is_tool_response_message(following_msg):
|
||||
# Stop at first non-tool-response message
|
||||
break
|
||||
|
||||
# Prepend the assistant and its tool responses (maintain order)
|
||||
messages_to_prepend = assistant_and_responses + messages_to_prepend
|
||||
# Mark these as found
|
||||
orphan_tool_call_ids -= msg_tool_ids
|
||||
# Also add this assistant's tool_call_ids to available set
|
||||
available_tool_call_ids |= msg_tool_ids
|
||||
|
||||
if not orphan_tool_call_ids:
|
||||
# Found all missing assistants
|
||||
break
|
||||
|
||||
if orphan_tool_call_ids:
|
||||
# Some tool_call_ids couldn't be resolved - remove those tool responses
|
||||
# This shouldn't happen in normal operation but handles edge cases
|
||||
logger.warning(
|
||||
f"Could not find assistant messages for tool_call_ids: {orphan_tool_call_ids}. "
|
||||
"Removing orphan tool responses."
|
||||
)
|
||||
recent_messages = _remove_orphan_tool_responses(
|
||||
recent_messages, orphan_tool_call_ids
|
||||
)
|
||||
|
||||
if messages_to_prepend:
|
||||
logger.info(
|
||||
f"Extended recent messages by {len(messages_to_prepend)} to preserve "
|
||||
f"tool_call/tool_response pairs"
|
||||
)
|
||||
return messages_to_prepend + recent_messages
|
||||
|
||||
return recent_messages
|
||||
|
||||
|
||||
async def _summarize_messages_llm(
|
||||
messages: list[dict],
|
||||
client: AsyncOpenAI,
|
||||
model: str,
|
||||
timeout: float = 30.0,
|
||||
) -> str:
|
||||
"""Summarize messages using an LLM."""
|
||||
conversation = []
|
||||
for msg in messages:
|
||||
role = msg.get("role", "")
|
||||
content = msg.get("content", "")
|
||||
if content and role in ("user", "assistant", "tool"):
|
||||
conversation.append(f"{role.upper()}: {content}")
|
||||
|
||||
conversation_text = "\n\n".join(conversation)
|
||||
|
||||
if not conversation_text:
|
||||
return "No conversation history available."
|
||||
|
||||
# Limit to ~100k chars for safety
|
||||
MAX_CHARS = 100_000
|
||||
if len(conversation_text) > MAX_CHARS:
|
||||
conversation_text = conversation_text[:MAX_CHARS] + "\n\n[truncated]"
|
||||
|
||||
response = await client.with_options(timeout=timeout).chat.completions.create(
|
||||
model=model,
|
||||
messages=[
|
||||
{
|
||||
"role": "system",
|
||||
"content": (
|
||||
"Create a detailed summary of the conversation so far. "
|
||||
"This summary will be used as context when continuing the conversation.\n\n"
|
||||
"Before writing the summary, analyze each message chronologically to identify:\n"
|
||||
"- User requests and their explicit goals\n"
|
||||
"- Your approach and key decisions made\n"
|
||||
"- Technical specifics (file names, tool outputs, function signatures)\n"
|
||||
"- Errors encountered and resolutions applied\n\n"
|
||||
"You MUST include ALL of the following sections:\n\n"
|
||||
"## 1. Primary Request and Intent\n"
|
||||
"The user's explicit goals and what they are trying to accomplish.\n\n"
|
||||
"## 2. Key Technical Concepts\n"
|
||||
"Technologies, frameworks, tools, and patterns being used or discussed.\n\n"
|
||||
"## 3. Files and Resources Involved\n"
|
||||
"Specific files examined or modified, with relevant snippets and identifiers.\n\n"
|
||||
"## 4. Errors and Fixes\n"
|
||||
"Problems encountered, error messages, and their resolutions. "
|
||||
"Include any user feedback on fixes.\n\n"
|
||||
"## 5. Problem Solving\n"
|
||||
"Issues that have been resolved and how they were addressed.\n\n"
|
||||
"## 6. All User Messages\n"
|
||||
"A complete list of all user inputs (excluding tool outputs) to preserve their exact requests.\n\n"
|
||||
"## 7. Pending Tasks\n"
|
||||
"Work items the user explicitly requested that have not yet been completed.\n\n"
|
||||
"## 8. Current Work\n"
|
||||
"Precise description of what was being worked on most recently, including relevant context.\n\n"
|
||||
"## 9. Next Steps\n"
|
||||
"What should happen next, aligned with the user's most recent requests. "
|
||||
"Include verbatim quotes of recent instructions if relevant."
|
||||
),
|
||||
},
|
||||
{"role": "user", "content": f"Summarize:\n\n{conversation_text}"},
|
||||
],
|
||||
max_tokens=1500,
|
||||
temperature=0.3,
|
||||
)
|
||||
|
||||
return response.choices[0].message.content or "No summary available."
|
||||
|
||||
|
||||
async def compress_context(
|
||||
messages: list[dict],
|
||||
target_tokens: int = DEFAULT_TOKEN_THRESHOLD,
|
||||
*,
|
||||
model: str = "gpt-4o",
|
||||
client: AsyncOpenAI | None = None,
|
||||
keep_recent: int = DEFAULT_KEEP_RECENT,
|
||||
reserve: int = 2_048,
|
||||
start_cap: int = 8_192,
|
||||
floor_cap: int = 128,
|
||||
) -> CompressResult:
|
||||
"""
|
||||
Unified context compression that combines summarization and truncation strategies.
|
||||
|
||||
Strategy (in order):
|
||||
1. **LLM summarization** – If client provided, summarize old messages into a
|
||||
single context message while keeping recent messages intact. This is the
|
||||
primary strategy for chat service.
|
||||
2. **Content truncation** – Progressively halve a per-message cap and truncate
|
||||
bloated message content (tool outputs, large pastes). Preserves all messages
|
||||
but shortens their content. Primary strategy when client=None (LLM blocks).
|
||||
3. **Middle-out deletion** – Delete whole messages one at a time from the center
|
||||
outward, skipping tool messages and objective messages.
|
||||
4. **First/last trim** – Truncate first and last message content as last resort.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
messages Complete chat history (will be deep-copied).
|
||||
target_tokens Hard ceiling for prompt size.
|
||||
model Model name for tokenization and summarization.
|
||||
client AsyncOpenAI client. If provided, enables LLM summarization
|
||||
as the first strategy. If None, skips to truncation strategies.
|
||||
keep_recent Number of recent messages to preserve during summarization.
|
||||
reserve Tokens to reserve for model response.
|
||||
start_cap Initial per-message truncation ceiling (tokens).
|
||||
floor_cap Lowest cap before moving to deletions.
|
||||
|
||||
Returns
|
||||
-------
|
||||
CompressResult with compressed messages and metadata.
|
||||
"""
|
||||
# Guard clause for empty messages
|
||||
if not messages:
|
||||
return CompressResult(
|
||||
messages=[],
|
||||
token_count=0,
|
||||
was_compacted=False,
|
||||
original_token_count=0,
|
||||
)
|
||||
|
||||
token_model = _normalize_model_for_tokenizer(model)
|
||||
enc = encoding_for_model(token_model)
|
||||
msgs = deepcopy(messages)
|
||||
|
||||
def total_tokens() -> int:
|
||||
return sum(_msg_tokens(m, enc) for m in msgs)
|
||||
|
||||
original_count = total_tokens()
|
||||
|
||||
# Already under limit
|
||||
if original_count + reserve <= target_tokens:
|
||||
return CompressResult(
|
||||
messages=msgs,
|
||||
token_count=original_count,
|
||||
was_compacted=False,
|
||||
original_token_count=original_count,
|
||||
)
|
||||
|
||||
messages_summarized = 0
|
||||
messages_dropped = 0
|
||||
|
||||
# ---- STEP 1: LLM summarization (if client provided) -------------------
|
||||
# This is the primary compression strategy for chat service.
|
||||
# Summarize old messages while keeping recent ones intact.
|
||||
if client is not None:
|
||||
has_system = len(msgs) > 0 and msgs[0].get("role") == "system"
|
||||
system_msg = msgs[0] if has_system else None
|
||||
|
||||
# Calculate old vs recent messages
|
||||
if has_system:
|
||||
if len(msgs) > keep_recent + 1:
|
||||
old_msgs = msgs[1:-keep_recent]
|
||||
recent_msgs = msgs[-keep_recent:]
|
||||
else:
|
||||
old_msgs = []
|
||||
recent_msgs = msgs[1:] if len(msgs) > 1 else []
|
||||
else:
|
||||
if len(msgs) > keep_recent:
|
||||
old_msgs = msgs[:-keep_recent]
|
||||
recent_msgs = msgs[-keep_recent:]
|
||||
else:
|
||||
old_msgs = []
|
||||
recent_msgs = msgs
|
||||
|
||||
# Ensure tool pairs stay intact
|
||||
slice_start = max(0, len(msgs) - keep_recent)
|
||||
recent_msgs = _ensure_tool_pairs_intact(recent_msgs, msgs, slice_start)
|
||||
|
||||
if old_msgs:
|
||||
try:
|
||||
summary_text = await _summarize_messages_llm(old_msgs, client, model)
|
||||
summary_msg = {
|
||||
"role": "assistant",
|
||||
"content": f"[Previous conversation summary — for context only]: {summary_text}",
|
||||
}
|
||||
messages_summarized = len(old_msgs)
|
||||
|
||||
if has_system:
|
||||
msgs = [system_msg, summary_msg] + recent_msgs
|
||||
else:
|
||||
msgs = [summary_msg] + recent_msgs
|
||||
|
||||
logger.info(
|
||||
f"Context summarized: {original_count} -> {total_tokens()} tokens, "
|
||||
f"summarized {messages_summarized} messages"
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning(f"Summarization failed, continuing with truncation: {e}")
|
||||
# Fall through to content truncation
|
||||
|
||||
# ---- STEP 2: Normalize content ----------------------------------------
|
||||
# Convert non-string payloads to strings so token counting is coherent.
|
||||
# Always run this before truncation to ensure consistent token counting.
|
||||
for i, m in enumerate(msgs):
|
||||
if not isinstance(m.get("content"), str) and m.get("content") is not None:
|
||||
if _is_tool_message(m):
|
||||
continue
|
||||
if i == 0 or i == len(msgs) - 1:
|
||||
continue
|
||||
content_str = json.dumps(m["content"], separators=(",", ":"))
|
||||
if len(content_str) > 20_000:
|
||||
content_str = _truncate_middle_tokens(content_str, enc, 20_000)
|
||||
m["content"] = content_str
|
||||
|
||||
# ---- STEP 3: Token-aware content truncation ---------------------------
|
||||
# Progressively halve per-message cap and truncate bloated content.
|
||||
# This preserves all messages but shortens their content.
|
||||
cap = start_cap
|
||||
while total_tokens() + reserve > target_tokens and cap >= floor_cap:
|
||||
for m in msgs[1:-1]:
|
||||
if _is_tool_message(m):
|
||||
_truncate_tool_message_content(m, enc, cap)
|
||||
continue
|
||||
if _is_objective_message(m):
|
||||
continue
|
||||
content = m.get("content") or ""
|
||||
if _tok_len(content, enc) > cap:
|
||||
m["content"] = _truncate_middle_tokens(content, enc, cap)
|
||||
cap //= 2
|
||||
|
||||
# ---- STEP 4: Middle-out deletion --------------------------------------
|
||||
# Delete messages one at a time from the center outward.
|
||||
# This is more granular than dropping all old messages at once.
|
||||
while total_tokens() + reserve > target_tokens and len(msgs) > 2:
|
||||
deletable: list[int] = []
|
||||
for i in range(1, len(msgs) - 1):
|
||||
msg = msgs[i]
|
||||
if (
|
||||
msg is not None
|
||||
and not _is_tool_message(msg)
|
||||
and not _is_objective_message(msg)
|
||||
):
|
||||
deletable.append(i)
|
||||
if not deletable:
|
||||
break
|
||||
centre = len(msgs) // 2
|
||||
to_delete = min(deletable, key=lambda i: abs(i - centre))
|
||||
del msgs[to_delete]
|
||||
messages_dropped += 1
|
||||
|
||||
# ---- STEP 5: Final trim on first/last ---------------------------------
|
||||
cap = start_cap
|
||||
while total_tokens() + reserve > target_tokens and cap >= floor_cap:
|
||||
for idx in (0, -1):
|
||||
msg = msgs[idx]
|
||||
if msg is None:
|
||||
continue
|
||||
if _is_tool_message(msg):
|
||||
_truncate_tool_message_content(msg, enc, cap)
|
||||
continue
|
||||
text = msg.get("content") or ""
|
||||
if _tok_len(text, enc) > cap:
|
||||
msg["content"] = _truncate_middle_tokens(text, enc, cap)
|
||||
cap //= 2
|
||||
|
||||
# Filter out any None values that may have been introduced
|
||||
final_msgs: list[dict] = [m for m in msgs if m is not None]
|
||||
final_count = sum(_msg_tokens(m, enc) for m in final_msgs)
|
||||
error = None
|
||||
if final_count + reserve > target_tokens:
|
||||
error = f"Could not compress below target ({final_count + reserve} > {target_tokens})"
|
||||
logger.warning(error)
|
||||
|
||||
return CompressResult(
|
||||
messages=final_msgs,
|
||||
token_count=final_count,
|
||||
was_compacted=True,
|
||||
error=error,
|
||||
original_token_count=original_count,
|
||||
messages_summarized=messages_summarized,
|
||||
messages_dropped=messages_dropped,
|
||||
)
|
||||
|
||||
@@ -1,10 +1,21 @@
|
||||
"""Tests for prompt utility functions, especially tool call token counting."""
|
||||
|
||||
from unittest.mock import AsyncMock, MagicMock
|
||||
|
||||
import pytest
|
||||
from tiktoken import encoding_for_model
|
||||
|
||||
from backend.util import json
|
||||
from backend.util.prompt import _msg_tokens, estimate_token_count
|
||||
from backend.util.prompt import (
|
||||
CompressResult,
|
||||
_ensure_tool_pairs_intact,
|
||||
_msg_tokens,
|
||||
_normalize_model_for_tokenizer,
|
||||
_truncate_middle_tokens,
|
||||
_truncate_tool_message_content,
|
||||
compress_context,
|
||||
estimate_token_count,
|
||||
)
|
||||
|
||||
|
||||
class TestMsgTokens:
|
||||
@@ -276,3 +287,690 @@ class TestEstimateTokenCount:
|
||||
|
||||
assert total_tokens == expected_total
|
||||
assert total_tokens > 20 # Should be substantial
|
||||
|
||||
|
||||
class TestNormalizeModelForTokenizer:
|
||||
"""Test model name normalization for tiktoken."""
|
||||
|
||||
def test_openai_models_unchanged(self):
|
||||
"""Test that OpenAI models are returned as-is."""
|
||||
assert _normalize_model_for_tokenizer("gpt-4o") == "gpt-4o"
|
||||
assert _normalize_model_for_tokenizer("gpt-4") == "gpt-4"
|
||||
assert _normalize_model_for_tokenizer("gpt-3.5-turbo") == "gpt-3.5-turbo"
|
||||
|
||||
def test_claude_models_normalized(self):
|
||||
"""Test that Claude models are normalized to gpt-4o."""
|
||||
assert _normalize_model_for_tokenizer("claude-3-opus") == "gpt-4o"
|
||||
assert _normalize_model_for_tokenizer("claude-3-sonnet") == "gpt-4o"
|
||||
assert _normalize_model_for_tokenizer("anthropic/claude-3-haiku") == "gpt-4o"
|
||||
|
||||
def test_openrouter_paths_extracted(self):
|
||||
"""Test that OpenRouter model paths are handled."""
|
||||
assert _normalize_model_for_tokenizer("openai/gpt-4o") == "gpt-4o"
|
||||
assert _normalize_model_for_tokenizer("anthropic/claude-3-opus") == "gpt-4o"
|
||||
|
||||
def test_unknown_models_default_to_gpt4o(self):
|
||||
"""Test that unknown models default to gpt-4o."""
|
||||
assert _normalize_model_for_tokenizer("some-random-model") == "gpt-4o"
|
||||
assert _normalize_model_for_tokenizer("llama-3-70b") == "gpt-4o"
|
||||
|
||||
|
||||
class TestTruncateToolMessageContent:
|
||||
"""Test tool message content truncation."""
|
||||
|
||||
@pytest.fixture
|
||||
def enc(self):
|
||||
return encoding_for_model("gpt-4o")
|
||||
|
||||
def test_truncate_openai_tool_message(self, enc):
|
||||
"""Test truncation of OpenAI-style tool message with string content."""
|
||||
long_content = "x" * 10000
|
||||
msg = {"role": "tool", "tool_call_id": "call_123", "content": long_content}
|
||||
|
||||
_truncate_tool_message_content(msg, enc, max_tokens=100)
|
||||
|
||||
# Content should be truncated
|
||||
assert len(msg["content"]) < len(long_content)
|
||||
assert "…" in msg["content"] # Has ellipsis marker
|
||||
|
||||
def test_truncate_anthropic_tool_result(self, enc):
|
||||
"""Test truncation of Anthropic-style tool_result."""
|
||||
long_content = "y" * 10000
|
||||
msg = {
|
||||
"role": "user",
|
||||
"content": [
|
||||
{
|
||||
"type": "tool_result",
|
||||
"tool_use_id": "toolu_123",
|
||||
"content": long_content,
|
||||
}
|
||||
],
|
||||
}
|
||||
|
||||
_truncate_tool_message_content(msg, enc, max_tokens=100)
|
||||
|
||||
# Content should be truncated
|
||||
result_content = msg["content"][0]["content"]
|
||||
assert len(result_content) < len(long_content)
|
||||
assert "…" in result_content
|
||||
|
||||
def test_preserve_tool_use_blocks(self, enc):
|
||||
"""Test that tool_use blocks are not truncated."""
|
||||
msg = {
|
||||
"role": "assistant",
|
||||
"content": [
|
||||
{
|
||||
"type": "tool_use",
|
||||
"id": "toolu_123",
|
||||
"name": "some_function",
|
||||
"input": {"key": "value" * 1000}, # Large input
|
||||
}
|
||||
],
|
||||
}
|
||||
|
||||
original = json.dumps(msg["content"][0]["input"])
|
||||
_truncate_tool_message_content(msg, enc, max_tokens=10)
|
||||
|
||||
# tool_use should be unchanged
|
||||
assert json.dumps(msg["content"][0]["input"]) == original
|
||||
|
||||
def test_no_truncation_when_under_limit(self, enc):
|
||||
"""Test that short content is not modified."""
|
||||
msg = {"role": "tool", "tool_call_id": "call_123", "content": "Short content"}
|
||||
|
||||
original = msg["content"]
|
||||
_truncate_tool_message_content(msg, enc, max_tokens=1000)
|
||||
|
||||
assert msg["content"] == original
|
||||
|
||||
|
||||
class TestTruncateMiddleTokens:
|
||||
"""Test middle truncation of text."""
|
||||
|
||||
@pytest.fixture
|
||||
def enc(self):
|
||||
return encoding_for_model("gpt-4o")
|
||||
|
||||
def test_truncates_long_text(self, enc):
|
||||
"""Test that long text is truncated with ellipsis in middle."""
|
||||
long_text = "word " * 1000
|
||||
result = _truncate_middle_tokens(long_text, enc, max_tok=50)
|
||||
|
||||
assert len(enc.encode(result)) <= 52 # Allow some slack for ellipsis
|
||||
assert "…" in result
|
||||
assert result.startswith("word") # Head preserved
|
||||
assert result.endswith("word ") # Tail preserved
|
||||
|
||||
def test_preserves_short_text(self, enc):
|
||||
"""Test that short text is not modified."""
|
||||
short_text = "Hello world"
|
||||
result = _truncate_middle_tokens(short_text, enc, max_tok=100)
|
||||
|
||||
assert result == short_text
|
||||
|
||||
|
||||
class TestEnsureToolPairsIntact:
|
||||
"""Test tool call/response pair preservation for both OpenAI and Anthropic formats."""
|
||||
|
||||
# ---- OpenAI Format Tests ----
|
||||
|
||||
def test_openai_adds_missing_tool_call(self):
|
||||
"""Test that orphaned OpenAI tool_response gets its tool_call prepended."""
|
||||
all_msgs = [
|
||||
{"role": "system", "content": "You are helpful."},
|
||||
{
|
||||
"role": "assistant",
|
||||
"tool_calls": [
|
||||
{"id": "call_1", "type": "function", "function": {"name": "f1"}}
|
||||
],
|
||||
},
|
||||
{"role": "tool", "tool_call_id": "call_1", "content": "result"},
|
||||
{"role": "user", "content": "Thanks!"},
|
||||
]
|
||||
# Recent messages start at index 2 (the tool response)
|
||||
recent = [all_msgs[2], all_msgs[3]]
|
||||
start_index = 2
|
||||
|
||||
result = _ensure_tool_pairs_intact(recent, all_msgs, start_index)
|
||||
|
||||
# Should prepend the tool_call message
|
||||
assert len(result) == 3
|
||||
assert result[0]["role"] == "assistant"
|
||||
assert "tool_calls" in result[0]
|
||||
|
||||
def test_openai_keeps_complete_pairs(self):
|
||||
"""Test that complete OpenAI pairs are unchanged."""
|
||||
all_msgs = [
|
||||
{"role": "system", "content": "System"},
|
||||
{
|
||||
"role": "assistant",
|
||||
"tool_calls": [
|
||||
{"id": "call_1", "type": "function", "function": {"name": "f1"}}
|
||||
],
|
||||
},
|
||||
{"role": "tool", "tool_call_id": "call_1", "content": "result"},
|
||||
]
|
||||
recent = all_msgs[1:] # Include both tool_call and response
|
||||
start_index = 1
|
||||
|
||||
result = _ensure_tool_pairs_intact(recent, all_msgs, start_index)
|
||||
|
||||
assert len(result) == 2 # No messages added
|
||||
|
||||
def test_openai_multiple_tool_calls(self):
|
||||
"""Test multiple OpenAI tool calls in one assistant message."""
|
||||
all_msgs = [
|
||||
{"role": "system", "content": "System"},
|
||||
{
|
||||
"role": "assistant",
|
||||
"tool_calls": [
|
||||
{"id": "call_1", "type": "function", "function": {"name": "f1"}},
|
||||
{"id": "call_2", "type": "function", "function": {"name": "f2"}},
|
||||
],
|
||||
},
|
||||
{"role": "tool", "tool_call_id": "call_1", "content": "result1"},
|
||||
{"role": "tool", "tool_call_id": "call_2", "content": "result2"},
|
||||
{"role": "user", "content": "Thanks!"},
|
||||
]
|
||||
# Recent messages start at index 2 (first tool response)
|
||||
recent = [all_msgs[2], all_msgs[3], all_msgs[4]]
|
||||
start_index = 2
|
||||
|
||||
result = _ensure_tool_pairs_intact(recent, all_msgs, start_index)
|
||||
|
||||
# Should prepend the assistant message with both tool_calls
|
||||
assert len(result) == 4
|
||||
assert result[0]["role"] == "assistant"
|
||||
assert len(result[0]["tool_calls"]) == 2
|
||||
|
||||
# ---- Anthropic Format Tests ----
|
||||
|
||||
def test_anthropic_adds_missing_tool_use(self):
|
||||
"""Test that orphaned Anthropic tool_result gets its tool_use prepended."""
|
||||
all_msgs = [
|
||||
{"role": "system", "content": "You are helpful."},
|
||||
{
|
||||
"role": "assistant",
|
||||
"content": [
|
||||
{
|
||||
"type": "tool_use",
|
||||
"id": "toolu_123",
|
||||
"name": "get_weather",
|
||||
"input": {"location": "SF"},
|
||||
}
|
||||
],
|
||||
},
|
||||
{
|
||||
"role": "user",
|
||||
"content": [
|
||||
{
|
||||
"type": "tool_result",
|
||||
"tool_use_id": "toolu_123",
|
||||
"content": "22°C and sunny",
|
||||
}
|
||||
],
|
||||
},
|
||||
{"role": "user", "content": "Thanks!"},
|
||||
]
|
||||
# Recent messages start at index 2 (the tool_result)
|
||||
recent = [all_msgs[2], all_msgs[3]]
|
||||
start_index = 2
|
||||
|
||||
result = _ensure_tool_pairs_intact(recent, all_msgs, start_index)
|
||||
|
||||
# Should prepend the tool_use message
|
||||
assert len(result) == 3
|
||||
assert result[0]["role"] == "assistant"
|
||||
assert result[0]["content"][0]["type"] == "tool_use"
|
||||
|
||||
def test_anthropic_keeps_complete_pairs(self):
|
||||
"""Test that complete Anthropic pairs are unchanged."""
|
||||
all_msgs = [
|
||||
{"role": "system", "content": "System"},
|
||||
{
|
||||
"role": "assistant",
|
||||
"content": [
|
||||
{
|
||||
"type": "tool_use",
|
||||
"id": "toolu_456",
|
||||
"name": "calculator",
|
||||
"input": {"expr": "2+2"},
|
||||
}
|
||||
],
|
||||
},
|
||||
{
|
||||
"role": "user",
|
||||
"content": [
|
||||
{
|
||||
"type": "tool_result",
|
||||
"tool_use_id": "toolu_456",
|
||||
"content": "4",
|
||||
}
|
||||
],
|
||||
},
|
||||
]
|
||||
recent = all_msgs[1:] # Include both tool_use and result
|
||||
start_index = 1
|
||||
|
||||
result = _ensure_tool_pairs_intact(recent, all_msgs, start_index)
|
||||
|
||||
assert len(result) == 2 # No messages added
|
||||
|
||||
def test_anthropic_multiple_tool_uses(self):
|
||||
"""Test multiple Anthropic tool_use blocks in one message."""
|
||||
all_msgs = [
|
||||
{"role": "system", "content": "System"},
|
||||
{
|
||||
"role": "assistant",
|
||||
"content": [
|
||||
{"type": "text", "text": "Let me check both..."},
|
||||
{
|
||||
"type": "tool_use",
|
||||
"id": "toolu_1",
|
||||
"name": "get_weather",
|
||||
"input": {"city": "NYC"},
|
||||
},
|
||||
{
|
||||
"type": "tool_use",
|
||||
"id": "toolu_2",
|
||||
"name": "get_weather",
|
||||
"input": {"city": "LA"},
|
||||
},
|
||||
],
|
||||
},
|
||||
{
|
||||
"role": "user",
|
||||
"content": [
|
||||
{
|
||||
"type": "tool_result",
|
||||
"tool_use_id": "toolu_1",
|
||||
"content": "Cold",
|
||||
},
|
||||
{
|
||||
"type": "tool_result",
|
||||
"tool_use_id": "toolu_2",
|
||||
"content": "Warm",
|
||||
},
|
||||
],
|
||||
},
|
||||
{"role": "user", "content": "Thanks!"},
|
||||
]
|
||||
# Recent messages start at index 2 (tool_result)
|
||||
recent = [all_msgs[2], all_msgs[3]]
|
||||
start_index = 2
|
||||
|
||||
result = _ensure_tool_pairs_intact(recent, all_msgs, start_index)
|
||||
|
||||
# Should prepend the assistant message with both tool_uses
|
||||
assert len(result) == 3
|
||||
assert result[0]["role"] == "assistant"
|
||||
tool_use_count = sum(
|
||||
1 for b in result[0]["content"] if b.get("type") == "tool_use"
|
||||
)
|
||||
assert tool_use_count == 2
|
||||
|
||||
# ---- Mixed/Edge Case Tests ----
|
||||
|
||||
def test_anthropic_with_type_message_field(self):
|
||||
"""Test Anthropic format with 'type': 'message' field (smart_decision_maker style)."""
|
||||
all_msgs = [
|
||||
{"role": "system", "content": "You are helpful."},
|
||||
{
|
||||
"role": "assistant",
|
||||
"content": [
|
||||
{
|
||||
"type": "tool_use",
|
||||
"id": "toolu_abc",
|
||||
"name": "search",
|
||||
"input": {"q": "test"},
|
||||
}
|
||||
],
|
||||
},
|
||||
{
|
||||
"role": "user",
|
||||
"type": "message", # Extra field from smart_decision_maker
|
||||
"content": [
|
||||
{
|
||||
"type": "tool_result",
|
||||
"tool_use_id": "toolu_abc",
|
||||
"content": "Found results",
|
||||
}
|
||||
],
|
||||
},
|
||||
{"role": "user", "content": "Thanks!"},
|
||||
]
|
||||
# Recent messages start at index 2 (the tool_result with 'type': 'message')
|
||||
recent = [all_msgs[2], all_msgs[3]]
|
||||
start_index = 2
|
||||
|
||||
result = _ensure_tool_pairs_intact(recent, all_msgs, start_index)
|
||||
|
||||
# Should prepend the tool_use message
|
||||
assert len(result) == 3
|
||||
assert result[0]["role"] == "assistant"
|
||||
assert result[0]["content"][0]["type"] == "tool_use"
|
||||
|
||||
def test_handles_no_tool_messages(self):
|
||||
"""Test messages without tool calls."""
|
||||
all_msgs = [
|
||||
{"role": "user", "content": "Hello"},
|
||||
{"role": "assistant", "content": "Hi there!"},
|
||||
]
|
||||
recent = all_msgs
|
||||
start_index = 0
|
||||
|
||||
result = _ensure_tool_pairs_intact(recent, all_msgs, start_index)
|
||||
|
||||
assert result == all_msgs
|
||||
|
||||
def test_handles_empty_messages(self):
|
||||
"""Test empty message list."""
|
||||
result = _ensure_tool_pairs_intact([], [], 0)
|
||||
assert result == []
|
||||
|
||||
def test_mixed_text_and_tool_content(self):
|
||||
"""Test Anthropic message with mixed text and tool_use content."""
|
||||
all_msgs = [
|
||||
{
|
||||
"role": "assistant",
|
||||
"content": [
|
||||
{"type": "text", "text": "I'll help you with that."},
|
||||
{
|
||||
"type": "tool_use",
|
||||
"id": "toolu_mixed",
|
||||
"name": "search",
|
||||
"input": {"q": "test"},
|
||||
},
|
||||
],
|
||||
},
|
||||
{
|
||||
"role": "user",
|
||||
"content": [
|
||||
{
|
||||
"type": "tool_result",
|
||||
"tool_use_id": "toolu_mixed",
|
||||
"content": "Found results",
|
||||
}
|
||||
],
|
||||
},
|
||||
{"role": "assistant", "content": "Here are the results..."},
|
||||
]
|
||||
# Start from tool_result
|
||||
recent = [all_msgs[1], all_msgs[2]]
|
||||
start_index = 1
|
||||
|
||||
result = _ensure_tool_pairs_intact(recent, all_msgs, start_index)
|
||||
|
||||
# Should prepend the assistant message with tool_use
|
||||
assert len(result) == 3
|
||||
assert result[0]["content"][0]["type"] == "text"
|
||||
assert result[0]["content"][1]["type"] == "tool_use"
|
||||
|
||||
|
||||
class TestCompressContext:
|
||||
"""Test the async compress_context function."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_no_compression_needed(self):
|
||||
"""Test messages under limit return without compression."""
|
||||
messages = [
|
||||
{"role": "system", "content": "You are helpful."},
|
||||
{"role": "user", "content": "Hello!"},
|
||||
]
|
||||
|
||||
result = await compress_context(messages, target_tokens=100000)
|
||||
|
||||
assert isinstance(result, CompressResult)
|
||||
assert result.was_compacted is False
|
||||
assert len(result.messages) == 2
|
||||
assert result.error is None
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_truncation_without_client(self):
|
||||
"""Test that truncation works without LLM client."""
|
||||
long_content = "x" * 50000
|
||||
messages = [
|
||||
{"role": "system", "content": "System"},
|
||||
{"role": "user", "content": long_content},
|
||||
{"role": "assistant", "content": "Response"},
|
||||
]
|
||||
|
||||
result = await compress_context(
|
||||
messages, target_tokens=1000, client=None, reserve=100
|
||||
)
|
||||
|
||||
assert result.was_compacted is True
|
||||
# Should have truncated without summarization
|
||||
assert result.messages_summarized == 0
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_with_mocked_llm_client(self):
|
||||
"""Test summarization with mocked LLM client."""
|
||||
# Create many messages to trigger summarization
|
||||
messages = [{"role": "system", "content": "System prompt"}]
|
||||
for i in range(30):
|
||||
messages.append({"role": "user", "content": f"User message {i} " * 100})
|
||||
messages.append(
|
||||
{"role": "assistant", "content": f"Assistant response {i} " * 100}
|
||||
)
|
||||
|
||||
# Mock the AsyncOpenAI client
|
||||
mock_client = AsyncMock()
|
||||
mock_response = MagicMock()
|
||||
mock_response.choices = [MagicMock()]
|
||||
mock_response.choices[0].message.content = "Summary of conversation"
|
||||
mock_client.with_options.return_value.chat.completions.create = AsyncMock(
|
||||
return_value=mock_response
|
||||
)
|
||||
|
||||
result = await compress_context(
|
||||
messages,
|
||||
target_tokens=5000,
|
||||
client=mock_client,
|
||||
keep_recent=5,
|
||||
reserve=500,
|
||||
)
|
||||
|
||||
assert result.was_compacted is True
|
||||
# Should have attempted summarization
|
||||
assert mock_client.with_options.called or result.messages_summarized > 0
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_preserves_tool_pairs(self):
|
||||
"""Test that tool call/response pairs stay together."""
|
||||
messages = [
|
||||
{"role": "system", "content": "System"},
|
||||
{"role": "user", "content": "Do something"},
|
||||
{
|
||||
"role": "assistant",
|
||||
"tool_calls": [
|
||||
{"id": "call_1", "type": "function", "function": {"name": "func"}}
|
||||
],
|
||||
},
|
||||
{"role": "tool", "tool_call_id": "call_1", "content": "Result " * 1000},
|
||||
{"role": "assistant", "content": "Done!"},
|
||||
]
|
||||
|
||||
result = await compress_context(
|
||||
messages, target_tokens=500, client=None, reserve=50
|
||||
)
|
||||
|
||||
# Check that if tool response exists, its call exists too
|
||||
tool_call_ids = set()
|
||||
tool_response_ids = set()
|
||||
for msg in result.messages:
|
||||
if "tool_calls" in msg:
|
||||
for tc in msg["tool_calls"]:
|
||||
tool_call_ids.add(tc["id"])
|
||||
if msg.get("role") == "tool":
|
||||
tool_response_ids.add(msg.get("tool_call_id"))
|
||||
|
||||
# All tool responses should have their calls
|
||||
assert tool_response_ids <= tool_call_ids
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_returns_error_when_cannot_compress(self):
|
||||
"""Test that error is returned when compression fails."""
|
||||
# Single huge message that can't be compressed enough
|
||||
messages = [
|
||||
{"role": "user", "content": "x" * 100000},
|
||||
]
|
||||
|
||||
result = await compress_context(
|
||||
messages, target_tokens=100, client=None, reserve=50
|
||||
)
|
||||
|
||||
# Should have an error since we can't get below 100 tokens
|
||||
assert result.error is not None
|
||||
assert result.was_compacted is True
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_empty_messages(self):
|
||||
"""Test that empty messages list returns early without error."""
|
||||
result = await compress_context([], target_tokens=1000)
|
||||
|
||||
assert result.messages == []
|
||||
assert result.token_count == 0
|
||||
assert result.was_compacted is False
|
||||
assert result.error is None
|
||||
|
||||
|
||||
class TestRemoveOrphanToolResponses:
|
||||
"""Test _remove_orphan_tool_responses helper function."""
|
||||
|
||||
def test_removes_openai_orphan(self):
|
||||
"""Test removal of orphan OpenAI tool response."""
|
||||
from backend.util.prompt import _remove_orphan_tool_responses
|
||||
|
||||
messages = [
|
||||
{"role": "tool", "tool_call_id": "call_orphan", "content": "result"},
|
||||
{"role": "user", "content": "Hello"},
|
||||
]
|
||||
orphan_ids = {"call_orphan"}
|
||||
|
||||
result = _remove_orphan_tool_responses(messages, orphan_ids)
|
||||
|
||||
assert len(result) == 1
|
||||
assert result[0]["role"] == "user"
|
||||
|
||||
def test_keeps_valid_openai_tool(self):
|
||||
"""Test that valid OpenAI tool responses are kept."""
|
||||
from backend.util.prompt import _remove_orphan_tool_responses
|
||||
|
||||
messages = [
|
||||
{"role": "tool", "tool_call_id": "call_valid", "content": "result"},
|
||||
]
|
||||
orphan_ids = {"call_other"}
|
||||
|
||||
result = _remove_orphan_tool_responses(messages, orphan_ids)
|
||||
|
||||
assert len(result) == 1
|
||||
assert result[0]["tool_call_id"] == "call_valid"
|
||||
|
||||
def test_filters_anthropic_mixed_blocks(self):
|
||||
"""Test filtering individual orphan blocks from Anthropic message with mixed valid/orphan."""
|
||||
from backend.util.prompt import _remove_orphan_tool_responses
|
||||
|
||||
messages = [
|
||||
{
|
||||
"role": "user",
|
||||
"content": [
|
||||
{
|
||||
"type": "tool_result",
|
||||
"tool_use_id": "toolu_valid",
|
||||
"content": "valid result",
|
||||
},
|
||||
{
|
||||
"type": "tool_result",
|
||||
"tool_use_id": "toolu_orphan",
|
||||
"content": "orphan result",
|
||||
},
|
||||
],
|
||||
},
|
||||
]
|
||||
orphan_ids = {"toolu_orphan"}
|
||||
|
||||
result = _remove_orphan_tool_responses(messages, orphan_ids)
|
||||
|
||||
assert len(result) == 1
|
||||
# Should only have the valid tool_result, orphan filtered out
|
||||
assert len(result[0]["content"]) == 1
|
||||
assert result[0]["content"][0]["tool_use_id"] == "toolu_valid"
|
||||
|
||||
def test_removes_anthropic_all_orphan(self):
|
||||
"""Test removal of Anthropic message when all tool_results are orphans."""
|
||||
from backend.util.prompt import _remove_orphan_tool_responses
|
||||
|
||||
messages = [
|
||||
{
|
||||
"role": "user",
|
||||
"content": [
|
||||
{
|
||||
"type": "tool_result",
|
||||
"tool_use_id": "toolu_orphan1",
|
||||
"content": "result1",
|
||||
},
|
||||
{
|
||||
"type": "tool_result",
|
||||
"tool_use_id": "toolu_orphan2",
|
||||
"content": "result2",
|
||||
},
|
||||
],
|
||||
},
|
||||
]
|
||||
orphan_ids = {"toolu_orphan1", "toolu_orphan2"}
|
||||
|
||||
result = _remove_orphan_tool_responses(messages, orphan_ids)
|
||||
|
||||
# Message should be completely removed since no content left
|
||||
assert len(result) == 0
|
||||
|
||||
def test_preserves_non_tool_messages(self):
|
||||
"""Test that non-tool messages are preserved."""
|
||||
from backend.util.prompt import _remove_orphan_tool_responses
|
||||
|
||||
messages = [
|
||||
{"role": "user", "content": "Hello"},
|
||||
{"role": "assistant", "content": "Hi there!"},
|
||||
]
|
||||
orphan_ids = {"some_id"}
|
||||
|
||||
result = _remove_orphan_tool_responses(messages, orphan_ids)
|
||||
|
||||
assert result == messages
|
||||
|
||||
|
||||
class TestCompressResultDataclass:
|
||||
"""Test CompressResult dataclass."""
|
||||
|
||||
def test_default_values(self):
|
||||
"""Test default values are set correctly."""
|
||||
result = CompressResult(
|
||||
messages=[{"role": "user", "content": "test"}],
|
||||
token_count=10,
|
||||
was_compacted=False,
|
||||
)
|
||||
|
||||
assert result.error is None
|
||||
assert result.original_token_count == 0 # Defaults to 0, not None
|
||||
assert result.messages_summarized == 0
|
||||
assert result.messages_dropped == 0
|
||||
|
||||
def test_all_fields(self):
|
||||
"""Test all fields can be set."""
|
||||
result = CompressResult(
|
||||
messages=[{"role": "user", "content": "test"}],
|
||||
token_count=100,
|
||||
was_compacted=True,
|
||||
error="Some error",
|
||||
original_token_count=500,
|
||||
messages_summarized=10,
|
||||
messages_dropped=5,
|
||||
)
|
||||
|
||||
assert result.token_count == 100
|
||||
assert result.was_compacted is True
|
||||
assert result.error == "Some error"
|
||||
assert result.original_token_count == 500
|
||||
assert result.messages_summarized == 10
|
||||
assert result.messages_dropped == 5
|
||||
|
||||
@@ -9,7 +9,8 @@
|
||||
"sub_heading": "Creator agent subheading",
|
||||
"description": "Creator agent description",
|
||||
"runs": 50,
|
||||
"rating": 4.0
|
||||
"rating": 4.0,
|
||||
"agent_graph_id": "test-graph-2"
|
||||
}
|
||||
],
|
||||
"pagination": {
|
||||
|
||||
@@ -9,7 +9,8 @@
|
||||
"sub_heading": "Category agent subheading",
|
||||
"description": "Category agent description",
|
||||
"runs": 60,
|
||||
"rating": 4.1
|
||||
"rating": 4.1,
|
||||
"agent_graph_id": "test-graph-category"
|
||||
}
|
||||
],
|
||||
"pagination": {
|
||||
|
||||
@@ -9,7 +9,8 @@
|
||||
"sub_heading": "Agent 0 subheading",
|
||||
"description": "Agent 0 description",
|
||||
"runs": 0,
|
||||
"rating": 4.0
|
||||
"rating": 4.0,
|
||||
"agent_graph_id": "test-graph-2"
|
||||
},
|
||||
{
|
||||
"slug": "agent-1",
|
||||
@@ -20,7 +21,8 @@
|
||||
"sub_heading": "Agent 1 subheading",
|
||||
"description": "Agent 1 description",
|
||||
"runs": 10,
|
||||
"rating": 4.0
|
||||
"rating": 4.0,
|
||||
"agent_graph_id": "test-graph-2"
|
||||
},
|
||||
{
|
||||
"slug": "agent-2",
|
||||
@@ -31,7 +33,8 @@
|
||||
"sub_heading": "Agent 2 subheading",
|
||||
"description": "Agent 2 description",
|
||||
"runs": 20,
|
||||
"rating": 4.0
|
||||
"rating": 4.0,
|
||||
"agent_graph_id": "test-graph-2"
|
||||
},
|
||||
{
|
||||
"slug": "agent-3",
|
||||
@@ -42,7 +45,8 @@
|
||||
"sub_heading": "Agent 3 subheading",
|
||||
"description": "Agent 3 description",
|
||||
"runs": 30,
|
||||
"rating": 4.0
|
||||
"rating": 4.0,
|
||||
"agent_graph_id": "test-graph-2"
|
||||
},
|
||||
{
|
||||
"slug": "agent-4",
|
||||
@@ -53,7 +57,8 @@
|
||||
"sub_heading": "Agent 4 subheading",
|
||||
"description": "Agent 4 description",
|
||||
"runs": 40,
|
||||
"rating": 4.0
|
||||
"rating": 4.0,
|
||||
"agent_graph_id": "test-graph-2"
|
||||
}
|
||||
],
|
||||
"pagination": {
|
||||
|
||||
@@ -9,7 +9,8 @@
|
||||
"sub_heading": "Search agent subheading",
|
||||
"description": "Specific search term description",
|
||||
"runs": 75,
|
||||
"rating": 4.2
|
||||
"rating": 4.2,
|
||||
"agent_graph_id": "test-graph-search"
|
||||
}
|
||||
],
|
||||
"pagination": {
|
||||
|
||||
@@ -9,7 +9,8 @@
|
||||
"sub_heading": "Top agent subheading",
|
||||
"description": "Top agent description",
|
||||
"runs": 1000,
|
||||
"rating": 5.0
|
||||
"rating": 5.0,
|
||||
"agent_graph_id": "test-graph-3"
|
||||
}
|
||||
],
|
||||
"pagination": {
|
||||
|
||||
@@ -9,7 +9,8 @@
|
||||
"sub_heading": "Featured agent subheading",
|
||||
"description": "Featured agent description",
|
||||
"runs": 100,
|
||||
"rating": 4.5
|
||||
"rating": 4.5,
|
||||
"agent_graph_id": "test-graph-1"
|
||||
}
|
||||
],
|
||||
"pagination": {
|
||||
|
||||
@@ -31,6 +31,10 @@
|
||||
"has_sensitive_action": false,
|
||||
"trigger_setup_info": null,
|
||||
"new_output": false,
|
||||
"execution_count": 0,
|
||||
"success_rate": null,
|
||||
"avg_correctness_score": null,
|
||||
"recent_executions": [],
|
||||
"can_access_graph": true,
|
||||
"is_latest_version": true,
|
||||
"is_favorite": false,
|
||||
@@ -72,6 +76,10 @@
|
||||
"has_sensitive_action": false,
|
||||
"trigger_setup_info": null,
|
||||
"new_output": false,
|
||||
"execution_count": 0,
|
||||
"success_rate": null,
|
||||
"avg_correctness_score": null,
|
||||
"recent_executions": [],
|
||||
"can_access_graph": false,
|
||||
"is_latest_version": true,
|
||||
"is_favorite": false,
|
||||
|
||||
@@ -57,7 +57,8 @@ class TestDecomposeGoal:
|
||||
|
||||
result = await core.decompose_goal("Build a chatbot")
|
||||
|
||||
mock_external.assert_called_once_with("Build a chatbot", "")
|
||||
# library_agents defaults to None
|
||||
mock_external.assert_called_once_with("Build a chatbot", "", None)
|
||||
assert result == expected_result
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@@ -74,7 +75,8 @@ class TestDecomposeGoal:
|
||||
|
||||
await core.decompose_goal("Build a chatbot", "Use Python")
|
||||
|
||||
mock_external.assert_called_once_with("Build a chatbot", "Use Python")
|
||||
# library_agents defaults to None
|
||||
mock_external.assert_called_once_with("Build a chatbot", "Use Python", None)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_returns_none_on_service_failure(self):
|
||||
@@ -109,7 +111,8 @@ class TestGenerateAgent:
|
||||
instructions = {"type": "instructions", "steps": ["Step 1"]}
|
||||
result = await core.generate_agent(instructions)
|
||||
|
||||
mock_external.assert_called_once_with(instructions)
|
||||
# library_agents defaults to None
|
||||
mock_external.assert_called_once_with(instructions, None)
|
||||
# Result should have id, version, is_active added if not present
|
||||
assert result is not None
|
||||
assert result["name"] == "Test Agent"
|
||||
@@ -174,7 +177,8 @@ class TestGenerateAgentPatch:
|
||||
current_agent = {"nodes": [], "links": []}
|
||||
result = await core.generate_agent_patch("Add a node", current_agent)
|
||||
|
||||
mock_external.assert_called_once_with("Add a node", current_agent)
|
||||
# library_agents defaults to None
|
||||
mock_external.assert_called_once_with("Add a node", current_agent, None)
|
||||
assert result == expected_result
|
||||
|
||||
@pytest.mark.asyncio
|
||||
|
||||
@@ -0,0 +1,857 @@
|
||||
"""
|
||||
Tests for library agent fetching functionality in agent generator.
|
||||
|
||||
This test suite verifies the search-based library agent fetching,
|
||||
including the combination of library and marketplace agents.
|
||||
"""
|
||||
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from backend.api.features.chat.tools.agent_generator import core
|
||||
|
||||
|
||||
class TestGetLibraryAgentsForGeneration:
|
||||
"""Test get_library_agents_for_generation function."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_fetches_agents_with_search_term(self):
|
||||
"""Test that search_term is passed to the library db."""
|
||||
# Create a mock agent with proper attribute values
|
||||
mock_agent = MagicMock()
|
||||
mock_agent.graph_id = "agent-123"
|
||||
mock_agent.graph_version = 1
|
||||
mock_agent.name = "Email Agent"
|
||||
mock_agent.description = "Sends emails"
|
||||
mock_agent.input_schema = {"properties": {}}
|
||||
mock_agent.output_schema = {"properties": {}}
|
||||
mock_agent.recent_executions = []
|
||||
|
||||
mock_response = MagicMock()
|
||||
mock_response.agents = [mock_agent]
|
||||
|
||||
with patch.object(
|
||||
core.library_db,
|
||||
"list_library_agents",
|
||||
new_callable=AsyncMock,
|
||||
return_value=mock_response,
|
||||
) as mock_list:
|
||||
result = await core.get_library_agents_for_generation(
|
||||
user_id="user-123",
|
||||
search_query="send email",
|
||||
)
|
||||
|
||||
mock_list.assert_called_once_with(
|
||||
user_id="user-123",
|
||||
search_term="send email",
|
||||
page=1,
|
||||
page_size=15,
|
||||
include_executions=True,
|
||||
)
|
||||
|
||||
# Verify result format
|
||||
assert len(result) == 1
|
||||
assert result[0]["graph_id"] == "agent-123"
|
||||
assert result[0]["name"] == "Email Agent"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_excludes_specified_graph_id(self):
|
||||
"""Test that agents with excluded graph_id are filtered out."""
|
||||
mock_response = MagicMock()
|
||||
mock_response.agents = [
|
||||
MagicMock(
|
||||
graph_id="agent-123",
|
||||
graph_version=1,
|
||||
name="Agent 1",
|
||||
description="First agent",
|
||||
input_schema={},
|
||||
output_schema={},
|
||||
recent_executions=[],
|
||||
),
|
||||
MagicMock(
|
||||
graph_id="agent-456",
|
||||
graph_version=1,
|
||||
name="Agent 2",
|
||||
description="Second agent",
|
||||
input_schema={},
|
||||
output_schema={},
|
||||
recent_executions=[],
|
||||
),
|
||||
]
|
||||
|
||||
with patch.object(
|
||||
core.library_db,
|
||||
"list_library_agents",
|
||||
new_callable=AsyncMock,
|
||||
return_value=mock_response,
|
||||
):
|
||||
result = await core.get_library_agents_for_generation(
|
||||
user_id="user-123",
|
||||
exclude_graph_id="agent-123",
|
||||
)
|
||||
|
||||
# Verify the excluded agent is not in results
|
||||
assert len(result) == 1
|
||||
assert result[0]["graph_id"] == "agent-456"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_respects_max_results(self):
|
||||
"""Test that max_results parameter limits the page_size."""
|
||||
mock_response = MagicMock()
|
||||
mock_response.agents = []
|
||||
|
||||
with patch.object(
|
||||
core.library_db,
|
||||
"list_library_agents",
|
||||
new_callable=AsyncMock,
|
||||
return_value=mock_response,
|
||||
) as mock_list:
|
||||
await core.get_library_agents_for_generation(
|
||||
user_id="user-123",
|
||||
max_results=5,
|
||||
)
|
||||
|
||||
mock_list.assert_called_once_with(
|
||||
user_id="user-123",
|
||||
search_term=None,
|
||||
page=1,
|
||||
page_size=5,
|
||||
include_executions=True,
|
||||
)
|
||||
|
||||
|
||||
class TestSearchMarketplaceAgentsForGeneration:
|
||||
"""Test search_marketplace_agents_for_generation function."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_searches_marketplace_with_query(self):
|
||||
"""Test that marketplace is searched with the query."""
|
||||
mock_response = MagicMock()
|
||||
mock_response.agents = [
|
||||
MagicMock(
|
||||
agent_name="Public Agent",
|
||||
description="A public agent",
|
||||
sub_heading="Does something useful",
|
||||
creator="creator-1",
|
||||
agent_graph_id="graph-123",
|
||||
)
|
||||
]
|
||||
|
||||
mock_graph = MagicMock()
|
||||
mock_graph.id = "graph-123"
|
||||
mock_graph.version = 1
|
||||
mock_graph.input_schema = {"type": "object"}
|
||||
mock_graph.output_schema = {"type": "object"}
|
||||
|
||||
with (
|
||||
patch(
|
||||
"backend.api.features.store.db.get_store_agents",
|
||||
new_callable=AsyncMock,
|
||||
return_value=mock_response,
|
||||
) as mock_search,
|
||||
patch(
|
||||
"backend.api.features.chat.tools.agent_generator.core.get_store_listed_graphs",
|
||||
new_callable=AsyncMock,
|
||||
return_value={"graph-123": mock_graph},
|
||||
),
|
||||
):
|
||||
result = await core.search_marketplace_agents_for_generation(
|
||||
search_query="automation",
|
||||
max_results=10,
|
||||
)
|
||||
|
||||
mock_search.assert_called_once_with(
|
||||
search_query="automation",
|
||||
page=1,
|
||||
page_size=10,
|
||||
)
|
||||
|
||||
assert len(result) == 1
|
||||
assert result[0]["name"] == "Public Agent"
|
||||
assert result[0]["graph_id"] == "graph-123"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_handles_marketplace_error_gracefully(self):
|
||||
"""Test that marketplace errors don't crash the function."""
|
||||
with patch(
|
||||
"backend.api.features.store.db.get_store_agents",
|
||||
new_callable=AsyncMock,
|
||||
side_effect=Exception("Marketplace unavailable"),
|
||||
):
|
||||
result = await core.search_marketplace_agents_for_generation(
|
||||
search_query="test"
|
||||
)
|
||||
|
||||
# Should return empty list, not raise exception
|
||||
assert result == []
|
||||
|
||||
|
||||
class TestGetAllRelevantAgentsForGeneration:
|
||||
"""Test get_all_relevant_agents_for_generation function."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_combines_library_and_marketplace_agents(self):
|
||||
"""Test that agents from both sources are combined."""
|
||||
library_agents = [
|
||||
{
|
||||
"graph_id": "lib-123",
|
||||
"graph_version": 1,
|
||||
"name": "Library Agent",
|
||||
"description": "From library",
|
||||
"input_schema": {},
|
||||
"output_schema": {},
|
||||
}
|
||||
]
|
||||
|
||||
marketplace_agents = [
|
||||
{
|
||||
"graph_id": "market-456",
|
||||
"graph_version": 1,
|
||||
"name": "Market Agent",
|
||||
"description": "From marketplace",
|
||||
"input_schema": {},
|
||||
"output_schema": {},
|
||||
}
|
||||
]
|
||||
|
||||
with patch.object(
|
||||
core,
|
||||
"get_library_agents_for_generation",
|
||||
new_callable=AsyncMock,
|
||||
return_value=library_agents,
|
||||
):
|
||||
with patch.object(
|
||||
core,
|
||||
"search_marketplace_agents_for_generation",
|
||||
new_callable=AsyncMock,
|
||||
return_value=marketplace_agents,
|
||||
):
|
||||
result = await core.get_all_relevant_agents_for_generation(
|
||||
user_id="user-123",
|
||||
search_query="test query",
|
||||
include_marketplace=True,
|
||||
)
|
||||
|
||||
# Library agents should come first
|
||||
assert len(result) == 2
|
||||
assert result[0]["name"] == "Library Agent"
|
||||
assert result[1]["name"] == "Market Agent"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_deduplicates_by_graph_id(self):
|
||||
"""Test that marketplace agents with same graph_id as library are excluded."""
|
||||
library_agents = [
|
||||
{
|
||||
"graph_id": "shared-123",
|
||||
"graph_version": 1,
|
||||
"name": "Shared Agent",
|
||||
"description": "From library",
|
||||
"input_schema": {},
|
||||
"output_schema": {},
|
||||
}
|
||||
]
|
||||
|
||||
marketplace_agents = [
|
||||
{
|
||||
"graph_id": "shared-123", # Same graph_id, should be deduplicated
|
||||
"graph_version": 1,
|
||||
"name": "Shared Agent",
|
||||
"description": "From marketplace",
|
||||
"input_schema": {},
|
||||
"output_schema": {},
|
||||
},
|
||||
{
|
||||
"graph_id": "unique-456",
|
||||
"graph_version": 1,
|
||||
"name": "Unique Agent",
|
||||
"description": "Only in marketplace",
|
||||
"input_schema": {},
|
||||
"output_schema": {},
|
||||
},
|
||||
]
|
||||
|
||||
with patch.object(
|
||||
core,
|
||||
"get_library_agents_for_generation",
|
||||
new_callable=AsyncMock,
|
||||
return_value=library_agents,
|
||||
):
|
||||
with patch.object(
|
||||
core,
|
||||
"search_marketplace_agents_for_generation",
|
||||
new_callable=AsyncMock,
|
||||
return_value=marketplace_agents,
|
||||
):
|
||||
result = await core.get_all_relevant_agents_for_generation(
|
||||
user_id="user-123",
|
||||
search_query="test",
|
||||
include_marketplace=True,
|
||||
)
|
||||
|
||||
# Shared Agent from marketplace should be excluded by graph_id
|
||||
assert len(result) == 2
|
||||
names = [a["name"] for a in result]
|
||||
assert "Shared Agent" in names
|
||||
assert "Unique Agent" in names
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_skips_marketplace_when_disabled(self):
|
||||
"""Test that marketplace is not searched when include_marketplace=False."""
|
||||
library_agents = [
|
||||
{
|
||||
"graph_id": "lib-123",
|
||||
"graph_version": 1,
|
||||
"name": "Library Agent",
|
||||
"description": "From library",
|
||||
"input_schema": {},
|
||||
"output_schema": {},
|
||||
}
|
||||
]
|
||||
|
||||
with patch.object(
|
||||
core,
|
||||
"get_library_agents_for_generation",
|
||||
new_callable=AsyncMock,
|
||||
return_value=library_agents,
|
||||
):
|
||||
with patch.object(
|
||||
core,
|
||||
"search_marketplace_agents_for_generation",
|
||||
new_callable=AsyncMock,
|
||||
) as mock_marketplace:
|
||||
result = await core.get_all_relevant_agents_for_generation(
|
||||
user_id="user-123",
|
||||
search_query="test",
|
||||
include_marketplace=False,
|
||||
)
|
||||
|
||||
# Marketplace should not be called
|
||||
mock_marketplace.assert_not_called()
|
||||
assert len(result) == 1
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_skips_marketplace_when_no_search_query(self):
|
||||
"""Test that marketplace is not searched without a search query."""
|
||||
library_agents = [
|
||||
{
|
||||
"graph_id": "lib-123",
|
||||
"graph_version": 1,
|
||||
"name": "Library Agent",
|
||||
"description": "From library",
|
||||
"input_schema": {},
|
||||
"output_schema": {},
|
||||
}
|
||||
]
|
||||
|
||||
with patch.object(
|
||||
core,
|
||||
"get_library_agents_for_generation",
|
||||
new_callable=AsyncMock,
|
||||
return_value=library_agents,
|
||||
):
|
||||
with patch.object(
|
||||
core,
|
||||
"search_marketplace_agents_for_generation",
|
||||
new_callable=AsyncMock,
|
||||
) as mock_marketplace:
|
||||
result = await core.get_all_relevant_agents_for_generation(
|
||||
user_id="user-123",
|
||||
search_query=None, # No search query
|
||||
include_marketplace=True,
|
||||
)
|
||||
|
||||
# Marketplace should not be called without search query
|
||||
mock_marketplace.assert_not_called()
|
||||
assert len(result) == 1
|
||||
|
||||
|
||||
class TestExtractSearchTermsFromSteps:
|
||||
"""Test extract_search_terms_from_steps function."""
|
||||
|
||||
def test_extracts_terms_from_instructions_type(self):
|
||||
"""Test extraction from valid instructions decomposition result."""
|
||||
decomposition_result = {
|
||||
"type": "instructions",
|
||||
"steps": [
|
||||
{
|
||||
"description": "Send an email notification",
|
||||
"block_name": "GmailSendBlock",
|
||||
},
|
||||
{"description": "Fetch weather data", "action": "Get weather API"},
|
||||
],
|
||||
}
|
||||
|
||||
result = core.extract_search_terms_from_steps(decomposition_result)
|
||||
|
||||
assert "Send an email notification" in result
|
||||
assert "GmailSendBlock" in result
|
||||
assert "Fetch weather data" in result
|
||||
assert "Get weather API" in result
|
||||
|
||||
def test_returns_empty_for_non_instructions_type(self):
|
||||
"""Test that non-instructions types return empty list."""
|
||||
decomposition_result = {
|
||||
"type": "clarifying_questions",
|
||||
"questions": [{"question": "What email?"}],
|
||||
}
|
||||
|
||||
result = core.extract_search_terms_from_steps(decomposition_result)
|
||||
|
||||
assert result == []
|
||||
|
||||
def test_deduplicates_terms_case_insensitively(self):
|
||||
"""Test that duplicate terms are removed (case-insensitive)."""
|
||||
decomposition_result = {
|
||||
"type": "instructions",
|
||||
"steps": [
|
||||
{"description": "Send Email", "name": "send email"},
|
||||
{"description": "Other task"},
|
||||
],
|
||||
}
|
||||
|
||||
result = core.extract_search_terms_from_steps(decomposition_result)
|
||||
|
||||
# Should only have one "send email" variant
|
||||
email_terms = [t for t in result if "email" in t.lower()]
|
||||
assert len(email_terms) == 1
|
||||
|
||||
def test_filters_short_terms(self):
|
||||
"""Test that terms with 3 or fewer characters are filtered out."""
|
||||
decomposition_result = {
|
||||
"type": "instructions",
|
||||
"steps": [
|
||||
{"description": "ab", "action": "xyz"}, # Both too short
|
||||
{"description": "Valid term here"},
|
||||
],
|
||||
}
|
||||
|
||||
result = core.extract_search_terms_from_steps(decomposition_result)
|
||||
|
||||
assert "ab" not in result
|
||||
assert "xyz" not in result
|
||||
assert "Valid term here" in result
|
||||
|
||||
def test_handles_empty_steps(self):
|
||||
"""Test handling of empty steps list."""
|
||||
decomposition_result = {
|
||||
"type": "instructions",
|
||||
"steps": [],
|
||||
}
|
||||
|
||||
result = core.extract_search_terms_from_steps(decomposition_result)
|
||||
|
||||
assert result == []
|
||||
|
||||
|
||||
class TestEnrichLibraryAgentsFromSteps:
|
||||
"""Test enrich_library_agents_from_steps function."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_enriches_with_additional_agents(self):
|
||||
"""Test that additional agents are found based on steps."""
|
||||
existing_agents = [
|
||||
{
|
||||
"graph_id": "existing-123",
|
||||
"graph_version": 1,
|
||||
"name": "Existing Agent",
|
||||
"description": "Already fetched",
|
||||
"input_schema": {},
|
||||
"output_schema": {},
|
||||
}
|
||||
]
|
||||
|
||||
additional_agents = [
|
||||
{
|
||||
"graph_id": "new-456",
|
||||
"graph_version": 1,
|
||||
"name": "Email Agent",
|
||||
"description": "For sending emails",
|
||||
"input_schema": {},
|
||||
"output_schema": {},
|
||||
}
|
||||
]
|
||||
|
||||
decomposition_result = {
|
||||
"type": "instructions",
|
||||
"steps": [
|
||||
{"description": "Send email notification"},
|
||||
],
|
||||
}
|
||||
|
||||
with patch.object(
|
||||
core,
|
||||
"get_all_relevant_agents_for_generation",
|
||||
new_callable=AsyncMock,
|
||||
return_value=additional_agents,
|
||||
):
|
||||
result = await core.enrich_library_agents_from_steps(
|
||||
user_id="user-123",
|
||||
decomposition_result=decomposition_result,
|
||||
existing_agents=existing_agents,
|
||||
)
|
||||
|
||||
# Should have both existing and new agents
|
||||
assert len(result) == 2
|
||||
names = [a["name"] for a in result]
|
||||
assert "Existing Agent" in names
|
||||
assert "Email Agent" in names
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_deduplicates_by_graph_id(self):
|
||||
"""Test that agents with same graph_id are not duplicated."""
|
||||
existing_agents = [
|
||||
{
|
||||
"graph_id": "agent-123",
|
||||
"graph_version": 1,
|
||||
"name": "Existing Agent",
|
||||
"description": "Already fetched",
|
||||
"input_schema": {},
|
||||
"output_schema": {},
|
||||
}
|
||||
]
|
||||
|
||||
# Additional search returns same agent
|
||||
additional_agents = [
|
||||
{
|
||||
"graph_id": "agent-123", # Same ID
|
||||
"graph_version": 1,
|
||||
"name": "Existing Agent Copy",
|
||||
"description": "Same agent different name",
|
||||
"input_schema": {},
|
||||
"output_schema": {},
|
||||
}
|
||||
]
|
||||
|
||||
decomposition_result = {
|
||||
"type": "instructions",
|
||||
"steps": [{"description": "Some action"}],
|
||||
}
|
||||
|
||||
with patch.object(
|
||||
core,
|
||||
"get_all_relevant_agents_for_generation",
|
||||
new_callable=AsyncMock,
|
||||
return_value=additional_agents,
|
||||
):
|
||||
result = await core.enrich_library_agents_from_steps(
|
||||
user_id="user-123",
|
||||
decomposition_result=decomposition_result,
|
||||
existing_agents=existing_agents,
|
||||
)
|
||||
|
||||
# Should not duplicate
|
||||
assert len(result) == 1
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_deduplicates_by_name(self):
|
||||
"""Test that agents with same name are not duplicated."""
|
||||
existing_agents = [
|
||||
{
|
||||
"graph_id": "agent-123",
|
||||
"graph_version": 1,
|
||||
"name": "Email Agent",
|
||||
"description": "Already fetched",
|
||||
"input_schema": {},
|
||||
"output_schema": {},
|
||||
}
|
||||
]
|
||||
|
||||
# Additional search returns agent with same name but different ID
|
||||
additional_agents = [
|
||||
{
|
||||
"graph_id": "agent-456", # Different ID
|
||||
"graph_version": 1,
|
||||
"name": "Email Agent", # Same name
|
||||
"description": "Different agent same name",
|
||||
"input_schema": {},
|
||||
"output_schema": {},
|
||||
}
|
||||
]
|
||||
|
||||
decomposition_result = {
|
||||
"type": "instructions",
|
||||
"steps": [{"description": "Send email"}],
|
||||
}
|
||||
|
||||
with patch.object(
|
||||
core,
|
||||
"get_all_relevant_agents_for_generation",
|
||||
new_callable=AsyncMock,
|
||||
return_value=additional_agents,
|
||||
):
|
||||
result = await core.enrich_library_agents_from_steps(
|
||||
user_id="user-123",
|
||||
decomposition_result=decomposition_result,
|
||||
existing_agents=existing_agents,
|
||||
)
|
||||
|
||||
# Should not duplicate by name
|
||||
assert len(result) == 1
|
||||
assert result[0].get("graph_id") == "agent-123" # Original kept
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_returns_existing_when_no_steps(self):
|
||||
"""Test that existing agents are returned when no search terms extracted."""
|
||||
existing_agents = [
|
||||
{
|
||||
"graph_id": "existing-123",
|
||||
"graph_version": 1,
|
||||
"name": "Existing Agent",
|
||||
"description": "Already fetched",
|
||||
"input_schema": {},
|
||||
"output_schema": {},
|
||||
}
|
||||
]
|
||||
|
||||
decomposition_result = {
|
||||
"type": "clarifying_questions", # Not instructions type
|
||||
"questions": [],
|
||||
}
|
||||
|
||||
result = await core.enrich_library_agents_from_steps(
|
||||
user_id="user-123",
|
||||
decomposition_result=decomposition_result,
|
||||
existing_agents=existing_agents,
|
||||
)
|
||||
|
||||
# Should return existing unchanged
|
||||
assert result == existing_agents
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_limits_search_terms_to_three(self):
|
||||
"""Test that only first 3 search terms are used."""
|
||||
existing_agents = []
|
||||
|
||||
decomposition_result = {
|
||||
"type": "instructions",
|
||||
"steps": [
|
||||
{"description": "First action"},
|
||||
{"description": "Second action"},
|
||||
{"description": "Third action"},
|
||||
{"description": "Fourth action"},
|
||||
{"description": "Fifth action"},
|
||||
],
|
||||
}
|
||||
|
||||
call_count = 0
|
||||
|
||||
async def mock_get_agents(*args, **kwargs):
|
||||
nonlocal call_count
|
||||
call_count += 1
|
||||
return []
|
||||
|
||||
with patch.object(
|
||||
core,
|
||||
"get_all_relevant_agents_for_generation",
|
||||
side_effect=mock_get_agents,
|
||||
):
|
||||
await core.enrich_library_agents_from_steps(
|
||||
user_id="user-123",
|
||||
decomposition_result=decomposition_result,
|
||||
existing_agents=existing_agents,
|
||||
)
|
||||
|
||||
# Should only make 3 calls (limited to first 3 terms)
|
||||
assert call_count == 3
|
||||
|
||||
|
||||
class TestExtractUuidsFromText:
|
||||
"""Test extract_uuids_from_text function."""
|
||||
|
||||
def test_extracts_single_uuid(self):
|
||||
"""Test extraction of a single UUID from text."""
|
||||
text = "Use my agent 46631191-e8a8-486f-ad90-84f89738321d for this task"
|
||||
result = core.extract_uuids_from_text(text)
|
||||
assert len(result) == 1
|
||||
assert "46631191-e8a8-486f-ad90-84f89738321d" in result
|
||||
|
||||
def test_extracts_multiple_uuids(self):
|
||||
"""Test extraction of multiple UUIDs from text."""
|
||||
text = (
|
||||
"Combine agents 11111111-1111-4111-8111-111111111111 "
|
||||
"and 22222222-2222-4222-9222-222222222222"
|
||||
)
|
||||
result = core.extract_uuids_from_text(text)
|
||||
assert len(result) == 2
|
||||
assert "11111111-1111-4111-8111-111111111111" in result
|
||||
assert "22222222-2222-4222-9222-222222222222" in result
|
||||
|
||||
def test_deduplicates_uuids(self):
|
||||
"""Test that duplicate UUIDs are deduplicated."""
|
||||
text = (
|
||||
"Use 46631191-e8a8-486f-ad90-84f89738321d twice: "
|
||||
"46631191-e8a8-486f-ad90-84f89738321d"
|
||||
)
|
||||
result = core.extract_uuids_from_text(text)
|
||||
assert len(result) == 1
|
||||
|
||||
def test_normalizes_to_lowercase(self):
|
||||
"""Test that UUIDs are normalized to lowercase."""
|
||||
text = "Use 46631191-E8A8-486F-AD90-84F89738321D"
|
||||
result = core.extract_uuids_from_text(text)
|
||||
assert result[0] == "46631191-e8a8-486f-ad90-84f89738321d"
|
||||
|
||||
def test_returns_empty_for_no_uuids(self):
|
||||
"""Test that empty list is returned when no UUIDs found."""
|
||||
text = "Create an email agent that sends notifications"
|
||||
result = core.extract_uuids_from_text(text)
|
||||
assert result == []
|
||||
|
||||
def test_ignores_invalid_uuids(self):
|
||||
"""Test that invalid UUID-like strings are ignored."""
|
||||
text = "Not a valid UUID: 12345678-1234-1234-1234-123456789abc"
|
||||
result = core.extract_uuids_from_text(text)
|
||||
# UUID v4 requires specific patterns (4 in third group, 8/9/a/b in fourth)
|
||||
assert len(result) == 0
|
||||
|
||||
|
||||
class TestGetLibraryAgentById:
|
||||
"""Test get_library_agent_by_id function (and its alias get_library_agent_by_graph_id)."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_returns_agent_when_found_by_graph_id(self):
|
||||
"""Test that agent is returned when found by graph_id."""
|
||||
mock_agent = MagicMock()
|
||||
mock_agent.graph_id = "agent-123"
|
||||
mock_agent.graph_version = 1
|
||||
mock_agent.name = "Test Agent"
|
||||
mock_agent.description = "Test description"
|
||||
mock_agent.input_schema = {"properties": {}}
|
||||
mock_agent.output_schema = {"properties": {}}
|
||||
|
||||
with patch.object(
|
||||
core.library_db,
|
||||
"get_library_agent_by_graph_id",
|
||||
new_callable=AsyncMock,
|
||||
return_value=mock_agent,
|
||||
):
|
||||
result = await core.get_library_agent_by_id("user-123", "agent-123")
|
||||
|
||||
assert result is not None
|
||||
assert result["graph_id"] == "agent-123"
|
||||
assert result["name"] == "Test Agent"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_falls_back_to_library_agent_id(self):
|
||||
"""Test that lookup falls back to library agent ID when graph_id not found."""
|
||||
mock_agent = MagicMock()
|
||||
mock_agent.graph_id = "graph-456" # Different from the lookup ID
|
||||
mock_agent.graph_version = 1
|
||||
mock_agent.name = "Library Agent"
|
||||
mock_agent.description = "Found by library ID"
|
||||
mock_agent.input_schema = {"properties": {}}
|
||||
mock_agent.output_schema = {"properties": {}}
|
||||
|
||||
with (
|
||||
patch.object(
|
||||
core.library_db,
|
||||
"get_library_agent_by_graph_id",
|
||||
new_callable=AsyncMock,
|
||||
return_value=None, # Not found by graph_id
|
||||
),
|
||||
patch.object(
|
||||
core.library_db,
|
||||
"get_library_agent",
|
||||
new_callable=AsyncMock,
|
||||
return_value=mock_agent, # Found by library ID
|
||||
),
|
||||
):
|
||||
result = await core.get_library_agent_by_id("user-123", "library-id-123")
|
||||
|
||||
assert result is not None
|
||||
assert result["graph_id"] == "graph-456"
|
||||
assert result["name"] == "Library Agent"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_returns_none_when_not_found_by_either_method(self):
|
||||
"""Test that None is returned when agent not found by either method."""
|
||||
with (
|
||||
patch.object(
|
||||
core.library_db,
|
||||
"get_library_agent_by_graph_id",
|
||||
new_callable=AsyncMock,
|
||||
return_value=None,
|
||||
),
|
||||
patch.object(
|
||||
core.library_db,
|
||||
"get_library_agent",
|
||||
new_callable=AsyncMock,
|
||||
side_effect=core.NotFoundError("Not found"),
|
||||
),
|
||||
):
|
||||
result = await core.get_library_agent_by_id("user-123", "nonexistent")
|
||||
|
||||
assert result is None
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_returns_none_on_exception(self):
|
||||
"""Test that None is returned when exception occurs in both lookups."""
|
||||
with (
|
||||
patch.object(
|
||||
core.library_db,
|
||||
"get_library_agent_by_graph_id",
|
||||
new_callable=AsyncMock,
|
||||
side_effect=Exception("Database error"),
|
||||
),
|
||||
patch.object(
|
||||
core.library_db,
|
||||
"get_library_agent",
|
||||
new_callable=AsyncMock,
|
||||
side_effect=Exception("Database error"),
|
||||
),
|
||||
):
|
||||
result = await core.get_library_agent_by_id("user-123", "agent-123")
|
||||
|
||||
assert result is None
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_alias_works(self):
|
||||
"""Test that get_library_agent_by_graph_id is an alias for get_library_agent_by_id."""
|
||||
assert core.get_library_agent_by_graph_id is core.get_library_agent_by_id
|
||||
|
||||
|
||||
class TestGetAllRelevantAgentsWithUuids:
|
||||
"""Test UUID extraction in get_all_relevant_agents_for_generation."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_fetches_explicitly_mentioned_agents(self):
|
||||
"""Test that agents mentioned by UUID are fetched directly."""
|
||||
mock_agent = MagicMock()
|
||||
mock_agent.graph_id = "46631191-e8a8-486f-ad90-84f89738321d"
|
||||
mock_agent.graph_version = 1
|
||||
mock_agent.name = "Mentioned Agent"
|
||||
mock_agent.description = "Explicitly mentioned"
|
||||
mock_agent.input_schema = {}
|
||||
mock_agent.output_schema = {}
|
||||
|
||||
mock_response = MagicMock()
|
||||
mock_response.agents = []
|
||||
|
||||
with (
|
||||
patch.object(
|
||||
core.library_db,
|
||||
"get_library_agent_by_graph_id",
|
||||
new_callable=AsyncMock,
|
||||
return_value=mock_agent,
|
||||
),
|
||||
patch.object(
|
||||
core.library_db,
|
||||
"list_library_agents",
|
||||
new_callable=AsyncMock,
|
||||
return_value=mock_response,
|
||||
),
|
||||
):
|
||||
result = await core.get_all_relevant_agents_for_generation(
|
||||
user_id="user-123",
|
||||
search_query="Use agent 46631191-e8a8-486f-ad90-84f89738321d",
|
||||
include_marketplace=False,
|
||||
)
|
||||
|
||||
assert len(result) == 1
|
||||
assert result[0].get("graph_id") == "46631191-e8a8-486f-ad90-84f89738321d"
|
||||
assert result[0].get("name") == "Mentioned Agent"
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
pytest.main([__file__, "-v"])
|
||||
@@ -102,7 +102,7 @@ class TestDecomposeGoalExternal:
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_decompose_goal_with_context(self):
|
||||
"""Test decomposition with additional context."""
|
||||
"""Test decomposition with additional context enriched into description."""
|
||||
mock_response = MagicMock()
|
||||
mock_response.json.return_value = {
|
||||
"success": True,
|
||||
@@ -119,9 +119,12 @@ class TestDecomposeGoalExternal:
|
||||
"Build a chatbot", context="Use Python"
|
||||
)
|
||||
|
||||
expected_description = (
|
||||
"Build a chatbot\n\nAdditional context from user:\nUse Python"
|
||||
)
|
||||
mock_client.post.assert_called_once_with(
|
||||
"/api/decompose-description",
|
||||
json={"description": "Build a chatbot", "user_instruction": "Use Python"},
|
||||
json={"description": expected_description},
|
||||
)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@@ -433,5 +436,139 @@ class TestGetBlocksExternal:
|
||||
assert result is None
|
||||
|
||||
|
||||
class TestLibraryAgentsPassthrough:
|
||||
"""Test that library_agents are passed correctly in all requests."""
|
||||
|
||||
def setup_method(self):
|
||||
"""Reset client singleton before each test."""
|
||||
service._settings = None
|
||||
service._client = None
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_decompose_goal_passes_library_agents(self):
|
||||
"""Test that library_agents are included in decompose goal payload."""
|
||||
library_agents = [
|
||||
{
|
||||
"graph_id": "agent-123",
|
||||
"graph_version": 1,
|
||||
"name": "Email Sender",
|
||||
"description": "Sends emails",
|
||||
"input_schema": {"properties": {"to": {"type": "string"}}},
|
||||
"output_schema": {"properties": {"sent": {"type": "boolean"}}},
|
||||
},
|
||||
]
|
||||
|
||||
mock_response = MagicMock()
|
||||
mock_response.json.return_value = {
|
||||
"success": True,
|
||||
"type": "instructions",
|
||||
"steps": ["Step 1"],
|
||||
}
|
||||
mock_response.raise_for_status = MagicMock()
|
||||
|
||||
mock_client = AsyncMock()
|
||||
mock_client.post.return_value = mock_response
|
||||
|
||||
with patch.object(service, "_get_client", return_value=mock_client):
|
||||
await service.decompose_goal_external(
|
||||
"Send an email",
|
||||
library_agents=library_agents,
|
||||
)
|
||||
|
||||
# Verify library_agents was passed in the payload
|
||||
call_args = mock_client.post.call_args
|
||||
assert call_args[1]["json"]["library_agents"] == library_agents
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_generate_agent_passes_library_agents(self):
|
||||
"""Test that library_agents are included in generate agent payload."""
|
||||
library_agents = [
|
||||
{
|
||||
"graph_id": "agent-456",
|
||||
"graph_version": 2,
|
||||
"name": "Data Fetcher",
|
||||
"description": "Fetches data from API",
|
||||
"input_schema": {"properties": {"url": {"type": "string"}}},
|
||||
"output_schema": {"properties": {"data": {"type": "object"}}},
|
||||
},
|
||||
]
|
||||
|
||||
mock_response = MagicMock()
|
||||
mock_response.json.return_value = {
|
||||
"success": True,
|
||||
"agent_json": {"name": "Test Agent", "nodes": []},
|
||||
}
|
||||
mock_response.raise_for_status = MagicMock()
|
||||
|
||||
mock_client = AsyncMock()
|
||||
mock_client.post.return_value = mock_response
|
||||
|
||||
with patch.object(service, "_get_client", return_value=mock_client):
|
||||
await service.generate_agent_external(
|
||||
{"steps": ["Step 1"]},
|
||||
library_agents=library_agents,
|
||||
)
|
||||
|
||||
# Verify library_agents was passed in the payload
|
||||
call_args = mock_client.post.call_args
|
||||
assert call_args[1]["json"]["library_agents"] == library_agents
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_generate_agent_patch_passes_library_agents(self):
|
||||
"""Test that library_agents are included in patch generation payload."""
|
||||
library_agents = [
|
||||
{
|
||||
"graph_id": "agent-789",
|
||||
"graph_version": 1,
|
||||
"name": "Slack Notifier",
|
||||
"description": "Sends Slack messages",
|
||||
"input_schema": {"properties": {"message": {"type": "string"}}},
|
||||
"output_schema": {"properties": {"success": {"type": "boolean"}}},
|
||||
},
|
||||
]
|
||||
|
||||
mock_response = MagicMock()
|
||||
mock_response.json.return_value = {
|
||||
"success": True,
|
||||
"agent_json": {"name": "Updated Agent", "nodes": []},
|
||||
}
|
||||
mock_response.raise_for_status = MagicMock()
|
||||
|
||||
mock_client = AsyncMock()
|
||||
mock_client.post.return_value = mock_response
|
||||
|
||||
with patch.object(service, "_get_client", return_value=mock_client):
|
||||
await service.generate_agent_patch_external(
|
||||
"Add error handling",
|
||||
{"name": "Original Agent", "nodes": []},
|
||||
library_agents=library_agents,
|
||||
)
|
||||
|
||||
# Verify library_agents was passed in the payload
|
||||
call_args = mock_client.post.call_args
|
||||
assert call_args[1]["json"]["library_agents"] == library_agents
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_decompose_goal_without_library_agents(self):
|
||||
"""Test that decompose goal works without library_agents."""
|
||||
mock_response = MagicMock()
|
||||
mock_response.json.return_value = {
|
||||
"success": True,
|
||||
"type": "instructions",
|
||||
"steps": ["Step 1"],
|
||||
}
|
||||
mock_response.raise_for_status = MagicMock()
|
||||
|
||||
mock_client = AsyncMock()
|
||||
mock_client.post.return_value = mock_response
|
||||
|
||||
with patch.object(service, "_get_client", return_value=mock_client):
|
||||
await service.decompose_goal_external("Build a workflow")
|
||||
|
||||
# Verify library_agents was NOT passed when not provided
|
||||
call_args = mock_client.post.call_args
|
||||
assert "library_agents" not in call_args[1]["json"]
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
pytest.main([__file__, "-v"])
|
||||
|
||||
@@ -857,7 +857,7 @@ export const CustomNode = React.memo(
|
||||
})();
|
||||
|
||||
const hasAdvancedFields =
|
||||
data.inputSchema &&
|
||||
data.inputSchema?.properties &&
|
||||
Object.entries(data.inputSchema.properties).some(([key, value]) => {
|
||||
return (
|
||||
value.advanced === true && !data.inputSchema.required?.includes(key)
|
||||
|
||||
@@ -7981,6 +7981,25 @@
|
||||
]
|
||||
},
|
||||
"new_output": { "type": "boolean", "title": "New Output" },
|
||||
"execution_count": {
|
||||
"type": "integer",
|
||||
"title": "Execution Count",
|
||||
"default": 0
|
||||
},
|
||||
"success_rate": {
|
||||
"anyOf": [{ "type": "number" }, { "type": "null" }],
|
||||
"title": "Success Rate"
|
||||
},
|
||||
"avg_correctness_score": {
|
||||
"anyOf": [{ "type": "number" }, { "type": "null" }],
|
||||
"title": "Avg Correctness Score"
|
||||
},
|
||||
"recent_executions": {
|
||||
"items": { "$ref": "#/components/schemas/RecentExecution" },
|
||||
"type": "array",
|
||||
"title": "Recent Executions",
|
||||
"description": "List of recent executions with status, score, and summary"
|
||||
},
|
||||
"can_access_graph": {
|
||||
"type": "boolean",
|
||||
"title": "Can Access Graph"
|
||||
@@ -9374,6 +9393,23 @@
|
||||
"required": ["providers", "pagination"],
|
||||
"title": "ProviderResponse"
|
||||
},
|
||||
"RecentExecution": {
|
||||
"properties": {
|
||||
"status": { "type": "string", "title": "Status" },
|
||||
"correctness_score": {
|
||||
"anyOf": [{ "type": "number" }, { "type": "null" }],
|
||||
"title": "Correctness Score"
|
||||
},
|
||||
"activity_summary": {
|
||||
"anyOf": [{ "type": "string" }, { "type": "null" }],
|
||||
"title": "Activity Summary"
|
||||
}
|
||||
},
|
||||
"type": "object",
|
||||
"required": ["status"],
|
||||
"title": "RecentExecution",
|
||||
"description": "Summary of a recent execution for quality assessment.\n\nUsed by the LLM to understand the agent's recent performance with specific examples\nrather than just aggregate statistics."
|
||||
},
|
||||
"RefundRequest": {
|
||||
"properties": {
|
||||
"id": { "type": "string", "title": "Id" },
|
||||
@@ -9797,7 +9833,8 @@
|
||||
"sub_heading": { "type": "string", "title": "Sub Heading" },
|
||||
"description": { "type": "string", "title": "Description" },
|
||||
"runs": { "type": "integer", "title": "Runs" },
|
||||
"rating": { "type": "number", "title": "Rating" }
|
||||
"rating": { "type": "number", "title": "Rating" },
|
||||
"agent_graph_id": { "type": "string", "title": "Agent Graph Id" }
|
||||
},
|
||||
"type": "object",
|
||||
"required": [
|
||||
@@ -9809,7 +9846,8 @@
|
||||
"sub_heading",
|
||||
"description",
|
||||
"runs",
|
||||
"rating"
|
||||
"rating",
|
||||
"agent_graph_id"
|
||||
],
|
||||
"title": "StoreAgent"
|
||||
},
|
||||
|
||||
@@ -104,7 +104,31 @@ export function FileInput(props: Props) {
|
||||
return false;
|
||||
}
|
||||
|
||||
const getFileLabelFromValue = (val: string) => {
|
||||
const getFileLabelFromValue = (val: unknown): string => {
|
||||
// Handle object format from external API: { name, type, size, data }
|
||||
if (val && typeof val === "object") {
|
||||
const obj = val as Record<string, unknown>;
|
||||
if (typeof obj.name === "string") {
|
||||
return getFileLabel(
|
||||
obj.name,
|
||||
typeof obj.type === "string" ? obj.type : "",
|
||||
);
|
||||
}
|
||||
if (typeof obj.type === "string") {
|
||||
const mimeParts = obj.type.split("/");
|
||||
if (mimeParts.length > 1) {
|
||||
return `${mimeParts[1].toUpperCase()} file`;
|
||||
}
|
||||
return `${obj.type} file`;
|
||||
}
|
||||
return "File";
|
||||
}
|
||||
|
||||
// Handle string values (data URIs or file paths)
|
||||
if (typeof val !== "string") {
|
||||
return "File";
|
||||
}
|
||||
|
||||
if (val.startsWith("data:")) {
|
||||
const matches = val.match(/^data:([^;]+);/);
|
||||
if (matches?.[1]) {
|
||||
|
||||
@@ -156,11 +156,19 @@ export function ChatMessage({
|
||||
}
|
||||
|
||||
if (isClarificationNeeded && message.type === "clarification_needed") {
|
||||
const hasUserReplyAfter =
|
||||
index >= 0 &&
|
||||
messages
|
||||
.slice(index + 1)
|
||||
.some((m) => m.type === "message" && m.role === "user");
|
||||
|
||||
return (
|
||||
<ClarificationQuestionsWidget
|
||||
questions={message.questions}
|
||||
message={message.message}
|
||||
sessionId={message.sessionId}
|
||||
onSubmitAnswers={handleClarificationAnswers}
|
||||
isAnswered={hasUserReplyAfter}
|
||||
className={className}
|
||||
/>
|
||||
);
|
||||
|
||||
@@ -6,7 +6,7 @@ import { Input } from "@/components/atoms/Input/Input";
|
||||
import { Text } from "@/components/atoms/Text/Text";
|
||||
import { cn } from "@/lib/utils";
|
||||
import { CheckCircleIcon, QuestionIcon } from "@phosphor-icons/react";
|
||||
import { useState } from "react";
|
||||
import { useState, useEffect, useRef } from "react";
|
||||
|
||||
export interface ClarifyingQuestion {
|
||||
question: string;
|
||||
@@ -17,39 +17,96 @@ export interface ClarifyingQuestion {
|
||||
interface Props {
|
||||
questions: ClarifyingQuestion[];
|
||||
message: string;
|
||||
sessionId?: string;
|
||||
onSubmitAnswers: (answers: Record<string, string>) => void;
|
||||
onCancel?: () => void;
|
||||
isAnswered?: boolean;
|
||||
className?: string;
|
||||
}
|
||||
|
||||
function getStorageKey(sessionId?: string): string | null {
|
||||
if (!sessionId) return null;
|
||||
return `clarification_answers_${sessionId}`;
|
||||
}
|
||||
|
||||
export function ClarificationQuestionsWidget({
|
||||
questions,
|
||||
message,
|
||||
sessionId,
|
||||
onSubmitAnswers,
|
||||
onCancel,
|
||||
isAnswered = false,
|
||||
className,
|
||||
}: Props) {
|
||||
const [answers, setAnswers] = useState<Record<string, string>>({});
|
||||
const [isSubmitted, setIsSubmitted] = useState(false);
|
||||
const lastSessionIdRef = useRef<string | undefined>(undefined);
|
||||
|
||||
useEffect(() => {
|
||||
const storageKey = getStorageKey(sessionId);
|
||||
if (!storageKey) {
|
||||
setAnswers({});
|
||||
setIsSubmitted(false);
|
||||
lastSessionIdRef.current = sessionId;
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
const saved = localStorage.getItem(storageKey);
|
||||
if (saved) {
|
||||
const parsed = JSON.parse(saved) as Record<string, string>;
|
||||
setAnswers(parsed);
|
||||
} else {
|
||||
setAnswers({});
|
||||
}
|
||||
setIsSubmitted(false);
|
||||
} catch {
|
||||
setAnswers({});
|
||||
setIsSubmitted(false);
|
||||
}
|
||||
lastSessionIdRef.current = sessionId;
|
||||
}, [sessionId]);
|
||||
|
||||
useEffect(() => {
|
||||
if (lastSessionIdRef.current !== sessionId) {
|
||||
return;
|
||||
}
|
||||
const storageKey = getStorageKey(sessionId);
|
||||
if (!storageKey) return;
|
||||
|
||||
const hasAnswers = Object.values(answers).some((v) => v.trim());
|
||||
try {
|
||||
if (hasAnswers) {
|
||||
localStorage.setItem(storageKey, JSON.stringify(answers));
|
||||
} else {
|
||||
localStorage.removeItem(storageKey);
|
||||
}
|
||||
} catch {}
|
||||
}, [answers, sessionId]);
|
||||
|
||||
function handleAnswerChange(keyword: string, value: string) {
|
||||
setAnswers((prev) => ({ ...prev, [keyword]: value }));
|
||||
}
|
||||
|
||||
function handleSubmit() {
|
||||
// Check if all questions are answered
|
||||
const allAnswered = questions.every((q) => answers[q.keyword]?.trim());
|
||||
if (!allAnswered) {
|
||||
return;
|
||||
}
|
||||
setIsSubmitted(true);
|
||||
onSubmitAnswers(answers);
|
||||
|
||||
const storageKey = getStorageKey(sessionId);
|
||||
try {
|
||||
if (storageKey) {
|
||||
localStorage.removeItem(storageKey);
|
||||
}
|
||||
} catch {}
|
||||
}
|
||||
|
||||
const allAnswered = questions.every((q) => answers[q.keyword]?.trim());
|
||||
|
||||
// Show submitted state after answers are submitted
|
||||
if (isSubmitted) {
|
||||
if (isAnswered || isSubmitted) {
|
||||
return (
|
||||
<div
|
||||
className={cn(
|
||||
|
||||
@@ -30,9 +30,9 @@ export function getErrorMessage(result: unknown): string {
|
||||
}
|
||||
if (typeof result === "object" && result !== null) {
|
||||
const response = result as Record<string, unknown>;
|
||||
if (response.error) return stripInternalReasoning(String(response.error));
|
||||
if (response.message)
|
||||
return stripInternalReasoning(String(response.message));
|
||||
if (response.error) return stripInternalReasoning(String(response.error));
|
||||
}
|
||||
return "An error occurred";
|
||||
}
|
||||
@@ -363,8 +363,8 @@ export function formatToolResponse(result: unknown, toolName: string): string {
|
||||
|
||||
case "error":
|
||||
const errorMsg =
|
||||
(response.error as string) || response.message || "An error occurred";
|
||||
return `Error: ${errorMsg}`;
|
||||
(response.message as string) || response.error || "An error occurred";
|
||||
return stripInternalReasoning(String(errorMsg));
|
||||
|
||||
case "no_results":
|
||||
const suggestions = (response.suggestions as string[]) || [];
|
||||
|
||||
@@ -4,6 +4,28 @@
|
||||
|
||||
This guide walks through creating a simple question-answer AI agent using AutoGPT's visual builder. This is a basic example that can be expanded into more complex agents.
|
||||
|
||||
## **Prerequisites**
|
||||
|
||||
### **Cloud-Hosted AutoGPT**
|
||||
If you're using the cloud-hosted version at [agpt.co](https://agpt.co), you're ready to go! AI blocks come with **built-in credits** — no API keys required to get started. If you'd prefer to use your own API keys, you can add them via **Profile → Integrations**.
|
||||
|
||||
### **Self-Hosted (Docker)**
|
||||
If you're running AutoGPT locally with Docker, you'll need to add your own API keys to `autogpt_platform/backend/.env`:
|
||||
|
||||
```bash
|
||||
# Create or edit backend/.env
|
||||
OPENAI_API_KEY=sk-your-key-here
|
||||
ANTHROPIC_API_KEY=sk-ant-your-key-here
|
||||
# Add other provider keys as needed
|
||||
```
|
||||
|
||||
After adding keys, restart the services:
|
||||
```bash
|
||||
docker compose down && docker compose up -d
|
||||
```
|
||||
|
||||
**Note:** The Calculator example below doesn't require any API credentials — it's a good way to test your setup before adding AI blocks.
|
||||
|
||||
## **Example Agent: Q&A (with AI)**
|
||||
|
||||
A step-by-step guide to creating a simple Q&A agent using input and output blocks.
|
||||
|
||||
Reference in New Issue
Block a user