pr comments

This commit is contained in:
Swifty
2026-01-08 14:43:07 +01:00
parent 1c8cba9c5f
commit 402bec4595
2 changed files with 29 additions and 32 deletions

View File

@@ -964,6 +964,12 @@ async def update_node_execution_status(
execution_data: BlockInput | None = None,
stats: dict[str, Any] | None = None,
) -> NodeExecutionResult:
"""
Update a node execution's status with validation of allowed transitions.
⚠️ Internal executor use only - no user_id check. Callers (executor/manager.py)
are responsible for validating user authorization before invoking this function.
"""
if status == ExecutionStatus.QUEUED and execution_data is None:
raise ValueError("Execution data must be provided when queuing an execution.")
@@ -974,23 +980,27 @@ async def update_node_execution_status(
f"Invalid status transition: {status} has no valid source statuses"
)
where_clause: Any = {
"id": node_exec_id,
"executionStatus": {"in": [s.value for s in allowed_from]},
}
if res := await AgentNodeExecution.prisma().update(
where=where_clause,
# Fetch current execution to validate status transition
current = await AgentNodeExecution.prisma().find_unique(
where={"id": node_exec_id}, include=EXECUTION_RESULT_INCLUDE
)
if not current:
raise ValueError(f"Execution {node_exec_id} not found.")
# Validate current status allows transition to the new status
if current.executionStatus not in allowed_from:
# Return current state without updating if transition is not allowed
return NodeExecutionResult.from_db(current)
# Perform the update with only the unique identifier
res = await AgentNodeExecution.prisma().update(
where={"id": node_exec_id},
data=_get_update_status_data(status, execution_data, stats),
include=EXECUTION_RESULT_INCLUDE,
):
return NodeExecutionResult.from_db(res)
if res := await AgentNodeExecution.prisma().find_unique(
where={"id": node_exec_id}, include=EXECUTION_RESULT_INCLUDE
):
return NodeExecutionResult.from_db(res)
raise ValueError(f"Execution {node_exec_id} not found.")
)
if not res:
raise ValueError(f"Failed to update execution {node_exec_id}.")
return NodeExecutionResult.from_db(res)
def _get_update_status_data(

View File

@@ -96,6 +96,7 @@ async def reset_user_onboarding(user_id: str):
async def update_user_onboarding(user_id: str, data: UserOnboardingUpdate):
update: UserOnboardingUpdateInput = {}
# get_user_onboarding guarantees the record exists via upsert
onboarding = await get_user_onboarding(user_id)
if data.walletShown:
update["walletShown"] = data.walletShown
@@ -114,26 +115,12 @@ async def update_user_onboarding(user_id: str, data: UserOnboardingUpdate):
if data.onboardingAgentExecutionId is not None:
update["onboardingAgentExecutionId"] = data.onboardingAgentExecutionId
# Build create_input manually to avoid type issues with Prisma update types
create_input = UserOnboardingCreateInput(
userId=user_id,
walletShown=data.walletShown if data.walletShown else False,
notified=(
list(set(data.notified + onboarding.notified))
if data.notified is not None
else []
),
usageReason=data.usageReason,
integrations=data.integrations if data.integrations is not None else [],
otherIntegrations=data.otherIntegrations,
selectedStoreListingVersionId=data.selectedStoreListingVersionId,
agentInput=SafeJson(data.agentInput) if data.agentInput is not None else None,
onboardingAgentExecutionId=data.onboardingAgentExecutionId,
)
# The create branch is never taken since get_user_onboarding ensures the record exists,
# but upsert requires a create payload so we provide a minimal one
return await UserOnboarding.prisma().upsert(
where={"userId": user_id},
data=UserOnboardingUpsertInput(
create=create_input,
create=UserOnboardingCreateInput(userId=user_id),
update=update,
),
)