From 16e59b26a6f901df54e430d2a3d216eeb3c0a898 Mon Sep 17 00:00:00 2001 From: ranausmanai Date: Mon, 16 Feb 2026 20:49:44 +0500 Subject: [PATCH] Add mesh auto-planning with chat command UX and hardened auth/session behavior --- README.md | 1 + docs/tools/slash-commands.md | 1 + src/auto-reply/commands-registry.data.ts | 9 + src/auto-reply/reply/commands-core.ts | 2 + src/auto-reply/reply/commands-mesh.ts | 346 ++++++++++++++++++ src/auto-reply/reply/commands.test.ts | 148 ++++++++ src/gateway/protocol/index.ts | 5 + src/gateway/protocol/schema/mesh.ts | 14 + .../protocol/schema/protocol-schemas.ts | 2 + src/gateway/server-methods-list.ts | 1 + src/gateway/server-methods.ts | 1 + src/gateway/server-methods/mesh.test.ts | 88 +++++ src/gateway/server-methods/mesh.ts | 212 ++++++++++- .../server-methods/server-methods.test.ts | 44 +++ 14 files changed, 862 insertions(+), 12 deletions(-) create mode 100644 src/auto-reply/reply/commands-mesh.ts diff --git a/README.md b/README.md index 0b95f93ce7..f6bae15fad 100644 --- a/README.md +++ b/README.md @@ -280,6 +280,7 @@ Original prompt: Send these in WhatsApp/Telegram/Slack/Google Chat/Microsoft Teams/WebChat (group commands are owner-only): - `/status` — compact session status (model + tokens, cost when available) +- `/mesh ` — auto-plan + run a multi-step workflow (`/mesh plan|run|status|retry` available) - `/new` or `/reset` — reset the session - `/compact` — compact session context (summary) - `/think ` — off|minimal|low|medium|high|xhigh (GPT-5.2 + Codex models only) diff --git a/docs/tools/slash-commands.md b/docs/tools/slash-commands.md index 081e4933b6..2ce82f897b 100644 --- a/docs/tools/slash-commands.md +++ b/docs/tools/slash-commands.md @@ -73,6 +73,7 @@ Text + native (when enabled): - `/commands` - `/skill [input]` (run a skill by name) - `/status` (show current status; includes provider usage/quota for the current model provider when available) +- `/mesh ` (auto-plan + run a workflow; also `/mesh plan|run|status|retry`, with `/mesh run ` for exact plan replay in the same chat) - `/allowlist` (list/add/remove allowlist entries) - `/approve allow-once|allow-always|deny` (resolve exec approval prompts) - `/context [list|detail|json]` (explain “context”; `detail` shows per-file + per-tool + per-skill + system prompt size) diff --git a/src/auto-reply/commands-registry.data.ts b/src/auto-reply/commands-registry.data.ts index b1d2168121..3a40bb09f6 100644 --- a/src/auto-reply/commands-registry.data.ts +++ b/src/auto-reply/commands-registry.data.ts @@ -172,6 +172,15 @@ function buildChatCommands(): ChatCommandDefinition[] { textAlias: "/status", category: "status", }), + defineChatCommand({ + key: "mesh", + nativeName: "mesh", + description: "Plan and run multi-step workflows.", + textAlias: "/mesh", + category: "tools", + argsParsing: "none", + acceptsArgs: true, + }), defineChatCommand({ key: "allowlist", description: "List/add/remove allowlist entries.", diff --git a/src/auto-reply/reply/commands-core.ts b/src/auto-reply/reply/commands-core.ts index e358670848..e78e515d30 100644 --- a/src/auto-reply/reply/commands-core.ts +++ b/src/auto-reply/reply/commands-core.ts @@ -21,6 +21,7 @@ import { handleStatusCommand, handleWhoamiCommand, } from "./commands-info.js"; +import { handleMeshCommand } from "./commands-mesh.js"; import { handleModelsCommand } from "./commands-models.js"; import { handlePluginCommand } from "./commands-plugin.js"; import { @@ -51,6 +52,7 @@ export async function handleCommands(params: HandleCommandsParams): Promise; +}; +type CachedMeshPlan = { plan: MeshPlanShape; createdAt: number }; + +type ParsedMeshCommand = + | { ok: true; action: "help" } + | { ok: true; action: "run" | "plan"; target: string } + | { ok: true; action: "status"; runId: string } + | { ok: true; action: "retry"; runId: string; stepIds?: string[] } + | { ok: false; message: string } + | null; + +const meshPlanCache = new Map(); +const MAX_CACHED_MESH_PLANS = 200; + +function trimMeshPlanCache() { + if (meshPlanCache.size <= MAX_CACHED_MESH_PLANS) { + return; + } + const oldest = [...meshPlanCache.entries()] + .sort((a, b) => a[1].createdAt - b[1].createdAt) + .slice(0, meshPlanCache.size - MAX_CACHED_MESH_PLANS); + for (const [key] of oldest) { + meshPlanCache.delete(key); + } +} + +function parseMeshCommand(commandBody: string): ParsedMeshCommand { + const trimmed = commandBody.trim(); + if (!/^\/mesh\b/i.test(trimmed)) { + return null; + } + const rest = trimmed.replace(/^\/mesh\b:?/i, "").trim(); + if (!rest || /^help$/i.test(rest)) { + return { ok: true, action: "help" }; + } + + const tokens = rest.split(/\s+/).filter(Boolean); + if (tokens.length === 0) { + return { ok: true, action: "help" }; + } + + const actionCandidate = tokens[0]?.toLowerCase() ?? ""; + const explicitAction = + actionCandidate === "run" || + actionCandidate === "plan" || + actionCandidate === "status" || + actionCandidate === "retry" + ? actionCandidate + : null; + + if (!explicitAction) { + // Shorthand: `/mesh ` => auto plan + run + return { ok: true, action: "run", target: rest }; + } + + const actionArgs = rest.slice(tokens[0]?.length ?? 0).trim(); + if (explicitAction === "plan" || explicitAction === "run") { + if (!actionArgs) { + return { ok: false, message: `Usage: /mesh ${explicitAction} ` }; + } + return { ok: true, action: explicitAction, target: actionArgs }; + } + + if (explicitAction === "status") { + if (!actionArgs) { + return { ok: false, message: "Usage: /mesh status " }; + } + return { ok: true, action: "status", runId: actionArgs.split(/\s+/)[0] }; + } + + // retry + const argsTokens = actionArgs.split(/\s+/).filter(Boolean); + if (argsTokens.length === 0) { + return { ok: false, message: "Usage: /mesh retry [step1,step2,...]" }; + } + const runId = argsTokens[0]; + const stepArg = argsTokens.slice(1).join(" ").trim(); + const stepIds = + stepArg.length > 0 + ? stepArg + .split(",") + .map((entry) => entry.trim()) + .filter(Boolean) + : undefined; + return { ok: true, action: "retry", runId, stepIds }; +} + +function cacheKeyForPlan(params: Parameters[0], planId: string) { + const sender = params.command.senderId ?? "unknown"; + const channel = params.command.channel || "unknown"; + return `${channel}:${sender}:${planId}`; +} + +function putCachedPlan(params: Parameters[0], plan: MeshPlanShape) { + meshPlanCache.set(cacheKeyForPlan(params, plan.planId), { plan, createdAt: Date.now() }); + trimMeshPlanCache(); +} + +function getCachedPlan(params: Parameters[0], planId: string): MeshPlanShape | null { + return meshPlanCache.get(cacheKeyForPlan(params, planId))?.plan ?? null; +} + +function looksLikeMeshPlanId(value: string) { + return /^mesh-plan-[a-z0-9-]+$/i.test(value.trim()); +} + +function resolveMeshCommandBody(params: Parameters[0]) { + return ( + params.ctx.BodyForCommands ?? + params.ctx.CommandBody ?? + params.ctx.RawBody ?? + params.ctx.Body ?? + params.command.commandBodyNormalized + ); +} + +function formatPlanSummary(plan: { + goal: string; + steps: Array<{ id: string; name?: string; prompt: string; dependsOn?: string[] }>; +}) { + const lines = [`🕸️ Mesh Plan`, `Goal: ${plan.goal}`, "", `Steps (${plan.steps.length}):`]; + for (const step of plan.steps) { + const dependsOn = Array.isArray(step.dependsOn) && step.dependsOn.length > 0; + const depLine = dependsOn ? ` (depends on: ${step.dependsOn?.join(", ")})` : ""; + lines.push(`- ${step.id}${step.name ? ` — ${step.name}` : ""}${depLine}`); + lines.push(` ${step.prompt}`); + } + return lines.join("\n"); +} + +function formatRunSummary(payload: { + runId: string; + status: string; + stats?: { + total?: number; + succeeded?: number; + failed?: number; + skipped?: number; + running?: number; + pending?: number; + }; +}) { + const stats = payload.stats ?? {}; + return [ + `🕸️ Mesh Run`, + `Run: ${payload.runId}`, + `Status: ${payload.status}`, + `Steps: total=${stats.total ?? 0}, ok=${stats.succeeded ?? 0}, failed=${stats.failed ?? 0}, skipped=${stats.skipped ?? 0}, running=${stats.running ?? 0}, pending=${stats.pending ?? 0}`, + ].join("\n"); +} + +function meshUsageText() { + return [ + "🕸️ Mesh command", + "Usage:", + "- /mesh (auto plan + run)", + "- /mesh plan ", + "- /mesh run ", + "- /mesh status ", + "- /mesh retry [step1,step2,...]", + ].join("\n"); +} + +function resolveMeshClientLabel(params: Parameters[0]) { + const channel = params.command.channel; + const sender = params.command.senderId ?? "unknown"; + return `Chat mesh (${channel}:${sender})`; +} + +export const handleMeshCommand: CommandHandler = async (params, allowTextCommands) => { + if (!allowTextCommands) { + return null; + } + const parsed = parseMeshCommand(resolveMeshCommandBody(params)); + if (!parsed) { + return null; + } + if (!params.command.isAuthorizedSender) { + logVerbose(`Ignoring /mesh from unauthorized sender: ${params.command.senderId || ""}`); + return { shouldContinue: false }; + } + if (!parsed.ok) { + return { shouldContinue: false, reply: { text: parsed.message } }; + } + if (parsed.action === "help") { + return { shouldContinue: false, reply: { text: meshUsageText() } }; + } + + const clientDisplayName = resolveMeshClientLabel(params); + const commonGateway = { + clientName: GATEWAY_CLIENT_NAMES.GATEWAY_CLIENT, + clientDisplayName, + mode: GATEWAY_CLIENT_MODES.BACKEND, + } as const; + + try { + if (parsed.action === "plan") { + const planResp = await callGateway<{ + plan: MeshPlanShape; + order?: string[]; + source?: string; + }>({ + method: "mesh.plan.auto", + params: { + goal: parsed.target, + agentId: params.agentId ?? "main", + }, + ...commonGateway, + }); + putCachedPlan(params, planResp.plan); + const sourceLine = planResp.source ? `\nPlanner source: ${planResp.source}` : ""; + return { + shouldContinue: false, + reply: { + text: `${formatPlanSummary(planResp.plan)}${sourceLine}\n\nRun exact plan: /mesh run ${planResp.plan.planId}`, + }, + }; + } + + if (parsed.action === "run") { + let runPlan: MeshPlanShape; + if (looksLikeMeshPlanId(parsed.target)) { + const cached = getCachedPlan(params, parsed.target.trim()); + if (!cached) { + return { + shouldContinue: false, + reply: { + text: `Plan ${parsed.target.trim()} not found in this chat.\nCreate one first: /mesh plan `, + }, + }; + } + runPlan = cached; + } else { + const planResp = await callGateway<{ + plan: MeshPlanShape; + order?: string[]; + source?: string; + }>({ + method: "mesh.plan.auto", + params: { + goal: parsed.target, + agentId: params.agentId ?? "main", + }, + ...commonGateway, + }); + putCachedPlan(params, planResp.plan); + runPlan = planResp.plan; + } + + const runResp = await callGateway<{ + runId: string; + status: string; + stats?: { + total?: number; + succeeded?: number; + failed?: number; + skipped?: number; + running?: number; + pending?: number; + }; + }>({ + method: "mesh.run", + params: { + plan: runPlan, + }, + ...commonGateway, + }); + + return { + shouldContinue: false, + reply: { + text: `${formatPlanSummary(runPlan)}\n\n${formatRunSummary(runResp)}`, + }, + }; + } + + if (parsed.action === "status") { + const statusResp = await callGateway<{ + runId: string; + status: string; + stats?: { + total?: number; + succeeded?: number; + failed?: number; + skipped?: number; + running?: number; + pending?: number; + }; + }>({ + method: "mesh.status", + params: { runId: parsed.runId }, + ...commonGateway, + }); + return { + shouldContinue: false, + reply: { text: formatRunSummary(statusResp) }, + }; + } + + if (parsed.action === "retry") { + const retryResp = await callGateway<{ + runId: string; + status: string; + stats?: { + total?: number; + succeeded?: number; + failed?: number; + skipped?: number; + running?: number; + pending?: number; + }; + }>({ + method: "mesh.retry", + params: { + runId: parsed.runId, + ...(parsed.stepIds && parsed.stepIds.length > 0 ? { stepIds: parsed.stepIds } : {}), + }, + ...commonGateway, + }); + return { + shouldContinue: false, + reply: { text: `🔁 Retry submitted\n${formatRunSummary(retryResp)}` }, + }; + } + + return null; + } catch (err) { + const message = err instanceof Error ? err.message : String(err); + return { + shouldContinue: false, + reply: { + text: `❌ Mesh command failed: ${message}`, + }, + }; + } +}; diff --git a/src/auto-reply/reply/commands.test.ts b/src/auto-reply/reply/commands.test.ts index 1351296a2a..f6a9229559 100644 --- a/src/auto-reply/reply/commands.test.ts +++ b/src/auto-reply/reply/commands.test.ts @@ -287,6 +287,154 @@ describe("/approve command", () => { }); }); +describe("/mesh command", () => { + beforeEach(() => { + vi.clearAllMocks(); + callGatewayMock.mockReset(); + }); + + it("shows usage for bare /mesh", async () => { + const cfg = { + commands: { text: true }, + channels: { whatsapp: { allowFrom: ["*"] } }, + } as OpenClawConfig; + const params = buildParams("/mesh", cfg); + const result = await handleCommands(params); + expect(result.shouldContinue).toBe(false); + expect(result.reply?.text).toContain("Mesh command"); + expect(result.reply?.text).toContain("/mesh run "); + expect(callGatewayMock).not.toHaveBeenCalled(); + }); + + it("runs auto plan + run for /mesh ", async () => { + const cfg = { + commands: { text: true }, + channels: { whatsapp: { allowFrom: ["*"] } }, + } as OpenClawConfig; + const params = buildParams("/mesh build a landing animation", cfg); + + callGatewayMock + .mockResolvedValueOnce({ + plan: { + planId: "mesh-plan-1", + goal: "build a landing animation", + createdAt: Date.now(), + steps: [ + { id: "design", prompt: "Design animation" }, + { id: "mobile-test", prompt: "Test mobile", dependsOn: ["design"] }, + ], + }, + order: ["design", "mobile-test"], + source: "llm", + }) + .mockResolvedValueOnce({ + runId: "mesh-run-1", + status: "completed", + stats: { total: 2, succeeded: 2, failed: 0, skipped: 0, running: 0, pending: 0 }, + }); + + const result = await handleCommands(params); + expect(result.shouldContinue).toBe(false); + expect(result.reply?.text).toContain("Mesh Plan"); + expect(result.reply?.text).toContain("Mesh Run"); + expect(callGatewayMock).toHaveBeenNthCalledWith( + 1, + expect.objectContaining({ + method: "mesh.plan.auto", + params: expect.objectContaining({ + goal: "build a landing animation", + }), + }), + ); + expect(callGatewayMock).toHaveBeenNthCalledWith( + 2, + expect.objectContaining({ + method: "mesh.run", + }), + ); + }); + + it("returns status via /mesh status ", async () => { + const cfg = { + commands: { text: true }, + channels: { whatsapp: { allowFrom: ["*"] } }, + } as OpenClawConfig; + const params = buildParams("/mesh status mesh-run-77", cfg); + + callGatewayMock.mockResolvedValueOnce({ + runId: "mesh-run-77", + status: "failed", + stats: { total: 3, succeeded: 1, failed: 1, skipped: 1, running: 0, pending: 0 }, + }); + + const result = await handleCommands(params); + expect(result.shouldContinue).toBe(false); + expect(result.reply?.text).toContain("Run: mesh-run-77"); + expect(result.reply?.text).toContain("Status: failed"); + expect(callGatewayMock).toHaveBeenCalledWith( + expect.objectContaining({ + method: "mesh.status", + params: { runId: "mesh-run-77" }, + }), + ); + }); + + it("runs a previously planned mesh plan id without re-planning", async () => { + const cfg = { + commands: { text: true }, + channels: { whatsapp: { allowFrom: ["*"] } }, + } as OpenClawConfig; + const planParams = buildParams("/mesh plan Build Hero Animation", cfg); + + callGatewayMock.mockResolvedValueOnce({ + plan: { + planId: "mesh-plan-abc", + goal: "Build Hero Animation", + createdAt: Date.now(), + steps: [{ id: "design", prompt: "Design hero animation" }], + }, + order: ["design"], + source: "llm", + }); + + const planResult = await handleCommands(planParams); + expect(planResult.shouldContinue).toBe(false); + expect(planResult.reply?.text).toContain("Run exact plan: /mesh run mesh-plan-abc"); + expect(callGatewayMock).toHaveBeenCalledTimes(1); + expect(callGatewayMock).toHaveBeenCalledWith( + expect.objectContaining({ + method: "mesh.plan.auto", + params: expect.objectContaining({ + goal: "Build Hero Animation", + }), + }), + ); + + callGatewayMock.mockReset(); + callGatewayMock.mockResolvedValueOnce({ + runId: "mesh-run-abc", + status: "completed", + stats: { total: 1, succeeded: 1, failed: 0, skipped: 0, running: 0, pending: 0 }, + }); + + const runParams = buildParams("/mesh run mesh-plan-abc", cfg); + const runResult = await handleCommands(runParams); + expect(runResult.shouldContinue).toBe(false); + expect(callGatewayMock).toHaveBeenCalledTimes(1); + expect(callGatewayMock).toHaveBeenCalledWith( + expect.objectContaining({ + method: "mesh.run", + params: expect.objectContaining({ + plan: expect.objectContaining({ + planId: "mesh-plan-abc", + goal: "Build Hero Animation", + }), + }), + }), + ); + }); +}); + describe("/compact command", () => { beforeEach(() => { vi.clearAllMocks(); diff --git a/src/gateway/protocol/index.ts b/src/gateway/protocol/index.ts index 9282e679c5..17974532e3 100644 --- a/src/gateway/protocol/index.ts +++ b/src/gateway/protocol/index.ts @@ -130,6 +130,8 @@ import { LogsTailResultSchema, type MeshPlanParams, MeshPlanParamsSchema, + type MeshPlanAutoParams, + MeshPlanAutoParamsSchema, type MeshRetryParams, MeshRetryParamsSchema, type MeshRunParams, @@ -369,6 +371,7 @@ export const validateExecApprovalsNodeSetParams = ajv.compile(LogsTailParamsSchema); export const validateMeshPlanParams = ajv.compile(MeshPlanParamsSchema); +export const validateMeshPlanAutoParams = ajv.compile(MeshPlanAutoParamsSchema); export const validateMeshRunParams = ajv.compile(MeshRunParamsSchema); export const validateMeshStatusParams = ajv.compile(MeshStatusParamsSchema); export const validateMeshRetryParams = ajv.compile(MeshRetryParamsSchema); @@ -432,6 +435,7 @@ export { AgentEventSchema, ChatEventSchema, MeshPlanParamsSchema, + MeshPlanAutoParamsSchema, MeshWorkflowPlanSchema, MeshRunParamsSchema, MeshStatusParamsSchema, @@ -536,6 +540,7 @@ export type { AgentWaitParams, ChatEvent, MeshPlanParams, + MeshPlanAutoParams, MeshWorkflowPlan, MeshRunParams, MeshStatusParams, diff --git a/src/gateway/protocol/schema/mesh.ts b/src/gateway/protocol/schema/mesh.ts index 1c296eb6ed..7d27421bc4 100644 --- a/src/gateway/protocol/schema/mesh.ts +++ b/src/gateway/protocol/schema/mesh.ts @@ -61,6 +61,19 @@ export const MeshRunParamsSchema = Type.Object( { additionalProperties: false }, ); +export const MeshPlanAutoParamsSchema = Type.Object( + { + goal: NonEmptyString, + maxSteps: Type.Optional(Type.Integer({ minimum: 1, maximum: 16 })), + agentId: Type.Optional(NonEmptyString), + sessionKey: Type.Optional(NonEmptyString), + thinking: Type.Optional(Type.String()), + timeoutMs: Type.Optional(Type.Integer({ minimum: 1_000, maximum: 3_600_000 })), + lane: Type.Optional(Type.String()), + }, + { additionalProperties: false }, +); + export const MeshStatusParamsSchema = Type.Object( { runId: NonEmptyString, @@ -79,5 +92,6 @@ export const MeshRetryParamsSchema = Type.Object( export type MeshPlanParams = Static; export type MeshWorkflowPlan = Static; export type MeshRunParams = Static; +export type MeshPlanAutoParams = Static; export type MeshStatusParams = Static; export type MeshRetryParams = Static; diff --git a/src/gateway/protocol/schema/protocol-schemas.ts b/src/gateway/protocol/schema/protocol-schemas.ts index 23a8ecf358..f734c17369 100644 --- a/src/gateway/protocol/schema/protocol-schemas.ts +++ b/src/gateway/protocol/schema/protocol-schemas.ts @@ -104,6 +104,7 @@ import { LogsTailResultSchema, } from "./logs-chat.js"; import { + MeshPlanAutoParamsSchema, MeshPlanParamsSchema, MeshRetryParamsSchema, MeshRunParamsSchema, @@ -262,6 +263,7 @@ export const ProtocolSchemas: Record = { ChatInjectParams: ChatInjectParamsSchema, ChatEvent: ChatEventSchema, MeshPlanParams: MeshPlanParamsSchema, + MeshPlanAutoParams: MeshPlanAutoParamsSchema, MeshWorkflowPlan: MeshWorkflowPlanSchema, MeshRunParams: MeshRunParamsSchema, MeshStatusParams: MeshStatusParamsSchema, diff --git a/src/gateway/server-methods-list.ts b/src/gateway/server-methods-list.ts index eb571a06f6..9379f249f0 100644 --- a/src/gateway/server-methods-list.ts +++ b/src/gateway/server-methods-list.ts @@ -86,6 +86,7 @@ const BASE_METHODS = [ "agent.identity.get", "agent.wait", "mesh.plan", + "mesh.plan.auto", "mesh.run", "mesh.status", "mesh.retry", diff --git a/src/gateway/server-methods.ts b/src/gateway/server-methods.ts index 0794a9ff9e..5ae929e79c 100644 --- a/src/gateway/server-methods.ts +++ b/src/gateway/server-methods.ts @@ -97,6 +97,7 @@ const WRITE_METHODS = new Set([ "chat.send", "chat.abort", "browser.request", + "mesh.plan.auto", "mesh.run", "mesh.retry", ]); diff --git a/src/gateway/server-methods/mesh.test.ts b/src/gateway/server-methods/mesh.test.ts index 7441455e7e..9228b998d9 100644 --- a/src/gateway/server-methods/mesh.test.ts +++ b/src/gateway/server-methods/mesh.test.ts @@ -5,6 +5,7 @@ import { __resetMeshRunsForTest, meshHandlers } from "./mesh.js"; const mocks = vi.hoisted(() => ({ agent: vi.fn(), agentWait: vi.fn(), + agentCommand: vi.fn(), })); vi.mock("./agent.js", () => ({ @@ -14,6 +15,10 @@ vi.mock("./agent.js", () => ({ }, })); +vi.mock("../../commands/agent.js", () => ({ + agentCommand: (...args: unknown[]) => mocks.agentCommand(...args), +})); + const makeContext = (): GatewayRequestContext => ({ dedupe: new Map(), @@ -38,6 +43,7 @@ afterEach(() => { __resetMeshRunsForTest(); mocks.agent.mockReset(); mocks.agentWait.mockReset(); + mocks.agentCommand.mockReset(); }); describe("mesh handlers", () => { @@ -135,4 +141,86 @@ describe("mesh handlers", () => { const statusPayload = statusRes.payload as { status: string }; expect(statusPayload.status).toBe("completed"); }); + + it("auto planner creates multiple steps from llm json output", async () => { + mocks.agentCommand.mockResolvedValue({ + payloads: [ + { + text: JSON.stringify({ + steps: [ + { id: "analyze", prompt: "Analyze requirements" }, + { id: "build", prompt: "Build implementation", dependsOn: ["analyze"] }, + ], + }), + }, + ], + meta: {}, + }); + + const res = await callMesh("mesh.plan.auto", { + goal: "Create dashboard with auth", + maxSteps: 4, + }); + expect(res.ok).toBe(true); + const payload = res.payload as { + source: string; + plan: { steps: Array<{ id: string }> }; + order: string[]; + }; + expect(payload.source).toBe("llm"); + expect(payload.plan.steps.map((s) => s.id)).toEqual(["analyze", "build"]); + expect(payload.order).toEqual(["analyze", "build"]); + expect(mocks.agentCommand).toHaveBeenCalledWith( + expect.objectContaining({ + agentId: "main", + sessionKey: "agent:main:mesh-planner", + }), + expect.any(Object), + undefined, + ); + }); + + it("auto planner falls back to single-step plan when llm output is invalid", async () => { + mocks.agentCommand.mockResolvedValue({ + payloads: [{ text: "not valid json" }], + meta: {}, + }); + const res = await callMesh("mesh.plan.auto", { + goal: "Do a thing", + }); + expect(res.ok).toBe(true); + const payload = res.payload as { + source: string; + plan: { steps: Array<{ id: string; prompt: string }> }; + }; + expect(payload.source).toBe("fallback"); + expect(payload.plan.steps).toHaveLength(1); + expect(payload.plan.steps[0]?.prompt).toBe("Do a thing"); + }); + + it("auto planner respects caller-provided planner session key", async () => { + mocks.agentCommand.mockResolvedValue({ + payloads: [ + { + text: JSON.stringify({ + steps: [{ id: "one", prompt: "One" }], + }), + }, + ], + meta: {}, + }); + + const res = await callMesh("mesh.plan.auto", { + goal: "Do a thing", + sessionKey: "agent:main:custom-planner", + }); + expect(res.ok).toBe(true); + expect(mocks.agentCommand).toHaveBeenCalledWith( + expect.objectContaining({ + sessionKey: "agent:main:custom-planner", + }), + expect.any(Object), + undefined, + ); + }); }); diff --git a/src/gateway/server-methods/mesh.ts b/src/gateway/server-methods/mesh.ts index 37587ef852..7de58cdea4 100644 --- a/src/gateway/server-methods/mesh.ts +++ b/src/gateway/server-methods/mesh.ts @@ -1,13 +1,18 @@ import { randomUUID } from "node:crypto"; +import { agentCommand } from "../../commands/agent.js"; +import { normalizeAgentId } from "../../routing/session-key.js"; +import { defaultRuntime } from "../../runtime.js"; import type { GatewayRequestHandlerOptions, GatewayRequestHandlers, RespondFn } from "./types.js"; import { ErrorCodes, errorShape, formatValidationErrors, + validateMeshPlanAutoParams, validateMeshPlanParams, validateMeshRetryParams, validateMeshRunParams, validateMeshStatusParams, + type MeshPlanAutoParams, type MeshRunParams, type MeshWorkflowPlan, } from "../protocol/index.js"; @@ -48,8 +53,25 @@ type MeshRunRecord = { history: Array<{ ts: number; type: string; stepId?: string; data?: Record }>; }; +type MeshAutoStep = { + id?: string; + name?: string; + prompt: string; + dependsOn?: string[]; + agentId?: string; + sessionKey?: string; + thinking?: string; + timeoutMs?: number; +}; + +type MeshAutoPlanShape = { + steps?: MeshAutoStep[]; +}; + const meshRuns = new Map(); const MAX_KEEP_RUNS = 200; +const AUTO_PLAN_TIMEOUT_MS = 90_000; +const PLANNER_MAIN_KEY = "mesh-planner"; function trimMap() { if (meshRuns.size <= MAX_KEEP_RUNS) { @@ -103,16 +125,7 @@ function normalizePlan(plan: MeshWorkflowPlan): MeshWorkflowPlan { function createPlanFromParams(params: { goal: string; - steps?: Array<{ - id?: string; - name?: string; - prompt: string; - dependsOn?: string[]; - agentId?: string; - sessionKey?: string; - thinking?: string; - timeoutMs?: number; - }>; + steps?: MeshAutoStep[]; }): MeshWorkflowPlan { const now = Date.now(); const goal = params.goal.trim(); @@ -431,6 +444,7 @@ async function runWorkflow(run: MeshRunRecord, opts: GatewayRequestHandlerOption const inFlight = new Set>(); let stopScheduling = false; + while (true) { const failed = Object.values(run.steps).some((step) => step.status === "failed"); if (failed && !run.continueOnError) { @@ -459,6 +473,7 @@ async function runWorkflow(run: MeshRunRecord, opts: GatewayRequestHandlerOption if (pending.length === 0) { break; } + for (const step of pending) { step.status = "skipped"; step.endedAt = Date.now(); @@ -547,6 +562,129 @@ function summarizeRun(run: MeshRunRecord) { }; } +function extractTextFromAgentResult(result: unknown): string { + const payloads = (result as { payloads?: Array<{ text?: unknown }> } | undefined)?.payloads; + if (!Array.isArray(payloads)) { + return ""; + } + const texts: string[] = []; + for (const payload of payloads) { + if (typeof payload?.text === "string" && payload.text.trim()) { + texts.push(payload.text.trim()); + } + } + return texts.join("\n\n"); +} + +function parseJsonObjectFromText(text: string): Record | null { + const trimmed = text.trim(); + if (!trimmed) { + return null; + } + try { + const parsed = JSON.parse(trimmed); + return parsed && typeof parsed === "object" && !Array.isArray(parsed) + ? (parsed as Record) + : null; + } catch { + // keep trying + } + + const fenceMatch = trimmed.match(/```(?:json)?\s*([\s\S]*?)\s*```/i); + if (fenceMatch?.[1]) { + try { + const parsed = JSON.parse(fenceMatch[1]); + return parsed && typeof parsed === "object" && !Array.isArray(parsed) + ? (parsed as Record) + : null; + } catch { + // keep trying + } + } + + const start = trimmed.indexOf("{"); + const end = trimmed.lastIndexOf("}"); + if (start >= 0 && end > start) { + const candidate = trimmed.slice(start, end + 1); + try { + const parsed = JSON.parse(candidate); + return parsed && typeof parsed === "object" && !Array.isArray(parsed) + ? (parsed as Record) + : null; + } catch { + return null; + } + } + + return null; +} + +function buildAutoPlannerPrompt(params: { goal: string; maxSteps: number }) { + return [ + "You are a workflow planner. Convert the user's goal into executable workflow steps.", + "Return STRICT JSON only, no markdown, no prose.", + 'JSON schema: {"steps": [{"id": string, "name"?: string, "prompt": string, "dependsOn"?: string[]}]}', + "Rules:", + `- Use 2 to ${params.maxSteps} steps.`, + "- Keep ids short, lowercase, kebab-case.", + "- dependsOn must reference earlier step ids when needed.", + "- prompts must be concrete and executable by an AI coding assistant.", + "- Do not include extra fields.", + `Goal: ${params.goal}`, + ].join("\n"); +} + +async function generateAutoPlan(params: { + goal: string; + maxSteps: number; + agentId?: string; + sessionKey?: string; + thinking?: string; + timeoutMs?: number; + lane?: string; + opts: GatewayRequestHandlerOptions; +}): Promise<{ plan: MeshWorkflowPlan; source: "llm" | "fallback"; plannerText?: string }> { + const prompt = buildAutoPlannerPrompt({ goal: params.goal, maxSteps: params.maxSteps }); + const timeoutSeconds = Math.ceil((params.timeoutMs ?? AUTO_PLAN_TIMEOUT_MS) / 1000); + const resolvedAgentId = normalizeAgentId(params.agentId ?? "main"); + const plannerSessionKey = params.sessionKey?.trim() || `agent:${resolvedAgentId}:${PLANNER_MAIN_KEY}`; + + try { + const runResult = await agentCommand( + { + message: prompt, + deliver: false, + timeout: String(timeoutSeconds), + agentId: resolvedAgentId, + sessionKey: plannerSessionKey, + ...(params.thinking ? { thinking: params.thinking } : {}), + ...(params.lane ? { lane: params.lane } : {}), + }, + defaultRuntime, + params.opts.context.deps, + ); + + const text = extractTextFromAgentResult(runResult); + const parsed = parseJsonObjectFromText(text) as MeshAutoPlanShape | null; + const rawSteps = Array.isArray(parsed?.steps) ? parsed.steps : []; + if (rawSteps.length > 0) { + const plan = normalizePlan( + createPlanFromParams({ + goal: params.goal, + steps: rawSteps.slice(0, params.maxSteps), + }), + ); + return { plan, source: "llm", plannerText: text }; + } + + const fallbackPlan = normalizePlan(createPlanFromParams({ goal: params.goal })); + return { plan: fallbackPlan, source: "fallback", plannerText: text }; + } catch { + const fallbackPlan = normalizePlan(createPlanFromParams({ goal: params.goal })); + return { plan: fallbackPlan, source: "fallback" }; + } +} + export const meshHandlers: GatewayRequestHandlers = { "mesh.plan": ({ params, respond }) => { if (!validateMeshPlanParams(params)) { @@ -581,6 +719,56 @@ export const meshHandlers: GatewayRequestHandlers = { undefined, ); }, + "mesh.plan.auto": async ({ params, respond, ...rest }) => { + if (!validateMeshPlanAutoParams(params)) { + respond( + false, + undefined, + errorShape( + ErrorCodes.INVALID_REQUEST, + `invalid mesh.plan.auto params: ${formatValidationErrors(validateMeshPlanAutoParams.errors)}`, + ), + ); + return; + } + + const p = params as MeshPlanAutoParams; + const maxSteps = + typeof p.maxSteps === "number" && Number.isFinite(p.maxSteps) + ? Math.max(1, Math.min(16, Math.floor(p.maxSteps))) + : 6; + const auto = await generateAutoPlan({ + goal: p.goal, + maxSteps, + agentId: p.agentId, + sessionKey: p.sessionKey, + thinking: p.thinking, + timeoutMs: p.timeoutMs, + lane: p.lane, + opts: { + ...rest, + params, + respond, + }, + }); + + const graph = validatePlanGraph(auto.plan); + if (!graph.ok) { + respond(false, undefined, errorShape(ErrorCodes.INVALID_REQUEST, graph.error)); + return; + } + + respond( + true, + { + plan: auto.plan, + order: graph.order, + source: auto.source, + plannerText: auto.plannerText, + }, + undefined, + ); + }, "mesh.run": async (opts) => { const { params, respond } = opts; if (!validateMeshRunParams(params)) { @@ -640,7 +828,7 @@ export const meshHandlers: GatewayRequestHandlers = { } const run = meshRuns.get(params.runId.trim()); if (!run) { - respond(false, undefined, errorShape(ErrorCodes.NOT_FOUND, "mesh run not found")); + respond(false, undefined, errorShape(ErrorCodes.INVALID_REQUEST, "mesh run not found")); return; } respond(true, summarizeRun(run), undefined); @@ -661,7 +849,7 @@ export const meshHandlers: GatewayRequestHandlers = { const runId = params.runId.trim(); const run = meshRuns.get(runId); if (!run) { - respond(false, undefined, errorShape(ErrorCodes.NOT_FOUND, "mesh run not found")); + respond(false, undefined, errorShape(ErrorCodes.INVALID_REQUEST, "mesh run not found")); return; } if (run.status === "running") { diff --git a/src/gateway/server-methods/server-methods.test.ts b/src/gateway/server-methods/server-methods.test.ts index e877a5f3b7..5c10a39215 100644 --- a/src/gateway/server-methods/server-methods.test.ts +++ b/src/gateway/server-methods/server-methods.test.ts @@ -500,6 +500,50 @@ describe("gateway healthHandlers.status scope handling", () => { }); }); +describe("gateway mesh.plan.auto scope handling", () => { + it("rejects operator.read clients for mesh.plan.auto", async () => { + const { handleGatewayRequest } = await import("../server-methods.js"); + const respond = vi.fn(); + const handler = vi.fn(); + + await handleGatewayRequest({ + req: { id: "req-mesh-read", type: "req", method: "mesh.plan.auto", params: {} }, + respond, + context: {} as Parameters[0]["context"], + client: { connect: { role: "operator", scopes: ["operator.read"] } }, + isWebchatConnect: () => false, + extraHandlers: { "mesh.plan.auto": handler }, + }); + + expect(handler).not.toHaveBeenCalled(); + expect(respond).toHaveBeenCalledWith( + false, + undefined, + expect.objectContaining({ message: "missing scope: operator.write" }), + ); + }); + + it("allows operator.write clients for mesh.plan.auto", async () => { + const { handleGatewayRequest } = await import("../server-methods.js"); + const respond = vi.fn(); + const handler = vi.fn(({ respond: send }: { respond: (ok: boolean, payload?: unknown) => void }) => + send(true, { ok: true }), + ); + + await handleGatewayRequest({ + req: { id: "req-mesh-write", type: "req", method: "mesh.plan.auto", params: {} }, + respond, + context: {} as Parameters[0]["context"], + client: { connect: { role: "operator", scopes: ["operator.write"] } }, + isWebchatConnect: () => false, + extraHandlers: { "mesh.plan.auto": handler }, + }); + + expect(handler).toHaveBeenCalledOnce(); + expect(respond).toHaveBeenCalledWith(true, { ok: true }); + }); +}); + describe("logs.tail", () => { const logsNoop = () => false;