Compare commits

..

2 Commits

Author SHA1 Message Date
Otto
a286b1d06e refactor: address review feedback
- Move helpers after search_agents function
- Simplify list_all logic by setting query='' early
- Update find_library_agent description to 'Search for or list'
2026-02-17 18:09:40 +00:00
Otto
2b0654b9e5 fix(copilot): handle 'all' keyword in find_library_agent tool
When users ask CoPilot to 'show all my agents', the LLM was passing
the literal string 'all' as a search query, which matched no agents.

Changes:
- Make query parameter optional in FindLibraryAgentTool
- Add _LIST_ALL_KEYWORDS set for special keywords ('all', '*', 'everything', 'any', '')
- When query matches a list-all keyword, pass None to list_library_agents
- Update response messages to reflect 'list all' vs 'search' behavior

Fixes SECRT-2002
2026-02-17 18:09:09 +00:00
5 changed files with 140 additions and 282 deletions

View File

@@ -5,7 +5,7 @@ import re
from datetime import datetime, timedelta, timezone
from typing import Any
from pydantic import BaseModel, Field, field_validator
from pydantic import BaseModel, field_validator
from backend.api.features.library.model import LibraryAgent
from backend.copilot.model import ChatSession
@@ -13,7 +13,6 @@ from backend.data.db_accessors import execution_db, library_db
from backend.data.execution import ExecutionStatus, GraphExecution, GraphExecutionMeta
from .base import BaseTool
from .execution_utils import TERMINAL_STATUSES, wait_for_execution
from .models import (
AgentOutputResponse,
ErrorResponse,
@@ -34,7 +33,6 @@ class AgentOutputInput(BaseModel):
store_slug: str = ""
execution_id: str = ""
run_time: str = "latest"
wait_if_running: int = Field(default=0, ge=0, le=300)
@field_validator(
"agent_name",
@@ -118,11 +116,6 @@ class AgentOutputTool(BaseTool):
Select which run to retrieve using:
- execution_id: Specific execution ID
- run_time: 'latest' (default), 'yesterday', 'last week', or ISO date 'YYYY-MM-DD'
Wait for completion (optional):
- wait_if_running: Max seconds to wait if execution is still running (0-300).
If the execution is running/queued, waits up to this many seconds for completion.
Returns current status on timeout. If already finished, returns immediately.
"""
@property
@@ -152,13 +145,6 @@ class AgentOutputTool(BaseTool):
"Time filter: 'latest', 'yesterday', 'last week', or 'YYYY-MM-DD'"
),
},
"wait_if_running": {
"type": "integer",
"description": (
"Max seconds to wait if execution is still running (0-300). "
"If running, waits for completion. Returns current state on timeout."
),
},
},
"required": [],
}
@@ -238,14 +224,10 @@ class AgentOutputTool(BaseTool):
execution_id: str | None,
time_start: datetime | None,
time_end: datetime | None,
include_running: bool = False,
) -> tuple[GraphExecution | None, list[GraphExecutionMeta], str | None]:
"""
Fetch execution(s) based on filters.
Returns (single_execution, available_executions_meta, error_message).
Args:
include_running: If True, also look for running/queued executions (for waiting)
"""
exec_db = execution_db()
@@ -260,22 +242,11 @@ class AgentOutputTool(BaseTool):
return None, [], f"Execution '{execution_id}' not found"
return execution, [], None
# Determine which statuses to query
statuses = [ExecutionStatus.COMPLETED]
if include_running:
statuses.extend(
[
ExecutionStatus.RUNNING,
ExecutionStatus.QUEUED,
ExecutionStatus.INCOMPLETE,
]
)
# Get executions with time filters
# Get completed executions with time filters
executions = await exec_db.get_graph_executions(
graph_id=graph_id,
user_id=user_id,
statuses=statuses,
statuses=[ExecutionStatus.COMPLETED],
created_time_gte=time_start,
created_time_lte=time_end,
limit=10,
@@ -342,28 +313,10 @@ class AgentOutputTool(BaseTool):
for e in available_executions[:5]
]
# Build appropriate message based on execution status
if execution.status == ExecutionStatus.COMPLETED:
message = f"Found execution outputs for agent '{agent.name}'"
elif execution.status == ExecutionStatus.FAILED:
message = f"Execution for agent '{agent.name}' failed"
elif execution.status == ExecutionStatus.TERMINATED:
message = f"Execution for agent '{agent.name}' was terminated"
elif execution.status in (
ExecutionStatus.RUNNING,
ExecutionStatus.QUEUED,
ExecutionStatus.INCOMPLETE,
):
message = (
f"Execution for agent '{agent.name}' is still {execution.status.value}. "
"Results may be incomplete. Use wait_if_running to wait for completion."
)
else:
message = f"Found execution for agent '{agent.name}' (status: {execution.status.value})"
message = f"Found execution outputs for agent '{agent.name}'"
if len(available_executions) > 1:
message += (
f" Showing latest of {len(available_executions)} matching executions."
f". Showing latest of {len(available_executions)} matching executions."
)
return AgentOutputResponse(
@@ -478,17 +431,13 @@ class AgentOutputTool(BaseTool):
# Parse time expression
time_start, time_end = parse_time_expression(input_data.run_time)
# Check if we should wait for running executions
wait_timeout = input_data.wait_if_running
# Fetch execution(s) - include running if we're going to wait
# Fetch execution(s)
execution, available_executions, exec_error = await self._get_execution(
user_id=user_id,
graph_id=agent.graph_id,
execution_id=input_data.execution_id or None,
time_start=time_start,
time_end=time_end,
include_running=wait_timeout > 0,
)
if exec_error:
@@ -497,17 +446,4 @@ class AgentOutputTool(BaseTool):
session_id=session_id,
)
# If we have an execution that's still running and we should wait
if execution and wait_timeout > 0 and execution.status not in TERMINAL_STATUSES:
logger.info(
f"Execution {execution.id} is {execution.status}, "
f"waiting up to {wait_timeout}s for completion"
)
execution = await wait_for_execution(
user_id=user_id,
graph_id=agent.graph_id,
execution_id=execution.id,
timeout_seconds=wait_timeout,
)
return self._build_response(agent, execution, available_executions, session_id)

View File

@@ -24,12 +24,20 @@ _UUID_PATTERN = re.compile(
re.IGNORECASE,
)
# Keywords that should be treated as "list all" rather than a literal search
_LIST_ALL_KEYWORDS = frozenset({"all", "*", "everything", "any", ""})
def _is_uuid(text: str) -> bool:
"""Check if text is a valid UUID v4."""
return bool(_UUID_PATTERN.match(text.strip()))
def _is_list_all_query(query: str) -> bool:
"""Check if query should list all agents rather than search."""
return query.lower().strip() in _LIST_ALL_KEYWORDS
async def _get_library_agent_by_id(user_id: str, agent_id: str) -> AgentInfo | None:
"""Fetch a library agent by ID (library agent ID or graph_id).
@@ -101,6 +109,7 @@ async def _get_library_agent_by_id(user_id: str, agent_id: str) -> AgentInfo | N
return None
async def search_agents(
query: str,
source: SearchSource,
@@ -111,7 +120,8 @@ async def search_agents(
Search for agents in marketplace or user library.
Args:
query: Search query string
query: Search query string. For library searches, empty or special keywords
like "all" will list all agents without filtering.
source: "marketplace" or "library"
session_id: Chat session ID
user_id: User ID (required for library search)
@@ -119,7 +129,13 @@ async def search_agents(
Returns:
AgentsFoundResponse, NoResultsResponse, or ErrorResponse
"""
if not query:
# For library searches, treat special keywords as "list all" (empty query)
list_all = source == "library" and _is_list_all_query(query)
if list_all:
query = ""
# For marketplace, we always need a search term
if source == "marketplace" and not query:
return ErrorResponse(
message="Please provide a search query", session_id=session_id
)
@@ -159,10 +175,14 @@ async def search_agents(
logger.info(f"Found agent by direct ID lookup: {agent.name}")
if not agents:
logger.info(f"Searching user library for: {query}")
search_term = query or None # None means list all
logger.info(
f"{'Listing all agents in' if list_all else 'Searching'} "
f"user library{'' if list_all else f' for: {query}'}"
)
results = await library_db().list_library_agents(
user_id=user_id, # type: ignore[arg-type]
search_term=query,
search_term=search_term,
page_size=10,
)
for agent in results.agents:
@@ -206,21 +226,34 @@ async def search_agents(
"Check your library at /library",
]
)
no_results_msg = (
f"No agents found matching '{query}'. Let the user know they can try different keywords or browse the marketplace. Also let them know you can create a custom agent for them based on their needs."
if source == "marketplace"
else f"No agents matching '{query}' found in your library. Let the user know you can create a custom agent for them based on their needs."
)
if source == "marketplace":
no_results_msg = (
f"No agents found matching '{query}'. Let the user know they can "
"try different keywords or browse the marketplace. Also let them "
"know you can create a custom agent for them based on their needs."
)
elif list_all:
no_results_msg = (
"The user's library is empty. Let them know they can browse the "
"marketplace to find agents, or you can create a custom agent "
"for them based on their needs."
)
else:
no_results_msg = (
f"No agents matching '{query}' found in your library. Let the user "
"know you can create a custom agent for them based on their needs."
)
return NoResultsResponse(
message=no_results_msg, session_id=session_id, suggestions=suggestions
)
title = f"Found {len(agents)} agent{'s' if len(agents) != 1 else ''} "
title += (
f"for '{query}'"
if source == "marketplace"
else f"in your library for '{query}'"
)
agent_count_str = f"{len(agents)} agent{'s' if len(agents) != 1 else ''}"
if source == "marketplace":
title = f"Found {agent_count_str} for '{query}'"
elif list_all:
title = f"Found {agent_count_str} in your library"
else:
title = f"Found {agent_count_str} in your library for '{query}'"
message = (
"Now you have found some options for the user to choose from. "
@@ -238,3 +271,82 @@ async def search_agents(
count=len(agents),
session_id=session_id,
)
def _is_uuid(text: str) -> bool:
"""Check if text is a valid UUID v4."""
return bool(_UUID_PATTERN.match(text.strip()))
def _is_list_all_query(query: str) -> bool:
"""Check if query should list all agents rather than search."""
return query.lower().strip() in _LIST_ALL_KEYWORDS
async def _get_library_agent_by_id(user_id: str, agent_id: str) -> AgentInfo | None:
"""Fetch a library agent by ID (library agent ID or graph_id).
Tries multiple lookup strategies:
1. First by graph_id (AgentGraph primary key)
2. Then by library agent ID (LibraryAgent primary key)
Args:
user_id: The user ID
agent_id: The ID to look up (can be graph_id or library agent ID)
Returns:
AgentInfo if found, None otherwise
"""
try:
agent = await library_db.get_library_agent_by_graph_id(user_id, agent_id)
if agent:
logger.debug(f"Found library agent by graph_id: {agent.name}")
return AgentInfo(
id=agent.id,
name=agent.name,
description=agent.description or "",
source="library",
in_library=True,
creator=agent.creator_name,
status=agent.status.value,
can_access_graph=agent.can_access_graph,
has_external_trigger=agent.has_external_trigger,
new_output=agent.new_output,
graph_id=agent.graph_id,
)
except DatabaseError:
raise
except Exception as e:
logger.warning(
f"Could not fetch library agent by graph_id {agent_id}: {e}",
exc_info=True,
)
try:
agent = await library_db.get_library_agent(agent_id, user_id)
if agent:
logger.debug(f"Found library agent by library_id: {agent.name}")
return AgentInfo(
id=agent.id,
name=agent.name,
description=agent.description or "",
source="library",
in_library=True,
creator=agent.creator_name,
status=agent.status.value,
can_access_graph=agent.can_access_graph,
has_external_trigger=agent.has_external_trigger,
new_output=agent.new_output,
graph_id=agent.graph_id,
)
except NotFoundError:
logger.debug(f"Library agent not found by library_id: {agent_id}")
except DatabaseError:
raise
except Exception as e:
logger.warning(
f"Could not fetch library agent by library_id {agent_id}: {e}",
exc_info=True,
)
return None

View File

@@ -1,129 +0,0 @@
"""Shared utilities for execution waiting and status handling."""
import asyncio
import logging
from typing import Any
from backend.data.db_accessors import execution_db
from backend.data.execution import (
AsyncRedisExecutionEventBus,
ExecutionStatus,
GraphExecution,
GraphExecutionEvent,
)
logger = logging.getLogger(__name__)
# Terminal statuses that indicate execution is complete
TERMINAL_STATUSES = frozenset(
{
ExecutionStatus.COMPLETED,
ExecutionStatus.FAILED,
ExecutionStatus.TERMINATED,
}
)
async def wait_for_execution(
user_id: str,
graph_id: str,
execution_id: str,
timeout_seconds: int,
) -> GraphExecution | None:
"""
Wait for an execution to reach a terminal status using Redis pubsub.
Uses asyncio.wait_for to ensure timeout is respected even when no events
are received.
Args:
user_id: User ID
graph_id: Graph ID
execution_id: Execution ID to wait for
timeout_seconds: Max seconds to wait
Returns:
The execution with current status, or None if not found
"""
exec_db = execution_db()
# First check current status - maybe it's already done
execution = await exec_db.get_graph_execution(
user_id=user_id,
execution_id=execution_id,
include_node_executions=False,
)
if not execution:
return None
# If already in terminal state, return immediately
if execution.status in TERMINAL_STATUSES:
logger.debug(
f"Execution {execution_id} already in terminal state: {execution.status}"
)
return execution
logger.info(
f"Waiting up to {timeout_seconds}s for execution {execution_id} "
f"(current status: {execution.status})"
)
# Subscribe to execution updates via Redis pubsub
event_bus = AsyncRedisExecutionEventBus()
channel_key = f"{user_id}/{graph_id}/{execution_id}"
try:
# Use wait_for to enforce timeout on the entire listen operation
result = await asyncio.wait_for(
_listen_for_terminal_status(
event_bus, channel_key, user_id, execution_id, exec_db
),
timeout=timeout_seconds,
)
return result
except asyncio.TimeoutError:
logger.info(f"Timeout waiting for execution {execution_id}")
except Exception as e:
logger.error(f"Error waiting for execution: {e}", exc_info=True)
# Return current state on timeout/error
return await exec_db.get_graph_execution(
user_id=user_id,
execution_id=execution_id,
include_node_executions=False,
)
async def _listen_for_terminal_status(
event_bus: AsyncRedisExecutionEventBus,
channel_key: str,
user_id: str,
execution_id: str,
exec_db: Any,
) -> GraphExecution | None:
"""
Listen for execution events until a terminal status is reached.
This is a helper that gets wrapped in asyncio.wait_for for timeout handling.
"""
async for event in event_bus.listen_events(channel_key):
# Only process GraphExecutionEvents (not NodeExecutionEvents)
if isinstance(event, GraphExecutionEvent):
logger.debug(f"Received execution update: {event.status}")
if event.status in TERMINAL_STATUSES:
# Fetch full execution with outputs
return await exec_db.get_graph_execution(
user_id=user_id,
execution_id=execution_id,
include_node_executions=False,
)
# Should not reach here normally (generator should yield indefinitely)
return None
def get_execution_outputs(execution: GraphExecution | None) -> dict[str, Any] | None:
"""Extract outputs from an execution, or return None."""
if execution is None:
return None
return execution.outputs

View File

@@ -19,7 +19,7 @@ class FindLibraryAgentTool(BaseTool):
@property
def description(self) -> str:
return (
"Search for agents in the user's library. Use this to find agents "
"Search for or list agents in the user's library. Use this to find agents "
"the user has already added to their library, including agents they "
"created or added from the marketplace."
)
@@ -31,10 +31,13 @@ class FindLibraryAgentTool(BaseTool):
"properties": {
"query": {
"type": "string",
"description": "Search query to find agents by name or description.",
"description": (
"Optional search query to filter agents by name or description. "
"Leave empty or omit to list all agents in the library."
),
},
},
"required": ["query"],
"required": [],
}
@property

View File

@@ -9,7 +9,6 @@ from backend.copilot.config import ChatConfig
from backend.copilot.model import ChatSession
from backend.copilot.tracking import track_agent_run_success, track_agent_scheduled
from backend.data.db_accessors import graph_db, library_db, user_db
from backend.data.execution import ExecutionStatus
from backend.data.graph import GraphModel
from backend.data.model import CredentialsMetaInput
from backend.executor import utils as execution_utils
@@ -25,7 +24,6 @@ from .helpers import get_inputs_from_schema
from .models import (
AgentDetails,
AgentDetailsResponse,
AgentOutputResponse,
ErrorResponse,
ExecutionOptions,
ExecutionStartedResponse,
@@ -35,7 +33,6 @@ from .models import (
ToolResponseBase,
UserReadiness,
)
from .execution_utils import get_execution_outputs, wait_for_execution
from .utils import (
build_missing_credentials_from_graph,
extract_credentials_from_schema,
@@ -69,7 +66,6 @@ class RunAgentInput(BaseModel):
schedule_name: str = ""
cron: str = ""
timezone: str = "UTC"
wait_for_result: int = Field(default=0, ge=0, le=300)
@field_validator(
"username_agent_slug",
@@ -151,14 +147,6 @@ class RunAgentTool(BaseTool):
"type": "string",
"description": "IANA timezone for schedule (default: UTC)",
},
"wait_for_result": {
"type": "integer",
"description": (
"Max seconds to wait for execution to complete (0-300). "
"If >0, blocks until the execution finishes or times out. "
"Returns execution outputs when complete."
),
},
},
"required": [],
}
@@ -353,7 +341,6 @@ class RunAgentTool(BaseTool):
graph=graph,
graph_credentials=graph_credentials,
inputs=params.inputs,
wait_for_result=params.wait_for_result,
)
except NotFoundError as e:
@@ -437,9 +424,8 @@ class RunAgentTool(BaseTool):
graph: GraphModel,
graph_credentials: dict[str, CredentialsMetaInput],
inputs: dict[str, Any],
wait_for_result: int = 0,
) -> ToolResponseBase:
"""Execute an agent immediately, optionally waiting for completion."""
"""Execute an agent immediately."""
session_id = session.session_id
# Check rate limits
@@ -476,56 +462,6 @@ class RunAgentTool(BaseTool):
)
library_agent_link = f"/library/agents/{library_agent.id}"
# If wait_for_result is requested, wait for execution to complete
if wait_for_result > 0:
logger.info(
f"Waiting up to {wait_for_result}s for execution {execution.id}"
)
completed = await wait_for_execution(
user_id=user_id,
graph_id=library_agent.graph_id,
execution_id=execution.id,
timeout_seconds=wait_for_result,
)
if completed and completed.status == ExecutionStatus.COMPLETED:
outputs = get_execution_outputs(completed)
return AgentOutputResponse(
message=(
f"Agent '{library_agent.name}' completed successfully. "
f"View at {library_agent_link}."
),
session_id=session_id,
execution_id=execution.id,
graph_id=library_agent.graph_id,
graph_name=library_agent.name,
outputs=outputs or {},
)
elif completed and completed.status == ExecutionStatus.FAILED:
return ErrorResponse(
message=(
f"Agent '{library_agent.name}' execution failed. "
f"View details at {library_agent_link}."
),
session_id=session_id,
)
else:
status = completed.status.value if completed else "unknown"
return ExecutionStartedResponse(
message=(
f"Agent '{library_agent.name}' is still {status} after "
f"{wait_for_result}s. Check results later at {library_agent_link}. "
f"Use view_agent_output with wait_if_running to check again."
),
session_id=session_id,
execution_id=execution.id,
graph_id=library_agent.graph_id,
graph_name=library_agent.name,
library_agent_id=library_agent.id,
library_agent_link=library_agent_link,
)
return ExecutionStartedResponse(
message=(
f"Agent '{library_agent.name}' execution started successfully. "