Compare commits

...

5 Commits

Author SHA1 Message Date
Otto
901b5e8b75 refactor: address review feedback
- Use pydantic Field(ge=0, le=300) instead of custom validators
- Extract shared wait logic to execution_utils.py
- Use asyncio.wait_for for proper timeout handling
- Remove duplicated code in agent_output.py and run_agent.py

Note: Direct DB access will need adjustment for #12057 compatibility
2026-02-17 13:39:59 +00:00
Otto
92ddb57460 style: move helper after function in agent_output.py too 2026-02-17 08:47:08 +00:00
Otto
55dcf9359a style: move helper after function it helps 2026-02-17 08:45:49 +00:00
Otto
08c44ba872 feat(copilot): add wait_for_result to run_agent tool
Adds wait_for_result parameter (0-300 seconds) to run_agent that blocks
until execution completes or times out.

- Uses Redis pubsub subscription via AsyncRedisExecutionEventBus (no polling)
- Returns immediately if execution finishes within timeout
- Returns current state + partial outputs on timeout
- Outputs included in ExecutionStartedResponse when wait is used

This allows LLMs to run agents and get results in a single tool call:
run_agent(username_agent_slug='user/agent', wait_for_result=60)
2026-02-17 08:44:25 +00:00
Otto
44e341dccc feat(copilot): add wait_if_running to view_agent_output tool
Adds ability to wait for execution completion instead of just returning
current state. Uses Redis pubsub subscription for real-time updates.

Changes:
- Add wait_if_running parameter (0-300 seconds) to AgentOutputInput
- Add _wait_for_execution_completion method using AsyncRedisExecutionEventBus
- Subscribe to execution updates via Redis pubsub channel
- Return immediately if execution already in terminal state
- Return current state on timeout
- Update response messages to indicate running/incomplete status

SECRT-2003
2026-02-17 08:04:50 +00:00
4 changed files with 263 additions and 7 deletions

View File

@@ -5,7 +5,7 @@ import re
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
from typing import Any from typing import Any
from pydantic import BaseModel, field_validator from pydantic import BaseModel, Field, field_validator
from backend.api.features.chat.model import ChatSession from backend.api.features.chat.model import ChatSession
from backend.api.features.library import db as library_db from backend.api.features.library import db as library_db
@@ -14,6 +14,7 @@ from backend.data import execution as execution_db
from backend.data.execution import ExecutionStatus, GraphExecution, GraphExecutionMeta from backend.data.execution import ExecutionStatus, GraphExecution, GraphExecutionMeta
from .base import BaseTool from .base import BaseTool
from .execution_utils import TERMINAL_STATUSES, wait_for_execution
from .models import ( from .models import (
AgentOutputResponse, AgentOutputResponse,
ErrorResponse, ErrorResponse,
@@ -34,6 +35,7 @@ class AgentOutputInput(BaseModel):
store_slug: str = "" store_slug: str = ""
execution_id: str = "" execution_id: str = ""
run_time: str = "latest" run_time: str = "latest"
wait_if_running: int = Field(default=0, ge=0, le=300)
@field_validator( @field_validator(
"agent_name", "agent_name",
@@ -117,6 +119,11 @@ class AgentOutputTool(BaseTool):
Select which run to retrieve using: Select which run to retrieve using:
- execution_id: Specific execution ID - execution_id: Specific execution ID
- run_time: 'latest' (default), 'yesterday', 'last week', or ISO date 'YYYY-MM-DD' - run_time: 'latest' (default), 'yesterday', 'last week', or ISO date 'YYYY-MM-DD'
Wait for completion (optional):
- wait_if_running: Max seconds to wait if execution is still running (0-300).
If the execution is running/queued, waits up to this many seconds for it to complete.
Returns current status on timeout. If already finished, returns immediately.
""" """
@property @property
@@ -146,6 +153,13 @@ class AgentOutputTool(BaseTool):
"Time filter: 'latest', 'yesterday', 'last week', or 'YYYY-MM-DD'" "Time filter: 'latest', 'yesterday', 'last week', or 'YYYY-MM-DD'"
), ),
}, },
"wait_if_running": {
"type": "integer",
"description": (
"Max seconds to wait if execution is still running (0-300). "
"If running, waits for completion. Returns current state on timeout."
),
},
}, },
"required": [], "required": [],
} }
@@ -223,10 +237,14 @@ class AgentOutputTool(BaseTool):
execution_id: str | None, execution_id: str | None,
time_start: datetime | None, time_start: datetime | None,
time_end: datetime | None, time_end: datetime | None,
include_running: bool = False,
) -> tuple[GraphExecution | None, list[GraphExecutionMeta], str | None]: ) -> tuple[GraphExecution | None, list[GraphExecutionMeta], str | None]:
""" """
Fetch execution(s) based on filters. Fetch execution(s) based on filters.
Returns (single_execution, available_executions_meta, error_message). Returns (single_execution, available_executions_meta, error_message).
Args:
include_running: If True, also look for running/queued executions (for waiting)
""" """
# If specific execution_id provided, fetch it directly # If specific execution_id provided, fetch it directly
if execution_id: if execution_id:
@@ -239,11 +257,22 @@ class AgentOutputTool(BaseTool):
return None, [], f"Execution '{execution_id}' not found" return None, [], f"Execution '{execution_id}' not found"
return execution, [], None return execution, [], None
# Get completed executions with time filters # Determine which statuses to query
statuses = [ExecutionStatus.COMPLETED]
if include_running:
statuses.extend(
[
ExecutionStatus.RUNNING,
ExecutionStatus.QUEUED,
ExecutionStatus.INCOMPLETE,
]
)
# Get executions with time filters
executions = await execution_db.get_graph_executions( executions = await execution_db.get_graph_executions(
graph_id=graph_id, graph_id=graph_id,
user_id=user_id, user_id=user_id,
statuses=[ExecutionStatus.COMPLETED], statuses=statuses,
created_time_gte=time_start, created_time_gte=time_start,
created_time_lte=time_end, created_time_lte=time_end,
limit=10, limit=10,
@@ -310,10 +339,28 @@ class AgentOutputTool(BaseTool):
for e in available_executions[:5] for e in available_executions[:5]
] ]
message = f"Found execution outputs for agent '{agent.name}'" # Build appropriate message based on execution status
if execution.status == ExecutionStatus.COMPLETED:
message = f"Found execution outputs for agent '{agent.name}'"
elif execution.status == ExecutionStatus.FAILED:
message = f"Execution for agent '{agent.name}' failed"
elif execution.status == ExecutionStatus.TERMINATED:
message = f"Execution for agent '{agent.name}' was terminated"
elif execution.status in (
ExecutionStatus.RUNNING,
ExecutionStatus.QUEUED,
ExecutionStatus.INCOMPLETE,
):
message = (
f"Execution for agent '{agent.name}' is still {execution.status.value}. "
"Results may be incomplete. Use wait_if_running to wait for completion."
)
else:
message = f"Found execution for agent '{agent.name}' (status: {execution.status.value})"
if len(available_executions) > 1: if len(available_executions) > 1:
message += ( message += (
f". Showing latest of {len(available_executions)} matching executions." f" Showing latest of {len(available_executions)} matching executions."
) )
return AgentOutputResponse( return AgentOutputResponse(
@@ -428,13 +475,17 @@ class AgentOutputTool(BaseTool):
# Parse time expression # Parse time expression
time_start, time_end = parse_time_expression(input_data.run_time) time_start, time_end = parse_time_expression(input_data.run_time)
# Fetch execution(s) # Check if we should wait for running executions
wait_timeout = input_data.wait_if_running
# Fetch execution(s) - include running if we're going to wait
execution, available_executions, exec_error = await self._get_execution( execution, available_executions, exec_error = await self._get_execution(
user_id=user_id, user_id=user_id,
graph_id=agent.graph_id, graph_id=agent.graph_id,
execution_id=input_data.execution_id or None, execution_id=input_data.execution_id or None,
time_start=time_start, time_start=time_start,
time_end=time_end, time_end=time_end,
include_running=wait_timeout > 0,
) )
if exec_error: if exec_error:
@@ -443,4 +494,17 @@ class AgentOutputTool(BaseTool):
session_id=session_id, session_id=session_id,
) )
# If we have an execution that's still running and we should wait
if execution and wait_timeout > 0 and execution.status not in TERMINAL_STATUSES:
logger.info(
f"Execution {execution.id} is {execution.status}, "
f"waiting up to {wait_timeout}s for completion"
)
execution = await wait_for_execution(
user_id=user_id,
graph_id=agent.graph_id,
execution_id=execution.id,
timeout_seconds=wait_timeout,
)
return self._build_response(agent, execution, available_executions, session_id) return self._build_response(agent, execution, available_executions, session_id)

View File

@@ -0,0 +1,124 @@
"""Shared utilities for execution waiting and status handling."""
import asyncio
import logging
from typing import Any
from backend.data import execution as execution_db
from backend.data.execution import (
AsyncRedisExecutionEventBus,
ExecutionStatus,
GraphExecution,
GraphExecutionEvent,
)
logger = logging.getLogger(__name__)
# Terminal statuses that indicate execution is complete
TERMINAL_STATUSES = frozenset(
{
ExecutionStatus.COMPLETED,
ExecutionStatus.FAILED,
ExecutionStatus.TERMINATED,
}
)
async def wait_for_execution(
user_id: str,
graph_id: str,
execution_id: str,
timeout_seconds: int,
) -> GraphExecution | None:
"""
Wait for an execution to reach a terminal status using Redis pubsub.
Uses asyncio.wait_for to ensure timeout is respected even when no events
are received.
Args:
user_id: User ID
graph_id: Graph ID
execution_id: Execution ID to wait for
timeout_seconds: Max seconds to wait
Returns:
The execution with current status, or None if not found
"""
# First check current status - maybe it's already done
execution = await execution_db.get_graph_execution(
user_id=user_id,
execution_id=execution_id,
include_node_executions=False,
)
if not execution:
return None
# If already in terminal state, return immediately
if execution.status in TERMINAL_STATUSES:
logger.debug(
f"Execution {execution_id} already in terminal state: {execution.status}"
)
return execution
logger.info(
f"Waiting up to {timeout_seconds}s for execution {execution_id} "
f"(current status: {execution.status})"
)
# Subscribe to execution updates via Redis pubsub
event_bus = AsyncRedisExecutionEventBus()
channel_key = f"{user_id}/{graph_id}/{execution_id}"
try:
# Use wait_for to enforce timeout on the entire listen operation
result = await asyncio.wait_for(
_listen_for_terminal_status(event_bus, channel_key, user_id, execution_id),
timeout=timeout_seconds,
)
return result
except asyncio.TimeoutError:
logger.info(f"Timeout waiting for execution {execution_id}")
except Exception as e:
logger.error(f"Error waiting for execution: {e}", exc_info=True)
# Return current state on timeout/error
return await execution_db.get_graph_execution(
user_id=user_id,
execution_id=execution_id,
include_node_executions=False,
)
async def _listen_for_terminal_status(
event_bus: AsyncRedisExecutionEventBus,
channel_key: str,
user_id: str,
execution_id: str,
) -> GraphExecution | None:
"""
Listen for execution events until a terminal status is reached.
This is a helper that gets wrapped in asyncio.wait_for for timeout handling.
"""
async for event in event_bus.listen_events(channel_key):
# Only process GraphExecutionEvents (not NodeExecutionEvents)
if isinstance(event, GraphExecutionEvent):
logger.debug(f"Received execution update: {event.status}")
if event.status in TERMINAL_STATUSES:
# Fetch full execution with outputs
return await execution_db.get_graph_execution(
user_id=user_id,
execution_id=execution_id,
include_node_executions=False,
)
# Should not reach here normally (generator should yield indefinitely)
return None
def get_execution_outputs(execution: GraphExecution | None) -> dict[str, Any] | None:
"""Extract outputs from an execution, or return None."""
if execution is None:
return None
return execution.outputs

View File

@@ -192,6 +192,7 @@ class ExecutionStartedResponse(ToolResponseBase):
library_agent_id: str | None = None library_agent_id: str | None = None
library_agent_link: str | None = None library_agent_link: str | None = None
status: str = "QUEUED" status: str = "QUEUED"
outputs: dict[str, Any] | None = None # Populated when wait_for_result is used
# Auth/error models # Auth/error models

View File

@@ -12,6 +12,7 @@ from backend.api.features.chat.tracking import (
track_agent_scheduled, track_agent_scheduled,
) )
from backend.api.features.library import db as library_db from backend.api.features.library import db as library_db
from backend.data.execution import ExecutionStatus
from backend.data.graph import GraphModel from backend.data.graph import GraphModel
from backend.data.model import CredentialsMetaInput from backend.data.model import CredentialsMetaInput
from backend.data.user import get_user_by_id from backend.data.user import get_user_by_id
@@ -24,6 +25,7 @@ from backend.util.timezone_utils import (
) )
from .base import BaseTool from .base import BaseTool
from .execution_utils import get_execution_outputs, wait_for_execution
from .helpers import get_inputs_from_schema from .helpers import get_inputs_from_schema
from .models import ( from .models import (
AgentDetails, AgentDetails,
@@ -70,6 +72,7 @@ class RunAgentInput(BaseModel):
schedule_name: str = "" schedule_name: str = ""
cron: str = "" cron: str = ""
timezone: str = "UTC" timezone: str = "UTC"
wait_for_result: int = Field(default=0, ge=0, le=300)
@field_validator( @field_validator(
"username_agent_slug", "username_agent_slug",
@@ -151,6 +154,14 @@ class RunAgentTool(BaseTool):
"type": "string", "type": "string",
"description": "IANA timezone for schedule (default: UTC)", "description": "IANA timezone for schedule (default: UTC)",
}, },
"wait_for_result": {
"type": "integer",
"description": (
"Max seconds to wait for execution to complete (0-300). "
"If >0, blocks until the execution finishes or times out. "
"Returns execution outputs when complete."
),
},
}, },
"required": [], "required": [],
} }
@@ -347,6 +358,7 @@ class RunAgentTool(BaseTool):
graph=graph, graph=graph,
graph_credentials=graph_credentials, graph_credentials=graph_credentials,
inputs=params.inputs, inputs=params.inputs,
wait_for_result=params.wait_for_result,
) )
except NotFoundError as e: except NotFoundError as e:
@@ -430,8 +442,9 @@ class RunAgentTool(BaseTool):
graph: GraphModel, graph: GraphModel,
graph_credentials: dict[str, CredentialsMetaInput], graph_credentials: dict[str, CredentialsMetaInput],
inputs: dict[str, Any], inputs: dict[str, Any],
wait_for_result: int = 0,
) -> ToolResponseBase: ) -> ToolResponseBase:
"""Execute an agent immediately.""" """Execute an agent immediately, optionally waiting for completion."""
session_id = session.session_id session_id = session.session_id
# Check rate limits # Check rate limits
@@ -468,6 +481,60 @@ class RunAgentTool(BaseTool):
) )
library_agent_link = f"/library/agents/{library_agent.id}" library_agent_link = f"/library/agents/{library_agent.id}"
# If wait_for_result is specified, wait for execution to complete
if wait_for_result > 0:
logger.info(
f"Waiting up to {wait_for_result}s for execution {execution.id}"
)
result = await wait_for_execution(
user_id=user_id,
graph_id=library_agent.graph_id,
execution_id=execution.id,
timeout_seconds=wait_for_result,
)
final_status = result.status if result else ExecutionStatus.FAILED
outputs = get_execution_outputs(result)
# Build message based on final status
if final_status == ExecutionStatus.COMPLETED:
message = (
f"Agent '{library_agent.name}' execution completed successfully. "
f"{MSG_DO_NOT_RUN_AGAIN}"
)
elif final_status == ExecutionStatus.FAILED:
message = (
f"Agent '{library_agent.name}' execution failed. "
f"View details at {library_agent_link}. "
f"{MSG_DO_NOT_RUN_AGAIN}"
)
elif final_status == ExecutionStatus.TERMINATED:
message = (
f"Agent '{library_agent.name}' execution was terminated. "
f"View details at {library_agent_link}. "
f"{MSG_DO_NOT_RUN_AGAIN}"
)
else:
message = (
f"Agent '{library_agent.name}' execution is still {final_status.value} "
f"(timed out after {wait_for_result}s). "
f"View at {library_agent_link}. "
f"{MSG_DO_NOT_RUN_AGAIN}"
)
return ExecutionStartedResponse(
message=message,
session_id=session_id,
execution_id=execution.id,
graph_id=library_agent.graph_id,
graph_name=library_agent.name,
library_agent_id=library_agent.id,
library_agent_link=library_agent_link,
status=final_status.value,
outputs=outputs,
)
# Default: return immediately without waiting
return ExecutionStartedResponse( return ExecutionStartedResponse(
message=( message=(
f"Agent '{library_agent.name}' execution started successfully. " f"Agent '{library_agent.name}' execution started successfully. "