fix: address review feedback

- Close Redis event bus connection in finally block (fixes connection leak)
- Handle race condition: re-check DB after subscribing to pubsub
- Handle REVIEW status (human-in-the-loop) as a stop-waiting state
- Handle TERMINATED explicitly in run_agent instead of falling into else-branch
- Include REVIEW in queryable statuses when waiting
This commit is contained in:
Otto
2026-02-18 11:58:12 +00:00
parent aeb3e8a129
commit bf55f7b648
3 changed files with 95 additions and 27 deletions

View File

@@ -268,6 +268,7 @@ class AgentOutputTool(BaseTool):
ExecutionStatus.RUNNING,
ExecutionStatus.QUEUED,
ExecutionStatus.INCOMPLETE,
ExecutionStatus.REVIEW,
]
)
@@ -349,6 +350,11 @@ class AgentOutputTool(BaseTool):
message = f"Execution for agent '{agent.name}' failed"
elif execution.status == ExecutionStatus.TERMINATED:
message = f"Execution for agent '{agent.name}' was terminated"
elif execution.status == ExecutionStatus.REVIEW:
message = (
f"Execution for agent '{agent.name}' is awaiting human review. "
"The user needs to approve it before it can continue."
)
elif execution.status in (
ExecutionStatus.RUNNING,
ExecutionStatus.QUEUED,

View File

@@ -23,6 +23,16 @@ TERMINAL_STATUSES = frozenset(
}
)
# Statuses where execution is paused but not finished (e.g. human-in-the-loop)
PAUSED_STATUSES = frozenset(
{
ExecutionStatus.REVIEW,
}
)
# Statuses that mean "stop waiting" (terminal or paused)
STOP_WAITING_STATUSES = TERMINAL_STATUSES | PAUSED_STATUSES
async def wait_for_execution(
user_id: str,
@@ -31,10 +41,10 @@ async def wait_for_execution(
timeout_seconds: int,
) -> GraphExecution | None:
"""
Wait for an execution to reach a terminal status using Redis pubsub.
Wait for an execution to reach a terminal or paused status using Redis pubsub.
Uses asyncio.wait_for to ensure timeout is respected even when no events
are received.
Handles the race condition between checking status and subscribing by
re-checking the DB after the subscription is established.
Args:
user_id: User ID
@@ -47,7 +57,7 @@ async def wait_for_execution(
"""
exec_db = execution_db()
# First check current status - maybe it's already done
# Quick check maybe it's already done
execution = await exec_db.get_graph_execution(
user_id=user_id,
execution_id=execution_id,
@@ -56,10 +66,10 @@ async def wait_for_execution(
if not execution:
return None
# If already in terminal state, return immediately
if execution.status in TERMINAL_STATUSES:
if execution.status in STOP_WAITING_STATUSES:
logger.debug(
f"Execution {execution_id} already in terminal state: {execution.status}"
f"Execution {execution_id} already in stop-waiting state: "
f"{execution.status}"
)
return execution
@@ -68,14 +78,12 @@ async def wait_for_execution(
f"(current status: {execution.status})"
)
# Subscribe to execution updates via Redis pubsub
event_bus = AsyncRedisExecutionEventBus()
channel_key = f"{user_id}/{graph_id}/{execution_id}"
try:
# Use wait_for to enforce timeout on the entire listen operation
result = await asyncio.wait_for(
_listen_for_terminal_status(
_subscribe_and_wait(
event_bus, channel_key, user_id, execution_id, exec_db
),
timeout=timeout_seconds,
@@ -85,6 +93,8 @@ async def wait_for_execution(
logger.info(f"Timeout waiting for execution {execution_id}")
except Exception as e:
logger.error(f"Error waiting for execution: {e}", exc_info=True)
finally:
await event_bus.close()
# Return current state on timeout/error
return await exec_db.get_graph_execution(
@@ -94,7 +104,7 @@ async def wait_for_execution(
)
async def _listen_for_terminal_status(
async def _subscribe_and_wait(
event_bus: AsyncRedisExecutionEventBus,
channel_key: str,
user_id: str,
@@ -102,24 +112,54 @@ async def _listen_for_terminal_status(
exec_db: Any,
) -> GraphExecution | None:
"""
Listen for execution events until a terminal status is reached.
Subscribe to execution events and wait for a terminal/paused status.
This is a helper that gets wrapped in asyncio.wait_for for timeout handling.
To avoid the race condition where the execution completes between the
initial DB check and the Redis subscription, we:
1. Start listening (which subscribes internally)
2. Re-check the DB after subscription is active
3. If still running, wait for pubsub events
"""
async for event in event_bus.listen_events(channel_key):
# Only process GraphExecutionEvents (not NodeExecutionEvents)
if isinstance(event, GraphExecutionEvent):
logger.debug(f"Received execution update: {event.status}")
if event.status in TERMINAL_STATUSES:
# Fetch full execution with outputs
return await exec_db.get_graph_execution(
user_id=user_id,
execution_id=execution_id,
include_node_executions=False,
)
listen_iter = event_bus.listen_events(channel_key).__aiter__()
# Should not reach here normally (generator should yield indefinitely)
return None
# Start a background task to consume pubsub events
result_event: GraphExecutionEvent | None = None
done = asyncio.Event()
async def _consume():
nonlocal result_event
async for event in listen_iter:
if isinstance(event, GraphExecutionEvent):
logger.debug(f"Received execution update: {event.status}")
if event.status in STOP_WAITING_STATUSES:
result_event = event
done.set()
return
consume_task = asyncio.create_task(_consume())
# Give the subscription a moment to establish, then re-check DB
await asyncio.sleep(0.1)
execution = await exec_db.get_graph_execution(
user_id=user_id,
execution_id=execution_id,
include_node_executions=False,
)
if execution and execution.status in STOP_WAITING_STATUSES:
consume_task.cancel()
return execution
# Wait for the pubsub consumer to find a terminal event
await done.wait()
consume_task.cancel()
# Fetch full execution
return await exec_db.get_graph_execution(
user_id=user_id,
execution_id=execution_id,
include_node_executions=False,
)
def get_execution_outputs(execution: GraphExecution | None) -> dict[str, Any] | None:

View File

@@ -510,12 +510,34 @@ class RunAgentTool(BaseTool):
),
session_id=session_id,
)
elif completed and completed.status == ExecutionStatus.TERMINATED:
return ErrorResponse(
message=(
f"Agent '{library_agent.name}' execution was terminated. "
f"View details at {library_agent_link}."
),
session_id=session_id,
)
elif completed and completed.status == ExecutionStatus.REVIEW:
return ExecutionStartedResponse(
message=(
f"Agent '{library_agent.name}' is awaiting human review. "
f"Check at {library_agent_link}."
),
session_id=session_id,
execution_id=execution.id,
graph_id=library_agent.graph_id,
graph_name=library_agent.name,
library_agent_id=library_agent.id,
library_agent_link=library_agent_link,
)
else:
status = completed.status.value if completed else "unknown"
return ExecutionStartedResponse(
message=(
f"Agent '{library_agent.name}' is still {status} after "
f"{wait_for_result}s. Check results later at {library_agent_link}. "
f"{wait_for_result}s. Check results later at "
f"{library_agent_link}. "
f"Use view_agent_output with wait_if_running to check again."
),
session_id=session_id,