From cd44a0d01e9fe195dcc6b031b8597957257d3571 Mon Sep 17 00:00:00 2001 From: Onur Date: Mon, 16 Feb 2026 09:32:05 +0800 Subject: [PATCH] fix: codex and similar processes keep dying on pty, solved by refactoring process spawning (#14257) * exec: clean up PTY resources on timeout and exit * cli: harden resume cleanup and watchdog stalled runs * cli: productionize PTY and resume reliability paths * docs: add PTY process supervision architecture plan * docs: rewrite PTY supervision plan as pre-rewrite baseline * docs: switch PTY supervision plan to one-go execution * docs: add one-line root cause to PTY supervision plan * docs: add OS contracts and test matrix to PTY supervision plan * docs: define process-supervisor package placement and scope * docs: tie supervisor plan to existing CI lanes * docs: place PTY supervisor plan under src/process * refactor(process): route exec and cli runs through supervisor * docs(process): refresh PTY supervision plan * wip * fix(process): harden supervisor timeout and PTY termination * fix(process): harden supervisor adapters env and wait handling * ci: avoid failing formal conformance on comment permissions * test(ui): fix cron request mock argument typing * fix(ui): remove leftover conflict marker * fix: supervise PTY processes (#14257) (openclaw#14257) (thanks @onutc) --- .github/workflows/formal-conformance.yml | 1 + CHANGELOG.md | 1 + .../plans/pty-process-supervision.md | 192 +++++++ src/agents/bash-tools.exec-runtime.ts | 483 +++++++----------- .../bash-tools.exec.pty-cleanup.test.ts | 73 +++ ...sh-tools.exec.pty-fallback-failure.test.ts | 40 ++ .../bash-tools.process.supervisor.test.ts | 152 ++++++ src/agents/bash-tools.process.ts | 77 ++- src/agents/bash-tools.shared.ts | 9 - src/agents/cli-backends.test.ts | 36 ++ src/agents/cli-backends.ts | 36 ++ src/agents/cli-runner.e2e.test.ts | 462 ++++++----------- src/agents/cli-runner.ts | 58 ++- src/agents/cli-runner/helpers.ts | 225 +------- src/agents/cli-runner/reliability.ts | 88 ++++ src/agents/cli-watchdog-defaults.ts | 13 + src/config/types.agent-defaults.ts | 28 + src/config/zod-schema.core.ts | 28 + src/process/exec.test.ts | 51 ++ src/process/exec.ts | 62 ++- src/process/kill-tree.ts | 34 ++ src/process/supervisor/adapters/child.test.ts | 116 +++++ src/process/supervisor/adapters/child.ts | 174 +++++++ src/process/supervisor/adapters/pty.test.ts | 161 ++++++ src/process/supervisor/adapters/pty.ts | 197 +++++++ src/process/supervisor/index.ts | 24 + src/process/supervisor/registry.test.ts | 83 +++ src/process/supervisor/registry.ts | 154 ++++++ .../supervisor/supervisor.pty-command.test.ts | 76 +++ src/process/supervisor/supervisor.test.ts | 102 ++++ src/process/supervisor/supervisor.ts | 282 ++++++++++ src/process/supervisor/types.ts | 96 ++++ 32 files changed, 2759 insertions(+), 855 deletions(-) create mode 100644 docs/experiments/plans/pty-process-supervision.md create mode 100644 src/agents/bash-tools.exec.pty-cleanup.test.ts create mode 100644 src/agents/bash-tools.exec.pty-fallback-failure.test.ts create mode 100644 src/agents/bash-tools.process.supervisor.test.ts create mode 100644 src/agents/cli-backends.test.ts create mode 100644 src/agents/cli-runner/reliability.ts create mode 100644 src/agents/cli-watchdog-defaults.ts create mode 100644 src/process/kill-tree.ts create mode 100644 src/process/supervisor/adapters/child.test.ts create mode 100644 src/process/supervisor/adapters/child.ts create mode 100644 src/process/supervisor/adapters/pty.test.ts create mode 100644 src/process/supervisor/adapters/pty.ts create mode 100644 src/process/supervisor/index.ts create mode 100644 src/process/supervisor/registry.test.ts create mode 100644 src/process/supervisor/registry.ts create mode 100644 src/process/supervisor/supervisor.pty-command.test.ts create mode 100644 src/process/supervisor/supervisor.test.ts create mode 100644 src/process/supervisor/supervisor.ts create mode 100644 src/process/supervisor/types.ts diff --git a/.github/workflows/formal-conformance.yml b/.github/workflows/formal-conformance.yml index a8ec86bfce..8ba6d7e56b 100644 --- a/.github/workflows/formal-conformance.yml +++ b/.github/workflows/formal-conformance.yml @@ -108,6 +108,7 @@ jobs: - name: Comment on PR (informational) if: steps.drift.outputs.drift == 'true' + continue-on-error: true uses: actions/github-script@v7 with: script: | diff --git a/CHANGELOG.md b/CHANGELOG.md index 741fc77a9a..2b1b741e37 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -108,6 +108,7 @@ Docs: https://docs.openclaw.ai - Tools/Write/Edit: normalize structured text-block arguments for `content`/`oldText`/`newText` before filesystem edits, preventing JSON-like file corruption and false “exact text not found” misses from block-form params. (#16778) Thanks @danielpipernz. - Ollama/Agents: avoid forcing `` tag enforcement for Ollama models, which could suppress all output as `(no output)`. (#16191) Thanks @Glucksberg. - Plugins: suppress false duplicate plugin id warnings when the same extension is discovered via multiple paths (config/workspace/global vs bundled), while still warning on genuine duplicates. (#16222) Thanks @shadril238. +- Agents/Process: supervise PTY/child process lifecycles with explicit ownership, cancellation, timeouts, and deterministic cleanup, preventing Codex/Pi PTY sessions from dying or stalling on resume. (#14257) Thanks @onutc. - Skills: watch `SKILL.md` only when refreshing skills snapshot to avoid file-descriptor exhaustion in large data trees. (#11325) Thanks @household-bard. - Memory/QMD: make `memory status` read-only by skipping QMD boot update/embed side effects for status-only manager checks. - Memory/QMD: keep original QMD failures when builtin fallback initialization fails (for example missing embedding API keys), instead of replacing them with fallback init errors. diff --git a/docs/experiments/plans/pty-process-supervision.md b/docs/experiments/plans/pty-process-supervision.md new file mode 100644 index 0000000000..352850c82f --- /dev/null +++ b/docs/experiments/plans/pty-process-supervision.md @@ -0,0 +1,192 @@ +--- +summary: "Production plan for reliable interactive process supervision (PTY + non-PTY) with explicit ownership, unified lifecycle, and deterministic cleanup" +owner: "openclaw" +status: "in-progress" +last_updated: "2026-02-15" +title: "PTY and Process Supervision Plan" +--- + +# PTY and Process Supervision Plan + +## 1. Problem and goal + +We need one reliable lifecycle for long-running command execution across: + +- `exec` foreground runs +- `exec` background runs +- `process` follow up actions (`poll`, `log`, `send-keys`, `paste`, `submit`, `kill`, `remove`) +- CLI agent runner subprocesses + +The goal is not just to support PTY. The goal is predictable ownership, cancellation, timeout, and cleanup with no unsafe process matching heuristics. + +## 2. Scope and boundaries + +- Keep implementation internal in `src/process/supervisor`. +- Do not create a new package for this. +- Keep current behavior compatibility where practical. +- Do not broaden scope to terminal replay or tmux style session persistence. + +## 3. Implemented in this branch + +### Supervisor baseline already present + +- Supervisor module is in place under `src/process/supervisor/*`. +- Exec runtime and CLI runner are already routed through supervisor spawn and wait. +- Registry finalization is idempotent. + +### This pass completed + +1. Explicit PTY command contract + +- `SpawnInput` is now a discriminated union in `src/process/supervisor/types.ts`. +- PTY runs require `ptyCommand` instead of reusing generic `argv`. +- Supervisor no longer rebuilds PTY command strings from argv joins in `src/process/supervisor/supervisor.ts`. +- Exec runtime now passes `ptyCommand` directly in `src/agents/bash-tools.exec-runtime.ts`. + +2. Process layer type decoupling + +- Supervisor types no longer import `SessionStdin` from agents. +- Process local stdin contract lives in `src/process/supervisor/types.ts` (`ManagedRunStdin`). +- Adapters now depend only on process level types: + - `src/process/supervisor/adapters/child.ts` + - `src/process/supervisor/adapters/pty.ts` + +3. Process tool lifecycle ownership improvement + +- `src/agents/bash-tools.process.ts` now requests cancellation through supervisor first. +- `process kill/remove` now use process-tree fallback termination when supervisor lookup misses. +- `remove` keeps deterministic remove behavior by dropping running session entries immediately after termination is requested. + +4. Single source watchdog defaults + +- Added shared defaults in `src/agents/cli-watchdog-defaults.ts`. +- `src/agents/cli-backends.ts` consumes the shared defaults. +- `src/agents/cli-runner/reliability.ts` consumes the same shared defaults. + +5. Dead helper cleanup + +- Removed unused `killSession` helper path from `src/agents/bash-tools.shared.ts`. + +6. Direct supervisor path tests added + +- Added `src/agents/bash-tools.process.supervisor.test.ts` to cover kill and remove routing through supervisor cancellation. + +7. Reliability gap fixes completed + +- `src/agents/bash-tools.process.ts` now falls back to real OS-level process termination when supervisor lookup misses. +- `src/process/supervisor/adapters/child.ts` now uses process-tree termination semantics for default cancel/timeout kill paths. +- Added shared process-tree utility in `src/process/kill-tree.ts`. + +8. PTY contract edge-case coverage added + +- Added `src/process/supervisor/supervisor.pty-command.test.ts` for verbatim PTY command forwarding and empty-command rejection. +- Added `src/process/supervisor/adapters/child.test.ts` for process-tree kill behavior in child adapter cancellation. + +## 4. Remaining gaps and decisions + +### Reliability status + +The two required reliability gaps for this pass are now closed: + +- `process kill/remove` now has a real OS termination fallback when supervisor lookup misses. +- child cancel/timeout now uses process-tree kill semantics for default kill path. +- Regression tests were added for both behaviors. + +### Durability and startup reconciliation + +Restart behavior is now explicitly defined as in-memory lifecycle only. + +- `reconcileOrphans()` remains a no-op in `src/process/supervisor/supervisor.ts` by design. +- Active runs are not recovered after process restart. +- This boundary is intentional for this implementation pass to avoid partial persistence risks. + +### Maintainability follow-ups + +1. `runExecProcess` in `src/agents/bash-tools.exec-runtime.ts` still handles multiple responsibilities and can be split into focused helpers in a follow-up. + +## 5. Implementation plan + +The implementation pass for required reliability and contract items is complete. + +Completed: + +- `process kill/remove` fallback real termination +- process-tree cancellation for child adapter default kill path +- regression tests for fallback kill and child adapter kill path +- PTY command edge-case tests under explicit `ptyCommand` +- explicit in-memory restart boundary with `reconcileOrphans()` no-op by design + +Optional follow-up: + +- split `runExecProcess` into focused helpers with no behavior drift + +## 6. File map + +### Process supervisor + +- `src/process/supervisor/types.ts` updated with discriminated spawn input and process local stdin contract. +- `src/process/supervisor/supervisor.ts` updated to use explicit `ptyCommand`. +- `src/process/supervisor/adapters/child.ts` and `src/process/supervisor/adapters/pty.ts` decoupled from agent types. +- `src/process/supervisor/registry.ts` idempotent finalize unchanged and retained. + +### Exec and process integration + +- `src/agents/bash-tools.exec-runtime.ts` updated to pass PTY command explicitly and keep fallback path. +- `src/agents/bash-tools.process.ts` updated to cancel via supervisor with real process-tree fallback termination. +- `src/agents/bash-tools.shared.ts` removed direct kill helper path. + +### CLI reliability + +- `src/agents/cli-watchdog-defaults.ts` added as shared baseline. +- `src/agents/cli-backends.ts` and `src/agents/cli-runner/reliability.ts` now consume same defaults. + +## 7. Validation run in this pass + +Unit tests: + +- `pnpm vitest src/process/supervisor/registry.test.ts` +- `pnpm vitest src/process/supervisor/supervisor.test.ts` +- `pnpm vitest src/process/supervisor/supervisor.pty-command.test.ts` +- `pnpm vitest src/process/supervisor/adapters/child.test.ts` +- `pnpm vitest src/agents/cli-backends.test.ts` +- `pnpm vitest src/agents/bash-tools.exec.pty-cleanup.test.ts` +- `pnpm vitest src/agents/bash-tools.process.poll-timeout.test.ts` +- `pnpm vitest src/agents/bash-tools.process.supervisor.test.ts` +- `pnpm vitest src/process/exec.test.ts` + +E2E targets: + +- `pnpm test:e2e src/agents/cli-runner.e2e.test.ts` +- `pnpm test:e2e src/agents/bash-tools.exec.pty-fallback.e2e.test.ts src/agents/bash-tools.exec.background-abort.e2e.test.ts src/agents/bash-tools.process.send-keys.e2e.test.ts` + +Typecheck note: + +- `pnpm tsgo` currently fails in this repo due to a pre-existing UI typing dependency issue (`@vitest/browser-playwright` resolution), unrelated to this process supervision work. + +## 8. Operational guarantees preserved + +- Exec env hardening behavior is unchanged. +- Approval and allowlist flow is unchanged. +- Output sanitization and output caps are unchanged. +- PTY adapter still guarantees wait settlement on forced kill and listener disposal. + +## 9. Definition of done + +1. Supervisor is lifecycle owner for managed runs. +2. PTY spawn uses explicit command contract with no argv reconstruction. +3. Process layer has no type dependency on agent layer for supervisor stdin contracts. +4. Watchdog defaults are single source. +5. Targeted unit and e2e tests remain green. +6. Restart durability boundary is explicitly documented or fully implemented. + +## 10. Summary + +The branch now has a coherent and safer supervision shape: + +- explicit PTY contract +- cleaner process layering +- supervisor driven cancellation path for process operations +- real fallback termination when supervisor lookup misses +- process-tree cancellation for child-run default kill paths +- unified watchdog defaults +- explicit in-memory restart boundary (no orphan reconciliation across restart in this pass) diff --git a/src/agents/bash-tools.exec-runtime.ts b/src/agents/bash-tools.exec-runtime.ts index 2af4e4a7f6..d458df01d1 100644 --- a/src/agents/bash-tools.exec-runtime.ts +++ b/src/agents/bash-tools.exec-runtime.ts @@ -1,17 +1,17 @@ import type { AgentToolResult } from "@mariozechner/pi-agent-core"; -import type { ChildProcessWithoutNullStreams } from "node:child_process"; import { Type } from "@sinclair/typebox"; import path from "node:path"; import type { ExecAsk, ExecHost, ExecSecurity } from "../infra/exec-approvals.js"; -import type { ProcessSession, SessionStdin } from "./bash-process-registry.js"; +import type { ProcessSession } from "./bash-process-registry.js"; import type { ExecToolDetails } from "./bash-tools.exec.js"; import type { BashSandboxConfig } from "./bash-tools.shared.js"; import { requestHeartbeatNow } from "../infra/heartbeat-wake.js"; import { mergePathPrepend } from "../infra/path-prepend.js"; import { enqueueSystemEvent } from "../infra/system-events.js"; export { applyPathPrepend, normalizePathPrepend } from "../infra/path-prepend.js"; +import type { ManagedRun } from "../process/supervisor/index.js"; import { logWarn } from "../logger.js"; -import { formatSpawnError, spawnWithFallback } from "../process/spawn-utils.js"; +import { getProcessSupervisor } from "../process/supervisor/index.js"; import { addSession, appendOutput, @@ -23,7 +23,6 @@ import { buildDockerExecArgs, chunkString, clampWithDefault, - killSession, readEnvInt, } from "./bash-tools.shared.js"; import { buildCursorPositionResponse, stripDsrRequests } from "./pty-dsr.js"; @@ -147,26 +146,6 @@ export const execSchema = Type.Object({ ), }); -type PtyExitEvent = { exitCode: number; signal?: number }; -type PtyListener = (event: T) => void; -type PtyHandle = { - pid: number; - write: (data: string | Buffer) => void; - onData: (listener: PtyListener) => void; - onExit: (listener: PtyListener) => void; -}; -type PtySpawn = ( - file: string, - args: string[] | string, - options: { - name?: string; - cols?: number; - rows?: number; - cwd?: string; - env?: Record; - }, -) => PtyHandle; - export type ExecProcessOutcome = { status: "completed" | "failed"; exitCode: number | null; @@ -319,138 +298,10 @@ export async function runExecProcess(opts: { }): Promise { const startedAt = Date.now(); const sessionId = createSessionSlug(); - let child: ChildProcessWithoutNullStreams | null = null; - let pty: PtyHandle | null = null; - let stdin: SessionStdin | undefined; const execCommand = opts.execCommand ?? opts.command; + const supervisor = getProcessSupervisor(); - const spawnFallbacks = [ - { - label: "no-detach", - options: { detached: false }, - }, - ]; - - const handleSpawnFallback = (err: unknown, fallback: { label: string }) => { - const errText = formatSpawnError(err); - const warning = `Warning: spawn failed (${errText}); retrying with ${fallback.label}.`; - logWarn(`exec: spawn failed (${errText}); retrying with ${fallback.label}.`); - opts.warnings.push(warning); - }; - - const spawnShellChild = async ( - shell: string, - shellArgs: string[], - ): Promise => { - const { child: spawned } = await spawnWithFallback({ - argv: [shell, ...shellArgs, execCommand], - options: { - cwd: opts.workdir, - env: opts.env, - detached: process.platform !== "win32", - stdio: ["pipe", "pipe", "pipe"], - windowsHide: true, - }, - fallbacks: spawnFallbacks, - onFallback: handleSpawnFallback, - }); - return spawned as ChildProcessWithoutNullStreams; - }; - - // `exec` does not currently accept tool-provided stdin content. For non-PTY runs, - // keeping stdin open can cause commands like `wc -l` (or safeBins-hardened segments) - // to block forever waiting for input, leading to accidental backgrounding. - // For interactive flows, callers should use `pty: true` (stdin kept open). - const maybeCloseNonPtyStdin = () => { - if (opts.usePty) { - return; - } - try { - // Signal EOF immediately so stdin-only commands can terminate. - child?.stdin?.end(); - } catch { - // ignore stdin close errors - } - }; - - if (opts.sandbox) { - const { child: spawned } = await spawnWithFallback({ - argv: [ - "docker", - ...buildDockerExecArgs({ - containerName: opts.sandbox.containerName, - command: execCommand, - workdir: opts.containerWorkdir ?? opts.sandbox.containerWorkdir, - env: opts.env, - tty: opts.usePty, - }), - ], - options: { - cwd: opts.workdir, - env: process.env, - detached: process.platform !== "win32", - stdio: ["pipe", "pipe", "pipe"], - windowsHide: true, - }, - fallbacks: spawnFallbacks, - onFallback: handleSpawnFallback, - }); - child = spawned as ChildProcessWithoutNullStreams; - stdin = child.stdin; - maybeCloseNonPtyStdin(); - } else if (opts.usePty) { - const { shell, args: shellArgs } = getShellConfig(); - try { - const ptyModule = (await import("@lydell/node-pty")) as unknown as { - spawn?: PtySpawn; - default?: { spawn?: PtySpawn }; - }; - const spawnPty = ptyModule.spawn ?? ptyModule.default?.spawn; - if (!spawnPty) { - throw new Error("PTY support is unavailable (node-pty spawn not found)."); - } - pty = spawnPty(shell, [...shellArgs, execCommand], { - cwd: opts.workdir, - env: opts.env, - name: process.env.TERM ?? "xterm-256color", - cols: 120, - rows: 30, - }); - stdin = { - destroyed: false, - write: (data, cb) => { - try { - pty?.write(data); - cb?.(null); - } catch (err) { - cb?.(err as Error); - } - }, - end: () => { - try { - const eof = process.platform === "win32" ? "\x1a" : "\x04"; - pty?.write(eof); - } catch { - // ignore EOF errors - } - }, - }; - } catch (err) { - const errText = String(err); - const warning = `Warning: PTY spawn failed (${errText}); retrying without PTY for \`${opts.command}\`.`; - logWarn(`exec: PTY spawn failed (${errText}); retrying without PTY for "${opts.command}".`); - opts.warnings.push(warning); - child = await spawnShellChild(shell, shellArgs); - stdin = child.stdin; - } - } else { - const { shell, args: shellArgs } = getShellConfig(); - child = await spawnShellChild(shell, shellArgs); - stdin = child.stdin; - maybeCloseNonPtyStdin(); - } - - const session = { + const session: ProcessSession = { id: sessionId, command: opts.command, scopeKey: opts.scopeKey, @@ -458,9 +309,9 @@ export async function runExecProcess(opts: { notifyOnExit: opts.notifyOnExit, notifyOnExitEmptySuccess: opts.notifyOnExitEmptySuccess === true, exitNotified: false, - child: child ?? undefined, - stdin, - pid: child?.pid ?? pty?.pid, + child: undefined, + stdin: undefined, + pid: undefined, startedAt, cwd: opts.workdir, maxOutputChars: opts.maxOutput, @@ -477,59 +328,9 @@ export async function runExecProcess(opts: { exitSignal: undefined as NodeJS.Signals | number | null | undefined, truncated: false, backgrounded: false, - } satisfies ProcessSession; + }; addSession(session); - let settled = false; - let timeoutTimer: NodeJS.Timeout | null = null; - let timeoutFinalizeTimer: NodeJS.Timeout | null = null; - let timedOut = false; - const timeoutFinalizeMs = 1000; - let resolveFn: ((outcome: ExecProcessOutcome) => void) | null = null; - - const settle = (outcome: ExecProcessOutcome) => { - if (settled) { - return; - } - settled = true; - resolveFn?.(outcome); - }; - - const finalizeTimeout = () => { - if (session.exited) { - return; - } - markExited(session, null, "SIGKILL", "failed"); - maybeNotifyOnExit(session, "failed"); - const aggregated = session.aggregated.trim(); - const reason = `Command timed out after ${opts.timeoutSec} seconds`; - settle({ - status: "failed", - exitCode: null, - exitSignal: "SIGKILL", - durationMs: Date.now() - startedAt, - aggregated, - timedOut: true, - reason: aggregated ? `${aggregated}\n\n${reason}` : reason, - }); - }; - - const onTimeout = () => { - timedOut = true; - killSession(session); - if (!timeoutFinalizeTimer) { - timeoutFinalizeTimer = setTimeout(() => { - finalizeTimeout(); - }, timeoutFinalizeMs); - } - }; - - if (opts.timeoutSec > 0) { - timeoutTimer = setTimeout(() => { - onTimeout(); - }, opts.timeoutSec * 1000); - } - const emitUpdate = () => { if (!opts.onUpdate) { return; @@ -565,116 +366,208 @@ export async function runExecProcess(opts: { } }; - if (pty) { - const cursorResponse = buildCursorPositionResponse(); - pty.onData((data) => { - const raw = data.toString(); - const { cleaned, requests } = stripDsrRequests(raw); - if (requests > 0) { + const timeoutMs = + typeof opts.timeoutSec === "number" && opts.timeoutSec > 0 + ? Math.floor(opts.timeoutSec * 1000) + : undefined; + + const spawnSpec: + | { + mode: "child"; + argv: string[]; + env: NodeJS.ProcessEnv; + stdinMode: "pipe-open" | "pipe-closed"; + } + | { + mode: "pty"; + ptyCommand: string; + childFallbackArgv: string[]; + env: NodeJS.ProcessEnv; + stdinMode: "pipe-open"; + } = (() => { + if (opts.sandbox) { + return { + mode: "child" as const, + argv: [ + "docker", + ...buildDockerExecArgs({ + containerName: opts.sandbox.containerName, + command: execCommand, + workdir: opts.containerWorkdir ?? opts.sandbox.containerWorkdir, + env: opts.env, + tty: opts.usePty, + }), + ], + env: process.env, + stdinMode: opts.usePty ? ("pipe-open" as const) : ("pipe-closed" as const), + }; + } + const { shell, args: shellArgs } = getShellConfig(); + const childArgv = [shell, ...shellArgs, execCommand]; + if (opts.usePty) { + return { + mode: "pty" as const, + ptyCommand: execCommand, + childFallbackArgv: childArgv, + env: opts.env, + stdinMode: "pipe-open" as const, + }; + } + return { + mode: "child" as const, + argv: childArgv, + env: opts.env, + stdinMode: "pipe-closed" as const, + }; + })(); + + let managedRun: ManagedRun | null = null; + let usingPty = spawnSpec.mode === "pty"; + const cursorResponse = buildCursorPositionResponse(); + + const onSupervisorStdout = (chunk: string) => { + if (usingPty) { + const { cleaned, requests } = stripDsrRequests(chunk); + if (requests > 0 && managedRun?.stdin) { for (let i = 0; i < requests; i += 1) { - pty.write(cursorResponse); + managedRun.stdin.write(cursorResponse); } } handleStdout(cleaned); - }); - } else if (child) { - child.stdout.on("data", handleStdout); - child.stderr.on("data", handleStderr); - } + return; + } + handleStdout(chunk); + }; - const promise = new Promise((resolve) => { - resolveFn = resolve; - const handleExit = (code: number | null, exitSignal: NodeJS.Signals | number | null) => { - if (timeoutTimer) { - clearTimeout(timeoutTimer); - } - if (timeoutFinalizeTimer) { - clearTimeout(timeoutFinalizeTimer); + try { + const spawnBase = { + runId: sessionId, + sessionId: opts.sessionKey?.trim() || sessionId, + backendId: opts.sandbox ? "exec-sandbox" : "exec-host", + scopeKey: opts.scopeKey, + cwd: opts.workdir, + env: spawnSpec.env, + timeoutMs, + captureOutput: false, + onStdout: onSupervisorStdout, + onStderr: handleStderr, + }; + managedRun = + spawnSpec.mode === "pty" + ? await supervisor.spawn({ + ...spawnBase, + mode: "pty", + ptyCommand: spawnSpec.ptyCommand, + }) + : await supervisor.spawn({ + ...spawnBase, + mode: "child", + argv: spawnSpec.argv, + stdinMode: spawnSpec.stdinMode, + }); + } catch (err) { + if (spawnSpec.mode === "pty") { + const warning = `Warning: PTY spawn failed (${String(err)}); retrying without PTY for \`${opts.command}\`.`; + logWarn( + `exec: PTY spawn failed (${String(err)}); retrying without PTY for "${opts.command}".`, + ); + opts.warnings.push(warning); + usingPty = false; + try { + managedRun = await supervisor.spawn({ + runId: sessionId, + sessionId: opts.sessionKey?.trim() || sessionId, + backendId: "exec-host", + scopeKey: opts.scopeKey, + mode: "child", + argv: spawnSpec.childFallbackArgv, + cwd: opts.workdir, + env: spawnSpec.env, + stdinMode: "pipe-open", + timeoutMs, + captureOutput: false, + onStdout: handleStdout, + onStderr: handleStderr, + }); + } catch (retryErr) { + markExited(session, null, null, "failed"); + maybeNotifyOnExit(session, "failed"); + throw retryErr; } + } else { + markExited(session, null, null, "failed"); + maybeNotifyOnExit(session, "failed"); + throw err; + } + } + session.stdin = managedRun.stdin; + session.pid = managedRun.pid; + + const promise = managedRun + .wait() + .then((exit): ExecProcessOutcome => { const durationMs = Date.now() - startedAt; - const wasSignal = exitSignal != null; - const isSuccess = code === 0 && !wasSignal && !timedOut; - const status: "completed" | "failed" = isSuccess ? "completed" : "failed"; - markExited(session, code, exitSignal, status); + const status: "completed" | "failed" = + exit.exitCode === 0 && exit.reason === "exit" ? "completed" : "failed"; + markExited(session, exit.exitCode, exit.exitSignal, status); maybeNotifyOnExit(session, status); if (!session.child && session.stdin) { session.stdin.destroyed = true; } - - if (settled) { - return; - } const aggregated = session.aggregated.trim(); - if (!isSuccess) { - const reason = timedOut - ? `Command timed out after ${opts.timeoutSec} seconds` - : wasSignal && exitSignal - ? `Command aborted by signal ${exitSignal}` - : code === null - ? "Command aborted before exit code was captured" - : `Command exited with code ${code}`; - const message = aggregated ? `${aggregated}\n\n${reason}` : reason; - settle({ - status: "failed", - exitCode: code ?? null, - exitSignal: exitSignal ?? null, + if (status === "completed") { + return { + status: "completed", + exitCode: exit.exitCode ?? 0, + exitSignal: exit.exitSignal, durationMs, aggregated, - timedOut, - reason: message, - }); - return; + timedOut: false, + }; } - settle({ - status: "completed", - exitCode: code ?? 0, - exitSignal: exitSignal ?? null, + const reason = + exit.reason === "overall-timeout" + ? `Command timed out after ${opts.timeoutSec} seconds` + : exit.reason === "no-output-timeout" + ? "Command timed out waiting for output" + : exit.exitSignal != null + ? `Command aborted by signal ${exit.exitSignal}` + : exit.exitCode == null + ? "Command aborted before exit code was captured" + : `Command exited with code ${exit.exitCode}`; + return { + status: "failed", + exitCode: exit.exitCode, + exitSignal: exit.exitSignal, durationMs, aggregated, + timedOut: exit.timedOut, + reason: aggregated ? `${aggregated}\n\n${reason}` : reason, + }; + }) + .catch((err): ExecProcessOutcome => { + markExited(session, null, null, "failed"); + maybeNotifyOnExit(session, "failed"); + const aggregated = session.aggregated.trim(); + const message = aggregated ? `${aggregated}\n\n${String(err)}` : String(err); + return { + status: "failed", + exitCode: null, + exitSignal: null, + durationMs: Date.now() - startedAt, + aggregated, timedOut: false, - }); - }; - - if (pty) { - pty.onExit((event) => { - const rawSignal = event.signal ?? null; - const normalizedSignal = rawSignal === 0 ? null : rawSignal; - handleExit(event.exitCode ?? null, normalizedSignal); - }); - } else if (child) { - child.once("close", (code, exitSignal) => { - handleExit(code, exitSignal); - }); - - child.once("error", (err) => { - if (timeoutTimer) { - clearTimeout(timeoutTimer); - } - if (timeoutFinalizeTimer) { - clearTimeout(timeoutFinalizeTimer); - } - markExited(session, null, null, "failed"); - maybeNotifyOnExit(session, "failed"); - const aggregated = session.aggregated.trim(); - const message = aggregated ? `${aggregated}\n\n${String(err)}` : String(err); - settle({ - status: "failed", - exitCode: null, - exitSignal: null, - durationMs: Date.now() - startedAt, - aggregated, - timedOut, - reason: message, - }); - }); - } - }); + reason: message, + }; + }); return { session, startedAt, pid: session.pid ?? undefined, promise, - kill: () => killSession(session), + kill: () => { + managedRun?.cancel("manual-cancel"); + }, }; } diff --git a/src/agents/bash-tools.exec.pty-cleanup.test.ts b/src/agents/bash-tools.exec.pty-cleanup.test.ts new file mode 100644 index 0000000000..613c8fd035 --- /dev/null +++ b/src/agents/bash-tools.exec.pty-cleanup.test.ts @@ -0,0 +1,73 @@ +import { afterEach, expect, test, vi } from "vitest"; +import { resetProcessRegistryForTests } from "./bash-process-registry"; + +afterEach(() => { + resetProcessRegistryForTests(); + vi.resetModules(); + vi.clearAllMocks(); +}); + +test("exec disposes PTY listeners after normal exit", async () => { + const disposeData = vi.fn(); + const disposeExit = vi.fn(); + + vi.doMock("@lydell/node-pty", () => ({ + spawn: () => { + return { + pid: 0, + write: vi.fn(), + onData: (listener: (value: string) => void) => { + setTimeout(() => listener("ok"), 0); + return { dispose: disposeData }; + }, + onExit: (listener: (event: { exitCode: number; signal?: number }) => void) => { + setTimeout(() => listener({ exitCode: 0 }), 0); + return { dispose: disposeExit }; + }, + kill: vi.fn(), + }; + }, + })); + + const { createExecTool } = await import("./bash-tools.exec"); + const tool = createExecTool({ allowBackground: false }); + const result = await tool.execute("toolcall", { + command: "echo ok", + pty: true, + }); + + expect(result.details.status).toBe("completed"); + expect(disposeData).toHaveBeenCalledTimes(1); + expect(disposeExit).toHaveBeenCalledTimes(1); +}); + +test("exec tears down PTY resources on timeout", async () => { + const disposeData = vi.fn(); + const disposeExit = vi.fn(); + const kill = vi.fn(); + + vi.doMock("@lydell/node-pty", () => ({ + spawn: () => { + return { + pid: 0, + write: vi.fn(), + onData: () => ({ dispose: disposeData }), + onExit: () => ({ dispose: disposeExit }), + kill, + }; + }, + })); + + const { createExecTool } = await import("./bash-tools.exec"); + const tool = createExecTool({ allowBackground: false }); + await expect( + tool.execute("toolcall", { + command: "sleep 5", + pty: true, + timeout: 0.01, + }), + ).rejects.toThrow("Command timed out"); + expect(kill).toHaveBeenCalledTimes(1); + expect(disposeData).toHaveBeenCalledTimes(1); + expect(disposeExit).toHaveBeenCalledTimes(1); +}); diff --git a/src/agents/bash-tools.exec.pty-fallback-failure.test.ts b/src/agents/bash-tools.exec.pty-fallback-failure.test.ts new file mode 100644 index 0000000000..7f3be41738 --- /dev/null +++ b/src/agents/bash-tools.exec.pty-fallback-failure.test.ts @@ -0,0 +1,40 @@ +import { afterEach, expect, test, vi } from "vitest"; +import { listRunningSessions, resetProcessRegistryForTests } from "./bash-process-registry"; + +const { supervisorSpawnMock } = vi.hoisted(() => ({ + supervisorSpawnMock: vi.fn(), +})); + +vi.mock("../process/supervisor/index.js", () => ({ + getProcessSupervisor: () => ({ + spawn: (...args: unknown[]) => supervisorSpawnMock(...args), + cancel: vi.fn(), + cancelScope: vi.fn(), + reconcileOrphans: vi.fn(), + getRecord: vi.fn(), + }), +})); + +afterEach(() => { + resetProcessRegistryForTests(); + vi.resetModules(); + vi.clearAllMocks(); +}); + +test("exec cleans session state when PTY fallback spawn also fails", async () => { + supervisorSpawnMock + .mockRejectedValueOnce(new Error("pty spawn failed")) + .mockRejectedValueOnce(new Error("child fallback failed")); + + const { createExecTool } = await import("./bash-tools.exec"); + const tool = createExecTool({ allowBackground: false }); + + await expect( + tool.execute("toolcall", { + command: "echo ok", + pty: true, + }), + ).rejects.toThrow("child fallback failed"); + + expect(listRunningSessions()).toHaveLength(0); +}); diff --git a/src/agents/bash-tools.process.supervisor.test.ts b/src/agents/bash-tools.process.supervisor.test.ts new file mode 100644 index 0000000000..e6d026595f --- /dev/null +++ b/src/agents/bash-tools.process.supervisor.test.ts @@ -0,0 +1,152 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import type { ProcessSession } from "./bash-process-registry.js"; +import { + addSession, + getFinishedSession, + getSession, + resetProcessRegistryForTests, +} from "./bash-process-registry.js"; +import { createProcessTool } from "./bash-tools.process.js"; + +const { supervisorMock } = vi.hoisted(() => ({ + supervisorMock: { + spawn: vi.fn(), + cancel: vi.fn(), + cancelScope: vi.fn(), + reconcileOrphans: vi.fn(), + getRecord: vi.fn(), + }, +})); + +const { killProcessTreeMock } = vi.hoisted(() => ({ + killProcessTreeMock: vi.fn(), +})); + +vi.mock("../process/supervisor/index.js", () => ({ + getProcessSupervisor: () => supervisorMock, +})); + +vi.mock("../process/kill-tree.js", () => ({ + killProcessTree: (...args: unknown[]) => killProcessTreeMock(...args), +})); + +function createBackgroundSession(id: string, pid?: number): ProcessSession { + return { + id, + command: "sleep 999", + startedAt: Date.now(), + cwd: "/tmp", + maxOutputChars: 10_000, + pendingMaxOutputChars: 30_000, + totalOutputChars: 0, + pendingStdout: [], + pendingStderr: [], + pendingStdoutChars: 0, + pendingStderrChars: 0, + aggregated: "", + tail: "", + pid, + exited: false, + exitCode: undefined, + exitSignal: undefined, + truncated: false, + backgrounded: true, + }; +} + +describe("process tool supervisor cancellation", () => { + beforeEach(() => { + supervisorMock.spawn.mockReset(); + supervisorMock.cancel.mockReset(); + supervisorMock.cancelScope.mockReset(); + supervisorMock.reconcileOrphans.mockReset(); + supervisorMock.getRecord.mockReset(); + killProcessTreeMock.mockReset(); + }); + + afterEach(() => { + resetProcessRegistryForTests(); + }); + + it("routes kill through supervisor when run is managed", async () => { + supervisorMock.getRecord.mockReturnValue({ + runId: "sess", + state: "running", + }); + addSession(createBackgroundSession("sess")); + const processTool = createProcessTool(); + + const result = await processTool.execute("toolcall", { + action: "kill", + sessionId: "sess", + }); + + expect(supervisorMock.cancel).toHaveBeenCalledWith("sess", "manual-cancel"); + expect(getSession("sess")).toBeDefined(); + expect(getSession("sess")?.exited).toBe(false); + expect(result.content[0]).toMatchObject({ + type: "text", + text: "Termination requested for session sess.", + }); + }); + + it("remove drops running session immediately when cancellation is requested", async () => { + supervisorMock.getRecord.mockReturnValue({ + runId: "sess", + state: "running", + }); + addSession(createBackgroundSession("sess")); + const processTool = createProcessTool(); + + const result = await processTool.execute("toolcall", { + action: "remove", + sessionId: "sess", + }); + + expect(supervisorMock.cancel).toHaveBeenCalledWith("sess", "manual-cancel"); + expect(getSession("sess")).toBeUndefined(); + expect(getFinishedSession("sess")).toBeUndefined(); + expect(result.content[0]).toMatchObject({ + type: "text", + text: "Removed session sess (termination requested).", + }); + }); + + it("falls back to process-tree kill when supervisor record is missing", async () => { + supervisorMock.getRecord.mockReturnValue(undefined); + addSession(createBackgroundSession("sess-fallback", 4242)); + const processTool = createProcessTool(); + + const result = await processTool.execute("toolcall", { + action: "kill", + sessionId: "sess-fallback", + }); + + expect(killProcessTreeMock).toHaveBeenCalledWith(4242); + expect(getSession("sess-fallback")).toBeUndefined(); + expect(getFinishedSession("sess-fallback")).toBeDefined(); + expect(result.content[0]).toMatchObject({ + type: "text", + text: "Killed session sess-fallback.", + }); + }); + + it("fails remove when no supervisor record and no pid is available", async () => { + supervisorMock.getRecord.mockReturnValue(undefined); + addSession(createBackgroundSession("sess-no-pid")); + const processTool = createProcessTool(); + + const result = await processTool.execute("toolcall", { + action: "remove", + sessionId: "sess-no-pid", + }); + + expect(killProcessTreeMock).not.toHaveBeenCalled(); + expect(getSession("sess-no-pid")).toBeDefined(); + expect(result.details).toMatchObject({ status: "failed" }); + expect(result.content[0]).toMatchObject({ + type: "text", + text: "Unable to remove session sess-no-pid: no active supervisor run or process id.", + }); + }); +}); diff --git a/src/agents/bash-tools.process.ts b/src/agents/bash-tools.process.ts index 014926b763..38a8ac357a 100644 --- a/src/agents/bash-tools.process.ts +++ b/src/agents/bash-tools.process.ts @@ -1,7 +1,10 @@ import type { AgentTool, AgentToolResult } from "@mariozechner/pi-agent-core"; import { Type } from "@sinclair/typebox"; import { formatDurationCompact } from "../infra/format-time/format-duration.ts"; +import { killProcessTree } from "../process/kill-tree.js"; +import { getProcessSupervisor } from "../process/supervisor/index.js"; import { + type ProcessSession, deleteSession, drainSession, getFinishedSession, @@ -11,13 +14,7 @@ import { markExited, setJobTtlMs, } from "./bash-process-registry.js"; -import { - deriveSessionName, - killSession, - pad, - sliceLogLines, - truncateMiddle, -} from "./bash-tools.shared.js"; +import { deriveSessionName, pad, sliceLogLines, truncateMiddle } from "./bash-tools.shared.js"; import { encodeKeySequence, encodePaste } from "./pty-keys.js"; export type ProcessToolDefaults = { @@ -107,9 +104,28 @@ export function createProcessTool( setJobTtlMs(defaults.cleanupMs); } const scopeKey = defaults?.scopeKey; + const supervisor = getProcessSupervisor(); const isInScope = (session?: { scopeKey?: string } | null) => !scopeKey || session?.scopeKey === scopeKey; + const cancelManagedSession = (sessionId: string) => { + const record = supervisor.getRecord(sessionId); + if (!record || record.state === "exited") { + return false; + } + supervisor.cancel(sessionId, "manual-cancel"); + return true; + }; + + const terminateSessionFallback = (session: ProcessSession) => { + const pid = session.pid ?? session.child?.pid; + if (typeof pid !== "number" || !Number.isFinite(pid) || pid <= 0) { + return false; + } + killProcessTree(pid); + return true; + }; + return { name: "process", label: "process", @@ -523,10 +539,25 @@ export function createProcessTool( if (!scopedSession.backgrounded) { return failText(`Session ${params.sessionId} is not backgrounded.`); } - killSession(scopedSession); - markExited(scopedSession, null, "SIGKILL", "failed"); + const canceled = cancelManagedSession(scopedSession.id); + if (!canceled) { + const terminated = terminateSessionFallback(scopedSession); + if (!terminated) { + return failText( + `Unable to terminate session ${params.sessionId}: no active supervisor run or process id.`, + ); + } + markExited(scopedSession, null, "SIGKILL", "failed"); + } return { - content: [{ type: "text", text: `Killed session ${params.sessionId}.` }], + content: [ + { + type: "text", + text: canceled + ? `Termination requested for session ${params.sessionId}.` + : `Killed session ${params.sessionId}.`, + }, + ], details: { status: "failed", name: scopedSession ? deriveSessionName(scopedSession.command) : undefined, @@ -555,10 +586,30 @@ export function createProcessTool( case "remove": { if (scopedSession) { - killSession(scopedSession); - markExited(scopedSession, null, "SIGKILL", "failed"); + const canceled = cancelManagedSession(scopedSession.id); + if (canceled) { + // Keep remove semantics deterministic: drop from process registry now. + scopedSession.backgrounded = false; + deleteSession(params.sessionId); + } else { + const terminated = terminateSessionFallback(scopedSession); + if (!terminated) { + return failText( + `Unable to remove session ${params.sessionId}: no active supervisor run or process id.`, + ); + } + markExited(scopedSession, null, "SIGKILL", "failed"); + deleteSession(params.sessionId); + } return { - content: [{ type: "text", text: `Removed session ${params.sessionId}.` }], + content: [ + { + type: "text", + text: canceled + ? `Removed session ${params.sessionId} (termination requested).` + : `Removed session ${params.sessionId}.`, + }, + ], details: { status: "failed", name: scopedSession ? deriveSessionName(scopedSession.command) : undefined, diff --git a/src/agents/bash-tools.shared.ts b/src/agents/bash-tools.shared.ts index 99a7a4b792..07b1226600 100644 --- a/src/agents/bash-tools.shared.ts +++ b/src/agents/bash-tools.shared.ts @@ -1,11 +1,9 @@ -import type { ChildProcessWithoutNullStreams } from "node:child_process"; import { existsSync, statSync } from "node:fs"; import fs from "node:fs/promises"; import { homedir } from "node:os"; import path from "node:path"; import { sliceUtf16Safe } from "../utils.js"; import { assertSandboxPath } from "./sandbox-paths.js"; -import { killProcessTree } from "./shell-utils.js"; const CHUNK_LIMIT = 8 * 1024; @@ -115,13 +113,6 @@ export async function resolveSandboxWorkdir(params: { } } -export function killSession(session: { pid?: number; child?: ChildProcessWithoutNullStreams }) { - const pid = session.pid ?? session.child?.pid; - if (pid) { - killProcessTree(pid); - } -} - export function resolveWorkdir(workdir: string, warnings: string[]) { const current = safeCwd(); const fallback = current ?? homedir(); diff --git a/src/agents/cli-backends.test.ts b/src/agents/cli-backends.test.ts new file mode 100644 index 0000000000..c78dfdb87f --- /dev/null +++ b/src/agents/cli-backends.test.ts @@ -0,0 +1,36 @@ +import { describe, expect, it } from "vitest"; +import type { OpenClawConfig } from "../config/config.js"; +import { resolveCliBackendConfig } from "./cli-backends.js"; + +describe("resolveCliBackendConfig reliability merge", () => { + it("deep-merges reliability watchdog overrides for codex", () => { + const cfg = { + agents: { + defaults: { + cliBackends: { + "codex-cli": { + command: "codex", + reliability: { + watchdog: { + resume: { + noOutputTimeoutMs: 42_000, + }, + }, + }, + }, + }, + }, + }, + } satisfies OpenClawConfig; + + const resolved = resolveCliBackendConfig("codex-cli", cfg); + + expect(resolved).not.toBeNull(); + expect(resolved?.config.reliability?.watchdog?.resume?.noOutputTimeoutMs).toBe(42_000); + // Ensure defaults are retained when only one field is overridden. + expect(resolved?.config.reliability?.watchdog?.resume?.noOutputTimeoutRatio).toBe(0.3); + expect(resolved?.config.reliability?.watchdog?.resume?.minMs).toBe(60_000); + expect(resolved?.config.reliability?.watchdog?.resume?.maxMs).toBe(180_000); + expect(resolved?.config.reliability?.watchdog?.fresh?.noOutputTimeoutRatio).toBe(0.8); + }); +}); diff --git a/src/agents/cli-backends.ts b/src/agents/cli-backends.ts index 5f6b2253fb..2f1db0f87a 100644 --- a/src/agents/cli-backends.ts +++ b/src/agents/cli-backends.ts @@ -1,5 +1,9 @@ import type { OpenClawConfig } from "../config/config.js"; import type { CliBackendConfig } from "../config/types.js"; +import { + CLI_FRESH_WATCHDOG_DEFAULTS, + CLI_RESUME_WATCHDOG_DEFAULTS, +} from "./cli-watchdog-defaults.js"; import { normalizeProviderId } from "./model-selection.js"; export type ResolvedCliBackend = { @@ -49,6 +53,12 @@ const DEFAULT_CLAUDE_BACKEND: CliBackendConfig = { systemPromptMode: "append", systemPromptWhen: "first", clearEnv: ["ANTHROPIC_API_KEY", "ANTHROPIC_API_KEY_OLD"], + reliability: { + watchdog: { + fresh: { ...CLI_FRESH_WATCHDOG_DEFAULTS }, + resume: { ...CLI_RESUME_WATCHDOG_DEFAULTS }, + }, + }, serialize: true, }; @@ -73,6 +83,12 @@ const DEFAULT_CODEX_BACKEND: CliBackendConfig = { sessionMode: "existing", imageArg: "--image", imageMode: "repeat", + reliability: { + watchdog: { + fresh: { ...CLI_FRESH_WATCHDOG_DEFAULTS }, + resume: { ...CLI_RESUME_WATCHDOG_DEFAULTS }, + }, + }, serialize: true, }; @@ -96,6 +112,10 @@ function mergeBackendConfig(base: CliBackendConfig, override?: CliBackendConfig) if (!override) { return { ...base }; } + const baseFresh = base.reliability?.watchdog?.fresh ?? {}; + const baseResume = base.reliability?.watchdog?.resume ?? {}; + const overrideFresh = override.reliability?.watchdog?.fresh ?? {}; + const overrideResume = override.reliability?.watchdog?.resume ?? {}; return { ...base, ...override, @@ -106,6 +126,22 @@ function mergeBackendConfig(base: CliBackendConfig, override?: CliBackendConfig) sessionIdFields: override.sessionIdFields ?? base.sessionIdFields, sessionArgs: override.sessionArgs ?? base.sessionArgs, resumeArgs: override.resumeArgs ?? base.resumeArgs, + reliability: { + ...base.reliability, + ...override.reliability, + watchdog: { + ...base.reliability?.watchdog, + ...override.reliability?.watchdog, + fresh: { + ...baseFresh, + ...overrideFresh, + }, + resume: { + ...baseResume, + ...overrideResume, + }, + }, + }, }; } diff --git a/src/agents/cli-runner.e2e.test.ts b/src/agents/cli-runner.e2e.test.ts index 1383be1edb..16f563d9e7 100644 --- a/src/agents/cli-runner.e2e.test.ts +++ b/src/agents/cli-runner.e2e.test.ts @@ -3,50 +3,69 @@ import os from "node:os"; import path from "node:path"; import { beforeEach, describe, expect, it, vi } from "vitest"; import type { OpenClawConfig } from "../config/config.js"; -import type { CliBackendConfig } from "../config/types.js"; import { runCliAgent } from "./cli-runner.js"; -import { cleanupResumeProcesses, cleanupSuspendedCliProcesses } from "./cli-runner/helpers.js"; +import { resolveCliNoOutputTimeoutMs } from "./cli-runner/helpers.js"; -const runCommandWithTimeoutMock = vi.fn(); -const runExecMock = vi.fn(); +const supervisorSpawnMock = vi.fn(); -vi.mock("../process/exec.js", () => ({ - runCommandWithTimeout: (...args: unknown[]) => runCommandWithTimeoutMock(...args), - runExec: (...args: unknown[]) => runExecMock(...args), +vi.mock("../process/supervisor/index.js", () => ({ + getProcessSupervisor: () => ({ + spawn: (...args: unknown[]) => supervisorSpawnMock(...args), + cancel: vi.fn(), + cancelScope: vi.fn(), + reconcileOrphans: vi.fn(), + getRecord: vi.fn(), + }), })); -describe("runCliAgent resume cleanup", () => { +type MockRunExit = { + reason: + | "manual-cancel" + | "overall-timeout" + | "no-output-timeout" + | "spawn-error" + | "signal" + | "exit"; + exitCode: number | null; + exitSignal: NodeJS.Signals | number | null; + durationMs: number; + stdout: string; + stderr: string; + timedOut: boolean; + noOutputTimedOut: boolean; +}; + +function createManagedRun(exit: MockRunExit, pid = 1234) { + return { + runId: "run-supervisor", + pid, + startedAtMs: Date.now(), + stdin: undefined, + wait: vi.fn().mockResolvedValue(exit), + cancel: vi.fn(), + }; +} + +describe("runCliAgent with process supervisor", () => { beforeEach(() => { - runCommandWithTimeoutMock.mockReset(); - runExecMock.mockReset(); + supervisorSpawnMock.mockReset(); }); - it("kills stale resume processes for codex sessions", async () => { - const selfPid = process.pid; - - runExecMock - .mockResolvedValueOnce({ - stdout: " 1 999 S /bin/launchd\n", + it("runs CLI through supervisor and returns payload", async () => { + supervisorSpawnMock.mockResolvedValueOnce( + createManagedRun({ + reason: "exit", + exitCode: 0, + exitSignal: null, + durationMs: 50, + stdout: "ok", stderr: "", - }) // cleanupSuspendedCliProcesses (ps) — ppid 999 != selfPid, no match - .mockResolvedValueOnce({ - stdout: [ - ` ${selfPid + 1} ${selfPid} codex exec resume thread-123 --color never --sandbox read-only --skip-git-repo-check`, - ` ${selfPid + 2} 999 codex exec resume thread-123 --color never --sandbox read-only --skip-git-repo-check`, - ].join("\n"), - stderr: "", - }) // cleanupResumeProcesses (ps) - .mockResolvedValueOnce({ stdout: "", stderr: "" }) // cleanupResumeProcesses (kill -TERM) - .mockResolvedValueOnce({ stdout: "", stderr: "" }); // cleanupResumeProcesses (kill -9) - runCommandWithTimeoutMock.mockResolvedValueOnce({ - stdout: "ok", - stderr: "", - code: 0, - signal: null, - killed: false, - }); + timedOut: false, + noOutputTimedOut: false, + }), + ); - await runCliAgent({ + const result = await runCliAgent({ sessionId: "s1", sessionFile: "/tmp/session.jsonl", workspaceDir: "/tmp", @@ -58,28 +77,80 @@ describe("runCliAgent resume cleanup", () => { cliSessionId: "thread-123", }); - if (process.platform === "win32") { - expect(runExecMock).not.toHaveBeenCalled(); - return; - } + expect(result.payloads?.[0]?.text).toBe("ok"); + expect(supervisorSpawnMock).toHaveBeenCalledTimes(1); + const input = supervisorSpawnMock.mock.calls[0]?.[0] as { + argv?: string[]; + mode?: string; + timeoutMs?: number; + noOutputTimeoutMs?: number; + replaceExistingScope?: boolean; + scopeKey?: string; + }; + expect(input.mode).toBe("child"); + expect(input.argv?.[0]).toBe("codex"); + expect(input.timeoutMs).toBe(1_000); + expect(input.noOutputTimeoutMs).toBeGreaterThanOrEqual(1_000); + expect(input.replaceExistingScope).toBe(true); + expect(input.scopeKey).toContain("thread-123"); + }); - expect(runExecMock).toHaveBeenCalledTimes(4); + it("fails with timeout when no-output watchdog trips", async () => { + supervisorSpawnMock.mockResolvedValueOnce( + createManagedRun({ + reason: "no-output-timeout", + exitCode: null, + exitSignal: "SIGKILL", + durationMs: 200, + stdout: "", + stderr: "", + timedOut: true, + noOutputTimedOut: true, + }), + ); - // Second call: cleanupResumeProcesses ps - const psCall = runExecMock.mock.calls[1] ?? []; - expect(psCall[0]).toBe("ps"); + await expect( + runCliAgent({ + sessionId: "s1", + sessionFile: "/tmp/session.jsonl", + workspaceDir: "/tmp", + prompt: "hi", + provider: "codex-cli", + model: "gpt-5.2-codex", + timeoutMs: 1_000, + runId: "run-2", + cliSessionId: "thread-123", + }), + ).rejects.toThrow("produced no output"); + }); - // Third call: TERM, only the child PID - const termCall = runExecMock.mock.calls[2] ?? []; - expect(termCall[0]).toBe("kill"); - const termArgs = termCall[1] as string[]; - expect(termArgs).toEqual(["-TERM", String(selfPid + 1)]); + it("fails with timeout when overall timeout trips", async () => { + supervisorSpawnMock.mockResolvedValueOnce( + createManagedRun({ + reason: "overall-timeout", + exitCode: null, + exitSignal: "SIGKILL", + durationMs: 200, + stdout: "", + stderr: "", + timedOut: true, + noOutputTimedOut: false, + }), + ); - // Fourth call: KILL, only the child PID - const killCall = runExecMock.mock.calls[3] ?? []; - expect(killCall[0]).toBe("kill"); - const killArgs = killCall[1] as string[]; - expect(killArgs).toEqual(["-9", String(selfPid + 1)]); + await expect( + runCliAgent({ + sessionId: "s1", + sessionFile: "/tmp/session.jsonl", + workspaceDir: "/tmp", + prompt: "hi", + provider: "codex-cli", + model: "gpt-5.2-codex", + timeoutMs: 1_000, + runId: "run-3", + cliSessionId: "thread-123", + }), + ).rejects.toThrow("exceeded timeout"); }); it("falls back to per-agent workspace when workspaceDir is missing", async () => { @@ -94,14 +165,18 @@ describe("runCliAgent resume cleanup", () => { }, } satisfies OpenClawConfig; - runExecMock.mockResolvedValue({ stdout: "", stderr: "" }); - runCommandWithTimeoutMock.mockResolvedValueOnce({ - stdout: "ok", - stderr: "", - code: 0, - signal: null, - killed: false, - }); + supervisorSpawnMock.mockResolvedValueOnce( + createManagedRun({ + reason: "exit", + exitCode: 0, + exitSignal: null, + durationMs: 25, + stdout: "ok", + stderr: "", + timedOut: false, + noOutputTimedOut: false, + }), + ); try { await runCliAgent({ @@ -114,264 +189,33 @@ describe("runCliAgent resume cleanup", () => { provider: "codex-cli", model: "gpt-5.2-codex", timeoutMs: 1_000, - runId: "run-1", + runId: "run-4", }); } finally { await fs.rm(tempDir, { recursive: true, force: true }); } - const options = runCommandWithTimeoutMock.mock.calls[0]?.[1] as { cwd?: string }; - expect(options.cwd).toBe(path.resolve(fallbackWorkspace)); + const input = supervisorSpawnMock.mock.calls[0]?.[0] as { cwd?: string }; + expect(input.cwd).toBe(path.resolve(fallbackWorkspace)); }); +}); - it("throws when sessionKey is malformed", async () => { - const tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-cli-runner-")); - const mainWorkspace = path.join(tempDir, "workspace-main"); - const researchWorkspace = path.join(tempDir, "workspace-research"); - await fs.mkdir(mainWorkspace, { recursive: true }); - await fs.mkdir(researchWorkspace, { recursive: true }); - const cfg = { - agents: { - defaults: { - workspace: mainWorkspace, +describe("resolveCliNoOutputTimeoutMs", () => { + it("uses backend-configured resume watchdog override", () => { + const timeoutMs = resolveCliNoOutputTimeoutMs({ + backend: { + command: "codex", + reliability: { + watchdog: { + resume: { + noOutputTimeoutMs: 42_000, + }, + }, }, - list: [{ id: "research", workspace: researchWorkspace }], }, - } satisfies OpenClawConfig; - - try { - await expect( - runCliAgent({ - sessionId: "s1", - sessionKey: "agent::broken", - agentId: "research", - sessionFile: "/tmp/session.jsonl", - workspaceDir: undefined as unknown as string, - config: cfg, - prompt: "hi", - provider: "codex-cli", - model: "gpt-5.2-codex", - timeoutMs: 1_000, - runId: "run-2", - }), - ).rejects.toThrow("Malformed agent session key"); - } finally { - await fs.rm(tempDir, { recursive: true, force: true }); - } - expect(runCommandWithTimeoutMock).not.toHaveBeenCalled(); - }); -}); - -describe("cleanupSuspendedCliProcesses", () => { - beforeEach(() => { - runExecMock.mockReset(); - }); - - it("skips when no session tokens are configured", async () => { - await cleanupSuspendedCliProcesses( - { - command: "tool", - } as CliBackendConfig, - 0, - ); - - if (process.platform === "win32") { - expect(runExecMock).not.toHaveBeenCalled(); - return; - } - - expect(runExecMock).not.toHaveBeenCalled(); - }); - - it("matches sessionArg-based commands", async () => { - const selfPid = process.pid; - runExecMock - .mockResolvedValueOnce({ - stdout: [ - ` 40 ${selfPid} T+ claude --session-id thread-1 -p`, - ` 41 ${selfPid} S claude --session-id thread-2 -p`, - ].join("\n"), - stderr: "", - }) - .mockResolvedValueOnce({ stdout: "", stderr: "" }); - - await cleanupSuspendedCliProcesses( - { - command: "claude", - sessionArg: "--session-id", - } as CliBackendConfig, - 0, - ); - - if (process.platform === "win32") { - expect(runExecMock).not.toHaveBeenCalled(); - return; - } - - expect(runExecMock).toHaveBeenCalledTimes(2); - const killCall = runExecMock.mock.calls[1] ?? []; - expect(killCall[0]).toBe("kill"); - expect(killCall[1]).toEqual(["-9", "40"]); - }); - - it("matches resumeArgs with positional session id", async () => { - const selfPid = process.pid; - runExecMock - .mockResolvedValueOnce({ - stdout: [ - ` 50 ${selfPid} T codex exec resume thread-99 --color never --sandbox read-only`, - ` 51 ${selfPid} T codex exec resume other --color never --sandbox read-only`, - ].join("\n"), - stderr: "", - }) - .mockResolvedValueOnce({ stdout: "", stderr: "" }); - - await cleanupSuspendedCliProcesses( - { - command: "codex", - resumeArgs: ["exec", "resume", "{sessionId}", "--color", "never", "--sandbox", "read-only"], - } as CliBackendConfig, - 1, - ); - - if (process.platform === "win32") { - expect(runExecMock).not.toHaveBeenCalled(); - return; - } - - expect(runExecMock).toHaveBeenCalledTimes(2); - const killCall = runExecMock.mock.calls[1] ?? []; - expect(killCall[0]).toBe("kill"); - expect(killCall[1]).toEqual(["-9", "50", "51"]); - }); - - it("only kills child processes of current process (ppid validation)", async () => { - const selfPid = process.pid; - const childPid = selfPid + 1; - const unrelatedPid = 9999; - - runExecMock - .mockResolvedValueOnce({ - stdout: [ - ` ${childPid} ${selfPid} T claude --session-id thread-1 -p`, - ` ${unrelatedPid} 100 T claude --session-id thread-2 -p`, - ].join("\n"), - stderr: "", - }) - .mockResolvedValueOnce({ stdout: "", stderr: "" }); - - await cleanupSuspendedCliProcesses( - { - command: "claude", - sessionArg: "--session-id", - } as CliBackendConfig, - 0, - ); - - if (process.platform === "win32") { - expect(runExecMock).not.toHaveBeenCalled(); - return; - } - - expect(runExecMock).toHaveBeenCalledTimes(2); - const killCall = runExecMock.mock.calls[1] ?? []; - expect(killCall[0]).toBe("kill"); - // Only childPid killed; unrelatedPid (ppid=100) excluded - expect(killCall[1]).toEqual(["-9", String(childPid)]); - }); - - it("skips all processes when none are children of current process", async () => { - runExecMock.mockResolvedValueOnce({ - stdout: [ - " 200 100 T claude --session-id thread-1 -p", - " 201 100 T claude --session-id thread-2 -p", - ].join("\n"), - stderr: "", + timeoutMs: 120_000, + useResume: true, }); - - await cleanupSuspendedCliProcesses( - { - command: "claude", - sessionArg: "--session-id", - } as CliBackendConfig, - 0, - ); - - if (process.platform === "win32") { - expect(runExecMock).not.toHaveBeenCalled(); - return; - } - - // Only ps called — no kill because no matching ppid - expect(runExecMock).toHaveBeenCalledTimes(1); - }); -}); - -describe("cleanupResumeProcesses", () => { - beforeEach(() => { - runExecMock.mockReset(); - }); - - it("only kills resume processes owned by current process", async () => { - const selfPid = process.pid; - - runExecMock - .mockResolvedValueOnce({ - stdout: [ - ` ${selfPid + 1} ${selfPid} codex exec resume abc-123`, - ` ${selfPid + 2} 999 codex exec resume abc-123`, - ].join("\n"), - stderr: "", - }) - .mockResolvedValueOnce({ stdout: "", stderr: "" }) - .mockResolvedValueOnce({ stdout: "", stderr: "" }); - - await cleanupResumeProcesses( - { - command: "codex", - resumeArgs: ["exec", "resume", "{sessionId}"], - } as CliBackendConfig, - "abc-123", - ); - - if (process.platform === "win32") { - expect(runExecMock).not.toHaveBeenCalled(); - return; - } - - expect(runExecMock).toHaveBeenCalledTimes(3); - - const termCall = runExecMock.mock.calls[1] ?? []; - expect(termCall[0]).toBe("kill"); - expect(termCall[1]).toEqual(["-TERM", String(selfPid + 1)]); - - const killCall = runExecMock.mock.calls[2] ?? []; - expect(killCall[0]).toBe("kill"); - expect(killCall[1]).toEqual(["-9", String(selfPid + 1)]); - }); - - it("skips kill when no resume processes match ppid", async () => { - runExecMock.mockResolvedValueOnce({ - stdout: [" 300 100 codex exec resume abc-123", " 301 200 codex exec resume abc-123"].join( - "\n", - ), - stderr: "", - }); - - await cleanupResumeProcesses( - { - command: "codex", - resumeArgs: ["exec", "resume", "{sessionId}"], - } as CliBackendConfig, - "abc-123", - ); - - if (process.platform === "win32") { - expect(runExecMock).not.toHaveBeenCalled(); - return; - } - - // Only ps called — no kill because no matching ppid - expect(runExecMock).toHaveBeenCalledTimes(1); + expect(timeoutMs).toBe(42_000); }); }); diff --git a/src/agents/cli-runner.ts b/src/agents/cli-runner.ts index 68dbf0d5c2..5160611e8e 100644 --- a/src/agents/cli-runner.ts +++ b/src/agents/cli-runner.ts @@ -6,20 +6,20 @@ import { resolveHeartbeatPrompt } from "../auto-reply/heartbeat.js"; import { shouldLogVerbose } from "../globals.js"; import { isTruthyEnvValue } from "../infra/env.js"; import { createSubsystemLogger } from "../logging/subsystem.js"; -import { runCommandWithTimeout } from "../process/exec.js"; +import { getProcessSupervisor } from "../process/supervisor/index.js"; import { resolveSessionAgentIds } from "./agent-scope.js"; import { makeBootstrapWarn, resolveBootstrapContextForRun } from "./bootstrap-files.js"; import { resolveCliBackendConfig } from "./cli-backends.js"; import { appendImagePathsToPrompt, + buildCliSupervisorScopeKey, buildCliArgs, buildSystemPrompt, - cleanupResumeProcesses, - cleanupSuspendedCliProcesses, enqueueCliRun, normalizeCliModel, parseCliJson, parseCliJsonl, + resolveCliNoOutputTimeoutMs, resolvePromptInput, resolveSessionIdToSend, resolveSystemPromptUsage, @@ -226,19 +226,32 @@ export async function runCliAgent(params: { } return next; })(); - - // Cleanup suspended processes that have accumulated (regardless of sessionId) - await cleanupSuspendedCliProcesses(backend); - if (useResume && cliSessionIdToSend) { - await cleanupResumeProcesses(backend, cliSessionIdToSend); - } - - const result = await runCommandWithTimeout([backend.command, ...args], { + const noOutputTimeoutMs = resolveCliNoOutputTimeoutMs({ + backend, timeoutMs: params.timeoutMs, + useResume, + }); + const supervisor = getProcessSupervisor(); + const scopeKey = buildCliSupervisorScopeKey({ + backend, + backendId: backendResolved.id, + cliSessionId: useResume ? cliSessionIdToSend : undefined, + }); + + const managedRun = await supervisor.spawn({ + sessionId: params.sessionId, + backendId: backendResolved.id, + scopeKey, + replaceExistingScope: Boolean(useResume && scopeKey), + mode: "child", + argv: [backend.command, ...args], + timeoutMs: params.timeoutMs, + noOutputTimeoutMs, cwd: workspaceDir, env, input: stdinPayload, }); + const result = await managedRun.wait(); const stdout = result.stdout.trim(); const stderr = result.stderr.trim(); @@ -259,7 +272,28 @@ export async function runCliAgent(params: { } } - if (result.code !== 0) { + if (result.exitCode !== 0 || result.reason !== "exit") { + if (result.reason === "no-output-timeout" || result.noOutputTimedOut) { + const timeoutReason = `CLI produced no output for ${Math.round(noOutputTimeoutMs / 1000)}s and was terminated.`; + log.warn( + `cli watchdog timeout: provider=${params.provider} model=${modelId} session=${cliSessionIdToSend ?? params.sessionId} noOutputTimeoutMs=${noOutputTimeoutMs} pid=${managedRun.pid ?? "unknown"}`, + ); + throw new FailoverError(timeoutReason, { + reason: "timeout", + provider: params.provider, + model: modelId, + status: resolveFailoverStatus("timeout"), + }); + } + if (result.reason === "overall-timeout") { + const timeoutReason = `CLI exceeded timeout (${Math.round(params.timeoutMs / 1000)}s) and was terminated.`; + throw new FailoverError(timeoutReason, { + reason: "timeout", + provider: params.provider, + model: modelId, + status: resolveFailoverStatus("timeout"), + }); + } const err = stderr || stdout || "CLI failed."; const reason = classifyFailoverReason(err) ?? "unknown"; const status = resolveFailoverStatus(reason); diff --git a/src/agents/cli-runner/helpers.ts b/src/agents/cli-runner/helpers.ts index 572c3c1dea..f11c3d0aa7 100644 --- a/src/agents/cli-runner/helpers.ts +++ b/src/agents/cli-runner/helpers.ts @@ -8,232 +8,27 @@ import type { ThinkLevel } from "../../auto-reply/thinking.js"; import type { OpenClawConfig } from "../../config/config.js"; import type { CliBackendConfig } from "../../config/types.js"; import type { EmbeddedContextFile } from "../pi-embedded-helpers.js"; -import { runExec } from "../../process/exec.js"; import { buildTtsSystemPromptHint } from "../../tts/tts.js"; -import { escapeRegExp, isRecord } from "../../utils.js"; +import { isRecord } from "../../utils.js"; import { buildModelAliasLines } from "../model-alias-lines.js"; import { resolveDefaultModelForAgent } from "../model-selection.js"; import { detectRuntimeShell } from "../shell-utils.js"; import { buildSystemPromptParams } from "../system-prompt-params.js"; import { buildAgentSystemPrompt } from "../system-prompt.js"; +export { buildCliSupervisorScopeKey, resolveCliNoOutputTimeoutMs } from "./reliability.js"; const CLI_RUN_QUEUE = new Map>(); - -function buildLooseArgOrderRegex(tokens: string[]): RegExp { - // Scan `ps` output lines. Keep matching flexible, but require whitespace arg boundaries - // to avoid substring matches like `codexx` or `/path/to/codexx`. - const [head, ...rest] = tokens.map((t) => String(t ?? "").trim()).filter(Boolean); - if (!head) { - return /$^/; - } - - const headEscaped = escapeRegExp(head); - const headFragment = `(?:^|\\s)(?:${headEscaped}|\\S+\\/${headEscaped})(?=\\s|$)`; - const restFragments = rest.map((t) => `(?:^|\\s)${escapeRegExp(t)}(?=\\s|$)`); - return new RegExp([headFragment, ...restFragments].join(".*")); -} - -async function psWithFallback(argsA: string[], argsB: string[]): Promise { - try { - const { stdout } = await runExec("ps", argsA); - return stdout; - } catch { - // fallthrough - } - const { stdout } = await runExec("ps", argsB); - return stdout; -} - -export async function cleanupResumeProcesses( - backend: CliBackendConfig, - sessionId: string, -): Promise { - if (process.platform === "win32") { - return; - } - const resumeArgs = backend.resumeArgs ?? []; - if (resumeArgs.length === 0) { - return; - } - if (!resumeArgs.some((arg) => arg.includes("{sessionId}"))) { - return; - } - const commandToken = path.basename(backend.command ?? "").trim(); - if (!commandToken) { - return; - } - - const resumeTokens = resumeArgs.map((arg) => arg.replaceAll("{sessionId}", sessionId)); - const pattern = [commandToken, ...resumeTokens] - .filter(Boolean) - .map((token) => escapeRegExp(token)) - .join(".*"); - if (!pattern) { - return; - } - - try { - const stdout = await psWithFallback( - ["-axww", "-o", "pid=,ppid=,command="], - ["-ax", "-o", "pid=,ppid=,command="], - ); - const patternRegex = buildLooseArgOrderRegex([commandToken, ...resumeTokens]); - const toKill: number[] = []; - - for (const line of stdout.split("\n")) { - const trimmed = line.trim(); - if (!trimmed) { - continue; - } - const match = /^(\d+)\s+(\d+)\s+(.*)$/.exec(trimmed); - if (!match) { - continue; - } - const pid = Number(match[1]); - const ppid = Number(match[2]); - const cmd = match[3] ?? ""; - if (!Number.isFinite(pid)) { - continue; - } - if (ppid !== process.pid) { - continue; - } - if (!patternRegex.test(cmd)) { - continue; - } - toKill.push(pid); - } - - if (toKill.length > 0) { - const pidArgs = toKill.map((pid) => String(pid)); - try { - await runExec("kill", ["-TERM", ...pidArgs]); - } catch { - // ignore - } - await new Promise((resolve) => setTimeout(resolve, 250)); - try { - await runExec("kill", ["-9", ...pidArgs]); - } catch { - // ignore - } - } - } catch { - // ignore errors - best effort cleanup - } -} - -function buildSessionMatchers(backend: CliBackendConfig): RegExp[] { - const commandToken = path.basename(backend.command ?? "").trim(); - if (!commandToken) { - return []; - } - const matchers: RegExp[] = []; - const sessionArg = backend.sessionArg?.trim(); - const sessionArgs = backend.sessionArgs ?? []; - const resumeArgs = backend.resumeArgs ?? []; - - const addMatcher = (args: string[]) => { - if (args.length === 0) { - return; - } - const tokens = [commandToken, ...args]; - const pattern = tokens - .map((token, index) => { - const tokenPattern = tokenToRegex(token); - return index === 0 ? `(?:^|\\s)${tokenPattern}` : `\\s+${tokenPattern}`; - }) - .join(""); - matchers.push(new RegExp(pattern)); - }; - - if (sessionArgs.some((arg) => arg.includes("{sessionId}"))) { - addMatcher(sessionArgs); - } else if (sessionArg) { - addMatcher([sessionArg, "{sessionId}"]); - } - - if (resumeArgs.some((arg) => arg.includes("{sessionId}"))) { - addMatcher(resumeArgs); - } - - return matchers; -} - -function tokenToRegex(token: string): string { - if (!token.includes("{sessionId}")) { - return escapeRegExp(token); - } - const parts = token.split("{sessionId}").map((part) => escapeRegExp(part)); - return parts.join("\\S+"); -} - -/** - * Cleanup suspended OpenClaw CLI processes that have accumulated. - * Only cleans up if there are more than the threshold (default: 10). - */ -export async function cleanupSuspendedCliProcesses( - backend: CliBackendConfig, - threshold = 10, -): Promise { - if (process.platform === "win32") { - return; - } - const matchers = buildSessionMatchers(backend); - if (matchers.length === 0) { - return; - } - - try { - const stdout = await psWithFallback( - ["-axww", "-o", "pid=,ppid=,stat=,command="], - ["-ax", "-o", "pid=,ppid=,stat=,command="], - ); - const suspended: number[] = []; - for (const line of stdout.split("\n")) { - const trimmed = line.trim(); - if (!trimmed) { - continue; - } - const match = /^(\d+)\s+(\d+)\s+(\S+)\s+(.*)$/.exec(trimmed); - if (!match) { - continue; - } - const pid = Number(match[1]); - const ppid = Number(match[2]); - const stat = match[3] ?? ""; - const command = match[4] ?? ""; - if (!Number.isFinite(pid)) { - continue; - } - if (ppid !== process.pid) { - continue; - } - if (!stat.includes("T")) { - continue; - } - if (!matchers.some((matcher) => matcher.test(command))) { - continue; - } - suspended.push(pid); - } - - if (suspended.length > threshold) { - // Verified locally: stopped (T) processes ignore SIGTERM, so use SIGKILL. - await runExec("kill", ["-9", ...suspended.map((pid) => String(pid))]); - } - } catch { - // ignore errors - best effort cleanup - } -} export function enqueueCliRun(key: string, task: () => Promise): Promise { const prior = CLI_RUN_QUEUE.get(key) ?? Promise.resolve(); const chained = prior.catch(() => undefined).then(task); - const tracked = chained.finally(() => { - if (CLI_RUN_QUEUE.get(key) === tracked) { - CLI_RUN_QUEUE.delete(key); - } - }); + // Keep queue continuity even when a run rejects, without emitting unhandled rejections. + const tracked = chained + .catch(() => undefined) + .finally(() => { + if (CLI_RUN_QUEUE.get(key) === tracked) { + CLI_RUN_QUEUE.delete(key); + } + }); CLI_RUN_QUEUE.set(key, tracked); return chained; } diff --git a/src/agents/cli-runner/reliability.ts b/src/agents/cli-runner/reliability.ts new file mode 100644 index 0000000000..cd1fefa937 --- /dev/null +++ b/src/agents/cli-runner/reliability.ts @@ -0,0 +1,88 @@ +import path from "node:path"; +import type { CliBackendConfig } from "../../config/types.js"; +import { + CLI_FRESH_WATCHDOG_DEFAULTS, + CLI_RESUME_WATCHDOG_DEFAULTS, + CLI_WATCHDOG_MIN_TIMEOUT_MS, +} from "../cli-watchdog-defaults.js"; + +function pickWatchdogProfile( + backend: CliBackendConfig, + useResume: boolean, +): { + noOutputTimeoutMs?: number; + noOutputTimeoutRatio: number; + minMs: number; + maxMs: number; +} { + const defaults = useResume ? CLI_RESUME_WATCHDOG_DEFAULTS : CLI_FRESH_WATCHDOG_DEFAULTS; + const configured = useResume + ? backend.reliability?.watchdog?.resume + : backend.reliability?.watchdog?.fresh; + + const ratio = (() => { + const value = configured?.noOutputTimeoutRatio; + if (typeof value !== "number" || !Number.isFinite(value)) { + return defaults.noOutputTimeoutRatio; + } + return Math.max(0.05, Math.min(0.95, value)); + })(); + const minMs = (() => { + const value = configured?.minMs; + if (typeof value !== "number" || !Number.isFinite(value)) { + return defaults.minMs; + } + return Math.max(CLI_WATCHDOG_MIN_TIMEOUT_MS, Math.floor(value)); + })(); + const maxMs = (() => { + const value = configured?.maxMs; + if (typeof value !== "number" || !Number.isFinite(value)) { + return defaults.maxMs; + } + return Math.max(CLI_WATCHDOG_MIN_TIMEOUT_MS, Math.floor(value)); + })(); + + return { + noOutputTimeoutMs: + typeof configured?.noOutputTimeoutMs === "number" && + Number.isFinite(configured.noOutputTimeoutMs) + ? Math.max(CLI_WATCHDOG_MIN_TIMEOUT_MS, Math.floor(configured.noOutputTimeoutMs)) + : undefined, + noOutputTimeoutRatio: ratio, + minMs: Math.min(minMs, maxMs), + maxMs: Math.max(minMs, maxMs), + }; +} + +export function resolveCliNoOutputTimeoutMs(params: { + backend: CliBackendConfig; + timeoutMs: number; + useResume: boolean; +}): number { + const profile = pickWatchdogProfile(params.backend, params.useResume); + // Keep watchdog below global timeout in normal cases. + const cap = Math.max(CLI_WATCHDOG_MIN_TIMEOUT_MS, params.timeoutMs - 1_000); + if (profile.noOutputTimeoutMs !== undefined) { + return Math.min(profile.noOutputTimeoutMs, cap); + } + const computed = Math.floor(params.timeoutMs * profile.noOutputTimeoutRatio); + const bounded = Math.min(profile.maxMs, Math.max(profile.minMs, computed)); + return Math.min(bounded, cap); +} + +export function buildCliSupervisorScopeKey(params: { + backend: CliBackendConfig; + backendId: string; + cliSessionId?: string; +}): string | undefined { + const commandToken = path + .basename(params.backend.command ?? "") + .trim() + .toLowerCase(); + const backendToken = params.backendId.trim().toLowerCase(); + const sessionToken = params.cliSessionId?.trim(); + if (!sessionToken) { + return undefined; + } + return `cli:${backendToken}:${commandToken}:${sessionToken}`; +} diff --git a/src/agents/cli-watchdog-defaults.ts b/src/agents/cli-watchdog-defaults.ts new file mode 100644 index 0000000000..c96f87e30b --- /dev/null +++ b/src/agents/cli-watchdog-defaults.ts @@ -0,0 +1,13 @@ +export const CLI_WATCHDOG_MIN_TIMEOUT_MS = 1_000; + +export const CLI_FRESH_WATCHDOG_DEFAULTS = { + noOutputTimeoutRatio: 0.8, + minMs: 180_000, + maxMs: 600_000, +} as const; + +export const CLI_RESUME_WATCHDOG_DEFAULTS = { + noOutputTimeoutRatio: 0.3, + minMs: 60_000, + maxMs: 180_000, +} as const; diff --git a/src/config/types.agent-defaults.ts b/src/config/types.agent-defaults.ts index b58a803906..851965e825 100644 --- a/src/config/types.agent-defaults.ts +++ b/src/config/types.agent-defaults.ts @@ -91,6 +91,34 @@ export type CliBackendConfig = { imageMode?: "repeat" | "list"; /** Serialize runs for this CLI. */ serialize?: boolean; + /** Runtime reliability tuning for this backend's process lifecycle. */ + reliability?: { + /** No-output watchdog tuning (fresh vs resumed runs). */ + watchdog?: { + /** Fresh/new sessions (non-resume). */ + fresh?: { + /** Fixed watchdog timeout in ms (overrides ratio when set). */ + noOutputTimeoutMs?: number; + /** Fraction of overall timeout used when fixed timeout is not set. */ + noOutputTimeoutRatio?: number; + /** Lower bound for computed watchdog timeout. */ + minMs?: number; + /** Upper bound for computed watchdog timeout. */ + maxMs?: number; + }; + /** Resume sessions. */ + resume?: { + /** Fixed watchdog timeout in ms (overrides ratio when set). */ + noOutputTimeoutMs?: number; + /** Fraction of overall timeout used when fixed timeout is not set. */ + noOutputTimeoutRatio?: number; + /** Lower bound for computed watchdog timeout. */ + minMs?: number; + /** Upper bound for computed watchdog timeout. */ + maxMs?: number; + }; + }; + }; }; export type AgentDefaultsConfig = { diff --git a/src/config/zod-schema.core.ts b/src/config/zod-schema.core.ts index 213ed9bedb..dbcbf80652 100644 --- a/src/config/zod-schema.core.ts +++ b/src/config/zod-schema.core.ts @@ -275,6 +275,34 @@ export const CliBackendSchema = z imageArg: z.string().optional(), imageMode: z.union([z.literal("repeat"), z.literal("list")]).optional(), serialize: z.boolean().optional(), + reliability: z + .object({ + watchdog: z + .object({ + fresh: z + .object({ + noOutputTimeoutMs: z.number().int().min(1000).optional(), + noOutputTimeoutRatio: z.number().min(0.05).max(0.95).optional(), + minMs: z.number().int().min(1000).optional(), + maxMs: z.number().int().min(1000).optional(), + }) + .strict() + .optional(), + resume: z + .object({ + noOutputTimeoutMs: z.number().int().min(1000).optional(), + noOutputTimeoutRatio: z.number().min(0.05).max(0.95).optional(), + minMs: z.number().int().min(1000).optional(), + maxMs: z.number().int().min(1000).optional(), + }) + .strict() + .optional(), + }) + .strict() + .optional(), + }) + .strict() + .optional(), }) .strict(); diff --git a/src/process/exec.test.ts b/src/process/exec.test.ts index 0af0979558..c2ac03e4b3 100644 --- a/src/process/exec.test.ts +++ b/src/process/exec.test.ts @@ -23,6 +23,7 @@ describe("runCommandWithTimeout", () => { expect(result.code).toBe(0); expect(result.stdout).toBe("ok"); + expect(result.termination).toBe("exit"); }); it("merges custom env with process.env", async () => { @@ -43,8 +44,58 @@ describe("runCommandWithTimeout", () => { expect(result.code).toBe(0); expect(result.stdout).toBe("base|ok"); + expect(result.termination).toBe("exit"); } finally { envSnapshot.restore(); } }); + + it("kills command when no output timeout elapses", async () => { + const startedAt = Date.now(); + const result = await runCommandWithTimeout( + [process.execPath, "-e", "setTimeout(() => {}, 10_000)"], + { + timeoutMs: 5_000, + noOutputTimeoutMs: 300, + }, + ); + + const durationMs = Date.now() - startedAt; + expect(durationMs).toBeLessThan(2_500); + expect(result.termination).toBe("no-output-timeout"); + expect(result.noOutputTimedOut).toBe(true); + expect(result.code).not.toBe(0); + }); + + it("resets no output timer when command keeps emitting output", async () => { + const result = await runCommandWithTimeout( + [ + process.execPath, + "-e", + 'let i=0; const t=setInterval(() => { process.stdout.write("."); i += 1; if (i >= 5) { clearInterval(t); process.exit(0); } }, 50);', + ], + { + timeoutMs: 5_000, + noOutputTimeoutMs: 200, + }, + ); + + expect(result.code).toBe(0); + expect(result.termination).toBe("exit"); + expect(result.noOutputTimedOut).toBe(false); + expect(result.stdout.length).toBeGreaterThanOrEqual(5); + }); + + it("reports global timeout termination when overall timeout elapses", async () => { + const result = await runCommandWithTimeout( + [process.execPath, "-e", "setTimeout(() => {}, 10_000)"], + { + timeoutMs: 200, + }, + ); + + expect(result.termination).toBe("timeout"); + expect(result.noOutputTimedOut).toBe(false); + expect(result.code).not.toBe(0); + }); }); diff --git a/src/process/exec.ts b/src/process/exec.ts index 0cbd77263a..6c4609e178 100644 --- a/src/process/exec.ts +++ b/src/process/exec.ts @@ -76,11 +76,14 @@ export async function runExec( } export type SpawnResult = { + pid?: number; stdout: string; stderr: string; code: number | null; signal: NodeJS.Signals | null; killed: boolean; + termination: "exit" | "timeout" | "no-output-timeout" | "signal"; + noOutputTimedOut?: boolean; }; export type CommandOptions = { @@ -89,6 +92,7 @@ export type CommandOptions = { input?: string; env?: NodeJS.ProcessEnv; windowsVerbatimArguments?: boolean; + noOutputTimeoutMs?: number; }; export async function runCommandWithTimeout( @@ -97,7 +101,7 @@ export async function runCommandWithTimeout( ): Promise { const options: CommandOptions = typeof optionsOrTimeout === "number" ? { timeoutMs: optionsOrTimeout } : optionsOrTimeout; - const { timeoutMs, cwd, input, env } = options; + const { timeoutMs, cwd, input, env, noOutputTimeoutMs } = options; const { windowsVerbatimArguments } = options; const hasInput = input !== undefined; @@ -144,11 +148,45 @@ export async function runCommandWithTimeout( let stdout = ""; let stderr = ""; let settled = false; + let timedOut = false; + let noOutputTimedOut = false; + let noOutputTimer: NodeJS.Timeout | null = null; + const shouldTrackOutputTimeout = + typeof noOutputTimeoutMs === "number" && + Number.isFinite(noOutputTimeoutMs) && + noOutputTimeoutMs > 0; + + const clearNoOutputTimer = () => { + if (!noOutputTimer) { + return; + } + clearTimeout(noOutputTimer); + noOutputTimer = null; + }; + + const armNoOutputTimer = () => { + if (!shouldTrackOutputTimeout || settled) { + return; + } + clearNoOutputTimer(); + noOutputTimer = setTimeout(() => { + if (settled) { + return; + } + noOutputTimedOut = true; + if (typeof child.kill === "function") { + child.kill("SIGKILL"); + } + }, Math.floor(noOutputTimeoutMs)); + }; + const timer = setTimeout(() => { + timedOut = true; if (typeof child.kill === "function") { child.kill("SIGKILL"); } }, timeoutMs); + armNoOutputTimer(); if (hasInput && child.stdin) { child.stdin.write(input ?? ""); @@ -157,9 +195,11 @@ export async function runCommandWithTimeout( child.stdout?.on("data", (d) => { stdout += d.toString(); + armNoOutputTimer(); }); child.stderr?.on("data", (d) => { stderr += d.toString(); + armNoOutputTimer(); }); child.on("error", (err) => { if (settled) { @@ -167,6 +207,7 @@ export async function runCommandWithTimeout( } settled = true; clearTimeout(timer); + clearNoOutputTimer(); reject(err); }); child.on("close", (code, signal) => { @@ -175,7 +216,24 @@ export async function runCommandWithTimeout( } settled = true; clearTimeout(timer); - resolve({ stdout, stderr, code, signal, killed: child.killed }); + clearNoOutputTimer(); + const termination = noOutputTimedOut + ? "no-output-timeout" + : timedOut + ? "timeout" + : signal != null + ? "signal" + : "exit"; + resolve({ + pid: child.pid ?? undefined, + stdout, + stderr, + code, + signal, + killed: child.killed, + termination, + noOutputTimedOut, + }); }); }); } diff --git a/src/process/kill-tree.ts b/src/process/kill-tree.ts new file mode 100644 index 0000000000..e3b5573faa --- /dev/null +++ b/src/process/kill-tree.ts @@ -0,0 +1,34 @@ +import { spawn } from "node:child_process"; + +/** + * Best-effort process-tree termination. + * - Windows: use taskkill /T to include descendants. + * - Unix: try process-group kill first, then direct pid kill. + */ +export function killProcessTree(pid: number): void { + if (!Number.isFinite(pid) || pid <= 0) { + return; + } + + if (process.platform === "win32") { + try { + spawn("taskkill", ["/F", "/T", "/PID", String(pid)], { + stdio: "ignore", + detached: true, + }); + } catch { + // ignore taskkill failures + } + return; + } + + try { + process.kill(-pid, "SIGKILL"); + } catch { + try { + process.kill(pid, "SIGKILL"); + } catch { + // process already gone or inaccessible + } + } +} diff --git a/src/process/supervisor/adapters/child.test.ts b/src/process/supervisor/adapters/child.test.ts new file mode 100644 index 0000000000..f69a67f141 --- /dev/null +++ b/src/process/supervisor/adapters/child.test.ts @@ -0,0 +1,116 @@ +import type { ChildProcess } from "node:child_process"; +import { EventEmitter } from "node:events"; +import { PassThrough } from "node:stream"; +import { beforeEach, describe, expect, it, vi } from "vitest"; + +const { spawnWithFallbackMock, killProcessTreeMock } = vi.hoisted(() => ({ + spawnWithFallbackMock: vi.fn(), + killProcessTreeMock: vi.fn(), +})); + +vi.mock("../../spawn-utils.js", () => ({ + spawnWithFallback: (...args: unknown[]) => spawnWithFallbackMock(...args), +})); + +vi.mock("../../kill-tree.js", () => ({ + killProcessTree: (...args: unknown[]) => killProcessTreeMock(...args), +})); + +function createStubChild(pid = 1234) { + const child = new EventEmitter() as ChildProcess; + child.stdin = new PassThrough() as ChildProcess["stdin"]; + child.stdout = new PassThrough() as ChildProcess["stdout"]; + child.stderr = new PassThrough() as ChildProcess["stderr"]; + child.pid = pid; + child.killed = false; + const killMock = vi.fn(() => true); + child.kill = killMock as ChildProcess["kill"]; + return { child, killMock }; +} + +describe("createChildAdapter", () => { + beforeEach(() => { + spawnWithFallbackMock.mockReset(); + killProcessTreeMock.mockReset(); + }); + + it("uses process-tree kill for default SIGKILL", async () => { + const { child, killMock } = createStubChild(4321); + spawnWithFallbackMock.mockResolvedValue({ + child, + usedFallback: false, + }); + const { createChildAdapter } = await import("./child.js"); + const adapter = await createChildAdapter({ + argv: ["node", "-e", "setTimeout(() => {}, 1000)"], + stdinMode: "pipe-open", + }); + + const spawnArgs = spawnWithFallbackMock.mock.calls[0]?.[0] as { + options?: { detached?: boolean }; + fallbacks?: Array<{ options?: { detached?: boolean } }>; + }; + expect(spawnArgs.options?.detached).toBe(true); + expect(spawnArgs.fallbacks?.[0]?.options?.detached).toBe(false); + + adapter.kill(); + + expect(killProcessTreeMock).toHaveBeenCalledWith(4321); + expect(killMock).not.toHaveBeenCalled(); + }); + + it("uses direct child.kill for non-SIGKILL signals", async () => { + const { child, killMock } = createStubChild(7654); + spawnWithFallbackMock.mockResolvedValue({ + child, + usedFallback: false, + }); + const { createChildAdapter } = await import("./child.js"); + const adapter = await createChildAdapter({ + argv: ["node", "-e", "setTimeout(() => {}, 1000)"], + stdinMode: "pipe-open", + }); + + adapter.kill("SIGTERM"); + + expect(killProcessTreeMock).not.toHaveBeenCalled(); + expect(killMock).toHaveBeenCalledWith("SIGTERM"); + }); + + it("keeps inherited env when no override env is provided", async () => { + const { child } = createStubChild(3333); + spawnWithFallbackMock.mockResolvedValue({ + child, + usedFallback: false, + }); + const { createChildAdapter } = await import("./child.js"); + await createChildAdapter({ + argv: ["node", "-e", "process.exit(0)"], + stdinMode: "pipe-open", + }); + + const spawnArgs = spawnWithFallbackMock.mock.calls[0]?.[0] as { + options?: { env?: NodeJS.ProcessEnv }; + }; + expect(spawnArgs.options?.env).toBeUndefined(); + }); + + it("passes explicit env overrides as strings", async () => { + const { child } = createStubChild(4444); + spawnWithFallbackMock.mockResolvedValue({ + child, + usedFallback: false, + }); + const { createChildAdapter } = await import("./child.js"); + await createChildAdapter({ + argv: ["node", "-e", "process.exit(0)"], + env: { FOO: "bar", COUNT: "12", DROP_ME: undefined }, + stdinMode: "pipe-open", + }); + + const spawnArgs = spawnWithFallbackMock.mock.calls[0]?.[0] as { + options?: { env?: Record }; + }; + expect(spawnArgs.options?.env).toEqual({ FOO: "bar", COUNT: "12" }); + }); +}); diff --git a/src/process/supervisor/adapters/child.ts b/src/process/supervisor/adapters/child.ts new file mode 100644 index 0000000000..6652e206b4 --- /dev/null +++ b/src/process/supervisor/adapters/child.ts @@ -0,0 +1,174 @@ +import type { ChildProcessWithoutNullStreams, SpawnOptions } from "node:child_process"; +import type { ManagedRunStdin } from "../types.js"; +import { killProcessTree } from "../../kill-tree.js"; +import { spawnWithFallback } from "../../spawn-utils.js"; + +function resolveCommand(command: string): string { + if (process.platform !== "win32") { + return command; + } + const lower = command.toLowerCase(); + if (lower.endsWith(".exe") || lower.endsWith(".cmd") || lower.endsWith(".bat")) { + return command; + } + const basename = lower.split(/[\\/]/).pop() ?? lower; + if (basename === "npm" || basename === "pnpm" || basename === "yarn" || basename === "npx") { + return `${command}.cmd`; + } + return command; +} + +function toStringEnv(env?: NodeJS.ProcessEnv): Record { + if (!env) { + return {}; + } + const out: Record = {}; + for (const [key, value] of Object.entries(env)) { + if (value === undefined) { + continue; + } + out[key] = String(value); + } + return out; +} + +export type ChildAdapter = { + pid?: number; + stdin?: ManagedRunStdin; + onStdout: (listener: (chunk: string) => void) => void; + onStderr: (listener: (chunk: string) => void) => void; + wait: () => Promise<{ code: number | null; signal: NodeJS.Signals | null }>; + kill: (signal?: NodeJS.Signals) => void; + dispose: () => void; +}; + +export async function createChildAdapter(params: { + argv: string[]; + cwd?: string; + env?: NodeJS.ProcessEnv; + windowsVerbatimArguments?: boolean; + input?: string; + stdinMode?: "inherit" | "pipe-open" | "pipe-closed"; +}): Promise { + const resolvedArgv = [...params.argv]; + resolvedArgv[0] = resolveCommand(resolvedArgv[0] ?? ""); + + const stdinMode = params.stdinMode ?? (params.input !== undefined ? "pipe-closed" : "inherit"); + + const options: SpawnOptions = { + cwd: params.cwd, + env: params.env ? toStringEnv(params.env) : undefined, + stdio: ["pipe", "pipe", "pipe"], + detached: true, + windowsHide: true, + windowsVerbatimArguments: params.windowsVerbatimArguments, + }; + if (stdinMode === "inherit") { + options.stdio = ["inherit", "pipe", "pipe"]; + } else { + options.stdio = ["pipe", "pipe", "pipe"]; + } + + const spawned = await spawnWithFallback({ + argv: resolvedArgv, + options, + fallbacks: [ + { + label: "no-detach", + options: { detached: false }, + }, + ], + }); + + const child = spawned.child as ChildProcessWithoutNullStreams; + if (child.stdin) { + if (params.input !== undefined) { + child.stdin.write(params.input); + child.stdin.end(); + } else if (stdinMode === "pipe-closed") { + child.stdin.end(); + } + } + + const stdin: ManagedRunStdin | undefined = child.stdin + ? { + destroyed: false, + write: (data: string, cb?: (err?: Error | null) => void) => { + try { + child.stdin.write(data, cb); + } catch (err) { + cb?.(err as Error); + } + }, + end: () => { + try { + child.stdin.end(); + } catch { + // ignore close errors + } + }, + destroy: () => { + try { + child.stdin.destroy(); + } catch { + // ignore destroy errors + } + }, + } + : undefined; + + const onStdout = (listener: (chunk: string) => void) => { + child.stdout.on("data", (chunk) => { + listener(chunk.toString()); + }); + }; + + const onStderr = (listener: (chunk: string) => void) => { + child.stderr.on("data", (chunk) => { + listener(chunk.toString()); + }); + }; + + const wait = async () => + await new Promise<{ code: number | null; signal: NodeJS.Signals | null }>((resolve, reject) => { + child.once("error", reject); + child.once("close", (code, signal) => { + resolve({ code, signal }); + }); + }); + + const kill = (signal?: NodeJS.Signals) => { + const pid = child.pid ?? undefined; + if (signal === undefined || signal === "SIGKILL") { + if (pid) { + killProcessTree(pid); + } else { + try { + child.kill("SIGKILL"); + } catch { + // ignore kill errors + } + } + return; + } + try { + child.kill(signal); + } catch { + // ignore kill errors for non-kill signals + } + }; + + const dispose = () => { + child.removeAllListeners(); + }; + + return { + pid: child.pid ?? undefined, + stdin, + onStdout, + onStderr, + wait, + kill, + dispose, + }; +} diff --git a/src/process/supervisor/adapters/pty.test.ts b/src/process/supervisor/adapters/pty.test.ts new file mode 100644 index 0000000000..abacf7b70e --- /dev/null +++ b/src/process/supervisor/adapters/pty.test.ts @@ -0,0 +1,161 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; + +const { spawnMock, ptyKillMock, killProcessTreeMock } = vi.hoisted(() => ({ + spawnMock: vi.fn(), + ptyKillMock: vi.fn(), + killProcessTreeMock: vi.fn(), +})); + +vi.mock("@lydell/node-pty", () => ({ + spawn: (...args: unknown[]) => spawnMock(...args), +})); + +vi.mock("../../kill-tree.js", () => ({ + killProcessTree: (...args: unknown[]) => killProcessTreeMock(...args), +})); + +function createStubPty(pid = 1234) { + let exitListener: ((event: { exitCode: number; signal?: number }) => void) | null = null; + return { + pid, + write: vi.fn(), + onData: vi.fn(() => ({ dispose: vi.fn() })), + onExit: vi.fn((listener: (event: { exitCode: number; signal?: number }) => void) => { + exitListener = listener; + return { dispose: vi.fn() }; + }), + kill: (signal?: string) => ptyKillMock(signal), + emitExit: (event: { exitCode: number; signal?: number }) => { + exitListener?.(event); + }, + }; +} + +describe("createPtyAdapter", () => { + beforeEach(() => { + spawnMock.mockReset(); + ptyKillMock.mockReset(); + killProcessTreeMock.mockReset(); + }); + + afterEach(() => { + vi.resetModules(); + vi.clearAllMocks(); + }); + + it("forwards explicit signals to node-pty kill", async () => { + spawnMock.mockReturnValue(createStubPty()); + const { createPtyAdapter } = await import("./pty.js"); + + const adapter = await createPtyAdapter({ + shell: "bash", + args: ["-lc", "sleep 10"], + }); + + adapter.kill("SIGTERM"); + expect(ptyKillMock).toHaveBeenCalledWith("SIGTERM"); + expect(killProcessTreeMock).not.toHaveBeenCalled(); + }); + + it("uses process-tree kill for SIGKILL by default", async () => { + spawnMock.mockReturnValue(createStubPty()); + const { createPtyAdapter } = await import("./pty.js"); + + const adapter = await createPtyAdapter({ + shell: "bash", + args: ["-lc", "sleep 10"], + }); + + adapter.kill(); + expect(killProcessTreeMock).toHaveBeenCalledWith(1234); + expect(ptyKillMock).not.toHaveBeenCalled(); + }); + + it("resolves wait when exit fires before wait is called", async () => { + const stub = createStubPty(); + spawnMock.mockReturnValue(stub); + const { createPtyAdapter } = await import("./pty.js"); + + const adapter = await createPtyAdapter({ + shell: "bash", + args: ["-lc", "exit 3"], + }); + + expect(stub.onExit).toHaveBeenCalledTimes(1); + stub.emitExit({ exitCode: 3, signal: 0 }); + await expect(adapter.wait()).resolves.toEqual({ code: 3, signal: null }); + }); + + it("keeps inherited env when no override env is provided", async () => { + const stub = createStubPty(); + spawnMock.mockReturnValue(stub); + const { createPtyAdapter } = await import("./pty.js"); + + await createPtyAdapter({ + shell: "bash", + args: ["-lc", "env"], + }); + + const spawnOptions = spawnMock.mock.calls[0]?.[2] as { env?: Record }; + expect(spawnOptions?.env).toBeUndefined(); + }); + + it("passes explicit env overrides as strings", async () => { + const stub = createStubPty(); + spawnMock.mockReturnValue(stub); + const { createPtyAdapter } = await import("./pty.js"); + + await createPtyAdapter({ + shell: "bash", + args: ["-lc", "env"], + env: { FOO: "bar", COUNT: "12", DROP_ME: undefined }, + }); + + const spawnOptions = spawnMock.mock.calls[0]?.[2] as { env?: Record }; + expect(spawnOptions?.env).toEqual({ FOO: "bar", COUNT: "12" }); + }); + + it("does not pass a signal to node-pty on Windows", async () => { + const originalPlatform = Object.getOwnPropertyDescriptor(process, "platform"); + Object.defineProperty(process, "platform", { value: "win32", configurable: true }); + try { + spawnMock.mockReturnValue(createStubPty()); + const { createPtyAdapter } = await import("./pty.js"); + + const adapter = await createPtyAdapter({ + shell: "powershell.exe", + args: ["-NoLogo"], + }); + + adapter.kill("SIGTERM"); + expect(ptyKillMock).toHaveBeenCalledWith(undefined); + expect(killProcessTreeMock).not.toHaveBeenCalled(); + } finally { + if (originalPlatform) { + Object.defineProperty(process, "platform", originalPlatform); + } + } + }); + + it("uses process-tree kill for SIGKILL on Windows", async () => { + const originalPlatform = Object.getOwnPropertyDescriptor(process, "platform"); + Object.defineProperty(process, "platform", { value: "win32", configurable: true }); + try { + spawnMock.mockReturnValue(createStubPty(4567)); + const { createPtyAdapter } = await import("./pty.js"); + + const adapter = await createPtyAdapter({ + shell: "powershell.exe", + args: ["-NoLogo"], + }); + + adapter.kill("SIGKILL"); + expect(killProcessTreeMock).toHaveBeenCalledWith(4567); + expect(ptyKillMock).not.toHaveBeenCalled(); + } finally { + if (originalPlatform) { + Object.defineProperty(process, "platform", originalPlatform); + } + } + }); +}); diff --git a/src/process/supervisor/adapters/pty.ts b/src/process/supervisor/adapters/pty.ts new file mode 100644 index 0000000000..516238eefc --- /dev/null +++ b/src/process/supervisor/adapters/pty.ts @@ -0,0 +1,197 @@ +import type { ManagedRunStdin } from "../types.js"; +import { killProcessTree } from "../../kill-tree.js"; + +type PtyExitEvent = { exitCode: number; signal?: number }; +type PtyDisposable = { dispose: () => void }; +type PtySpawnHandle = { + pid: number; + write: (data: string | Buffer) => void; + onData: (listener: (value: string) => void) => PtyDisposable | void; + onExit: (listener: (event: PtyExitEvent) => void) => PtyDisposable | void; + kill: (signal?: string) => void; +}; +type PtySpawn = ( + file: string, + args: string[] | string, + options: { + name?: string; + cols?: number; + rows?: number; + cwd?: string; + env?: Record; + }, +) => PtySpawnHandle; + +type PtyModule = { + spawn?: PtySpawn; + default?: { + spawn?: PtySpawn; + }; +}; + +function toStringEnv(env?: NodeJS.ProcessEnv): Record { + if (!env) { + return {}; + } + const out: Record = {}; + for (const [key, value] of Object.entries(env)) { + if (value === undefined) { + continue; + } + out[key] = String(value); + } + return out; +} + +export type PtyAdapter = { + pid?: number; + stdin?: ManagedRunStdin; + onStdout: (listener: (chunk: string) => void) => void; + onStderr: (listener: (chunk: string) => void) => void; + wait: () => Promise<{ code: number | null; signal: NodeJS.Signals | number | null }>; + kill: (signal?: NodeJS.Signals) => void; + dispose: () => void; +}; + +export async function createPtyAdapter(params: { + shell: string; + args: string[]; + cwd?: string; + env?: NodeJS.ProcessEnv; + cols?: number; + rows?: number; + name?: string; +}): Promise { + const module = (await import("@lydell/node-pty")) as unknown as PtyModule; + const spawn = module.spawn ?? module.default?.spawn; + if (!spawn) { + throw new Error("PTY support is unavailable (node-pty spawn not found)."); + } + const pty = spawn(params.shell, params.args, { + cwd: params.cwd, + env: params.env ? toStringEnv(params.env) : undefined, + name: params.name ?? process.env.TERM ?? "xterm-256color", + cols: params.cols ?? 120, + rows: params.rows ?? 30, + }); + + let dataListener: PtyDisposable | null = null; + let exitListener: PtyDisposable | null = null; + let waitResult: { code: number | null; signal: NodeJS.Signals | number | null } | null = null; + let resolveWait: + | ((value: { code: number | null; signal: NodeJS.Signals | number | null }) => void) + | null = null; + let waitPromise: Promise<{ code: number | null; signal: NodeJS.Signals | number | null }> | null = + null; + + const settleWait = (value: { code: number | null; signal: NodeJS.Signals | number | null }) => { + if (waitResult) { + return; + } + waitResult = value; + if (resolveWait) { + const resolve = resolveWait; + resolveWait = null; + resolve(value); + } + }; + + exitListener = + pty.onExit((event) => { + const signal = event.signal && event.signal !== 0 ? event.signal : null; + settleWait({ code: event.exitCode ?? null, signal }); + }) ?? null; + + const stdin: ManagedRunStdin = { + destroyed: false, + write: (data, cb) => { + try { + pty.write(data); + cb?.(null); + } catch (err) { + cb?.(err as Error); + } + }, + end: () => { + try { + const eof = process.platform === "win32" ? "\x1a" : "\x04"; + pty.write(eof); + } catch { + // ignore EOF errors + } + }, + }; + + const onStdout = (listener: (chunk: string) => void) => { + dataListener = + pty.onData((chunk) => { + listener(chunk.toString()); + }) ?? null; + }; + + const onStderr = (_listener: (chunk: string) => void) => { + // PTY gives a unified output stream. + }; + + const wait = async () => { + if (waitResult) { + return waitResult; + } + if (!waitPromise) { + waitPromise = new Promise<{ code: number | null; signal: NodeJS.Signals | number | null }>( + (resolve) => { + resolveWait = resolve; + if (waitResult) { + const settled = waitResult; + resolveWait = null; + resolve(settled); + } + }, + ); + } + return waitPromise; + }; + + const kill = (signal: NodeJS.Signals = "SIGKILL") => { + try { + if (signal === "SIGKILL" && typeof pty.pid === "number" && pty.pid > 0) { + killProcessTree(pty.pid); + } else if (process.platform === "win32") { + pty.kill(); + } else { + pty.kill(signal); + } + } catch { + // ignore kill errors + } + // Some PTY hosts do not emit `onExit` reliably after kill. + // Ensure waiters can progress on forced termination. + settleWait({ code: null, signal }); + }; + + const dispose = () => { + try { + dataListener?.dispose(); + } catch { + // ignore disposal errors + } + try { + exitListener?.dispose(); + } catch { + // ignore disposal errors + } + dataListener = null; + exitListener = null; + settleWait({ code: null, signal: null }); + }; + + return { + pid: pty.pid || undefined, + stdin, + onStdout, + onStderr, + wait, + kill, + dispose, + }; +} diff --git a/src/process/supervisor/index.ts b/src/process/supervisor/index.ts new file mode 100644 index 0000000000..42557ba540 --- /dev/null +++ b/src/process/supervisor/index.ts @@ -0,0 +1,24 @@ +import type { ProcessSupervisor } from "./types.js"; +import { createProcessSupervisor } from "./supervisor.js"; + +let singleton: ProcessSupervisor | null = null; + +export function getProcessSupervisor(): ProcessSupervisor { + if (singleton) { + return singleton; + } + singleton = createProcessSupervisor(); + return singleton; +} + +export { createProcessSupervisor } from "./supervisor.js"; +export type { + ManagedRun, + ProcessSupervisor, + RunExit, + RunRecord, + RunState, + SpawnInput, + SpawnMode, + TerminationReason, +} from "./types.js"; diff --git a/src/process/supervisor/registry.test.ts b/src/process/supervisor/registry.test.ts new file mode 100644 index 0000000000..64d56d33d1 --- /dev/null +++ b/src/process/supervisor/registry.test.ts @@ -0,0 +1,83 @@ +import { describe, expect, it } from "vitest"; +import { createRunRegistry } from "./registry.js"; + +describe("process supervisor run registry", () => { + it("finalize is idempotent and preserves first terminal metadata", () => { + const registry = createRunRegistry(); + registry.add({ + runId: "r1", + sessionId: "s1", + backendId: "b1", + state: "running", + startedAtMs: 1, + lastOutputAtMs: 1, + createdAtMs: 1, + updatedAtMs: 1, + }); + + const first = registry.finalize("r1", { + reason: "overall-timeout", + exitCode: null, + exitSignal: "SIGKILL", + }); + const second = registry.finalize("r1", { + reason: "manual-cancel", + exitCode: 0, + exitSignal: null, + }); + + expect(first).not.toBeNull(); + expect(first?.firstFinalize).toBe(true); + expect(first?.record.terminationReason).toBe("overall-timeout"); + expect(first?.record.exitCode).toBeNull(); + expect(first?.record.exitSignal).toBe("SIGKILL"); + + expect(second).not.toBeNull(); + expect(second?.firstFinalize).toBe(false); + expect(second?.record.terminationReason).toBe("overall-timeout"); + expect(second?.record.exitCode).toBeNull(); + expect(second?.record.exitSignal).toBe("SIGKILL"); + }); + + it("prunes oldest exited records once retention cap is exceeded", () => { + const registry = createRunRegistry({ maxExitedRecords: 2 }); + registry.add({ + runId: "r1", + sessionId: "s1", + backendId: "b1", + state: "running", + startedAtMs: 1, + lastOutputAtMs: 1, + createdAtMs: 1, + updatedAtMs: 1, + }); + registry.add({ + runId: "r2", + sessionId: "s2", + backendId: "b1", + state: "running", + startedAtMs: 2, + lastOutputAtMs: 2, + createdAtMs: 2, + updatedAtMs: 2, + }); + registry.add({ + runId: "r3", + sessionId: "s3", + backendId: "b1", + state: "running", + startedAtMs: 3, + lastOutputAtMs: 3, + createdAtMs: 3, + updatedAtMs: 3, + }); + + registry.finalize("r1", { reason: "exit", exitCode: 0, exitSignal: null }); + registry.finalize("r2", { reason: "exit", exitCode: 0, exitSignal: null }); + registry.finalize("r3", { reason: "exit", exitCode: 0, exitSignal: null }); + + expect(registry.get("r1")).toBeUndefined(); + expect(registry.get("r2")?.state).toBe("exited"); + expect(registry.get("r3")?.state).toBe("exited"); + }); +}); diff --git a/src/process/supervisor/registry.ts b/src/process/supervisor/registry.ts new file mode 100644 index 0000000000..02432af7c0 --- /dev/null +++ b/src/process/supervisor/registry.ts @@ -0,0 +1,154 @@ +import type { RunRecord, RunState, TerminationReason } from "./types.js"; + +function nowMs() { + return Date.now(); +} + +const DEFAULT_MAX_EXITED_RECORDS = 2_000; + +function resolveMaxExitedRecords(value?: number): number { + if (typeof value !== "number" || !Number.isFinite(value) || value < 1) { + return DEFAULT_MAX_EXITED_RECORDS; + } + return Math.max(1, Math.floor(value)); +} + +export type RunRegistry = { + add: (record: RunRecord) => void; + get: (runId: string) => RunRecord | undefined; + list: () => RunRecord[]; + listByScope: (scopeKey: string) => RunRecord[]; + updateState: ( + runId: string, + state: RunState, + patch?: Partial>, + ) => RunRecord | undefined; + touchOutput: (runId: string) => void; + finalize: ( + runId: string, + exit: { + reason: TerminationReason; + exitCode: number | null; + exitSignal: NodeJS.Signals | number | null; + }, + ) => { record: RunRecord; firstFinalize: boolean } | null; + delete: (runId: string) => void; +}; + +export function createRunRegistry(options?: { maxExitedRecords?: number }): RunRegistry { + const records = new Map(); + const maxExitedRecords = resolveMaxExitedRecords(options?.maxExitedRecords); + + const pruneExitedRecords = () => { + if (!records.size) { + return; + } + let exited = 0; + for (const record of records.values()) { + if (record.state === "exited") { + exited += 1; + } + } + if (exited <= maxExitedRecords) { + return; + } + let remove = exited - maxExitedRecords; + for (const [runId, record] of records.entries()) { + if (remove <= 0) { + break; + } + if (record.state !== "exited") { + continue; + } + records.delete(runId); + remove -= 1; + } + }; + + const add: RunRegistry["add"] = (record) => { + records.set(record.runId, { ...record }); + }; + + const get: RunRegistry["get"] = (runId) => { + const record = records.get(runId); + return record ? { ...record } : undefined; + }; + + const list: RunRegistry["list"] = () => { + return Array.from(records.values()).map((record) => ({ ...record })); + }; + + const listByScope: RunRegistry["listByScope"] = (scopeKey) => { + if (!scopeKey.trim()) { + return []; + } + return Array.from(records.values()) + .filter((record) => record.scopeKey === scopeKey) + .map((record) => ({ ...record })); + }; + + const updateState: RunRegistry["updateState"] = (runId, state, patch) => { + const current = records.get(runId); + if (!current) { + return undefined; + } + const updatedAtMs = nowMs(); + const next: RunRecord = { + ...current, + ...patch, + state, + updatedAtMs, + lastOutputAtMs: current.lastOutputAtMs, + }; + records.set(runId, next); + return { ...next }; + }; + + const touchOutput: RunRegistry["touchOutput"] = (runId) => { + const current = records.get(runId); + if (!current) { + return; + } + const ts = nowMs(); + records.set(runId, { + ...current, + lastOutputAtMs: ts, + updatedAtMs: ts, + }); + }; + + const finalize: RunRegistry["finalize"] = (runId, exit) => { + const current = records.get(runId); + if (!current) { + return null; + } + const firstFinalize = current.state !== "exited"; + const ts = nowMs(); + const next: RunRecord = { + ...current, + state: "exited", + terminationReason: current.terminationReason ?? exit.reason, + exitCode: current.exitCode !== undefined ? current.exitCode : exit.exitCode, + exitSignal: current.exitSignal !== undefined ? current.exitSignal : exit.exitSignal, + updatedAtMs: ts, + }; + records.set(runId, next); + pruneExitedRecords(); + return { record: { ...next }, firstFinalize }; + }; + + const del: RunRegistry["delete"] = (runId) => { + records.delete(runId); + }; + + return { + add, + get, + list, + listByScope, + updateState, + touchOutput, + finalize, + delete: del, + }; +} diff --git a/src/process/supervisor/supervisor.pty-command.test.ts b/src/process/supervisor/supervisor.pty-command.test.ts new file mode 100644 index 0000000000..3fec62d4df --- /dev/null +++ b/src/process/supervisor/supervisor.pty-command.test.ts @@ -0,0 +1,76 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; + +const { createPtyAdapterMock } = vi.hoisted(() => ({ + createPtyAdapterMock: vi.fn(), +})); + +vi.mock("../../agents/shell-utils.js", () => ({ + getShellConfig: () => ({ shell: "sh", args: ["-c"] }), +})); + +vi.mock("./adapters/pty.js", () => ({ + createPtyAdapter: (...args: unknown[]) => createPtyAdapterMock(...args), +})); + +function createStubPtyAdapter() { + return { + pid: 1234, + stdin: undefined, + onStdout: (_listener: (chunk: string) => void) => { + // no-op + }, + onStderr: (_listener: (chunk: string) => void) => { + // no-op + }, + wait: async () => ({ code: 0, signal: null }), + kill: (_signal?: NodeJS.Signals) => { + // no-op + }, + dispose: () => { + // no-op + }, + }; +} + +describe("process supervisor PTY command contract", () => { + beforeEach(() => { + createPtyAdapterMock.mockReset(); + }); + + it("passes PTY command verbatim to shell args", async () => { + createPtyAdapterMock.mockResolvedValue(createStubPtyAdapter()); + const { createProcessSupervisor } = await import("./supervisor.js"); + const supervisor = createProcessSupervisor(); + const command = `printf '%s\\n' "a b" && printf '%s\\n' '$HOME'`; + + const run = await supervisor.spawn({ + sessionId: "s1", + backendId: "test", + mode: "pty", + ptyCommand: command, + timeoutMs: 1_000, + }); + const exit = await run.wait(); + + expect(exit.reason).toBe("exit"); + expect(createPtyAdapterMock).toHaveBeenCalledTimes(1); + const params = createPtyAdapterMock.mock.calls[0]?.[0] as { args?: string[] }; + expect(params.args).toEqual(["-c", command]); + }); + + it("rejects empty PTY command", async () => { + createPtyAdapterMock.mockResolvedValue(createStubPtyAdapter()); + const { createProcessSupervisor } = await import("./supervisor.js"); + const supervisor = createProcessSupervisor(); + + await expect( + supervisor.spawn({ + sessionId: "s1", + backendId: "test", + mode: "pty", + ptyCommand: " ", + }), + ).rejects.toThrow("PTY command cannot be empty"); + expect(createPtyAdapterMock).not.toHaveBeenCalled(); + }); +}); diff --git a/src/process/supervisor/supervisor.test.ts b/src/process/supervisor/supervisor.test.ts new file mode 100644 index 0000000000..08b22ca8f3 --- /dev/null +++ b/src/process/supervisor/supervisor.test.ts @@ -0,0 +1,102 @@ +import { describe, expect, it } from "vitest"; +import { createProcessSupervisor } from "./supervisor.js"; + +describe("process supervisor", () => { + it("spawns child runs and captures output", async () => { + const supervisor = createProcessSupervisor(); + const run = await supervisor.spawn({ + sessionId: "s1", + backendId: "test", + mode: "child", + argv: [process.execPath, "-e", 'process.stdout.write("ok")'], + timeoutMs: 2_000, + stdinMode: "pipe-closed", + }); + const exit = await run.wait(); + expect(exit.reason).toBe("exit"); + expect(exit.exitCode).toBe(0); + expect(exit.stdout).toBe("ok"); + }); + + it("enforces no-output timeout for silent processes", async () => { + const supervisor = createProcessSupervisor(); + const run = await supervisor.spawn({ + sessionId: "s1", + backendId: "test", + mode: "child", + argv: [process.execPath, "-e", "setTimeout(() => {}, 10_000)"], + timeoutMs: 5_000, + noOutputTimeoutMs: 200, + stdinMode: "pipe-closed", + }); + const exit = await run.wait(); + expect(exit.reason).toBe("no-output-timeout"); + expect(exit.noOutputTimedOut).toBe(true); + expect(exit.timedOut).toBe(true); + }); + + it("cancels prior scoped run when replaceExistingScope is enabled", async () => { + const supervisor = createProcessSupervisor(); + const first = await supervisor.spawn({ + sessionId: "s1", + backendId: "test", + scopeKey: "scope:a", + mode: "child", + argv: [process.execPath, "-e", "setTimeout(() => {}, 10_000)"], + timeoutMs: 10_000, + stdinMode: "pipe-open", + }); + + const second = await supervisor.spawn({ + sessionId: "s1", + backendId: "test", + scopeKey: "scope:a", + replaceExistingScope: true, + mode: "child", + argv: [process.execPath, "-e", 'process.stdout.write("new")'], + timeoutMs: 2_000, + stdinMode: "pipe-closed", + }); + + const firstExit = await first.wait(); + const secondExit = await second.wait(); + expect(firstExit.reason === "manual-cancel" || firstExit.reason === "signal").toBe(true); + expect(secondExit.reason).toBe("exit"); + expect(secondExit.stdout).toBe("new"); + }); + + it("applies overall timeout even for near-immediate timer firing", async () => { + const supervisor = createProcessSupervisor(); + const run = await supervisor.spawn({ + sessionId: "s-timeout", + backendId: "test", + mode: "child", + argv: [process.execPath, "-e", "setTimeout(() => {}, 10_000)"], + timeoutMs: 1, + stdinMode: "pipe-closed", + }); + const exit = await run.wait(); + expect(exit.reason).toBe("overall-timeout"); + expect(exit.timedOut).toBe(true); + }); + + it("can stream output without retaining it in RunExit payload", async () => { + const supervisor = createProcessSupervisor(); + let streamed = ""; + const run = await supervisor.spawn({ + sessionId: "s-capture", + backendId: "test", + mode: "child", + argv: [process.execPath, "-e", 'process.stdout.write("streamed")'], + timeoutMs: 2_000, + stdinMode: "pipe-closed", + captureOutput: false, + onStdout: (chunk) => { + streamed += chunk; + }, + }); + const exit = await run.wait(); + expect(streamed).toBe("streamed"); + expect(exit.stdout).toBe(""); + }); +}); diff --git a/src/process/supervisor/supervisor.ts b/src/process/supervisor/supervisor.ts new file mode 100644 index 0000000000..dba0d7d375 --- /dev/null +++ b/src/process/supervisor/supervisor.ts @@ -0,0 +1,282 @@ +import crypto from "node:crypto"; +import type { + ManagedRun, + ProcessSupervisor, + RunExit, + RunRecord, + SpawnInput, + TerminationReason, +} from "./types.js"; +import { getShellConfig } from "../../agents/shell-utils.js"; +import { createSubsystemLogger } from "../../logging/subsystem.js"; +import { createChildAdapter } from "./adapters/child.js"; +import { createPtyAdapter } from "./adapters/pty.js"; +import { createRunRegistry } from "./registry.js"; + +const log = createSubsystemLogger("process/supervisor"); + +type ActiveRun = { + run: ManagedRun; + scopeKey?: string; +}; + +function clampTimeout(value?: number): number | undefined { + if (typeof value !== "number" || !Number.isFinite(value) || value <= 0) { + return undefined; + } + return Math.max(1, Math.floor(value)); +} + +function isTimeoutReason(reason: TerminationReason) { + return reason === "overall-timeout" || reason === "no-output-timeout"; +} + +export function createProcessSupervisor(): ProcessSupervisor { + const registry = createRunRegistry(); + const active = new Map(); + + const cancel = (runId: string, reason: TerminationReason = "manual-cancel") => { + const current = active.get(runId); + if (!current) { + return; + } + registry.updateState(runId, "exiting", { + terminationReason: reason, + }); + current.run.cancel(reason); + }; + + const cancelScope = (scopeKey: string, reason: TerminationReason = "manual-cancel") => { + if (!scopeKey.trim()) { + return; + } + for (const [runId, run] of active.entries()) { + if (run.scopeKey !== scopeKey) { + continue; + } + cancel(runId, reason); + } + }; + + const spawn = async (input: SpawnInput): Promise => { + const runId = input.runId?.trim() || crypto.randomUUID(); + if (input.replaceExistingScope && input.scopeKey?.trim()) { + cancelScope(input.scopeKey, "manual-cancel"); + } + const startedAtMs = Date.now(); + const record: RunRecord = { + runId, + sessionId: input.sessionId, + backendId: input.backendId, + scopeKey: input.scopeKey?.trim() || undefined, + state: "starting", + startedAtMs, + lastOutputAtMs: startedAtMs, + createdAtMs: startedAtMs, + updatedAtMs: startedAtMs, + }; + registry.add(record); + + let forcedReason: TerminationReason | null = null; + let settled = false; + let stdout = ""; + let stderr = ""; + let timeoutTimer: NodeJS.Timeout | null = null; + let noOutputTimer: NodeJS.Timeout | null = null; + const captureOutput = input.captureOutput !== false; + + const overallTimeoutMs = clampTimeout(input.timeoutMs); + const noOutputTimeoutMs = clampTimeout(input.noOutputTimeoutMs); + + const setForcedReason = (reason: TerminationReason) => { + if (forcedReason) { + return; + } + forcedReason = reason; + registry.updateState(runId, "exiting", { terminationReason: reason }); + }; + + let cancelAdapter: ((reason: TerminationReason) => void) | null = null; + + const requestCancel = (reason: TerminationReason) => { + setForcedReason(reason); + cancelAdapter?.(reason); + }; + + const touchOutput = () => { + registry.touchOutput(runId); + if (!noOutputTimeoutMs || settled) { + return; + } + if (noOutputTimer) { + clearTimeout(noOutputTimer); + } + noOutputTimer = setTimeout(() => { + requestCancel("no-output-timeout"); + }, noOutputTimeoutMs); + }; + + try { + if (input.mode === "child" && input.argv.length === 0) { + throw new Error("spawn argv cannot be empty"); + } + const adapter = + input.mode === "pty" + ? await (async () => { + const { shell, args: shellArgs } = getShellConfig(); + const ptyCommand = input.ptyCommand.trim(); + if (!ptyCommand) { + throw new Error("PTY command cannot be empty"); + } + return await createPtyAdapter({ + shell, + args: [...shellArgs, ptyCommand], + cwd: input.cwd, + env: input.env, + }); + })() + : await createChildAdapter({ + argv: input.argv, + cwd: input.cwd, + env: input.env, + windowsVerbatimArguments: input.windowsVerbatimArguments, + input: input.input, + stdinMode: input.stdinMode, + }); + + registry.updateState(runId, "running", { pid: adapter.pid }); + + const clearTimers = () => { + if (timeoutTimer) { + clearTimeout(timeoutTimer); + timeoutTimer = null; + } + if (noOutputTimer) { + clearTimeout(noOutputTimer); + noOutputTimer = null; + } + }; + + cancelAdapter = (_reason: TerminationReason) => { + if (settled) { + return; + } + adapter.kill("SIGKILL"); + }; + + if (overallTimeoutMs) { + timeoutTimer = setTimeout(() => { + requestCancel("overall-timeout"); + }, overallTimeoutMs); + } + if (noOutputTimeoutMs) { + noOutputTimer = setTimeout(() => { + requestCancel("no-output-timeout"); + }, noOutputTimeoutMs); + } + + adapter.onStdout((chunk) => { + if (captureOutput) { + stdout += chunk; + } + input.onStdout?.(chunk); + touchOutput(); + }); + adapter.onStderr((chunk) => { + if (captureOutput) { + stderr += chunk; + } + input.onStderr?.(chunk); + touchOutput(); + }); + + const waitPromise = (async (): Promise => { + const result = await adapter.wait(); + if (settled) { + return { + reason: forcedReason ?? "exit", + exitCode: result.code, + exitSignal: result.signal, + durationMs: Date.now() - startedAtMs, + stdout, + stderr, + timedOut: isTimeoutReason(forcedReason ?? "exit"), + noOutputTimedOut: forcedReason === "no-output-timeout", + }; + } + settled = true; + clearTimers(); + adapter.dispose(); + active.delete(runId); + + const reason: TerminationReason = + forcedReason ?? (result.signal != null ? ("signal" as const) : ("exit" as const)); + const exit: RunExit = { + reason, + exitCode: result.code, + exitSignal: result.signal, + durationMs: Date.now() - startedAtMs, + stdout, + stderr, + timedOut: isTimeoutReason(forcedReason ?? reason), + noOutputTimedOut: forcedReason === "no-output-timeout", + }; + registry.finalize(runId, { + reason: exit.reason, + exitCode: exit.exitCode, + exitSignal: exit.exitSignal, + }); + return exit; + })().catch((err) => { + if (!settled) { + settled = true; + clearTimers(); + active.delete(runId); + adapter.dispose(); + registry.finalize(runId, { + reason: "spawn-error", + exitCode: null, + exitSignal: null, + }); + } + throw err; + }); + + const managedRun: ManagedRun = { + runId, + pid: adapter.pid, + startedAtMs, + stdin: adapter.stdin, + wait: async () => await waitPromise, + cancel: (reason = "manual-cancel") => { + requestCancel(reason); + }, + }; + + active.set(runId, { + run: managedRun, + scopeKey: input.scopeKey?.trim() || undefined, + }); + return managedRun; + } catch (err) { + registry.finalize(runId, { + reason: "spawn-error", + exitCode: null, + exitSignal: null, + }); + log.warn(`spawn failed: runId=${runId} reason=${String(err)}`); + throw err; + } + }; + + return { + spawn, + cancel, + cancelScope, + reconcileOrphans: async () => { + // Deliberate no-op: this supervisor uses in-memory ownership only. + // Active runs are not recovered after process restart in the current model. + }, + getRecord: (runId: string) => registry.get(runId), + }; +} diff --git a/src/process/supervisor/types.ts b/src/process/supervisor/types.ts new file mode 100644 index 0000000000..04c571b08b --- /dev/null +++ b/src/process/supervisor/types.ts @@ -0,0 +1,96 @@ +export type RunState = "starting" | "running" | "exiting" | "exited"; + +export type TerminationReason = + | "manual-cancel" + | "overall-timeout" + | "no-output-timeout" + | "spawn-error" + | "signal" + | "exit"; + +export type RunRecord = { + runId: string; + sessionId: string; + backendId: string; + scopeKey?: string; + pid?: number; + processGroupId?: number; + startedAtMs: number; + lastOutputAtMs: number; + createdAtMs: number; + updatedAtMs: number; + state: RunState; + terminationReason?: TerminationReason; + exitCode?: number | null; + exitSignal?: NodeJS.Signals | number | null; +}; + +export type RunExit = { + reason: TerminationReason; + exitCode: number | null; + exitSignal: NodeJS.Signals | number | null; + durationMs: number; + stdout: string; + stderr: string; + timedOut: boolean; + noOutputTimedOut: boolean; +}; + +export type ManagedRun = { + runId: string; + pid?: number; + startedAtMs: number; + stdin?: ManagedRunStdin; + wait: () => Promise; + cancel: (reason?: TerminationReason) => void; +}; + +export type SpawnMode = "child" | "pty"; + +export type ManagedRunStdin = { + write: (data: string, cb?: (err?: Error | null) => void) => void; + end: () => void; + destroy?: () => void; + destroyed?: boolean; +}; + +type SpawnBaseInput = { + runId?: string; + sessionId: string; + backendId: string; + scopeKey?: string; + replaceExistingScope?: boolean; + cwd?: string; + env?: NodeJS.ProcessEnv; + timeoutMs?: number; + noOutputTimeoutMs?: number; + /** + * When false, stdout/stderr are streamed via callbacks only and not retained in RunExit payload. + */ + captureOutput?: boolean; + onStdout?: (chunk: string) => void; + onStderr?: (chunk: string) => void; +}; + +type SpawnChildInput = SpawnBaseInput & { + mode: "child"; + argv: string[]; + windowsVerbatimArguments?: boolean; + input?: string; + stdinMode?: "inherit" | "pipe-open" | "pipe-closed"; +}; + +type SpawnPtyInput = SpawnBaseInput & { + mode: "pty"; + ptyCommand: string; +}; + +export type SpawnInput = SpawnChildInput | SpawnPtyInput; + +export interface ProcessSupervisor { + spawn(input: SpawnInput): Promise; + cancel(runId: string, reason?: TerminationReason): void; + cancelScope(scopeKey: string, reason?: TerminationReason): void; + reconcileOrphans(): Promise; + getRecord(runId: string): RunRecord | undefined; +}