Compare commits

...

1 Commits

Author SHA1 Message Date
Otto (AGPT)
1b5abf97fb fix(backend): Use db_manager for workspace in add_graph_execution
get_or_create_workspace was called directly via UserWorkspace.prisma(),
bypassing the db_manager fallback pattern. When the global Prisma client
isn't connected (e.g. CoPilot/external API context), this raises
ClientNotConnectedError.

Use the same if prisma.is_connected() fallback pattern as all other DB
calls in the function.

Resolves Sentry AUTOGPT-SERVER-83T and related issues.
2026-03-06 06:34:39 +00:00
2 changed files with 45 additions and 41 deletions

View File

@@ -15,6 +15,7 @@ from backend.data import graph as graph_db
from backend.data import human_review as human_review_db
from backend.data import onboarding as onboarding_db
from backend.data import user as user_db
from backend.data import workspace as workspace_db
# Import dynamic field utilities from centralized location
from backend.data.block import BlockInput, BlockOutputEntry
@@ -32,7 +33,6 @@ from backend.data.execution import (
from backend.data.graph import GraphModel, Node
from backend.data.model import USER_TIMEZONE_NOT_SET, CredentialsMetaInput, GraphInput
from backend.data.rabbitmq import Exchange, ExchangeType, Queue, RabbitMQConfig
from backend.data.workspace import get_or_create_workspace
from backend.util.clients import (
get_async_execution_event_bus,
get_async_execution_queue,
@@ -294,9 +294,9 @@ async def _validate_node_input_credentials(
if field_is_optional:
continue # Don't add error, will be marked for skip after loop
else:
credential_errors[node.id][
field_name
] = "These credentials are required"
credential_errors[node.id][field_name] = (
"These credentials are required"
)
continue
credentials_meta = credentials_meta_type.model_validate(field_value)
@@ -315,15 +315,15 @@ async def _validate_node_input_credentials(
except Exception as e:
# Handle any errors fetching credentials
# If credentials were explicitly configured but unavailable, it's an error
credential_errors[node.id][
field_name
] = f"Credentials not available: {e}"
credential_errors[node.id][field_name] = (
f"Credentials not available: {e}"
)
continue
if not credentials:
credential_errors[node.id][
field_name
] = f"Unknown credentials #{credentials_meta.id}"
credential_errors[node.id][field_name] = (
f"Unknown credentials #{credentials_meta.id}"
)
continue
if (
@@ -336,9 +336,9 @@ async def _validate_node_input_credentials(
f"{credentials_meta.type}<>{credentials.type};"
f"{credentials_meta.provider}<>{credentials.provider}"
)
credential_errors[node.id][
field_name
] = "Invalid credentials: type/provider mismatch"
credential_errors[node.id][field_name] = (
"Invalid credentials: type/provider mismatch"
)
continue
# If node has optional credentials and any are missing, allow running without.
@@ -349,8 +349,7 @@ async def _validate_node_input_credentials(
and node.id not in credential_errors
):
logger.info(
f"Node #{node.id}: optional credentials not configured, "
"running without"
f"Node #{node.id}: optional credentials not configured, running without"
)
return credential_errors, nodes_to_skip
@@ -412,9 +411,10 @@ async def validate_graph_with_credentials(
)
# Get credential input/availability/validation errors and nodes to skip
node_credential_input_errors, nodes_to_skip = (
await _validate_node_input_credentials(graph, user_id, nodes_input_masks)
)
(
node_credential_input_errors,
nodes_to_skip,
) = await _validate_node_input_credentials(graph, user_id, nodes_input_masks)
# Merge credential errors with structural errors
for node_id, field_errors in node_credential_input_errors.items():
@@ -562,13 +562,14 @@ async def validate_and_construct_node_execution_input(
nodes_input_masks or {},
)
starting_nodes_input, nodes_to_skip = (
await _construct_starting_node_execution_input(
graph=graph,
user_id=user_id,
graph_inputs=graph_inputs,
nodes_input_masks=nodes_input_masks,
)
(
starting_nodes_input,
nodes_to_skip,
) = await _construct_starting_node_execution_input(
graph=graph,
user_id=user_id,
graph_inputs=graph_inputs,
nodes_input_masks=nodes_input_masks,
)
return graph, starting_nodes_input, nodes_input_masks, nodes_to_skip
@@ -632,8 +633,7 @@ def create_execution_queue_config() -> RabbitMQConfig:
# Solution: Disable consumer timeout entirely - let graphs run indefinitely
# Safety: Heartbeat mechanism now handles dead consumer detection instead
# Use case: Graph executions that take hours to complete (AI model training, etc.)
"x-consumer-timeout": GRACEFUL_SHUTDOWN_TIMEOUT_SECONDS
* 1000,
"x-consumer-timeout": GRACEFUL_SHUTDOWN_TIMEOUT_SECONDS * 1000,
},
)
cancel_queue = Queue(
@@ -831,8 +831,9 @@ async def add_graph_execution(
udb = user_db
gdb = graph_db
odb = onboarding_db
wdb = workspace_db
else:
edb = udb = gdb = odb = get_database_manager_async_client()
edb = udb = gdb = odb = wdb = get_database_manager_async_client()
# Get or create the graph execution
if graph_exec_id:
@@ -859,16 +860,19 @@ async def add_graph_execution(
)
# Create new execution
graph, starting_nodes_input, compiled_nodes_input_masks, nodes_to_skip = (
await validate_and_construct_node_execution_input(
graph_id=graph_id,
user_id=user_id,
graph_inputs=inputs or {},
graph_version=graph_version,
graph_credentials_inputs=graph_credentials_inputs,
nodes_input_masks=nodes_input_masks,
is_sub_graph=parent_exec_id is not None,
)
(
graph,
starting_nodes_input,
compiled_nodes_input_masks,
nodes_to_skip,
) = await validate_and_construct_node_execution_input(
graph_id=graph_id,
user_id=user_id,
graph_inputs=inputs or {},
graph_version=graph_version,
graph_credentials_inputs=graph_credentials_inputs,
nodes_input_masks=nodes_input_masks,
is_sub_graph=parent_exec_id is not None,
)
graph_exec = await edb.create_graph_execution(
@@ -892,7 +896,7 @@ async def add_graph_execution(
if execution_context is None:
user = await udb.get_user_by_id(user_id)
settings = await gdb.get_graph_settings(user_id=user_id, graph_id=graph_id)
workspace = await get_or_create_workspace(user_id)
workspace = await wdb.get_or_create_workspace(user_id)
execution_context = ExecutionContext(
# Execution identity

View File

@@ -371,7 +371,7 @@ async def test_add_graph_execution_is_repeatable(mocker: MockerFixture):
mock_workspace = mocker.MagicMock()
mock_workspace.id = "test-workspace-id"
mocker.patch(
"backend.executor.utils.get_or_create_workspace",
"backend.executor.utils.workspace_db.get_or_create_workspace",
new=mocker.AsyncMock(return_value=mock_workspace),
)
@@ -652,7 +652,7 @@ async def test_add_graph_execution_with_nodes_to_skip(mocker: MockerFixture):
mock_workspace = mocker.MagicMock()
mock_workspace.id = "test-workspace-id"
mocker.patch(
"backend.executor.utils.get_or_create_workspace",
"backend.executor.utils.workspace_db.get_or_create_workspace",
new=mocker.AsyncMock(return_value=mock_workspace),
)