mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-02-18 02:32:04 -05:00
style: move helper after function it helps
This commit is contained in:
@@ -458,96 +458,6 @@ class RunAgentTool(BaseTool):
|
||||
trigger_info=trigger_info,
|
||||
)
|
||||
|
||||
async def _wait_for_execution_completion(
|
||||
self,
|
||||
user_id: str,
|
||||
graph_id: str,
|
||||
execution_id: str,
|
||||
timeout_seconds: int,
|
||||
) -> tuple[ExecutionStatus, dict[str, Any] | None]:
|
||||
"""
|
||||
Wait for an execution to reach a terminal status using Redis pubsub.
|
||||
|
||||
Args:
|
||||
user_id: User ID
|
||||
graph_id: Graph ID
|
||||
execution_id: Execution ID to wait for
|
||||
timeout_seconds: Max seconds to wait
|
||||
|
||||
Returns:
|
||||
Tuple of (final_status, outputs_dict_or_None)
|
||||
"""
|
||||
# First check current status - maybe it's already done
|
||||
execution = await execution_db.get_graph_execution(
|
||||
user_id=user_id,
|
||||
execution_id=execution_id,
|
||||
include_node_executions=False,
|
||||
)
|
||||
if not execution:
|
||||
return ExecutionStatus.FAILED, None
|
||||
|
||||
# If already in terminal state, return immediately
|
||||
if execution.status in TERMINAL_STATUSES:
|
||||
logger.debug(
|
||||
f"Execution {execution_id} already in terminal state: {execution.status}"
|
||||
)
|
||||
return execution.status, execution.outputs
|
||||
|
||||
logger.info(
|
||||
f"Waiting up to {timeout_seconds}s for execution {execution_id} "
|
||||
f"(current status: {execution.status})"
|
||||
)
|
||||
|
||||
# Subscribe to execution updates via Redis pubsub
|
||||
event_bus = AsyncRedisExecutionEventBus()
|
||||
channel_key = f"{user_id}/{graph_id}/{execution_id}"
|
||||
|
||||
try:
|
||||
deadline = asyncio.get_event_loop().time() + timeout_seconds
|
||||
|
||||
async for event in event_bus.listen_events(channel_key):
|
||||
# Check if we've exceeded the timeout
|
||||
remaining = deadline - asyncio.get_event_loop().time()
|
||||
if remaining <= 0:
|
||||
logger.info(f"Timeout waiting for execution {execution_id}")
|
||||
break
|
||||
|
||||
# Only process GraphExecutionEvents (not NodeExecutionEvents)
|
||||
if isinstance(event, GraphExecutionEvent):
|
||||
logger.debug(f"Received execution update: {event.status}")
|
||||
if event.status in TERMINAL_STATUSES:
|
||||
# Fetch full execution with outputs
|
||||
final_exec = await execution_db.get_graph_execution(
|
||||
user_id=user_id,
|
||||
execution_id=execution_id,
|
||||
include_node_executions=False,
|
||||
)
|
||||
if final_exec:
|
||||
return final_exec.status, final_exec.outputs
|
||||
return event.status, None
|
||||
|
||||
except asyncio.TimeoutError:
|
||||
logger.info(f"Timeout waiting for execution {execution_id}")
|
||||
except Exception as e:
|
||||
logger.error(f"Error waiting for execution: {e}", exc_info=True)
|
||||
finally:
|
||||
# Clean up pubsub connection
|
||||
try:
|
||||
if hasattr(event_bus, "_pubsub") and event_bus._pubsub:
|
||||
await event_bus._pubsub.close()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Return current state on timeout
|
||||
execution = await execution_db.get_graph_execution(
|
||||
user_id=user_id,
|
||||
execution_id=execution_id,
|
||||
include_node_executions=False,
|
||||
)
|
||||
if execution:
|
||||
return execution.status, execution.outputs
|
||||
return ExecutionStatus.QUEUED, None
|
||||
|
||||
async def _run_agent(
|
||||
self,
|
||||
user_id: str,
|
||||
@@ -660,6 +570,96 @@ class RunAgentTool(BaseTool):
|
||||
library_agent_link=library_agent_link,
|
||||
)
|
||||
|
||||
async def _wait_for_execution_completion(
|
||||
self,
|
||||
user_id: str,
|
||||
graph_id: str,
|
||||
execution_id: str,
|
||||
timeout_seconds: int,
|
||||
) -> tuple[ExecutionStatus, dict[str, Any] | None]:
|
||||
"""
|
||||
Wait for an execution to reach a terminal status using Redis pubsub.
|
||||
|
||||
Args:
|
||||
user_id: User ID
|
||||
graph_id: Graph ID
|
||||
execution_id: Execution ID to wait for
|
||||
timeout_seconds: Max seconds to wait
|
||||
|
||||
Returns:
|
||||
Tuple of (final_status, outputs_dict_or_None)
|
||||
"""
|
||||
# First check current status - maybe it's already done
|
||||
execution = await execution_db.get_graph_execution(
|
||||
user_id=user_id,
|
||||
execution_id=execution_id,
|
||||
include_node_executions=False,
|
||||
)
|
||||
if not execution:
|
||||
return ExecutionStatus.FAILED, None
|
||||
|
||||
# If already in terminal state, return immediately
|
||||
if execution.status in TERMINAL_STATUSES:
|
||||
logger.debug(
|
||||
f"Execution {execution_id} already in terminal state: {execution.status}"
|
||||
)
|
||||
return execution.status, execution.outputs
|
||||
|
||||
logger.info(
|
||||
f"Waiting up to {timeout_seconds}s for execution {execution_id} "
|
||||
f"(current status: {execution.status})"
|
||||
)
|
||||
|
||||
# Subscribe to execution updates via Redis pubsub
|
||||
event_bus = AsyncRedisExecutionEventBus()
|
||||
channel_key = f"{user_id}/{graph_id}/{execution_id}"
|
||||
|
||||
try:
|
||||
deadline = asyncio.get_event_loop().time() + timeout_seconds
|
||||
|
||||
async for event in event_bus.listen_events(channel_key):
|
||||
# Check if we've exceeded the timeout
|
||||
remaining = deadline - asyncio.get_event_loop().time()
|
||||
if remaining <= 0:
|
||||
logger.info(f"Timeout waiting for execution {execution_id}")
|
||||
break
|
||||
|
||||
# Only process GraphExecutionEvents (not NodeExecutionEvents)
|
||||
if isinstance(event, GraphExecutionEvent):
|
||||
logger.debug(f"Received execution update: {event.status}")
|
||||
if event.status in TERMINAL_STATUSES:
|
||||
# Fetch full execution with outputs
|
||||
final_exec = await execution_db.get_graph_execution(
|
||||
user_id=user_id,
|
||||
execution_id=execution_id,
|
||||
include_node_executions=False,
|
||||
)
|
||||
if final_exec:
|
||||
return final_exec.status, final_exec.outputs
|
||||
return event.status, None
|
||||
|
||||
except asyncio.TimeoutError:
|
||||
logger.info(f"Timeout waiting for execution {execution_id}")
|
||||
except Exception as e:
|
||||
logger.error(f"Error waiting for execution: {e}", exc_info=True)
|
||||
finally:
|
||||
# Clean up pubsub connection
|
||||
try:
|
||||
if hasattr(event_bus, "_pubsub") and event_bus._pubsub:
|
||||
await event_bus._pubsub.close()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Return current state on timeout
|
||||
execution = await execution_db.get_graph_execution(
|
||||
user_id=user_id,
|
||||
execution_id=execution_id,
|
||||
include_node_executions=False,
|
||||
)
|
||||
if execution:
|
||||
return execution.status, execution.outputs
|
||||
return ExecutionStatus.QUEUED, None
|
||||
|
||||
async def _schedule_agent(
|
||||
self,
|
||||
user_id: str,
|
||||
|
||||
Reference in New Issue
Block a user