feat(backend): Make agent graph execution retriable and its failure visible (#10518)

Make agent graph execution durable by making it retriable. When it fails
to retry, we should make the error visible to the UI.

<img width="900" height="495" alt="image"
src="https://github.com/user-attachments/assets/70e3e117-31e7-4704-8bdf-1802c6afc70b"
/>
<img width="900" height="407" alt="image"
src="https://github.com/user-attachments/assets/78ca6c28-6cc2-4aff-bfa9-9f94b7f89f77"
/>


### Changes 🏗️

* Make _on_graph_execution retriable
* Increase retry count for failing db-manager RPC
* Add test coverage for RPC failure retry

### Checklist 📋

#### For code changes:
- [x] I have clearly listed my changes in the PR description
- [x] I have made a test plan
- [x] I have tested my changes according to the test plan:
  <!-- Put your test plan here: -->
  - [x] Allow graph execution retry
This commit is contained in:
Zamil Majdy
2025-08-01 19:44:43 +08:00
committed by GitHub
parent e632549175
commit 8331dabf6a
9 changed files with 128 additions and 30 deletions

View File

@@ -664,6 +664,7 @@ class Executor:
execution_stats.cost += cost
@classmethod
@func_retry
@time_measured
def _on_graph_execution(
cls,

View File

@@ -105,6 +105,7 @@ EXCEPTION_MAPPING = {
e.__name__: e
for e in [
ValueError,
RuntimeError,
TimeoutError,
ConnectionError,
*[
@@ -286,7 +287,7 @@ def get_service_client(
return retry(
reraise=True,
stop=stop_after_attempt(api_comm_retry),
wait=wait_exponential_jitter(max=4.0),
wait=wait_exponential_jitter(max=5.0),
retry=retry_if_not_exception_type(
(
# Don't retry these specific exceptions that won't be fixed by retrying

View File

@@ -19,6 +19,7 @@ TEST_SERVICE_PORT = 8765
class ServiceTest(AppService):
def __init__(self):
super().__init__()
self.fail_count = 0
def cleanup(self):
pass
@@ -42,6 +43,19 @@ class ServiceTest(AppService):
return self.run_and_wait(add_async(a, b))
@expose
def failing_add(self, a: int, b: int) -> int:
"""Method that fails 2 times then succeeds - for testing retry logic"""
self.fail_count += 1
if self.fail_count <= 2:
raise RuntimeError("Database connection failed")
return a + b
@expose
def always_failing_add(self, a: int, b: int) -> int:
"""Method that always fails - for testing no retry when disabled"""
raise RuntimeError("Database connection failed")
class ServiceTestClient(AppServiceClient):
@classmethod
@@ -51,6 +65,8 @@ class ServiceTestClient(AppServiceClient):
add = ServiceTest.add
subtract = ServiceTest.subtract
fun_with_async = ServiceTest.fun_with_async
failing_add = ServiceTest.failing_add
always_failing_add = ServiceTest.always_failing_add
add_async = endpoint_to_async(ServiceTest.add)
subtract_async = endpoint_to_async(ServiceTest.subtract)
@@ -313,3 +329,25 @@ def test_cached_property_behavior():
resource3 = obj.expensive_resource
assert creation_count == 2 # New creation
assert resource1 != resource3
def test_service_with_runtime_error_retries(server):
"""Test a real service method that throws RuntimeError and gets retried"""
with ServiceTest():
# Get client with retry enabled
client = get_service_client(ServiceTestClient, request_retry=True)
# This should succeed after retries (fails 2 times, succeeds on 3rd try)
result = client.failing_add(5, 3)
assert result == 8
def test_service_no_retry_when_disabled(server):
"""Test that retry doesn't happen when disabled"""
with ServiceTest():
# Get client with retry disabled
client = get_service_client(ServiceTestClient, request_retry=False)
# This should fail immediately without retry
with pytest.raises(RuntimeError, match="Database connection failed"):
client.always_failing_add(5, 3)

View File

@@ -68,7 +68,7 @@ class Config(UpdateTrackingModel["Config"], BaseSettings):
description="The default timeout in seconds, for Pyro client connections.",
)
pyro_client_comm_retry: int = Field(
default=3,
default=5,
description="The default number of retries for Pyro client connections.",
)
rpc_client_call_timeout: int = Field(

View File

@@ -116,6 +116,7 @@ const FlowEditor: React.FC<{
isRunning,
isStopping,
isScheduling,
graphExecutionError,
nodes,
setNodes,
edges,
@@ -764,6 +765,7 @@ const FlowEditor: React.FC<{
ref={runnerUIRef}
graph={savedAgent}
nodes={nodes}
graphExecutionError={graphExecutionError}
createRunSchedule={createRunSchedule}
saveAndRun={saveAndRun}
/>

View File

@@ -19,6 +19,7 @@ import RunnerOutputUI, {
interface RunnerUIWrapperProps {
graph: GraphMeta;
nodes: Node<CustomNodeData>[];
graphExecutionError?: string | null;
saveAndRun: (
inputs: Record<string, any>,
credentialsInputs: Record<string, CredentialsMetaInput>,
@@ -38,7 +39,10 @@ export interface RunnerUIWrapperRef {
}
const RunnerUIWrapper = forwardRef<RunnerUIWrapperRef, RunnerUIWrapperProps>(
({ graph, nodes, saveAndRun, createRunSchedule }, ref) => {
(
{ graph, nodes, graphExecutionError, saveAndRun, createRunSchedule },
ref,
) => {
const [isRunInputDialogOpen, setIsRunInputDialogOpen] = useState(false);
const [isRunnerOutputOpen, setIsRunnerOutputOpen] = useState(false);
@@ -103,6 +107,7 @@ const RunnerUIWrapper = forwardRef<RunnerUIWrapperRef, RunnerUIWrapperProps>(
isOpen={isRunnerOutputOpen}
doClose={() => setIsRunnerOutputOpen(false)}
outputs={graphOutputs}
graphExecutionError={graphExecutionError}
/>
</>
);

View File

@@ -235,6 +235,15 @@ export default function AgentRunDetailsView({
</div>
))}
</div>
{run.status === "FAILED" && (
<div className="mt-4 rounded-md border border-red-200 bg-red-50 p-3 dark:border-red-800 dark:bg-red-900/20">
<p className="text-sm text-red-800 dark:text-red-200">
<strong>Error:</strong>{" "}
{run.stats?.error ||
"The execution failed due to an internal error. You can re-run the agent to retry."}
</p>
</div>
)}
</CardContent>
</Card>

View File

@@ -25,6 +25,7 @@ interface OutputModalProps {
isOpen: boolean;
doClose: () => void;
outputs: OutputNodeInfo[];
graphExecutionError?: string | null;
}
const formatOutput = (output: any): string => {
@@ -47,7 +48,12 @@ const formatOutput = (output: any): string => {
return String(output);
};
export function RunnerOutputUI({ isOpen, doClose, outputs }: OutputModalProps) {
export function RunnerOutputUI({
isOpen,
doClose,
outputs,
graphExecutionError,
}: OutputModalProps) {
const { toast } = useToast();
const copyOutput = (name: string, output: any) => {
@@ -80,6 +86,13 @@ export function RunnerOutputUI({ isOpen, doClose, outputs }: OutputModalProps) {
<div className="flex-grow overflow-y-auto px-2 py-2">
<ScrollArea className="h-full overflow-auto pr-4">
<div className="space-y-4">
{graphExecutionError && (
<div className="rounded-md border border-red-200 bg-red-50 p-3 dark:border-red-800 dark:bg-red-900/20">
<p className="text-sm text-red-800 dark:text-red-200">
<strong>Error:</strong> {graphExecutionError}
</p>
</div>
)}
{outputs && outputs.length > 0 ? (
outputs.map((output, i) => (
<div key={i} className="space-y-1">

View File

@@ -59,6 +59,9 @@ export default function useAgentGraph(
const [isStopping, setIsStopping] = useState(false);
const [activeExecutionID, setActiveExecutionID] =
useState<GraphExecutionID | null>(null);
const [graphExecutionError, setGraphExecutionError] = useState<string | null>(
null,
);
const [xyNodes, setXYNodes] = useState<CustomNode[]>([]);
const [xyEdges, setXYEdges] = useState<CustomEdge[]>([]);
const { state, completeStep, incrementRuns } = useOnboarding();
@@ -461,6 +464,15 @@ export default function useAgentGraph(
flowID,
flowExecutionID,
);
// Set graph execution error from the initial fetch
if (execution.status === "FAILED") {
setGraphExecutionError(
execution.stats?.error ||
"The execution failed due to an internal error. You can re-run the agent to retry.",
);
}
if (
(execution.status === "QUEUED" || execution.status === "RUNNING") &&
!isRunning
@@ -479,32 +491,48 @@ export default function useAgentGraph(
if (graphExec.id != flowExecutionID) {
return;
}
if (
graphExec.status === "FAILED" &&
graphExec?.stats?.error
?.toLowerCase()
?.includes("insufficient balance")
) {
// Show no credits toast if user has low credits
toast({
variant: "destructive",
title: "Credits low",
description: (
<div>
Agent execution failed due to insufficient credits.
<br />
Go to the{" "}
<NextLink
className="text-purple-300"
href="/marketplace/credits"
>
Credits
</NextLink>{" "}
page to top up.
</div>
),
duration: 5000,
});
// Update graph execution error state and show toast
if (graphExec.status === "FAILED") {
const errorMessage =
graphExec.stats?.error ||
"The execution failed due to an internal error. You can re-run the agent to retry.";
setGraphExecutionError(errorMessage);
if (
graphExec.stats?.error
?.toLowerCase()
.includes("insufficient balance")
) {
// Show no credits toast if user has low credits
toast({
variant: "destructive",
title: "Credits low",
description: (
<div>
Agent execution failed due to insufficient credits.
<br />
Go to the{" "}
<NextLink
className="text-purple-300"
href="/profile/credits"
>
Credits
</NextLink>{" "}
page to top up.
</div>
),
duration: 5000,
});
} else {
// Show general graph execution error
toast({
variant: "destructive",
title: "Agent execution failed",
description: errorMessage,
duration: 8000,
});
}
}
if (
graphExec.status === "COMPLETED" ||
@@ -999,6 +1027,7 @@ export default function useAgentGraph(
isRunning,
isStopping,
isScheduling,
graphExecutionError,
nodes: xyNodes,
setNodes: setXYNodes,
edges: xyEdges,