mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-04-08 03:00:28 -04:00
Merge branch 'master' of https://github.com/Significant-Gravitas/AutoGPT into ntindle/secrt-1218-add-ability-to-reject-approved-agents-in-admin-dashboard
This commit is contained in:
@@ -1208,6 +1208,9 @@ class ExecutionManager(AppProcess):
|
||||
)
|
||||
return
|
||||
|
||||
# Check if channel is closed and force reconnection if needed
|
||||
if not self.cancel_client.is_ready:
|
||||
self.cancel_client.disconnect()
|
||||
self.cancel_client.connect()
|
||||
cancel_channel = self.cancel_client.get_channel()
|
||||
cancel_channel.basic_consume(
|
||||
@@ -1237,6 +1240,9 @@ class ExecutionManager(AppProcess):
|
||||
)
|
||||
return
|
||||
|
||||
# Check if channel is closed and force reconnection if needed
|
||||
if not self.run_client.is_ready:
|
||||
self.run_client.disconnect()
|
||||
self.run_client.connect()
|
||||
run_channel = self.run_client.get_channel()
|
||||
run_channel.basic_qos(prefetch_count=self.pool_size)
|
||||
|
||||
@@ -269,7 +269,9 @@ class Scheduler(AppService):
|
||||
|
||||
self.scheduler = BackgroundScheduler(
|
||||
executors={
|
||||
"default": ThreadPoolExecutor(max_workers=10), # Max 10 concurrent jobs
|
||||
"default": ThreadPoolExecutor(
|
||||
max_workers=self.db_pool_size()
|
||||
), # Match DB pool size to prevent resource contention
|
||||
},
|
||||
job_defaults={
|
||||
"coalesce": True, # Skip redundant missed jobs - just run the latest
|
||||
|
||||
@@ -548,7 +548,7 @@ async def validate_graph_with_credentials(
|
||||
return node_input_errors
|
||||
|
||||
|
||||
async def _construct_node_execution_input(
|
||||
async def _construct_starting_node_execution_input(
|
||||
graph: GraphModel,
|
||||
user_id: str,
|
||||
graph_inputs: BlockInput,
|
||||
@@ -622,7 +622,7 @@ async def validate_and_construct_node_execution_input(
|
||||
graph_version: Optional[int] = None,
|
||||
graph_credentials_inputs: Optional[dict[str, CredentialsMetaInput]] = None,
|
||||
nodes_input_masks: Optional[dict[str, dict[str, JsonValue]]] = None,
|
||||
) -> tuple[GraphModel, list[tuple[str, BlockInput]]]:
|
||||
) -> tuple[GraphModel, list[tuple[str, BlockInput]], dict[str, dict[str, JsonValue]]]:
|
||||
"""
|
||||
Public wrapper that handles graph fetching, credential mapping, and validation+construction.
|
||||
This centralizes the logic used by both scheduler validation and actual execution.
|
||||
@@ -666,14 +666,14 @@ async def validate_and_construct_node_execution_input(
|
||||
nodes_input_masks or {},
|
||||
)
|
||||
|
||||
starting_nodes_input = await _construct_node_execution_input(
|
||||
starting_nodes_input = await _construct_starting_node_execution_input(
|
||||
graph=graph,
|
||||
user_id=user_id,
|
||||
graph_inputs=graph_inputs,
|
||||
nodes_input_masks=nodes_input_masks,
|
||||
)
|
||||
|
||||
return graph, starting_nodes_input
|
||||
return graph, starting_nodes_input, nodes_input_masks
|
||||
|
||||
|
||||
def _merge_nodes_input_masks(
|
||||
@@ -856,13 +856,15 @@ async def add_graph_execution(
|
||||
else:
|
||||
edb = get_database_manager_async_client()
|
||||
|
||||
graph, starting_nodes_input = await validate_and_construct_node_execution_input(
|
||||
graph_id=graph_id,
|
||||
user_id=user_id,
|
||||
graph_inputs=inputs or {},
|
||||
graph_version=graph_version,
|
||||
graph_credentials_inputs=graph_credentials_inputs,
|
||||
nodes_input_masks=nodes_input_masks,
|
||||
graph, starting_nodes_input, nodes_input_masks = (
|
||||
await validate_and_construct_node_execution_input(
|
||||
graph_id=graph_id,
|
||||
user_id=user_id,
|
||||
graph_inputs=inputs or {},
|
||||
graph_version=graph_version,
|
||||
graph_credentials_inputs=graph_credentials_inputs,
|
||||
nodes_input_masks=nodes_input_masks,
|
||||
)
|
||||
)
|
||||
graph_exec = None
|
||||
|
||||
|
||||
@@ -676,7 +676,15 @@ async def update_graph(
|
||||
# Handle deactivation of the previously active version
|
||||
await on_graph_deactivate(current_active_version, user_id=user_id)
|
||||
|
||||
return new_graph_version
|
||||
# Fetch new graph version *with sub-graphs* (needed for credentials input schema)
|
||||
new_graph_version_with_subgraphs = await graph_db.get_graph(
|
||||
graph_id,
|
||||
new_graph_version.version,
|
||||
user_id=user_id,
|
||||
include_subgraphs=True,
|
||||
)
|
||||
assert new_graph_version_with_subgraphs # make type checker happy
|
||||
return new_graph_version_with_subgraphs
|
||||
|
||||
|
||||
@v1_router.put(
|
||||
|
||||
@@ -241,7 +241,11 @@ async def get_library_agent_by_graph_id(
|
||||
)
|
||||
if not agent:
|
||||
return None
|
||||
return library_model.LibraryAgent.from_db(agent)
|
||||
|
||||
assert agent.AgentGraph # make type checker happy
|
||||
# Include sub-graphs so we can make a full credentials input schema
|
||||
sub_graphs = await graph_db.get_sub_graphs(agent.AgentGraph)
|
||||
return library_model.LibraryAgent.from_db(agent, sub_graphs=sub_graphs)
|
||||
except prisma.errors.PrismaError as e:
|
||||
logger.error(f"Database error fetching library agent by graph ID: {e}")
|
||||
raise store_exceptions.DatabaseError("Failed to fetch library agent") from e
|
||||
|
||||
@@ -71,4 +71,10 @@ class GraphValidationError(ValueError):
|
||||
self.node_errors = node_errors or {}
|
||||
|
||||
def __str__(self):
|
||||
return self.message
|
||||
return self.message + "".join(
|
||||
[
|
||||
f"\n {node_id}:"
|
||||
+ "".join([f"\n {k}: {e}" for k, e in errors.items()])
|
||||
for node_id, errors in self.node_errors.items()
|
||||
]
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user