style: move helper after function in agent_output.py too

This commit is contained in:
Otto
2026-02-17 08:47:08 +00:00
parent 55dcf9359a
commit 92ddb57460

View File

@@ -192,90 +192,6 @@ class AgentOutputTool(BaseTool):
def requires_auth(self) -> bool:
return True
async def _wait_for_execution_completion(
self,
user_id: str,
graph_id: str,
execution_id: str,
timeout_seconds: int,
) -> GraphExecution | None:
"""
Wait for an execution to reach a terminal status using Redis pubsub.
Args:
user_id: User ID
graph_id: Graph ID
execution_id: Execution ID to wait for
timeout_seconds: Max seconds to wait
Returns:
The execution with current status, or None on error
"""
# First check current status - maybe it's already done
execution = await execution_db.get_graph_execution(
user_id=user_id,
execution_id=execution_id,
include_node_executions=False,
)
if not execution:
return None
# If already in terminal state, return immediately
if execution.status in TERMINAL_STATUSES:
logger.debug(
f"Execution {execution_id} already in terminal state: {execution.status}"
)
return execution
logger.info(
f"Waiting up to {timeout_seconds}s for execution {execution_id} "
f"(current status: {execution.status})"
)
# Subscribe to execution updates via Redis pubsub
event_bus = AsyncRedisExecutionEventBus()
channel_key = f"{user_id}/{graph_id}/{execution_id}"
try:
deadline = asyncio.get_event_loop().time() + timeout_seconds
async for event in event_bus.listen_events(channel_key):
# Check if we've exceeded the timeout
remaining = deadline - asyncio.get_event_loop().time()
if remaining <= 0:
logger.info(f"Timeout waiting for execution {execution_id}")
break
# Only process GraphExecutionEvents (not NodeExecutionEvents)
if isinstance(event, GraphExecutionEvent):
logger.debug(f"Received execution update: {event.status}")
if event.status in TERMINAL_STATUSES:
# Fetch full execution with outputs
return await execution_db.get_graph_execution(
user_id=user_id,
execution_id=execution_id,
include_node_executions=False,
)
except asyncio.TimeoutError:
logger.info(f"Timeout waiting for execution {execution_id}")
except Exception as e:
logger.error(f"Error waiting for execution: {e}", exc_info=True)
finally:
# Clean up pubsub connection
try:
if hasattr(event_bus, "_pubsub") and event_bus._pubsub:
await event_bus._pubsub.close()
except Exception:
pass
# Return current state on timeout
return await execution_db.get_graph_execution(
user_id=user_id,
execution_id=execution_id,
include_node_executions=False,
)
async def _resolve_agent(
self,
user_id: str,
@@ -616,3 +532,87 @@ class AgentOutputTool(BaseTool):
)
return self._build_response(agent, execution, available_executions, session_id)
async def _wait_for_execution_completion(
self,
user_id: str,
graph_id: str,
execution_id: str,
timeout_seconds: int,
) -> GraphExecution | None:
"""
Wait for an execution to reach a terminal status using Redis pubsub.
Args:
user_id: User ID
graph_id: Graph ID
execution_id: Execution ID to wait for
timeout_seconds: Max seconds to wait
Returns:
The execution with current status, or None on error
"""
# First check current status - maybe it's already done
execution = await execution_db.get_graph_execution(
user_id=user_id,
execution_id=execution_id,
include_node_executions=False,
)
if not execution:
return None
# If already in terminal state, return immediately
if execution.status in TERMINAL_STATUSES:
logger.debug(
f"Execution {execution_id} already in terminal state: {execution.status}"
)
return execution
logger.info(
f"Waiting up to {timeout_seconds}s for execution {execution_id} "
f"(current status: {execution.status})"
)
# Subscribe to execution updates via Redis pubsub
event_bus = AsyncRedisExecutionEventBus()
channel_key = f"{user_id}/{graph_id}/{execution_id}"
try:
deadline = asyncio.get_event_loop().time() + timeout_seconds
async for event in event_bus.listen_events(channel_key):
# Check if we've exceeded the timeout
remaining = deadline - asyncio.get_event_loop().time()
if remaining <= 0:
logger.info(f"Timeout waiting for execution {execution_id}")
break
# Only process GraphExecutionEvents (not NodeExecutionEvents)
if isinstance(event, GraphExecutionEvent):
logger.debug(f"Received execution update: {event.status}")
if event.status in TERMINAL_STATUSES:
# Fetch full execution with outputs
return await execution_db.get_graph_execution(
user_id=user_id,
execution_id=execution_id,
include_node_executions=False,
)
except asyncio.TimeoutError:
logger.info(f"Timeout waiting for execution {execution_id}")
except Exception as e:
logger.error(f"Error waiting for execution: {e}", exc_info=True)
finally:
# Clean up pubsub connection
try:
if hasattr(event_bus, "_pubsub") and event_bus._pubsub:
await event_bus._pubsub.close()
except Exception:
pass
# Return current state on timeout
return await execution_db.get_graph_execution(
user_id=user_id,
execution_id=execution_id,
include_node_executions=False,
)