mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-03-17 03:00:27 -04:00
Compare commits
1 Commits
feat/githu
...
otto/prism
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1b5abf97fb |
@@ -15,6 +15,7 @@ from backend.data import graph as graph_db
|
||||
from backend.data import human_review as human_review_db
|
||||
from backend.data import onboarding as onboarding_db
|
||||
from backend.data import user as user_db
|
||||
from backend.data import workspace as workspace_db
|
||||
|
||||
# Import dynamic field utilities from centralized location
|
||||
from backend.data.block import BlockInput, BlockOutputEntry
|
||||
@@ -32,7 +33,6 @@ from backend.data.execution import (
|
||||
from backend.data.graph import GraphModel, Node
|
||||
from backend.data.model import USER_TIMEZONE_NOT_SET, CredentialsMetaInput, GraphInput
|
||||
from backend.data.rabbitmq import Exchange, ExchangeType, Queue, RabbitMQConfig
|
||||
from backend.data.workspace import get_or_create_workspace
|
||||
from backend.util.clients import (
|
||||
get_async_execution_event_bus,
|
||||
get_async_execution_queue,
|
||||
@@ -294,9 +294,9 @@ async def _validate_node_input_credentials(
|
||||
if field_is_optional:
|
||||
continue # Don't add error, will be marked for skip after loop
|
||||
else:
|
||||
credential_errors[node.id][
|
||||
field_name
|
||||
] = "These credentials are required"
|
||||
credential_errors[node.id][field_name] = (
|
||||
"These credentials are required"
|
||||
)
|
||||
continue
|
||||
|
||||
credentials_meta = credentials_meta_type.model_validate(field_value)
|
||||
@@ -315,15 +315,15 @@ async def _validate_node_input_credentials(
|
||||
except Exception as e:
|
||||
# Handle any errors fetching credentials
|
||||
# If credentials were explicitly configured but unavailable, it's an error
|
||||
credential_errors[node.id][
|
||||
field_name
|
||||
] = f"Credentials not available: {e}"
|
||||
credential_errors[node.id][field_name] = (
|
||||
f"Credentials not available: {e}"
|
||||
)
|
||||
continue
|
||||
|
||||
if not credentials:
|
||||
credential_errors[node.id][
|
||||
field_name
|
||||
] = f"Unknown credentials #{credentials_meta.id}"
|
||||
credential_errors[node.id][field_name] = (
|
||||
f"Unknown credentials #{credentials_meta.id}"
|
||||
)
|
||||
continue
|
||||
|
||||
if (
|
||||
@@ -336,9 +336,9 @@ async def _validate_node_input_credentials(
|
||||
f"{credentials_meta.type}<>{credentials.type};"
|
||||
f"{credentials_meta.provider}<>{credentials.provider}"
|
||||
)
|
||||
credential_errors[node.id][
|
||||
field_name
|
||||
] = "Invalid credentials: type/provider mismatch"
|
||||
credential_errors[node.id][field_name] = (
|
||||
"Invalid credentials: type/provider mismatch"
|
||||
)
|
||||
continue
|
||||
|
||||
# If node has optional credentials and any are missing, allow running without.
|
||||
@@ -349,8 +349,7 @@ async def _validate_node_input_credentials(
|
||||
and node.id not in credential_errors
|
||||
):
|
||||
logger.info(
|
||||
f"Node #{node.id}: optional credentials not configured, "
|
||||
"running without"
|
||||
f"Node #{node.id}: optional credentials not configured, running without"
|
||||
)
|
||||
|
||||
return credential_errors, nodes_to_skip
|
||||
@@ -412,9 +411,10 @@ async def validate_graph_with_credentials(
|
||||
)
|
||||
|
||||
# Get credential input/availability/validation errors and nodes to skip
|
||||
node_credential_input_errors, nodes_to_skip = (
|
||||
await _validate_node_input_credentials(graph, user_id, nodes_input_masks)
|
||||
)
|
||||
(
|
||||
node_credential_input_errors,
|
||||
nodes_to_skip,
|
||||
) = await _validate_node_input_credentials(graph, user_id, nodes_input_masks)
|
||||
|
||||
# Merge credential errors with structural errors
|
||||
for node_id, field_errors in node_credential_input_errors.items():
|
||||
@@ -562,13 +562,14 @@ async def validate_and_construct_node_execution_input(
|
||||
nodes_input_masks or {},
|
||||
)
|
||||
|
||||
starting_nodes_input, nodes_to_skip = (
|
||||
await _construct_starting_node_execution_input(
|
||||
graph=graph,
|
||||
user_id=user_id,
|
||||
graph_inputs=graph_inputs,
|
||||
nodes_input_masks=nodes_input_masks,
|
||||
)
|
||||
(
|
||||
starting_nodes_input,
|
||||
nodes_to_skip,
|
||||
) = await _construct_starting_node_execution_input(
|
||||
graph=graph,
|
||||
user_id=user_id,
|
||||
graph_inputs=graph_inputs,
|
||||
nodes_input_masks=nodes_input_masks,
|
||||
)
|
||||
|
||||
return graph, starting_nodes_input, nodes_input_masks, nodes_to_skip
|
||||
@@ -632,8 +633,7 @@ def create_execution_queue_config() -> RabbitMQConfig:
|
||||
# Solution: Disable consumer timeout entirely - let graphs run indefinitely
|
||||
# Safety: Heartbeat mechanism now handles dead consumer detection instead
|
||||
# Use case: Graph executions that take hours to complete (AI model training, etc.)
|
||||
"x-consumer-timeout": GRACEFUL_SHUTDOWN_TIMEOUT_SECONDS
|
||||
* 1000,
|
||||
"x-consumer-timeout": GRACEFUL_SHUTDOWN_TIMEOUT_SECONDS * 1000,
|
||||
},
|
||||
)
|
||||
cancel_queue = Queue(
|
||||
@@ -831,8 +831,9 @@ async def add_graph_execution(
|
||||
udb = user_db
|
||||
gdb = graph_db
|
||||
odb = onboarding_db
|
||||
wdb = workspace_db
|
||||
else:
|
||||
edb = udb = gdb = odb = get_database_manager_async_client()
|
||||
edb = udb = gdb = odb = wdb = get_database_manager_async_client()
|
||||
|
||||
# Get or create the graph execution
|
||||
if graph_exec_id:
|
||||
@@ -859,16 +860,19 @@ async def add_graph_execution(
|
||||
)
|
||||
|
||||
# Create new execution
|
||||
graph, starting_nodes_input, compiled_nodes_input_masks, nodes_to_skip = (
|
||||
await validate_and_construct_node_execution_input(
|
||||
graph_id=graph_id,
|
||||
user_id=user_id,
|
||||
graph_inputs=inputs or {},
|
||||
graph_version=graph_version,
|
||||
graph_credentials_inputs=graph_credentials_inputs,
|
||||
nodes_input_masks=nodes_input_masks,
|
||||
is_sub_graph=parent_exec_id is not None,
|
||||
)
|
||||
(
|
||||
graph,
|
||||
starting_nodes_input,
|
||||
compiled_nodes_input_masks,
|
||||
nodes_to_skip,
|
||||
) = await validate_and_construct_node_execution_input(
|
||||
graph_id=graph_id,
|
||||
user_id=user_id,
|
||||
graph_inputs=inputs or {},
|
||||
graph_version=graph_version,
|
||||
graph_credentials_inputs=graph_credentials_inputs,
|
||||
nodes_input_masks=nodes_input_masks,
|
||||
is_sub_graph=parent_exec_id is not None,
|
||||
)
|
||||
|
||||
graph_exec = await edb.create_graph_execution(
|
||||
@@ -892,7 +896,7 @@ async def add_graph_execution(
|
||||
if execution_context is None:
|
||||
user = await udb.get_user_by_id(user_id)
|
||||
settings = await gdb.get_graph_settings(user_id=user_id, graph_id=graph_id)
|
||||
workspace = await get_or_create_workspace(user_id)
|
||||
workspace = await wdb.get_or_create_workspace(user_id)
|
||||
|
||||
execution_context = ExecutionContext(
|
||||
# Execution identity
|
||||
|
||||
@@ -371,7 +371,7 @@ async def test_add_graph_execution_is_repeatable(mocker: MockerFixture):
|
||||
mock_workspace = mocker.MagicMock()
|
||||
mock_workspace.id = "test-workspace-id"
|
||||
mocker.patch(
|
||||
"backend.executor.utils.get_or_create_workspace",
|
||||
"backend.executor.utils.workspace_db.get_or_create_workspace",
|
||||
new=mocker.AsyncMock(return_value=mock_workspace),
|
||||
)
|
||||
|
||||
@@ -652,7 +652,7 @@ async def test_add_graph_execution_with_nodes_to_skip(mocker: MockerFixture):
|
||||
mock_workspace = mocker.MagicMock()
|
||||
mock_workspace.id = "test-workspace-id"
|
||||
mocker.patch(
|
||||
"backend.executor.utils.get_or_create_workspace",
|
||||
"backend.executor.utils.workspace_db.get_or_create_workspace",
|
||||
new=mocker.AsyncMock(return_value=mock_workspace),
|
||||
)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user