feat(platform): Support opening graphs with version and execution id (#9332)

Currently it's only possible to open latest graph from monitor and see
the node execution results only when manually running. This PR adds
ability to open running and finished graphs in builder.

### Changes 🏗️

Builder now handles graph version and execution ID in addition to graph
ID when opening a graph. When an execution ID is provided, node
execution results are fetched and subscribed to in real time. This makes
it possible to open a graph that is already executing and see both
existing node execution data and real-time updates (if it's still
running).

- Use graph version and execution id on the builder page and in
`useAgentGraph`
- Use graph version on the `execute_graph` endpoint
- Use graph version on the websockets to distinguish between versions
- Move `formatEdgeID` to utils; it's used in `useAgentGraph.ts` and in
`Flow.tsx`

### Checklist 📋

#### For code changes:
- [x] I have clearly listed my changes in the PR description
- [x] I have made a test plan
- [x] I have tested my changes according to the test plan:
  - [x] Opening finished execution restores node results
- [x] Opening running execution restores results and continues to run
properly
  - [x] Results are separate for each graph across multiple tabs

#### For configuration changes:
- [ ] `.env.example` is updated or already compatible with my changes
- [ ] `docker-compose.yml` is updated or already compatible with my
changes
- [ ] I have included a list of my configuration changes in the PR
description (under **Changes**)

<details>
  <summary>Examples of configuration changes</summary>

  - Changing ports
  - Adding new services that need to communicate with each other
  - Secrets or environment variable changes
  - New or infrastructure changes such as databases
</details>

---------

Co-authored-by: Zamil Majdy <zamil.majdy@agpt.co>
This commit is contained in:
Krzysztof Czerwinski
2025-02-07 11:16:30 +01:00
committed by GitHub
parent c693875951
commit 1a1fe7c0b7
22 changed files with 283 additions and 235 deletions

View File

@@ -221,7 +221,8 @@ def event():
@test.command()
@click.argument("server_address")
@click.argument("graph_id")
def websocket(server_address: str, graph_id: str):
@click.argument("graph_version")
def websocket(server_address: str, graph_id: str, graph_version: int):
"""
Tests the websocket connection.
"""
@@ -237,7 +238,9 @@ def websocket(server_address: str, graph_id: str):
try:
msg = WsMessage(
method=Methods.SUBSCRIBE,
data=ExecutionSubscription(graph_id=graph_id).model_dump(),
data=ExecutionSubscription(
graph_id=graph_id, graph_version=graph_version
).model_dump(),
).model_dump_json()
await websocket.send(msg)
print(f"Sending: {msg}")

View File

@@ -23,6 +23,7 @@ class GraphExecutionEntry(BaseModel):
user_id: str
graph_exec_id: str
graph_id: str
graph_version: int
start_node_execs: list["NodeExecutionEntry"]

View File

@@ -870,6 +870,7 @@ class ExecutionManager(AppService):
graph_exec = GraphExecutionEntry(
user_id=user_id,
graph_id=graph_id,
graph_version=graph_version or 0,
graph_exec_id=graph_exec_id,
start_node_execs=starting_node_execs,
)

View File

@@ -20,24 +20,28 @@ class ConnectionManager:
for subscribers in self.subscriptions.values():
subscribers.discard(websocket)
async def subscribe(self, graph_id: str, websocket: WebSocket):
if graph_id not in self.subscriptions:
self.subscriptions[graph_id] = set()
self.subscriptions[graph_id].add(websocket)
async def subscribe(self, graph_id: str, graph_version: int, websocket: WebSocket):
key = f"{graph_id}_{graph_version}"
if key not in self.subscriptions:
self.subscriptions[key] = set()
self.subscriptions[key].add(websocket)
async def unsubscribe(self, graph_id: str, websocket: WebSocket):
if graph_id in self.subscriptions:
self.subscriptions[graph_id].discard(websocket)
if not self.subscriptions[graph_id]:
del self.subscriptions[graph_id]
async def unsubscribe(
self, graph_id: str, graph_version: int, websocket: WebSocket
):
key = f"{graph_id}_{graph_version}"
if key in self.subscriptions:
self.subscriptions[key].discard(websocket)
if not self.subscriptions[key]:
del self.subscriptions[key]
async def send_execution_result(self, result: execution.ExecutionResult):
graph_id = result.graph_id
if graph_id in self.subscriptions:
key = f"{result.graph_id}_{result.graph_version}"
if key in self.subscriptions:
message = WsMessage(
method=Methods.EXECUTION_EVENT,
channel=graph_id,
channel=key,
data=result.model_dump(),
).model_dump_json()
for connection in self.subscriptions[graph_id]:
for connection in self.subscriptions[key]:
await connection.send_text(message)

View File

@@ -95,17 +95,21 @@ def execute_graph_block(
@v1_router.post(
path="/graphs/{graph_id}/execute",
path="/graphs/{graph_id}/execute/{graph_version}",
tags=["graphs"],
)
def execute_graph(
graph_id: str,
graph_version: int,
node_input: dict[Any, Any],
api_key: APIKey = Depends(require_permission(APIKeyPermission.EXECUTE_GRAPH)),
) -> dict[str, Any]:
try:
graph_exec = execution_manager_client().add_execution(
graph_id, node_input, user_id=api_key.user_id
graph_id,
graph_version=graph_version,
data=node_input,
user_id=api_key.user_id,
)
return {"id": graph_exec.graph_exec_id}
except Exception as e:

View File

@@ -25,12 +25,11 @@ class WsMessage(pydantic.BaseModel):
class ExecutionSubscription(pydantic.BaseModel):
graph_id: str
graph_version: int
class SubscriptionDetails(pydantic.BaseModel):
event_type: str
channel: str
graph_id: str
class ExecuteGraphResponse(pydantic.BaseModel):
graph_exec_id: str
class CreateGraph(pydantic.BaseModel):

View File

@@ -50,6 +50,7 @@ from backend.server.model import (
CreateAPIKeyRequest,
CreateAPIKeyResponse,
CreateGraph,
ExecuteGraphResponse,
RequestTopUp,
SetGraphActiveVersion,
UpdatePermissionsRequest,
@@ -491,7 +492,7 @@ async def set_graph_active_version(
@v1_router.post(
path="/graphs/{graph_id}/execute",
path="/graphs/{graph_id}/execute/{graph_version}",
tags=["graphs"],
dependencies=[Depends(auth_middleware)],
)
@@ -500,12 +501,12 @@ def execute_graph(
node_input: dict[Any, Any],
user_id: Annotated[str, Depends(get_user_id)],
graph_version: Optional[int] = None,
) -> dict[str, Any]: # FIXME: add proper return type
) -> ExecuteGraphResponse:
try:
graph_exec = execution_manager_client().add_execution(
graph_id, node_input, user_id=user_id, graph_version=graph_version
)
return {"id": graph_exec.graph_exec_id}
return ExecuteGraphResponse(graph_exec_id=graph_exec.graph_exec_id)
except Exception as e:
msg = e.__str__().encode().decode("unicode_escape")
raise HTTPException(status_code=400, detail=msg)

View File

@@ -86,13 +86,13 @@ async def handle_subscribe(
)
else:
ex_sub = ExecutionSubscription.model_validate(message.data)
await manager.subscribe(ex_sub.graph_id, websocket)
await manager.subscribe(ex_sub.graph_id, ex_sub.graph_version, websocket)
logger.debug(f"New execution subscription for graph {ex_sub.graph_id}")
await websocket.send_text(
WsMessage(
method=Methods.SUBSCRIBE,
success=True,
channel=ex_sub.graph_id,
channel=f"{ex_sub.graph_id}_{ex_sub.graph_version}",
).model_dump_json()
)
@@ -110,13 +110,13 @@ async def handle_unsubscribe(
)
else:
ex_sub = ExecutionSubscription.model_validate(message.data)
await manager.unsubscribe(ex_sub.graph_id, websocket)
await manager.unsubscribe(ex_sub.graph_id, ex_sub.graph_version, websocket)
logger.debug(f"Removed execution subscription for graph {ex_sub.graph_id}")
await websocket.send_text(
WsMessage(
method=Methods.UNSUBSCRIBE,
success=True,
channel=ex_sub.graph_id,
channel=f"{ex_sub.graph_id}_{ex_sub.graph_version}",
).model_dump_json()
)

View File

@@ -258,7 +258,7 @@ async def block_autogen_agent():
print(response)
result = await wait_execution(
graph_id=test_graph.id,
graph_exec_id=response["id"],
graph_exec_id=response.graph_exec_id,
timeout=1200,
user_id=test_user.id,
)

View File

@@ -160,7 +160,9 @@ async def reddit_marketing_agent():
test_graph.id, input_data, test_user.id
)
print(response)
result = await wait_execution(test_user.id, test_graph.id, response["id"], 120)
result = await wait_execution(
test_user.id, test_graph.id, response.graph_exec_id, 120
)
print(result)

View File

@@ -89,7 +89,9 @@ async def sample_agent():
test_graph.id, input_data, test_user.id
)
print(response)
result = await wait_execution(test_user.id, test_graph.id, response["id"], 10)
result = await wait_execution(
test_user.id, test_graph.id, response.graph_exec_id, 10
)
print(result)

View File

@@ -39,7 +39,7 @@ async def execute_graph(
graph_version=test_graph.version,
node_input=input_data,
)
graph_exec_id = response["id"]
graph_exec_id = response.graph_exec_id
logger.info(f"Created execution with ID: {graph_exec_id}")
# Execution queue should be empty

View File

@@ -34,29 +34,29 @@ def test_disconnect(
connection_manager: ConnectionManager, mock_websocket: AsyncMock
) -> None:
connection_manager.active_connections.add(mock_websocket)
connection_manager.subscriptions["test_graph"] = {mock_websocket}
connection_manager.subscriptions["test_graph_1"] = {mock_websocket}
connection_manager.disconnect(mock_websocket)
assert mock_websocket not in connection_manager.active_connections
assert mock_websocket not in connection_manager.subscriptions["test_graph"]
assert mock_websocket not in connection_manager.subscriptions["test_graph_1"]
@pytest.mark.asyncio
async def test_subscribe(
connection_manager: ConnectionManager, mock_websocket: AsyncMock
) -> None:
await connection_manager.subscribe("test_graph", mock_websocket)
assert mock_websocket in connection_manager.subscriptions["test_graph"]
await connection_manager.subscribe("test_graph", 1, mock_websocket)
assert mock_websocket in connection_manager.subscriptions["test_graph_1"]
@pytest.mark.asyncio
async def test_unsubscribe(
connection_manager: ConnectionManager, mock_websocket: AsyncMock
) -> None:
connection_manager.subscriptions["test_graph"] = {mock_websocket}
connection_manager.subscriptions["test_graph_1"] = {mock_websocket}
await connection_manager.unsubscribe("test_graph", mock_websocket)
await connection_manager.unsubscribe("test_graph", 1, mock_websocket)
assert "test_graph" not in connection_manager.subscriptions
@@ -65,7 +65,7 @@ async def test_unsubscribe(
async def test_send_execution_result(
connection_manager: ConnectionManager, mock_websocket: AsyncMock
) -> None:
connection_manager.subscriptions["test_graph"] = {mock_websocket}
connection_manager.subscriptions["test_graph_1"] = {mock_websocket}
result: ExecutionResult = ExecutionResult(
graph_id="test_graph",
graph_version=1,
@@ -87,7 +87,7 @@ async def test_send_execution_result(
mock_websocket.send_text.assert_called_once_with(
WsMessage(
method=Methods.EXECUTION_EVENT,
channel="test_graph",
channel="test_graph_1",
data=result.model_dump(),
).model_dump_json()
)

View File

@@ -30,7 +30,8 @@ async def test_websocket_router_subscribe(
) -> None:
mock_websocket.receive_text.side_effect = [
WsMessage(
method=Methods.SUBSCRIBE, data={"graph_id": "test_graph"}
method=Methods.SUBSCRIBE,
data={"graph_id": "test_graph", "graph_version": 1},
).model_dump_json(),
WebSocketDisconnect(),
]
@@ -40,7 +41,7 @@ async def test_websocket_router_subscribe(
)
mock_manager.connect.assert_called_once_with(mock_websocket)
mock_manager.subscribe.assert_called_once_with("test_graph", mock_websocket)
mock_manager.subscribe.assert_called_once_with("test_graph", 1, mock_websocket)
mock_websocket.send_text.assert_called_once()
assert '"method":"subscribe"' in mock_websocket.send_text.call_args[0][0]
assert '"success":true' in mock_websocket.send_text.call_args[0][0]
@@ -53,7 +54,8 @@ async def test_websocket_router_unsubscribe(
) -> None:
mock_websocket.receive_text.side_effect = [
WsMessage(
method=Methods.UNSUBSCRIBE, data={"graph_id": "test_graph"}
method=Methods.UNSUBSCRIBE,
data={"graph_id": "test_graph", "graph_version": 1},
).model_dump_json(),
WebSocketDisconnect(),
]
@@ -63,7 +65,7 @@ async def test_websocket_router_unsubscribe(
)
mock_manager.connect.assert_called_once_with(mock_websocket)
mock_manager.unsubscribe.assert_called_once_with("test_graph", mock_websocket)
mock_manager.unsubscribe.assert_called_once_with("test_graph", 1, mock_websocket)
mock_websocket.send_text.assert_called_once()
assert '"method":"unsubscribe"' in mock_websocket.send_text.call_args[0][0]
assert '"success":true' in mock_websocket.send_text.call_args[0][0]
@@ -94,13 +96,15 @@ async def test_websocket_router_invalid_method(
async def test_handle_subscribe_success(
mock_websocket: AsyncMock, mock_manager: AsyncMock
) -> None:
message = WsMessage(method=Methods.SUBSCRIBE, data={"graph_id": "test_graph"})
message = WsMessage(
method=Methods.SUBSCRIBE, data={"graph_id": "test_graph", "graph_version": 1}
)
await handle_subscribe(
cast(WebSocket, mock_websocket), cast(ConnectionManager, mock_manager), message
)
mock_manager.subscribe.assert_called_once_with("test_graph", mock_websocket)
mock_manager.subscribe.assert_called_once_with("test_graph", 1, mock_websocket)
mock_websocket.send_text.assert_called_once()
assert '"method":"subscribe"' in mock_websocket.send_text.call_args[0][0]
assert '"success":true' in mock_websocket.send_text.call_args[0][0]
@@ -126,13 +130,15 @@ async def test_handle_subscribe_missing_data(
async def test_handle_unsubscribe_success(
mock_websocket: AsyncMock, mock_manager: AsyncMock
) -> None:
message = WsMessage(method=Methods.UNSUBSCRIBE, data={"graph_id": "test_graph"})
message = WsMessage(
method=Methods.UNSUBSCRIBE, data={"graph_id": "test_graph", "graph_version": 1}
)
await handle_unsubscribe(
cast(WebSocket, mock_websocket), cast(ConnectionManager, mock_manager), message
)
mock_manager.unsubscribe.assert_called_once_with("test_graph", mock_websocket)
mock_manager.unsubscribe.assert_called_once_with("test_graph", 1, mock_websocket)
mock_websocket.send_text.assert_called_once()
assert '"method":"unsubscribe"' in mock_websocket.send_text.call_args[0][0]
assert '"success":true' in mock_websocket.send_text.call_args[0][0]

View File

@@ -10,6 +10,7 @@ export default function Home() {
<FlowEditor
className="flow-container"
flowID={query.get("flowID") ?? undefined}
flowVersion={query.get("flowVersion") ?? undefined}
/>
);
}

View File

@@ -26,7 +26,7 @@ import {
import "@xyflow/react/dist/style.css";
import { CustomNode } from "./CustomNode";
import "./flow.css";
import { BlockUIType, Link } from "@/lib/autogpt-server-api";
import { BlockUIType, formatEdgeID } from "@/lib/autogpt-server-api";
import { getTypeColor, findNewlyAddedBlockCoordinates } from "@/lib/utils";
import { history } from "./history";
import { CustomEdge } from "./CustomEdge";
@@ -70,8 +70,9 @@ export const FlowContext = createContext<FlowContextType | null>(null);
const FlowEditor: React.FC<{
flowID?: string;
flowVersion?: string;
className?: string;
}> = ({ flowID, className }) => {
}> = ({ flowID, flowVersion, className }) => {
const {
addNodes,
addEdges,
@@ -85,6 +86,7 @@ const FlowEditor: React.FC<{
const [visualizeBeads, setVisualizeBeads] = useState<
"no" | "static" | "animate"
>("animate");
const [flowExecutionID, setFlowExecutionID] = useState<string | undefined>();
const {
agentName,
setAgentName,
@@ -107,7 +109,12 @@ const FlowEditor: React.FC<{
setNodes,
edges,
setEdges,
} = useAgentGraph(flowID, visualizeBeads !== "no");
} = useAgentGraph(
flowID,
flowVersion ? parseInt(flowVersion) : undefined,
flowExecutionID,
visualizeBeads !== "no",
);
const router = useRouter();
const pathname = usePathname();
@@ -157,6 +164,7 @@ const FlowEditor: React.FC<{
if (params.get("open_scheduling") === "true") {
setOpenCron(true);
}
setFlowExecutionID(params.get("flowExecutionID") || undefined);
}, [params]);
useEffect(() => {
@@ -267,14 +275,6 @@ const FlowEditor: React.FC<{
[deleteElements, setNodes, nodes, edges, addNodes],
);
const formatEdgeID = useCallback((conn: Link | Connection): string => {
if ("sink_id" in conn) {
return `${conn.source_id}_${conn.source_name}_${conn.sink_id}_${conn.sink_name}`;
} else {
return `${conn.source}_${conn.sourceHandle}_${conn.target}_${conn.targetHandle}`;
}
}, []);
const onConnect: OnConnect = useCallback(
(connection: Connection) => {
// Check if this exact connection already exists

View File

@@ -65,7 +65,7 @@ export const FlowInfo: React.FC<
setNodes,
edges,
setEdges,
} = useAgentGraph(flow.id, false);
} = useAgentGraph(flow.id, flow.version, undefined, false);
const api = useBackendAPI();
const { toast } = useToast();
@@ -224,7 +224,7 @@ export const FlowInfo: React.FC<
)}
<Link
className={buttonVariants({ variant: "default" })}
href={`/build?flowID=${flow.id}`}
href={`/build?flowID=${flow.id}&flowVersion=${flow.version}`}
>
<Pencil2Icon className="mr-2" />
Open in Builder

View File

@@ -109,7 +109,7 @@ export const FlowRunInfo: React.FC<
</Button>
<Link
className={buttonVariants({ variant: "default" })}
href={`/build?flowID=${flow.id}`}
href={`/build?flowID=${flow.id}&flowVersion=${execution.graph_version}&flowExecutionID=${execution.execution_id}`}
>
<Pencil2Icon className="mr-2" /> Open in Builder
</Link>

View File

@@ -4,8 +4,8 @@ import BackendAPI, {
Block,
BlockIOSubSchema,
BlockUIType,
formatEdgeID,
Graph,
Link,
NodeExecutionResult,
} from "@/lib/autogpt-server-api";
import {
@@ -14,7 +14,7 @@ import {
removeEmptyStringsAndNulls,
setNestedProperty,
} from "@/lib/utils";
import { Connection, MarkerType } from "@xyflow/react";
import { MarkerType } from "@xyflow/react";
import Ajv from "ajv";
import { useCallback, useEffect, useMemo, useRef, useState } from "react";
import { useRouter, useSearchParams, usePathname } from "next/navigation";
@@ -28,6 +28,8 @@ const ajv = new Ajv({ strict: false, allErrors: true });
export default function useAgentGraph(
flowID?: string,
flowVersion?: number,
flowExecutionID?: string,
passDataToBeads?: boolean,
) {
const { toast } = useToast();
@@ -81,25 +83,6 @@ export default function useAgentGraph(
[],
);
// Connect to WebSocket
useEffect(() => {
api
.connectWebSocket()
.then(() => {
console.debug("WebSocket connected");
api.onWebSocketMessage("execution_event", (data) => {
setUpdateQueue((prev) => [...prev, data]);
});
})
.catch((error) => {
console.error("Failed to connect WebSocket:", error);
});
return () => {
api.disconnectWebSocket();
};
}, [api]);
// Load available blocks & flows
useEffect(() => {
api
@@ -111,16 +94,32 @@ export default function useAgentGraph(
.listGraphs()
.then((flows) => setAvailableFlows(flows))
.catch();
api.connectWebSocket().catch((error) => {
console.error("Failed to connect WebSocket:", error);
});
return () => {
api.disconnectWebSocket();
};
}, [api]);
//TODO to utils? repeated in Flow
const formatEdgeID = useCallback((conn: Link | Connection): string => {
if ("sink_id" in conn) {
return `${conn.source_id}_${conn.source_name}_${conn.sink_id}_${conn.sink_name}`;
} else {
return `${conn.source}_${conn.sourceHandle}_${conn.target}_${conn.targetHandle}`;
// Subscribe to execution events
useEffect(() => {
api.onWebSocketMessage("execution_event", (data) => {
if (data.graph_exec_id != flowExecutionID) {
return;
}
setUpdateQueue((prev) => [...prev, data]);
});
if (flowID && flowVersion) {
api.subscribeToExecution(flowID, flowVersion);
console.debug(
`Subscribed to execution events for ${flowID} v.${flowVersion}`,
);
}
}, []);
}, [api, flowID, flowVersion, flowExecutionID]);
const getOutputType = useCallback(
(nodes: CustomNode[], nodeId: string, handleId: string) => {
@@ -144,80 +143,82 @@ export default function useAgentGraph(
setAgentName(graph.name);
setAgentDescription(graph.description);
setNodes(() => {
const newNodes = graph.nodes
.map((node) => {
const block = availableNodes.find(
(block) => block.id === node.block_id,
)!;
if (!block) return null;
const flow =
block.uiType == BlockUIType.AGENT
? availableFlows.find(
(flow) => flow.id === node.input_default.graph_id,
)
: null;
const newNode: CustomNode = {
id: node.id,
type: "custom",
position: {
x: node?.metadata?.position?.x || 0,
y: node?.metadata?.position?.y || 0,
},
data: {
block_id: block.id,
blockType: flow?.name || block.name,
blockCosts: block.costs,
categories: block.categories,
description: block.description,
title: `${block.name} ${node.id}`,
inputSchema: block.inputSchema,
outputSchema: block.outputSchema,
hardcodedValues: node.input_default,
webhook: node.webhook,
uiType: block.uiType,
connections: graph.links
.filter((l) => [l.source_id, l.sink_id].includes(node.id))
.map((link) => ({
edge_id: formatEdgeID(link),
source: link.source_id,
sourceHandle: link.source_name,
target: link.sink_id,
targetHandle: link.sink_name,
})),
isOutputOpen: false,
},
};
return newNode;
})
.filter((node) => node !== null);
setEdges((_) =>
graph.links.map((link) => ({
id: formatEdgeID(link),
setNodes((prevNodes) => {
const newNodes = graph.nodes.map((node) => {
const block = availableNodes.find(
(block) => block.id === node.block_id,
)!;
const prevNode = prevNodes.find((n) => n.id === node.id);
const flow =
block.uiType == BlockUIType.AGENT
? availableFlows.find(
(flow) => flow.id === node.input_default.graph_id,
)
: null;
const newNode: CustomNode = {
id: node.id,
type: "custom",
position: {
x: node?.metadata?.position?.x || 0,
y: node?.metadata?.position?.y || 0,
},
data: {
edgeColor: getTypeColor(
getOutputType(newNodes, link.source_id, link.source_name!),
),
sourcePos: newNodes.find((node) => node.id === link.source_id)
?.position,
isStatic: link.is_static,
beadUp: 0,
beadDown: 0,
beadData: [],
isOutputOpen: false,
...prevNode?.data,
block_id: block.id,
blockType: flow?.name || block.name,
blockCosts: block.costs,
categories: block.categories,
description: block.description,
title: `${block.name} ${node.id}`,
inputSchema: block.inputSchema,
outputSchema: block.outputSchema,
hardcodedValues: node.input_default,
webhook: node.webhook,
uiType: block.uiType,
connections: graph.links
.filter((l) => [l.source_id, l.sink_id].includes(node.id))
.map((link) => ({
edge_id: formatEdgeID(link),
source: link.source_id,
sourceHandle: link.source_name,
target: link.sink_id,
targetHandle: link.sink_name,
})),
backend_id: node.id,
},
markerEnd: {
type: MarkerType.ArrowClosed,
strokeWidth: 2,
color: getTypeColor(
getOutputType(newNodes, link.source_id, link.source_name!),
),
},
source: link.source_id,
target: link.sink_id,
sourceHandle: link.source_name || undefined,
targetHandle: link.sink_name || undefined,
})),
};
return newNode;
});
setEdges(() =>
graph.links.map((link) => {
return {
id: formatEdgeID(link),
type: "custom",
data: {
edgeColor: getTypeColor(
getOutputType(newNodes, link.source_id, link.source_name!),
),
sourcePos: newNodes.find((node) => node.id === link.source_id)
?.position,
isStatic: link.is_static,
beadUp: 0,
beadDown: 0,
beadData: [],
},
markerEnd: {
type: MarkerType.ArrowClosed,
strokeWidth: 2,
color: getTypeColor(
getOutputType(newNodes, link.source_id, link.source_name!),
),
},
source: link.source_id,
target: link.sink_id,
sourceHandle: link.source_name || undefined,
targetHandle: link.sink_name || undefined,
};
}),
);
return newNodes;
});
@@ -335,14 +336,15 @@ export default function useAgentGraph(
[passDataToBeads, updateEdgeBeads],
);
// Load graph
useEffect(() => {
if (!flowID || availableNodes.length == 0) return;
api.getGraph(flowID).then((graph) => {
api.getGraph(flowID, flowVersion).then((graph) => {
console.debug("Loading graph");
loadGraph(graph);
});
}, [flowID, availableNodes, api, loadGraph]);
}, [flowID, flowVersion, availableNodes, api, loadGraph]);
// Update nodes with execution data
useEffect(() => {
@@ -543,71 +545,22 @@ export default function useAgentGraph(
});
return;
}
api.subscribeToExecution(savedAgent.id);
setSaveRunRequest({ request: "run", state: "running" });
api
.executeGraph(savedAgent.id)
.executeGraph(savedAgent.id, savedAgent.version)
.then((graphExecution) => {
setSaveRunRequest({
request: "run",
state: "running",
activeExecutionID: graphExecution.id,
activeExecutionID: graphExecution.graph_exec_id,
});
// Track execution until completed
const pendingNodeExecutions: Set<string> = new Set();
const cancelExecListener = api.onWebSocketMessage(
"execution_event",
(nodeResult) => {
// We are racing the server here, since we need the ID to filter events
if (nodeResult.graph_exec_id != graphExecution.id) {
return;
}
if (
nodeResult.status === "FAILED" &&
nodeResult.output_data?.error?.[0]
.toLowerCase()
.includes("insufficient balance")
) {
// Show no credits toast if user has low credits
toast({
variant: "destructive",
title: "Credits low",
description: (
<div>
Agent execution failed due to insufficient credits.
<br />
Go to the{" "}
<NextLink
className="text-purple-300"
href="/marketplace/credits"
>
Credits
</NextLink>{" "}
page to top up.
</div>
),
duration: 5000,
});
}
if (
!["COMPLETED", "TERMINATED", "FAILED"].includes(
nodeResult.status,
)
) {
pendingNodeExecutions.add(nodeResult.node_exec_id);
} else {
pendingNodeExecutions.delete(nodeResult.node_exec_id);
}
if (pendingNodeExecutions.size == 0) {
// Assuming the first event is always a QUEUED node, and
// following nodes are QUEUED before all preceding nodes are COMPLETED,
// an empty set means the graph has finished running.
cancelExecListener();
setSaveRunRequest({ request: "none", state: "none" });
}
},
);
// Update URL params
const path = new URLSearchParams(searchParams);
path.set("flowID", savedAgent.id);
path.set("flowVersion", savedAgent.version.toString());
path.set("flowExecutionID", graphExecution.graph_exec_id);
router.push(`${pathname}?${path.toString()}`);
})
.catch((error) => {
const errorMessage =
@@ -648,6 +601,72 @@ export default function useAgentGraph(
validateNodes,
]);
useEffect(() => {
if (!flowID || !flowExecutionID) {
return;
}
const fetchExecutions = async () => {
const results = await api.getGraphExecutionInfo(flowID, flowExecutionID);
setUpdateQueue((prev) => [...prev, ...results]);
// Track execution until completed
const pendingNodeExecutions: Set<string> = new Set();
const cancelExecListener = api.onWebSocketMessage(
"execution_event",
(nodeResult) => {
// We are racing the server here, since we need the ID to filter events
if (nodeResult.graph_exec_id != flowExecutionID) {
return;
}
if (
nodeResult.status === "FAILED" &&
nodeResult.output_data?.error?.[0]
.toLowerCase()
.includes("insufficient balance")
) {
// Show no credits toast if user has low credits
toast({
variant: "destructive",
title: "Credits low",
description: (
<div>
Agent execution failed due to insufficient credits.
<br />
Go to the{" "}
<NextLink
className="text-purple-300"
href="/marketplace/credits"
>
Credits
</NextLink>{" "}
page to top up.
</div>
),
duration: 5000,
});
}
if (
!["COMPLETED", "TERMINATED", "FAILED"].includes(nodeResult.status)
) {
pendingNodeExecutions.add(nodeResult.node_exec_id);
} else {
pendingNodeExecutions.delete(nodeResult.node_exec_id);
}
if (pendingNodeExecutions.size == 0) {
// Assuming the first event is always a QUEUED node, and
// following nodes are QUEUED before all preceding nodes are COMPLETED,
// an empty set means the graph has finished running.
cancelExecListener();
setSaveRunRequest({ request: "none", state: "none" });
}
},
);
};
fetchExecutions();
}, [flowID, flowExecutionID]);
// Check if node ids are synced with saved agent
useEffect(() => {
// Check if all node ids are synced with saved agent (frontend and backend)
@@ -825,6 +844,7 @@ export default function useAgentGraph(
if (!savedAgent) {
const path = new URLSearchParams(searchParams);
path.set("flowID", newSavedAgent.id);
path.set("flowVersion", newSavedAgent.version.toString());
router.push(`${pathname}?${path.toString()}`);
return;
}

View File

@@ -16,7 +16,6 @@ import {
GraphExecution,
Graph,
GraphCreatable,
GraphExecuteResponse,
GraphMeta,
GraphUpdateable,
MyAgentsResponse,
@@ -185,9 +184,10 @@ export default class BackendAPI {
executeGraph(
id: string,
version: number,
inputData: { [key: string]: any } = {},
): Promise<GraphExecuteResponse> {
return this._request("POST", `/graphs/${id}/execute`, inputData);
): Promise<{ graph_exec_id: string }> {
return this._request("POST", `/graphs/${id}/execute/${version}`, inputData);
}
async getGraphExecutionInfo(
@@ -757,8 +757,11 @@ export default class BackendAPI {
return () => this.wsMessageHandlers[method].delete(handler);
}
subscribeToExecution(graphId: string) {
this.sendWebSocketMessage("subscribe", { graph_id: graphId });
subscribeToExecution(graphId: string, graphVersion: number) {
this.sendWebSocketMessage("subscribe", {
graph_id: graphId,
graph_version: graphVersion,
});
}
}
@@ -769,7 +772,7 @@ type GraphCreateRequestBody = {
};
type WebsocketMessageTypeMap = {
subscribe: { graph_id: string };
subscribe: { graph_id: string; graph_version: number };
execution_event: NodeExecutionResult;
heartbeat: "ping" | "pong";
};

View File

@@ -257,14 +257,6 @@ export type GraphUpdateable = Omit<
export type GraphCreatable = Omit<GraphUpdateable, "id"> & { id?: string };
/* Derived from backend/executor/manager.py:ExecutionManager.add_execution */
export type GraphExecuteResponse = {
/** ID of the initiated run */
id: string;
/** List of node executions */
executions: Array<{ id: string; node_id: string }>;
};
/* Mirror of backend/data/execution.py:ExecutionResult */
export type NodeExecutionResult = {
graph_id: string;

View File

@@ -1,4 +1,5 @@
import { Graph, Block, Node, BlockUIType } from "./types";
import { Connection } from "@xyflow/react";
import { Graph, Block, Node, BlockUIType, Link } from "./types";
/** Creates a copy of the graph with all secrets removed */
export function safeCopyGraph(graph: Graph, block_defs: Block[]): Graph {
@@ -44,3 +45,11 @@ export function removeAgentInputBlockValues(graph: Graph, blocks: Block[]) {
nodes: modifiedNodes,
};
}
export function formatEdgeID(conn: Link | Connection): string {
if ("sink_id" in conn) {
return `${conn.source_id}_${conn.source_name}_${conn.sink_id}_${conn.sink_name}`;
} else {
return `${conn.source}_${conn.sourceHandle}_${conn.target}_${conn.targetHandle}`;
}
}