Merge branch 'master' into aarushikansal/execution-manager

This commit is contained in:
Aarushi
2024-09-09 10:18:17 +01:00
committed by GitHub
32 changed files with 724 additions and 368 deletions

View File

@@ -0,0 +1,43 @@
"use client";
import { useEffect } from "react";
import { IconCircleAlert } from "@/components/ui/icons";
import { Button } from "@/components/ui/button";
import Link from "next/link";
export default function Error({
error,
reset,
}: {
error: Error & { digest?: string };
reset: () => void;
}) {
useEffect(() => {
console.error(error);
}, [error]);
return (
<div className="fixed inset-0 flex items-center justify-center bg-background">
<div className="w-full max-w-md px-4 text-center sm:px-6">
<div className="mx-auto flex size-12 items-center justify-center rounded-full bg-muted">
<IconCircleAlert className="size-10" />
</div>
<h1 className="mt-8 text-2xl font-bold tracking-tight text-foreground">
Oops, something went wrong!
</h1>
<p className="mt-4 text-muted-foreground">
We&apos;re sorry, but an unexpected error has occurred. Please try
again later or contact support if the issue persists.
</p>
<div className="mt-6 flex flex-row justify-center gap-4">
<Button onClick={reset} variant="outline">
Retry
</Button>
<Button>
<Link href="/">Go to Homepage</Link>
</Button>
</div>
</div>
</div>
);
}

View File

@@ -1,3 +0,0 @@
export default function ErrorPage() {
return <p>Sorry, something went wrong</p>;
}

View File

@@ -0,0 +1,21 @@
import AgentFlowListSkeleton from "@/components/monitor/skeletons/AgentFlowListSkeleton";
import React from "react";
import FlowRunsListSkeleton from "@/components/monitor/skeletons/FlowRunsListSkeleton";
import FlowRunsStatusSkeleton from "@/components/monitor/skeletons/FlowRunsStatusSkeleton";
export default function MonitorLoadingSkeleton() {
return (
<div className="space-y-4 p-4">
<div className="grid grid-cols-1 gap-4 md:grid-cols-3">
{/* Agents Section */}
<AgentFlowListSkeleton />
{/* Runs Section */}
<FlowRunsListSkeleton />
{/* Stats Section */}
<FlowRunsStatusSkeleton />
</div>
</div>
);
}

View File

@@ -1,5 +1,5 @@
"use client";
import React, { useEffect, useState } from "react";
import React, { useCallback, useEffect, useMemo, useState } from "react";
import AutoGPTServerAPI, {
GraphMeta,
@@ -22,54 +22,57 @@ const Monitor = () => {
const [selectedFlow, setSelectedFlow] = useState<GraphMeta | null>(null);
const [selectedRun, setSelectedRun] = useState<FlowRun | null>(null);
const api = new AutoGPTServerAPI();
const api = useMemo(() => new AutoGPTServerAPI(), []);
useEffect(() => fetchFlowsAndRuns(), []);
const refreshFlowRuns = useCallback(
(flowID: string) => {
// Fetch flow run IDs
api.listGraphRunIDs(flowID).then((runIDs) =>
runIDs.map((runID) => {
let run;
if (
(run = flowRuns.find((fr) => fr.id == runID)) &&
!["waiting", "running"].includes(run.status)
) {
return;
}
// Fetch flow run
api.getGraphExecutionInfo(flowID, runID).then((execInfo) =>
setFlowRuns((flowRuns) => {
if (execInfo.length == 0) return flowRuns;
const flowRunIndex = flowRuns.findIndex((fr) => fr.id == runID);
const flowRun = flowRunFromNodeExecutionResults(execInfo);
if (flowRunIndex > -1) {
flowRuns.splice(flowRunIndex, 1, flowRun);
} else {
flowRuns.push(flowRun);
}
return [...flowRuns];
}),
);
}),
);
},
[api, flowRuns],
);
const fetchFlowsAndRuns = useCallback(() => {
api.listGraphs().then((flows) => {
setFlows(flows);
flows.map((flow) => refreshFlowRuns(flow.id));
});
}, [api, refreshFlowRuns]);
useEffect(() => fetchFlowsAndRuns(), [fetchFlowsAndRuns]);
useEffect(() => {
const intervalId = setInterval(
() => flows.map((f) => refreshFlowRuns(f.id)),
5000,
);
return () => clearInterval(intervalId);
}, []);
function fetchFlowsAndRuns() {
api.listGraphs().then((flows) => {
setFlows(flows);
flows.map((flow) => refreshFlowRuns(flow.id));
});
}
function refreshFlowRuns(flowID: string) {
// Fetch flow run IDs
api.listGraphRunIDs(flowID).then((runIDs) =>
runIDs.map((runID) => {
let run;
if (
(run = flowRuns.find((fr) => fr.id == runID)) &&
!["waiting", "running"].includes(run.status)
) {
return;
}
// Fetch flow run
api.getGraphExecutionInfo(flowID, runID).then((execInfo) =>
setFlowRuns((flowRuns) => {
if (execInfo.length == 0) return flowRuns;
const flowRunIndex = flowRuns.findIndex((fr) => fr.id == runID);
const flowRun = flowRunFromNodeExecutionResults(execInfo);
if (flowRunIndex > -1) {
flowRuns.splice(flowRunIndex, 1, flowRun);
} else {
flowRuns.push(flowRun);
}
return [...flowRuns];
}),
);
}),
);
}
}, [flows, refreshFlowRuns]);
const column1 = "md:col-span-2 xl:col-span-3 xxl:col-span-2";
const column2 = "md:col-span-3 lg:col-span-2 xl:col-span-3 space-y-4";

View File

@@ -1,4 +1,4 @@
import React, { useContext, useEffect, useState } from "react";
import React, { useCallback, useContext, useEffect, useState } from "react";
import {
BaseEdge,
EdgeLabelRenderer,
@@ -65,24 +65,27 @@ export function CustomEdge({
const beadDiameter = 12;
const deltaTime = 16;
function setTargetPositions(beads: Bead[]) {
const distanceBetween = Math.min(
(length - beadDiameter) / (beads.length + 1),
beadDiameter,
);
const setTargetPositions = useCallback(
(beads: Bead[]) => {
const distanceBetween = Math.min(
(length - beadDiameter) / (beads.length + 1),
beadDiameter,
);
return beads.map((bead, index) => {
const distanceFromEnd = beadDiameter * 1.35;
const targetPosition = distanceBetween * index + distanceFromEnd;
const t = getTForDistance(-targetPosition);
return beads.map((bead, index) => {
const distanceFromEnd = beadDiameter * 1.35;
const targetPosition = distanceBetween * index + distanceFromEnd;
const t = getTForDistance(-targetPosition);
return {
...bead,
t: visualizeBeads === "animate" ? bead.t : t,
targetT: t,
} as Bead;
});
}
return {
...bead,
t: visualizeBeads === "animate" ? bead.t : t,
targetT: t,
} as Bead;
});
},
[getTForDistance, length, visualizeBeads],
);
useEffect(() => {
if (data?.beadUp === 0 && data?.beadDown === 0) {
@@ -170,7 +173,7 @@ export function CustomEdge({
}, deltaTime);
return () => clearInterval(interval);
}, [data]);
}, [data, setTargetPositions, visualizeBeads]);
const middle = getPointForT(0.5);

View File

@@ -96,7 +96,7 @@ export function CustomNode({ data, id, width, height }: NodeProps<CustomNode>) {
useEffect(() => {
setIsAnyModalOpen?.(isModalOpen || isOutputModalOpen);
}, [isModalOpen, isOutputModalOpen, data]);
}, [isModalOpen, isOutputModalOpen, data, setIsAnyModalOpen]);
useEffect(() => {
isInitialSetup.current = false;

View File

@@ -121,7 +121,7 @@ const FlowEditor: React.FC<{
localStorage.setItem("shepherd-tour", "yes");
}
}
}, [availableNodes, tutorialStarted]);
}, [availableNodes, tutorialStarted, router, pathname]);
useEffect(() => {
const handleKeyDown = (event: KeyboardEvent) => {
@@ -256,7 +256,7 @@ const FlowEditor: React.FC<{
}
const edgeColor = getTypeColor(
getOutputType(connection.source!, connection.sourceHandle!),
getOutputType(nodes, connection.source!, connection.sourceHandle!),
);
const sourceNode = getNode(connection.source!);
const newEdge: CustomEdge = {
@@ -295,6 +295,7 @@ const FlowEditor: React.FC<{
addEdges,
deleteElements,
clearNodesStatusAndOutput,
nodes,
edges,
formatEdgeID,
getOutputType,
@@ -377,7 +378,7 @@ const FlowEditor: React.FC<{
clearNodesStatusAndOutput();
}
},
[setNodes, clearNodesStatusAndOutput],
[setNodes, clearNodesStatusAndOutput, setEdges],
);
const getNextNodeId = useCallback(() => {
@@ -434,7 +435,6 @@ const FlowEditor: React.FC<{
nodeId,
availableNodes,
addNodes,
setNodes,
deleteElements,
clearNodesStatusAndOutput,
x,

View File

@@ -11,7 +11,10 @@ import {
ChevronUp,
} from "lucide-react";
import { Button } from "@/components/ui/button";
import { AgentDetailResponse } from "@/lib/marketplace-api";
import {
AgentDetailResponse,
InstallationLocation,
} from "@/lib/marketplace-api";
import dynamic from "next/dynamic";
import { Node, Edge } from "@xyflow/react";
import MarketplaceAPI from "@/lib/marketplace-api";
@@ -32,6 +35,7 @@ const Background = dynamic(
import "@xyflow/react/dist/style.css";
import { beautifyString } from "@/lib/utils";
import { makeAnalyticsEvent } from "./actions";
function convertGraphToReactFlow(graph: any): { nodes: Node[]; edges: Edge[] } {
const nodes: Node[] = graph.nodes.map((node: any) => {
@@ -96,8 +100,16 @@ async function installGraph(id: string): Promise<void> {
nodes: agent.graph.nodes,
links: agent.graph.links,
};
await serverAPI.createTemplate(data);
console.log(`Agent installed successfully`);
const result = await serverAPI.createTemplate(data);
makeAnalyticsEvent({
event_name: "agent_installed_from_marketplace",
event_data: {
marketplace_agent_id: id,
installed_agent_id: result.id,
installation_location: InstallationLocation.CLOUD,
},
});
console.log(`Agent installed successfully`, result);
} catch (error) {
console.error(`Error installing agent:`, error);
throw error;

View File

@@ -0,0 +1,9 @@
"use server";
import MarketplaceAPI, { AnalyticsEvent } from "@/lib/marketplace-api";
export async function makeAnalyticsEvent(event: AnalyticsEvent) {
const apiUrl = process.env.AGPT_SERVER_API_URL;
const api = new MarketplaceAPI();
await api.makeAnalyticsEvent(event);
}

View File

@@ -1,5 +1,5 @@
import AutoGPTServerAPI, { GraphMeta } from "@/lib/autogpt-server-api";
import React, { useEffect, useState } from "react";
import React, { useEffect, useMemo, useState } from "react";
import { Card, CardContent, CardHeader, CardTitle } from "@/components/ui/card";
import { Button } from "@/components/ui/button";
import Link from "next/link";
@@ -45,10 +45,10 @@ export const AgentFlowList = ({
className?: string;
}) => {
const [templates, setTemplates] = useState<GraphMeta[]>([]);
const api = new AutoGPTServerAPI();
const api = useMemo(() => new AutoGPTServerAPI(), []);
useEffect(() => {
api.listTemplates().then((templates) => setTemplates(templates));
}, []);
}, [api]);
return (
<Card className={className}>

View File

@@ -1,4 +1,4 @@
import React, { useEffect, useState } from "react";
import React, { useEffect, useMemo, useState } from "react";
import AutoGPTServerAPI, {
Graph,
GraphMeta,
@@ -28,7 +28,7 @@ export const FlowInfo: React.FC<
flowVersion?: number | "all";
}
> = ({ flow, flowRuns, flowVersion, ...props }) => {
const api = new AutoGPTServerAPI();
const api = useMemo(() => new AutoGPTServerAPI(), []);
const [flowVersions, setFlowVersions] = useState<Graph[] | null>(null);
const [selectedVersion, setSelectedFlowVersion] = useState(
@@ -41,7 +41,7 @@ export const FlowInfo: React.FC<
useEffect(() => {
api.getGraphAllVersions(flow.id).then((result) => setFlowVersions(result));
}, [flow.id]);
}, [flow.id, api]);
return (
<Card {...props}>

View File

@@ -0,0 +1,24 @@
export default function AgentsFlowListSkeleton() {
return (
<div className="mx-auto max-w-4xl p-4">
<div className="mb-4 flex items-center justify-between">
<h1 className="text-2xl font-bold">Agents</h1>
<div className="h-10 w-24 animate-pulse rounded bg-gray-200"></div>
</div>
<div className="rounded-lg bg-white p-4 shadow">
<div className="mb-4 grid grid-cols-3 gap-4 font-medium text-gray-500">
<div>Name</div>
<div># of runs</div>
<div>Last run</div>
</div>
{[...Array(3)].map((_, index) => (
<div key={index} className="mb-4 grid grid-cols-3 gap-4">
<div className="h-6 animate-pulse rounded bg-gray-200"></div>
<div className="h-6 animate-pulse rounded bg-gray-200"></div>
<div className="h-6 animate-pulse rounded bg-gray-200"></div>
</div>
))}
</div>
</div>
);
}

View File

@@ -0,0 +1,23 @@
export default function FlowRunsListSkeleton() {
return (
<div className="mx-auto max-w-4xl p-4">
<div className="rounded-lg bg-white p-4 shadow">
<h2 className="mb-4 text-xl font-semibold">Runs</h2>
<div className="mb-4 grid grid-cols-4 gap-4 text-sm font-medium text-gray-500">
<div>Agent</div>
<div>Started</div>
<div>Status</div>
<div>Duration</div>
</div>
{[...Array(4)].map((_, index) => (
<div key={index} className="mb-4 grid grid-cols-4 gap-4">
<div className="h-5 animate-pulse rounded bg-gray-200"></div>
<div className="h-5 animate-pulse rounded bg-gray-200"></div>
<div className="h-5 animate-pulse rounded bg-gray-200"></div>
<div className="h-5 animate-pulse rounded bg-gray-200"></div>
</div>
))}
</div>
</div>
);
}

View File

@@ -0,0 +1,28 @@
export default function FlowRunsStatusSkeleton() {
return (
<div className="mx-auto max-w-4xl p-4">
<div className="rounded-lg bg-white p-4 shadow">
<div className="mb-6 flex items-center justify-between">
<h2 className="text-xl font-semibold">Stats</h2>
<div className="flex space-x-2">
{["2h", "8h", "24h", "7d", "Custom", "All"].map((btn) => (
<div
key={btn}
className="h-8 w-16 animate-pulse rounded bg-gray-200"
></div>
))}
</div>
</div>
{/* Placeholder for the line chart */}
<div className="mb-6 h-64 w-full animate-pulse rounded bg-gray-200"></div>
{/* Placeholders for total runs and total run time */}
<div className="space-y-2">
<div className="h-6 w-1/3 animate-pulse rounded bg-gray-200"></div>
<div className="h-6 w-1/2 animate-pulse rounded bg-gray-200"></div>
</div>
</div>
</div>
);
}

View File

@@ -10,7 +10,7 @@ import {
BlockIONumberSubSchema,
BlockIOBooleanSubSchema,
} from "@/lib/autogpt-server-api/types";
import { FC, useEffect, useState } from "react";
import { FC, useCallback, useEffect, useState } from "react";
import { Button } from "./ui/button";
import { Switch } from "./ui/switch";
import {
@@ -296,7 +296,7 @@ const NodeKeyValueInput: FC<{
className,
displayName,
}) => {
const getPairValues = () => {
const getPairValues = useCallback(() => {
let defaultEntries = new Map<string, any>();
connections
@@ -311,7 +311,7 @@ const NodeKeyValueInput: FC<{
});
return Array.from(defaultEntries, ([key, value]) => ({ key, value }));
};
}, [connections, entries, schema.default, selfKey]);
const [keyValuePairs, setKeyValuePairs] = useState<
{ key: string; value: string | number | null }[]
@@ -319,7 +319,7 @@ const NodeKeyValueInput: FC<{
useEffect(
() => setKeyValuePairs(getPairValues()),
[connections, entries, schema.default],
[connections, entries, schema.default, getPairValues],
);
function updateKeyValuePairs(newPairs: typeof keyValuePairs) {

View File

@@ -19,7 +19,7 @@ const Input = React.forwardRef<HTMLInputElement, InputProps>(
) {
ref.current.value = value;
}
}, [value, type]);
}, [value, type, ref]);
return (
<input
type={type}

View File

@@ -61,8 +61,10 @@ export default function useAgentGraph(
const [nodes, setNodes] = useState<CustomNode[]>([]);
const [edges, setEdges] = useState<CustomEdge[]>([]);
const apiUrl = process.env.NEXT_PUBLIC_AGPT_SERVER_URL!;
const api = useMemo(() => new AutoGPTServerAPI(apiUrl), [apiUrl]);
const api = useMemo(
() => new AutoGPTServerAPI(process.env.NEXT_PUBLIC_AGPT_SERVER_URL!),
[],
);
// Connect to WebSocket
useEffect(() => {
@@ -89,16 +91,227 @@ export default function useAgentGraph(
.getBlocks()
.then((blocks) => setAvailableNodes(blocks))
.catch();
}, [api]);
//TODO to utils? repeated in Flow
const formatEdgeID = useCallback((conn: Link | Connection): string => {
if ("sink_id" in conn) {
return `${conn.source_id}_${conn.source_name}_${conn.sink_id}_${conn.sink_name}`;
} else {
return `${conn.source}_${conn.sourceHandle}_${conn.target}_${conn.targetHandle}`;
}
}, []);
const getOutputType = useCallback(
(nodes: CustomNode[], nodeId: string, handleId: string) => {
const node = nodes.find((n) => n.id === nodeId);
if (!node) return "unknown";
const outputSchema = node.data.outputSchema;
if (!outputSchema) return "unknown";
const outputHandle = outputSchema.properties[handleId];
if (!("type" in outputHandle)) return "unknown";
return outputHandle.type;
},
[],
);
// Load existing graph
const loadGraph = useCallback(
(graph: Graph) => {
setSavedAgent(graph);
setAgentName(graph.name);
setAgentDescription(graph.description);
setNodes(() => {
const newNodes = graph.nodes.map((node) => {
const block = availableNodes.find(
(block) => block.id === node.block_id,
)!;
const newNode: CustomNode = {
id: node.id,
type: "custom",
position: {
x: node.metadata.position.x,
y: node.metadata.position.y,
},
data: {
block_id: block.id,
blockType: block.name,
categories: block.categories,
description: block.description,
title: `${block.name} ${node.id}`,
inputSchema: block.inputSchema,
outputSchema: block.outputSchema,
hardcodedValues: node.input_default,
connections: graph.links
.filter((l) => [l.source_id, l.sink_id].includes(node.id))
.map((link) => ({
edge_id: formatEdgeID(link),
source: link.source_id,
sourceHandle: link.source_name,
target: link.sink_id,
targetHandle: link.sink_name,
})),
isOutputOpen: false,
},
};
return newNode;
});
setEdges((_) =>
graph.links.map((link) => ({
id: formatEdgeID(link),
type: "custom",
data: {
edgeColor: getTypeColor(
getOutputType(newNodes, link.source_id, link.source_name!),
),
sourcePos: newNodes.find((node) => node.id === link.source_id)
?.position,
isStatic: link.is_static,
beadUp: 0,
beadDown: 0,
beadData: [],
},
markerEnd: {
type: MarkerType.ArrowClosed,
strokeWidth: 2,
color: getTypeColor(
getOutputType(newNodes, link.source_id, link.source_name!),
),
},
source: link.source_id,
target: link.sink_id,
sourceHandle: link.source_name || undefined,
targetHandle: link.sink_name || undefined,
})),
);
return newNodes;
});
},
[availableNodes, formatEdgeID, getOutputType],
);
const getFrontendId = useCallback(
(backendId: string, nodes: CustomNode[]) => {
const node = nodes.find((node) => node.data.backend_id === backendId);
return node?.id;
},
[],
);
const updateEdgeBeads = useCallback(
(executionData: NodeExecutionResult) => {
setEdges((edges) => {
return edges.map((e) => {
const edge = { ...e, data: { ...e.data } } as CustomEdge;
if (executionData.status === "COMPLETED") {
// Produce output beads
for (let key in executionData.output_data) {
if (
edge.source !== getFrontendId(executionData.node_id, nodes) ||
edge.sourceHandle !== key
) {
continue;
}
edge.data!.beadUp = (edge.data!.beadUp ?? 0) + 1;
// For static edges beadDown is always one less than beadUp
// Because there's no queueing and one bead is always at the connection point
if (edge.data?.isStatic) {
edge.data!.beadDown = (edge.data!.beadUp ?? 0) - 1;
edge.data!.beadData = edge.data!.beadData!.slice(0, -1);
continue;
}
//todo kcze this assumes output at key is always array with one element
edge.data!.beadData = [
executionData.output_data[key][0],
...edge.data!.beadData!,
];
}
} else if (executionData.status === "RUNNING") {
// Consume input beads
for (let key in executionData.input_data) {
if (
edge.target !== getFrontendId(executionData.node_id, nodes) ||
edge.targetHandle !== key
) {
continue;
}
// Skip decreasing bead count if edge doesn't match or if it's static
if (
edge.data!.beadData![edge.data!.beadData!.length - 1] !==
executionData.input_data[key] ||
edge.data?.isStatic
) {
continue;
}
edge.data!.beadDown = (edge.data!.beadDown ?? 0) + 1;
edge.data!.beadData = edge.data!.beadData!.slice(0, -1);
}
}
return edge;
});
});
},
[getFrontendId, nodes],
);
const updateNodesWithExecutionData = useCallback(
(executionData: NodeExecutionResult) => {
if (passDataToBeads) {
updateEdgeBeads(executionData);
}
setNodes((nodes) => {
const nodeId = nodes.find(
(node) => node.data.backend_id === executionData.node_id,
)?.id;
if (!nodeId) {
console.error(
"Node not found for execution data:",
executionData,
"This shouldn't happen and means that the frontend and backend are out of sync.",
);
return nodes;
}
return nodes.map((node) =>
node.id === nodeId
? {
...node,
data: {
...node.data,
status: executionData.status,
executionResults:
Object.keys(executionData.output_data).length > 0
? [
...(node.data.executionResults || []),
{
execId: executionData.node_exec_id,
data: executionData.output_data,
},
]
: node.data.executionResults,
isOutputOpen: true,
},
}
: node,
);
});
},
[passDataToBeads, updateEdgeBeads],
);
useEffect(() => {
if (!flowID || availableNodes.length == 0) return;
(template ? api.getTemplate(flowID) : api.getGraph(flowID)).then((graph) =>
loadGraph(graph),
(template ? api.getTemplate(flowID) : api.getGraph(flowID)).then(
(graph) => {
console.log("Loading graph");
loadGraph(graph);
},
);
}, [flowID, template, availableNodes]);
}, [flowID, template, availableNodes, api, loadGraph]);
// Update nodes with execution data
useEffect(() => {
@@ -119,7 +332,68 @@ export default function useAgentGraph(
});
return [];
});
}, [updateQueue, nodesSyncedWithSavedAgent]);
}, [updateQueue, nodesSyncedWithSavedAgent, updateNodesWithExecutionData]);
const validateNodes = useCallback((): boolean => {
let isValid = true;
nodes.forEach((node) => {
const validate = ajv.compile(node.data.inputSchema);
const errors = {} as { [key: string]: string };
// Validate values against schema using AJV
const valid = validate(node.data.hardcodedValues);
if (!valid) {
// Populate errors if validation fails
validate.errors?.forEach((error) => {
// Skip error if there's an edge connected
const path =
"dataPath" in error
? (error.dataPath as string)
: error.instancePath;
const handle = path.split(/[\/.]/)[0];
if (
node.data.connections.some(
(conn) => conn.target === node.id || conn.targetHandle === handle,
)
) {
return;
}
console.warn("Error", error);
isValid = false;
if (path && error.message) {
const key = path.slice(1);
console.log("Error", key, error.message);
setNestedProperty(
errors,
key,
error.message[0].toUpperCase() + error.message.slice(1),
);
} else if (error.keyword === "required") {
const key = error.params.missingProperty;
setNestedProperty(errors, key, "This field is required");
}
});
}
// Set errors
setNodes((nodes) => {
return nodes.map((n) => {
if (n.id === node.id) {
return {
...n,
data: {
...n.data,
errors,
},
};
}
return n;
});
});
});
return isValid;
}, [nodes]);
// Handle user requests
useEffect(() => {
@@ -224,7 +498,13 @@ export default function useAgentGraph(
.stopGraphExecution(savedAgent.id, saveRunRequest.activeExecutionID)
.then(() => setSaveRunRequest({ request: "none", state: "none" }));
}
}, [saveRunRequest, savedAgent, nodesSyncedWithSavedAgent]);
}, [
api,
saveRunRequest,
savedAgent,
nodesSyncedWithSavedAgent,
validateNodes,
]);
// Check if node ids are synced with saved agent
useEffect(() => {
@@ -241,275 +521,6 @@ export default function useAgentGraph(
setNodesSyncedWithSavedAgent(oneNodeSynced);
}, [savedAgent, nodes]);
const validateNodes = useCallback((): boolean => {
let isValid = true;
nodes.forEach((node) => {
const validate = ajv.compile(node.data.inputSchema);
const errors = {} as { [key: string]: string };
// Validate values against schema using AJV
const valid = validate(node.data.hardcodedValues);
if (!valid) {
// Populate errors if validation fails
validate.errors?.forEach((error) => {
// Skip error if there's an edge connected
const path =
"dataPath" in error
? (error.dataPath as string)
: error.instancePath;
const handle = path.split(/[\/.]/)[0];
if (
node.data.connections.some(
(conn) => conn.target === node.id || conn.targetHandle === handle,
)
) {
return;
}
console.warn("Error", error);
isValid = false;
if (path && error.message) {
const key = path.slice(1);
console.log("Error", key, error.message);
setNestedProperty(
errors,
key,
error.message[0].toUpperCase() + error.message.slice(1),
);
} else if (error.keyword === "required") {
const key = error.params.missingProperty;
setNestedProperty(errors, key, "This field is required");
}
});
}
// Set errors
setNodes((nodes) => {
return nodes.map((n) => {
if (n.id === node.id) {
return {
...n,
data: {
...n.data,
errors,
},
};
}
return n;
});
});
});
return isValid;
}, [nodes]);
const getFrontendId = useCallback(
(backendId: string, nodes: CustomNode[]) => {
const node = nodes.find((node) => node.data.backend_id === backendId);
return node?.id;
},
[],
);
const updateEdgeBeads = useCallback(
(executionData: NodeExecutionResult) => {
setEdges((edges) => {
return edges.map((e) => {
const edge = { ...e, data: { ...e.data } } as CustomEdge;
if (executionData.status === "COMPLETED") {
// Produce output beads
for (let key in executionData.output_data) {
if (
edge.source !== getFrontendId(executionData.node_id, nodes) ||
edge.sourceHandle !== key
) {
continue;
}
edge.data!.beadUp = (edge.data!.beadUp ?? 0) + 1;
// For static edges beadDown is always one less than beadUp
// Because there's no queueing and one bead is always at the connection point
if (edge.data?.isStatic) {
edge.data!.beadDown = (edge.data!.beadUp ?? 0) - 1;
edge.data!.beadData = edge.data!.beadData!.slice(0, -1);
continue;
}
//todo kcze this assumes output at key is always array with one element
edge.data!.beadData = [
executionData.output_data[key][0],
...edge.data!.beadData!,
];
}
} else if (executionData.status === "RUNNING") {
// Consume input beads
for (let key in executionData.input_data) {
if (
edge.target !== getFrontendId(executionData.node_id, nodes) ||
edge.targetHandle !== key
) {
continue;
}
// Skip decreasing bead count if edge doesn't match or if it's static
if (
edge.data!.beadData![edge.data!.beadData!.length - 1] !==
executionData.input_data[key] ||
edge.data?.isStatic
) {
continue;
}
edge.data!.beadDown = (edge.data!.beadDown ?? 0) + 1;
edge.data!.beadData = edge.data!.beadData!.slice(0, -1);
}
}
return edge;
});
});
},
[edges],
);
const updateNodesWithExecutionData = useCallback(
(executionData: NodeExecutionResult) => {
if (passDataToBeads) {
updateEdgeBeads(executionData);
}
setNodes((nodes) => {
const nodeId = nodes.find(
(node) => node.data.backend_id === executionData.node_id,
)?.id;
if (!nodeId) {
console.error(
"Node not found for execution data:",
executionData,
"This shouldn't happen and means that the frontend and backend are out of sync.",
);
return nodes;
}
return nodes.map((node) =>
node.id === nodeId
? {
...node,
data: {
...node.data,
status: executionData.status,
executionResults:
Object.keys(executionData.output_data).length > 0
? [
...(node.data.executionResults || []),
{
execId: executionData.node_exec_id,
data: executionData.output_data,
},
]
: node.data.executionResults,
isOutputOpen: true,
},
}
: node,
);
});
},
[nodes],
);
//TODO to utils? repeated in Flow
const formatEdgeID = useCallback((conn: Link | Connection): string => {
if ("sink_id" in conn) {
return `${conn.source_id}_${conn.source_name}_${conn.sink_id}_${conn.sink_name}`;
} else {
return `${conn.source}_${conn.sourceHandle}_${conn.target}_${conn.targetHandle}`;
}
}, []);
const getOutputType = useCallback(
(nodeId: string, handleId: string) => {
const node = nodes.find((n) => n.id === nodeId);
if (!node) return "unknown";
const outputSchema = node.data.outputSchema;
if (!outputSchema) return "unknown";
const outputHandle = outputSchema.properties[handleId];
if (!("type" in outputHandle)) return "unknown";
return outputHandle.type;
},
[nodes],
);
const loadGraph = useCallback(
(graph: Graph) => {
setSavedAgent(graph);
setAgentName(graph.name);
setAgentDescription(graph.description);
setNodes(() => {
const newNodes = graph.nodes.map((node) => {
const block = availableNodes.find(
(block) => block.id === node.block_id,
)!;
const newNode: CustomNode = {
id: node.id,
type: "custom",
position: {
x: node.metadata.position.x,
y: node.metadata.position.y,
},
data: {
block_id: block.id,
blockType: block.name,
categories: block.categories,
description: block.description,
title: `${block.name} ${node.id}`,
inputSchema: block.inputSchema,
outputSchema: block.outputSchema,
hardcodedValues: node.input_default,
connections: graph.links
.filter((l) => [l.source_id, l.sink_id].includes(node.id))
.map((link) => ({
edge_id: formatEdgeID(link),
source: link.source_id,
sourceHandle: link.source_name,
target: link.sink_id,
targetHandle: link.sink_name,
})),
isOutputOpen: false,
},
};
return newNode;
});
setEdges((_) =>
graph.links.map((link) => ({
id: formatEdgeID(link),
type: "custom",
data: {
edgeColor: getTypeColor(
getOutputType(link.source_id, link.source_name!),
),
sourcePos: nodes.find((node) => node.id === link.source_id)
?.position,
isStatic: link.is_static,
beadUp: 0,
beadDown: 0,
beadData: [],
},
markerEnd: {
type: MarkerType.ArrowClosed,
strokeWidth: 2,
color: getTypeColor(
getOutputType(link.source_id, link.source_name!),
),
},
source: link.source_id,
target: link.sink_id,
sourceHandle: link.source_name || undefined,
targetHandle: link.sink_name || undefined,
})),
);
return newNodes;
});
},
[availableNodes],
);
const prepareNodeInputData = useCallback(
(node: CustomNode) => {
console.debug(
@@ -696,7 +707,15 @@ export default function useAgentGraph(
}));
});
},
[nodes, edges, savedAgent],
[
api,
nodes,
edges,
savedAgent,
agentName,
agentDescription,
prepareNodeInputData,
],
);
const requestSave = useCallback(

View File

@@ -84,12 +84,12 @@ export function useBezierPath(
return length;
},
[path],
[getPointForT],
);
const length = useMemo(() => {
return getArcLength(1);
}, [path]);
}, [getArcLength]);
const getBezierDerivative = useCallback(
(t: number) => {
@@ -131,7 +131,7 @@ export function useBezierPath(
return t;
},
[path],
[getArcLength, getBezierDerivative, length],
);
const getPointAtDistance = useCallback(
@@ -143,7 +143,7 @@ export function useBezierPath(
const t = getTForDistance(distance);
return getPointForT(t);
},
[path],
[getTForDistance, getPointForT, length],
);
return {

View File

@@ -8,6 +8,7 @@ import {
AgentWithRank,
FeaturedAgentResponse,
UniqueCategoriesResponse,
AnalyticsEvent,
} from "./types";
export default class MarketplaceAPI {
@@ -193,6 +194,13 @@ export default class MarketplaceAPI {
return this._get("/admin/categories");
}
async makeAnalyticsEvent(event: AnalyticsEvent) {
if (event.event_name === "agent_installed_from_marketplace") {
return this._post("/analytics/agent-installed", event.event_data);
}
throw new Error("Invalid event name");
}
private async _get(path: string) {
return this._request("GET", path);
}

View File

@@ -76,3 +76,34 @@ export type AgentResponse = Agent;
export type UniqueCategoriesResponse = {
unique_categories: string[];
};
export enum InstallationLocation {
LOCAL = "local",
CLOUD = "cloud",
}
export type AgentInstalledFromMarketplaceEventData = {
marketplace_agent_id: string;
installed_agent_id: string;
installation_location: InstallationLocation;
};
export type AgentInstalledFromTemplateEventData = {
template_id: string;
installed_agent_id: string;
installation_location: InstallationLocation;
};
export interface AgentInstalledFromMarketplaceEvent {
event_name: "agent_installed_from_marketplace";
event_data: AgentInstalledFromMarketplaceEventData;
}
export interface AgentInstalledFromTemplateEvent {
event_name: "agent_installed_from_template";
event_data: AgentInstalledFromTemplateEventData;
}
export type AnalyticsEvent =
| AgentInstalledFromMarketplaceEvent
| AgentInstalledFromTemplateEvent;

View File

@@ -32,7 +32,7 @@ class SendWebRequestBlock(Block):
super().__init__(
id="6595ae1f-b924-42cb-9a41-551a0611c4b4",
description="This block makes an HTTP request to the given URL.",
categories={BlockCategory.INPUT},
categories={BlockCategory.OUTPUT},
input_schema=SendWebRequestBlock.Input,
output_schema=SendWebRequestBlock.Output,
)

View File

@@ -46,7 +46,7 @@ class ReadRSSFeedBlock(Block):
id="c6731acb-4105-4zp1-bc9b-03d0036h370g",
input_schema=ReadRSSFeedBlock.Input,
output_schema=ReadRSSFeedBlock.Output,
categories={BlockCategory.OUTPUT},
categories={BlockCategory.INPUT},
test_input={
"rss_url": "https://example.com/rss",
"time_period": 10_000_000,

View File

@@ -59,7 +59,7 @@ class AsyncRedisEventQueue(AsyncEventQueue):
async def put(self, execution_result: ExecutionResult):
if self.connection:
message = json.dumps(execution_result.model_dump(), cls=DateTimeEncoder)
logger.info(f"Put {message}")
logger.info(f"Putting execution result to Redis {message}")
await self.connection.lpush(self.queue_name, message) # type: ignore
async def get(self) -> ExecutionResult | None:
@@ -67,7 +67,7 @@ class AsyncRedisEventQueue(AsyncEventQueue):
message = await self.connection.rpop(self.queue_name) # type: ignore
if message is not None and isinstance(message, (str, bytes, bytearray)):
data = json.loads(message)
logger.info(f"Get {data}")
logger.info(f"Getting execution result from Redis {data}")
return ExecutionResult(**data)
return None

View File

@@ -0,0 +1,11 @@
-- AlterTable
ALTER TABLE "AgentGraph" ADD COLUMN "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
ADD COLUMN "updatedAt" TIMESTAMP(3);
-- AlterTable
ALTER TABLE "AgentGraphExecution" ADD COLUMN "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
ADD COLUMN "updatedAt" TIMESTAMP(3);
-- AlterTable
ALTER TABLE "AgentGraphExecutionSchedule" ADD COLUMN "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
ADD COLUMN "updatedAt" TIMESTAMP(3);

View File

@@ -0,0 +1,3 @@
# Please do not edit this file manually
# It should be added in your version-control system (i.e. Git)
provider = "postgresql"

View File

@@ -31,6 +31,8 @@ model User {
model AgentGraph {
id String @default(uuid())
version Int @default(1)
createdAt DateTime @default(now())
updatedAt DateTime? @updatedAt
name String?
description String?
@@ -114,6 +116,8 @@ model AgentBlock {
// This model describes the execution of an AgentGraph.
model AgentGraphExecution {
id String @id @default(uuid())
createdAt DateTime @default(now())
updatedAt DateTime? @updatedAt
agentGraphId String
agentGraphVersion Int @default(1)
@@ -175,6 +179,8 @@ model AgentNodeExecutionInputOutput {
// This model describes the recurring execution schedule of an Agent.
model AgentGraphExecutionSchedule {
id String @id
createdAt DateTime @default(now())
updatedAt DateTime? @updatedAt
agentGraphId String
agentGraphVersion Int @default(1)

View File

@@ -18,6 +18,7 @@ import market.routes.admin
import market.routes.agents
import market.routes.search
import market.routes.submissions
import market.routes.analytics
dotenv.load_dotenv()
@@ -75,6 +76,9 @@ app.include_router(market.routes.agents.router, tags=["agents"])
app.include_router(market.routes.search.router, tags=["search"])
app.include_router(market.routes.submissions.router, tags=["submissions"])
app.include_router(market.routes.admin.router, prefix="/admin", tags=["admin"])
app.include_router(
market.routes.analytics.router, prefix="/analytics", tags=["analytics"]
)
@app.get("/health")

View File

@@ -634,10 +634,30 @@ FROM (
model=market.model.CategoriesResponse,
)
if not categories:
raise AgentQueryError("No categories found")
return market.model.CategoriesResponse(unique_categories=[])
return categories
except prisma.errors.PrismaError as e:
raise AgentQueryError(f"Database query failed: {str(e)}")
except Exception as e:
# raise AgentQueryError(f"Unexpected error occurred: {str(e)}")
return market.model.CategoriesResponse(unique_categories=[])
async def create_agent_installed_event(
event_data: market.model.AgentInstalledFromMarketplaceEventData,
):
try:
await prisma.models.InstallTracker.prisma().create(
data={
"installedAgentId": event_data.installed_agent_id,
"marketplaceAgentId": event_data.marketplace_agent_id,
"installationLocation": prisma.enums.InstallationLocation(
event_data.installation_location.name
),
}
)
except prisma.errors.PrismaError as e:
raise AgentQueryError(f"Database query failed: {str(e)}")
except Exception as e:
raise AgentQueryError(f"Unexpected error occurred: {str(e)}")

View File

@@ -4,6 +4,35 @@ import typing
import prisma.enums
import pydantic
from enum import Enum
from typing import Literal, Union
class InstallationLocation(str, Enum):
LOCAL = "local"
CLOUD = "cloud"
class AgentInstalledFromMarketplaceEventData(pydantic.BaseModel):
marketplace_agent_id: str
installed_agent_id: str
installation_location: InstallationLocation
class AgentInstalledFromTemplateEventData(pydantic.BaseModel):
template_id: str
installed_agent_id: str
installation_location: InstallationLocation
class AgentInstalledFromMarketplaceEvent(pydantic.BaseModel):
event_name: Literal["agent_installed_from_marketplace"]
event_data: AgentInstalledFromMarketplaceEventData
class AgentInstalledFromTemplateEvent(pydantic.BaseModel):
event_name: Literal["agent_installed_from_template"]
event_data: AgentInstalledFromTemplateEventData
AnalyticsEvent = Union[AgentInstalledFromMarketplaceEvent, AgentInstalledFromTemplateEvent]
class AnalyticsRequest(pydantic.BaseModel):
event: AnalyticsEvent
class AddAgentRequest(pydantic.BaseModel):
graph: dict[str, typing.Any]

View File

@@ -0,0 +1,26 @@
import fastapi
import market.db
import market.model
router = fastapi.APIRouter()
@router.post("/agent-installed")
async def agent_installed_endpoint(
event_data: market.model.AgentInstalledFromMarketplaceEventData,
):
"""
Endpoint to track agent installation events from the marketplace.
Args:
event_data (market.model.AgentInstalledFromMarketplaceEventData): The event data.
"""
try:
await market.db.create_agent_installed_event(event_data)
except market.db.AgentQueryError as e:
raise fastapi.HTTPException(status_code=400, detail=str(e))
except Exception as e:
raise fastapi.HTTPException(
status_code=500, detail=f"An unexpected error occurred: {e}"
)

View File

@@ -0,0 +1,19 @@
-- CreateEnum
CREATE TYPE "InstallationLocation" AS ENUM ('LOCAL', 'CLOUD');
-- CreateTable
CREATE TABLE "InstallTracker" (
"id" UUID NOT NULL DEFAULT gen_random_uuid(),
"marketplaceAgentId" UUID NOT NULL,
"installedAgentId" UUID NOT NULL,
"installationLocation" "InstallationLocation" NOT NULL,
"createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT "InstallTracker_pkey" PRIMARY KEY ("id")
);
-- CreateIndex
CREATE UNIQUE INDEX "InstallTracker_marketplaceAgentId_installedAgentId_key" ON "InstallTracker"("marketplaceAgentId", "installedAgentId");
-- AddForeignKey
ALTER TABLE "InstallTracker" ADD CONSTRAINT "InstallTracker_marketplaceAgentId_fkey" FOREIGN KEY ("marketplaceAgentId") REFERENCES "Agents"("id") ON DELETE RESTRICT ON UPDATE CASCADE;

View File

@@ -40,6 +40,7 @@ model Agents {
graph Json
AnalyticsTracker AnalyticsTracker[]
FeaturedAgent FeaturedAgent?
InstallTracker InstallTracker[]
@@id(name: "graphVersionId", [id, version])
}
@@ -52,6 +53,22 @@ model AnalyticsTracker {
downloads Int
}
enum InstallationLocation {
LOCAL
CLOUD
}
model InstallTracker {
id String @id @default(dbgenerated("gen_random_uuid()")) @db.Uuid
marketplaceAgentId String @db.Uuid
marketplaceAgent Agents @relation(fields: [marketplaceAgentId], references: [id])
installedAgentId String @db.Uuid
installationLocation InstallationLocation
createdAt DateTime @default(now())
@@unique([marketplaceAgentId, installedAgentId])
}
model FeaturedAgent {
id String @id @unique @default(dbgenerated("gen_random_uuid()")) @db.Uuid
agentId String @unique @db.Uuid