fix: codex and similar processes keep dying on pty, solved by refactoring process spawning (#14257)

* exec: clean up PTY resources on timeout and exit

* cli: harden resume cleanup and watchdog stalled runs

* cli: productionize PTY and resume reliability paths

* docs: add PTY process supervision architecture plan

* docs: rewrite PTY supervision plan as pre-rewrite baseline

* docs: switch PTY supervision plan to one-go execution

* docs: add one-line root cause to PTY supervision plan

* docs: add OS contracts and test matrix to PTY supervision plan

* docs: define process-supervisor package placement and scope

* docs: tie supervisor plan to existing CI lanes

* docs: place PTY supervisor plan under src/process

* refactor(process): route exec and cli runs through supervisor

* docs(process): refresh PTY supervision plan

* wip

* fix(process): harden supervisor timeout and PTY termination

* fix(process): harden supervisor adapters env and wait handling

* ci: avoid failing formal conformance on comment permissions

* test(ui): fix cron request mock argument typing

* fix(ui): remove leftover conflict marker

* fix: supervise PTY processes (#14257) (openclaw#14257) (thanks @onutc)
This commit is contained in:
Onur
2026-02-16 09:32:05 +08:00
committed by GitHub
parent a73e7786e7
commit cd44a0d01e
32 changed files with 2759 additions and 855 deletions

View File

@@ -108,6 +108,7 @@ jobs:
- name: Comment on PR (informational) - name: Comment on PR (informational)
if: steps.drift.outputs.drift == 'true' if: steps.drift.outputs.drift == 'true'
continue-on-error: true
uses: actions/github-script@v7 uses: actions/github-script@v7
with: with:
script: | script: |

View File

@@ -108,6 +108,7 @@ Docs: https://docs.openclaw.ai
- Tools/Write/Edit: normalize structured text-block arguments for `content`/`oldText`/`newText` before filesystem edits, preventing JSON-like file corruption and false “exact text not found” misses from block-form params. (#16778) Thanks @danielpipernz. - Tools/Write/Edit: normalize structured text-block arguments for `content`/`oldText`/`newText` before filesystem edits, preventing JSON-like file corruption and false “exact text not found” misses from block-form params. (#16778) Thanks @danielpipernz.
- Ollama/Agents: avoid forcing `<final>` tag enforcement for Ollama models, which could suppress all output as `(no output)`. (#16191) Thanks @Glucksberg. - Ollama/Agents: avoid forcing `<final>` tag enforcement for Ollama models, which could suppress all output as `(no output)`. (#16191) Thanks @Glucksberg.
- Plugins: suppress false duplicate plugin id warnings when the same extension is discovered via multiple paths (config/workspace/global vs bundled), while still warning on genuine duplicates. (#16222) Thanks @shadril238. - Plugins: suppress false duplicate plugin id warnings when the same extension is discovered via multiple paths (config/workspace/global vs bundled), while still warning on genuine duplicates. (#16222) Thanks @shadril238.
- Agents/Process: supervise PTY/child process lifecycles with explicit ownership, cancellation, timeouts, and deterministic cleanup, preventing Codex/Pi PTY sessions from dying or stalling on resume. (#14257) Thanks @onutc.
- Skills: watch `SKILL.md` only when refreshing skills snapshot to avoid file-descriptor exhaustion in large data trees. (#11325) Thanks @household-bard. - Skills: watch `SKILL.md` only when refreshing skills snapshot to avoid file-descriptor exhaustion in large data trees. (#11325) Thanks @household-bard.
- Memory/QMD: make `memory status` read-only by skipping QMD boot update/embed side effects for status-only manager checks. - Memory/QMD: make `memory status` read-only by skipping QMD boot update/embed side effects for status-only manager checks.
- Memory/QMD: keep original QMD failures when builtin fallback initialization fails (for example missing embedding API keys), instead of replacing them with fallback init errors. - Memory/QMD: keep original QMD failures when builtin fallback initialization fails (for example missing embedding API keys), instead of replacing them with fallback init errors.

View File

@@ -0,0 +1,192 @@
---
summary: "Production plan for reliable interactive process supervision (PTY + non-PTY) with explicit ownership, unified lifecycle, and deterministic cleanup"
owner: "openclaw"
status: "in-progress"
last_updated: "2026-02-15"
title: "PTY and Process Supervision Plan"
---
# PTY and Process Supervision Plan
## 1. Problem and goal
We need one reliable lifecycle for long-running command execution across:
- `exec` foreground runs
- `exec` background runs
- `process` follow up actions (`poll`, `log`, `send-keys`, `paste`, `submit`, `kill`, `remove`)
- CLI agent runner subprocesses
The goal is not just to support PTY. The goal is predictable ownership, cancellation, timeout, and cleanup with no unsafe process matching heuristics.
## 2. Scope and boundaries
- Keep implementation internal in `src/process/supervisor`.
- Do not create a new package for this.
- Keep current behavior compatibility where practical.
- Do not broaden scope to terminal replay or tmux style session persistence.
## 3. Implemented in this branch
### Supervisor baseline already present
- Supervisor module is in place under `src/process/supervisor/*`.
- Exec runtime and CLI runner are already routed through supervisor spawn and wait.
- Registry finalization is idempotent.
### This pass completed
1. Explicit PTY command contract
- `SpawnInput` is now a discriminated union in `src/process/supervisor/types.ts`.
- PTY runs require `ptyCommand` instead of reusing generic `argv`.
- Supervisor no longer rebuilds PTY command strings from argv joins in `src/process/supervisor/supervisor.ts`.
- Exec runtime now passes `ptyCommand` directly in `src/agents/bash-tools.exec-runtime.ts`.
2. Process layer type decoupling
- Supervisor types no longer import `SessionStdin` from agents.
- Process local stdin contract lives in `src/process/supervisor/types.ts` (`ManagedRunStdin`).
- Adapters now depend only on process level types:
- `src/process/supervisor/adapters/child.ts`
- `src/process/supervisor/adapters/pty.ts`
3. Process tool lifecycle ownership improvement
- `src/agents/bash-tools.process.ts` now requests cancellation through supervisor first.
- `process kill/remove` now use process-tree fallback termination when supervisor lookup misses.
- `remove` keeps deterministic remove behavior by dropping running session entries immediately after termination is requested.
4. Single source watchdog defaults
- Added shared defaults in `src/agents/cli-watchdog-defaults.ts`.
- `src/agents/cli-backends.ts` consumes the shared defaults.
- `src/agents/cli-runner/reliability.ts` consumes the same shared defaults.
5. Dead helper cleanup
- Removed unused `killSession` helper path from `src/agents/bash-tools.shared.ts`.
6. Direct supervisor path tests added
- Added `src/agents/bash-tools.process.supervisor.test.ts` to cover kill and remove routing through supervisor cancellation.
7. Reliability gap fixes completed
- `src/agents/bash-tools.process.ts` now falls back to real OS-level process termination when supervisor lookup misses.
- `src/process/supervisor/adapters/child.ts` now uses process-tree termination semantics for default cancel/timeout kill paths.
- Added shared process-tree utility in `src/process/kill-tree.ts`.
8. PTY contract edge-case coverage added
- Added `src/process/supervisor/supervisor.pty-command.test.ts` for verbatim PTY command forwarding and empty-command rejection.
- Added `src/process/supervisor/adapters/child.test.ts` for process-tree kill behavior in child adapter cancellation.
## 4. Remaining gaps and decisions
### Reliability status
The two required reliability gaps for this pass are now closed:
- `process kill/remove` now has a real OS termination fallback when supervisor lookup misses.
- child cancel/timeout now uses process-tree kill semantics for default kill path.
- Regression tests were added for both behaviors.
### Durability and startup reconciliation
Restart behavior is now explicitly defined as in-memory lifecycle only.
- `reconcileOrphans()` remains a no-op in `src/process/supervisor/supervisor.ts` by design.
- Active runs are not recovered after process restart.
- This boundary is intentional for this implementation pass to avoid partial persistence risks.
### Maintainability follow-ups
1. `runExecProcess` in `src/agents/bash-tools.exec-runtime.ts` still handles multiple responsibilities and can be split into focused helpers in a follow-up.
## 5. Implementation plan
The implementation pass for required reliability and contract items is complete.
Completed:
- `process kill/remove` fallback real termination
- process-tree cancellation for child adapter default kill path
- regression tests for fallback kill and child adapter kill path
- PTY command edge-case tests under explicit `ptyCommand`
- explicit in-memory restart boundary with `reconcileOrphans()` no-op by design
Optional follow-up:
- split `runExecProcess` into focused helpers with no behavior drift
## 6. File map
### Process supervisor
- `src/process/supervisor/types.ts` updated with discriminated spawn input and process local stdin contract.
- `src/process/supervisor/supervisor.ts` updated to use explicit `ptyCommand`.
- `src/process/supervisor/adapters/child.ts` and `src/process/supervisor/adapters/pty.ts` decoupled from agent types.
- `src/process/supervisor/registry.ts` idempotent finalize unchanged and retained.
### Exec and process integration
- `src/agents/bash-tools.exec-runtime.ts` updated to pass PTY command explicitly and keep fallback path.
- `src/agents/bash-tools.process.ts` updated to cancel via supervisor with real process-tree fallback termination.
- `src/agents/bash-tools.shared.ts` removed direct kill helper path.
### CLI reliability
- `src/agents/cli-watchdog-defaults.ts` added as shared baseline.
- `src/agents/cli-backends.ts` and `src/agents/cli-runner/reliability.ts` now consume same defaults.
## 7. Validation run in this pass
Unit tests:
- `pnpm vitest src/process/supervisor/registry.test.ts`
- `pnpm vitest src/process/supervisor/supervisor.test.ts`
- `pnpm vitest src/process/supervisor/supervisor.pty-command.test.ts`
- `pnpm vitest src/process/supervisor/adapters/child.test.ts`
- `pnpm vitest src/agents/cli-backends.test.ts`
- `pnpm vitest src/agents/bash-tools.exec.pty-cleanup.test.ts`
- `pnpm vitest src/agents/bash-tools.process.poll-timeout.test.ts`
- `pnpm vitest src/agents/bash-tools.process.supervisor.test.ts`
- `pnpm vitest src/process/exec.test.ts`
E2E targets:
- `pnpm test:e2e src/agents/cli-runner.e2e.test.ts`
- `pnpm test:e2e src/agents/bash-tools.exec.pty-fallback.e2e.test.ts src/agents/bash-tools.exec.background-abort.e2e.test.ts src/agents/bash-tools.process.send-keys.e2e.test.ts`
Typecheck note:
- `pnpm tsgo` currently fails in this repo due to a pre-existing UI typing dependency issue (`@vitest/browser-playwright` resolution), unrelated to this process supervision work.
## 8. Operational guarantees preserved
- Exec env hardening behavior is unchanged.
- Approval and allowlist flow is unchanged.
- Output sanitization and output caps are unchanged.
- PTY adapter still guarantees wait settlement on forced kill and listener disposal.
## 9. Definition of done
1. Supervisor is lifecycle owner for managed runs.
2. PTY spawn uses explicit command contract with no argv reconstruction.
3. Process layer has no type dependency on agent layer for supervisor stdin contracts.
4. Watchdog defaults are single source.
5. Targeted unit and e2e tests remain green.
6. Restart durability boundary is explicitly documented or fully implemented.
## 10. Summary
The branch now has a coherent and safer supervision shape:
- explicit PTY contract
- cleaner process layering
- supervisor driven cancellation path for process operations
- real fallback termination when supervisor lookup misses
- process-tree cancellation for child-run default kill paths
- unified watchdog defaults
- explicit in-memory restart boundary (no orphan reconciliation across restart in this pass)

View File

@@ -1,17 +1,17 @@
import type { AgentToolResult } from "@mariozechner/pi-agent-core"; import type { AgentToolResult } from "@mariozechner/pi-agent-core";
import type { ChildProcessWithoutNullStreams } from "node:child_process";
import { Type } from "@sinclair/typebox"; import { Type } from "@sinclair/typebox";
import path from "node:path"; import path from "node:path";
import type { ExecAsk, ExecHost, ExecSecurity } from "../infra/exec-approvals.js"; import type { ExecAsk, ExecHost, ExecSecurity } from "../infra/exec-approvals.js";
import type { ProcessSession, SessionStdin } from "./bash-process-registry.js"; import type { ProcessSession } from "./bash-process-registry.js";
import type { ExecToolDetails } from "./bash-tools.exec.js"; import type { ExecToolDetails } from "./bash-tools.exec.js";
import type { BashSandboxConfig } from "./bash-tools.shared.js"; import type { BashSandboxConfig } from "./bash-tools.shared.js";
import { requestHeartbeatNow } from "../infra/heartbeat-wake.js"; import { requestHeartbeatNow } from "../infra/heartbeat-wake.js";
import { mergePathPrepend } from "../infra/path-prepend.js"; import { mergePathPrepend } from "../infra/path-prepend.js";
import { enqueueSystemEvent } from "../infra/system-events.js"; import { enqueueSystemEvent } from "../infra/system-events.js";
export { applyPathPrepend, normalizePathPrepend } from "../infra/path-prepend.js"; export { applyPathPrepend, normalizePathPrepend } from "../infra/path-prepend.js";
import type { ManagedRun } from "../process/supervisor/index.js";
import { logWarn } from "../logger.js"; import { logWarn } from "../logger.js";
import { formatSpawnError, spawnWithFallback } from "../process/spawn-utils.js"; import { getProcessSupervisor } from "../process/supervisor/index.js";
import { import {
addSession, addSession,
appendOutput, appendOutput,
@@ -23,7 +23,6 @@ import {
buildDockerExecArgs, buildDockerExecArgs,
chunkString, chunkString,
clampWithDefault, clampWithDefault,
killSession,
readEnvInt, readEnvInt,
} from "./bash-tools.shared.js"; } from "./bash-tools.shared.js";
import { buildCursorPositionResponse, stripDsrRequests } from "./pty-dsr.js"; import { buildCursorPositionResponse, stripDsrRequests } from "./pty-dsr.js";
@@ -147,26 +146,6 @@ export const execSchema = Type.Object({
), ),
}); });
type PtyExitEvent = { exitCode: number; signal?: number };
type PtyListener<T> = (event: T) => void;
type PtyHandle = {
pid: number;
write: (data: string | Buffer) => void;
onData: (listener: PtyListener<string>) => void;
onExit: (listener: PtyListener<PtyExitEvent>) => void;
};
type PtySpawn = (
file: string,
args: string[] | string,
options: {
name?: string;
cols?: number;
rows?: number;
cwd?: string;
env?: Record<string, string>;
},
) => PtyHandle;
export type ExecProcessOutcome = { export type ExecProcessOutcome = {
status: "completed" | "failed"; status: "completed" | "failed";
exitCode: number | null; exitCode: number | null;
@@ -319,138 +298,10 @@ export async function runExecProcess(opts: {
}): Promise<ExecProcessHandle> { }): Promise<ExecProcessHandle> {
const startedAt = Date.now(); const startedAt = Date.now();
const sessionId = createSessionSlug(); const sessionId = createSessionSlug();
let child: ChildProcessWithoutNullStreams | null = null;
let pty: PtyHandle | null = null;
let stdin: SessionStdin | undefined;
const execCommand = opts.execCommand ?? opts.command; const execCommand = opts.execCommand ?? opts.command;
const supervisor = getProcessSupervisor();
const spawnFallbacks = [ const session: ProcessSession = {
{
label: "no-detach",
options: { detached: false },
},
];
const handleSpawnFallback = (err: unknown, fallback: { label: string }) => {
const errText = formatSpawnError(err);
const warning = `Warning: spawn failed (${errText}); retrying with ${fallback.label}.`;
logWarn(`exec: spawn failed (${errText}); retrying with ${fallback.label}.`);
opts.warnings.push(warning);
};
const spawnShellChild = async (
shell: string,
shellArgs: string[],
): Promise<ChildProcessWithoutNullStreams> => {
const { child: spawned } = await spawnWithFallback({
argv: [shell, ...shellArgs, execCommand],
options: {
cwd: opts.workdir,
env: opts.env,
detached: process.platform !== "win32",
stdio: ["pipe", "pipe", "pipe"],
windowsHide: true,
},
fallbacks: spawnFallbacks,
onFallback: handleSpawnFallback,
});
return spawned as ChildProcessWithoutNullStreams;
};
// `exec` does not currently accept tool-provided stdin content. For non-PTY runs,
// keeping stdin open can cause commands like `wc -l` (or safeBins-hardened segments)
// to block forever waiting for input, leading to accidental backgrounding.
// For interactive flows, callers should use `pty: true` (stdin kept open).
const maybeCloseNonPtyStdin = () => {
if (opts.usePty) {
return;
}
try {
// Signal EOF immediately so stdin-only commands can terminate.
child?.stdin?.end();
} catch {
// ignore stdin close errors
}
};
if (opts.sandbox) {
const { child: spawned } = await spawnWithFallback({
argv: [
"docker",
...buildDockerExecArgs({
containerName: opts.sandbox.containerName,
command: execCommand,
workdir: opts.containerWorkdir ?? opts.sandbox.containerWorkdir,
env: opts.env,
tty: opts.usePty,
}),
],
options: {
cwd: opts.workdir,
env: process.env,
detached: process.platform !== "win32",
stdio: ["pipe", "pipe", "pipe"],
windowsHide: true,
},
fallbacks: spawnFallbacks,
onFallback: handleSpawnFallback,
});
child = spawned as ChildProcessWithoutNullStreams;
stdin = child.stdin;
maybeCloseNonPtyStdin();
} else if (opts.usePty) {
const { shell, args: shellArgs } = getShellConfig();
try {
const ptyModule = (await import("@lydell/node-pty")) as unknown as {
spawn?: PtySpawn;
default?: { spawn?: PtySpawn };
};
const spawnPty = ptyModule.spawn ?? ptyModule.default?.spawn;
if (!spawnPty) {
throw new Error("PTY support is unavailable (node-pty spawn not found).");
}
pty = spawnPty(shell, [...shellArgs, execCommand], {
cwd: opts.workdir,
env: opts.env,
name: process.env.TERM ?? "xterm-256color",
cols: 120,
rows: 30,
});
stdin = {
destroyed: false,
write: (data, cb) => {
try {
pty?.write(data);
cb?.(null);
} catch (err) {
cb?.(err as Error);
}
},
end: () => {
try {
const eof = process.platform === "win32" ? "\x1a" : "\x04";
pty?.write(eof);
} catch {
// ignore EOF errors
}
},
};
} catch (err) {
const errText = String(err);
const warning = `Warning: PTY spawn failed (${errText}); retrying without PTY for \`${opts.command}\`.`;
logWarn(`exec: PTY spawn failed (${errText}); retrying without PTY for "${opts.command}".`);
opts.warnings.push(warning);
child = await spawnShellChild(shell, shellArgs);
stdin = child.stdin;
}
} else {
const { shell, args: shellArgs } = getShellConfig();
child = await spawnShellChild(shell, shellArgs);
stdin = child.stdin;
maybeCloseNonPtyStdin();
}
const session = {
id: sessionId, id: sessionId,
command: opts.command, command: opts.command,
scopeKey: opts.scopeKey, scopeKey: opts.scopeKey,
@@ -458,9 +309,9 @@ export async function runExecProcess(opts: {
notifyOnExit: opts.notifyOnExit, notifyOnExit: opts.notifyOnExit,
notifyOnExitEmptySuccess: opts.notifyOnExitEmptySuccess === true, notifyOnExitEmptySuccess: opts.notifyOnExitEmptySuccess === true,
exitNotified: false, exitNotified: false,
child: child ?? undefined, child: undefined,
stdin, stdin: undefined,
pid: child?.pid ?? pty?.pid, pid: undefined,
startedAt, startedAt,
cwd: opts.workdir, cwd: opts.workdir,
maxOutputChars: opts.maxOutput, maxOutputChars: opts.maxOutput,
@@ -477,59 +328,9 @@ export async function runExecProcess(opts: {
exitSignal: undefined as NodeJS.Signals | number | null | undefined, exitSignal: undefined as NodeJS.Signals | number | null | undefined,
truncated: false, truncated: false,
backgrounded: false, backgrounded: false,
} satisfies ProcessSession; };
addSession(session); addSession(session);
let settled = false;
let timeoutTimer: NodeJS.Timeout | null = null;
let timeoutFinalizeTimer: NodeJS.Timeout | null = null;
let timedOut = false;
const timeoutFinalizeMs = 1000;
let resolveFn: ((outcome: ExecProcessOutcome) => void) | null = null;
const settle = (outcome: ExecProcessOutcome) => {
if (settled) {
return;
}
settled = true;
resolveFn?.(outcome);
};
const finalizeTimeout = () => {
if (session.exited) {
return;
}
markExited(session, null, "SIGKILL", "failed");
maybeNotifyOnExit(session, "failed");
const aggregated = session.aggregated.trim();
const reason = `Command timed out after ${opts.timeoutSec} seconds`;
settle({
status: "failed",
exitCode: null,
exitSignal: "SIGKILL",
durationMs: Date.now() - startedAt,
aggregated,
timedOut: true,
reason: aggregated ? `${aggregated}\n\n${reason}` : reason,
});
};
const onTimeout = () => {
timedOut = true;
killSession(session);
if (!timeoutFinalizeTimer) {
timeoutFinalizeTimer = setTimeout(() => {
finalizeTimeout();
}, timeoutFinalizeMs);
}
};
if (opts.timeoutSec > 0) {
timeoutTimer = setTimeout(() => {
onTimeout();
}, opts.timeoutSec * 1000);
}
const emitUpdate = () => { const emitUpdate = () => {
if (!opts.onUpdate) { if (!opts.onUpdate) {
return; return;
@@ -565,116 +366,208 @@ export async function runExecProcess(opts: {
} }
}; };
if (pty) { const timeoutMs =
const cursorResponse = buildCursorPositionResponse(); typeof opts.timeoutSec === "number" && opts.timeoutSec > 0
pty.onData((data) => { ? Math.floor(opts.timeoutSec * 1000)
const raw = data.toString(); : undefined;
const { cleaned, requests } = stripDsrRequests(raw);
if (requests > 0) { const spawnSpec:
| {
mode: "child";
argv: string[];
env: NodeJS.ProcessEnv;
stdinMode: "pipe-open" | "pipe-closed";
}
| {
mode: "pty";
ptyCommand: string;
childFallbackArgv: string[];
env: NodeJS.ProcessEnv;
stdinMode: "pipe-open";
} = (() => {
if (opts.sandbox) {
return {
mode: "child" as const,
argv: [
"docker",
...buildDockerExecArgs({
containerName: opts.sandbox.containerName,
command: execCommand,
workdir: opts.containerWorkdir ?? opts.sandbox.containerWorkdir,
env: opts.env,
tty: opts.usePty,
}),
],
env: process.env,
stdinMode: opts.usePty ? ("pipe-open" as const) : ("pipe-closed" as const),
};
}
const { shell, args: shellArgs } = getShellConfig();
const childArgv = [shell, ...shellArgs, execCommand];
if (opts.usePty) {
return {
mode: "pty" as const,
ptyCommand: execCommand,
childFallbackArgv: childArgv,
env: opts.env,
stdinMode: "pipe-open" as const,
};
}
return {
mode: "child" as const,
argv: childArgv,
env: opts.env,
stdinMode: "pipe-closed" as const,
};
})();
let managedRun: ManagedRun | null = null;
let usingPty = spawnSpec.mode === "pty";
const cursorResponse = buildCursorPositionResponse();
const onSupervisorStdout = (chunk: string) => {
if (usingPty) {
const { cleaned, requests } = stripDsrRequests(chunk);
if (requests > 0 && managedRun?.stdin) {
for (let i = 0; i < requests; i += 1) { for (let i = 0; i < requests; i += 1) {
pty.write(cursorResponse); managedRun.stdin.write(cursorResponse);
} }
} }
handleStdout(cleaned); handleStdout(cleaned);
}); return;
} else if (child) { }
child.stdout.on("data", handleStdout); handleStdout(chunk);
child.stderr.on("data", handleStderr); };
}
const promise = new Promise<ExecProcessOutcome>((resolve) => { try {
resolveFn = resolve; const spawnBase = {
const handleExit = (code: number | null, exitSignal: NodeJS.Signals | number | null) => { runId: sessionId,
if (timeoutTimer) { sessionId: opts.sessionKey?.trim() || sessionId,
clearTimeout(timeoutTimer); backendId: opts.sandbox ? "exec-sandbox" : "exec-host",
} scopeKey: opts.scopeKey,
if (timeoutFinalizeTimer) { cwd: opts.workdir,
clearTimeout(timeoutFinalizeTimer); env: spawnSpec.env,
timeoutMs,
captureOutput: false,
onStdout: onSupervisorStdout,
onStderr: handleStderr,
};
managedRun =
spawnSpec.mode === "pty"
? await supervisor.spawn({
...spawnBase,
mode: "pty",
ptyCommand: spawnSpec.ptyCommand,
})
: await supervisor.spawn({
...spawnBase,
mode: "child",
argv: spawnSpec.argv,
stdinMode: spawnSpec.stdinMode,
});
} catch (err) {
if (spawnSpec.mode === "pty") {
const warning = `Warning: PTY spawn failed (${String(err)}); retrying without PTY for \`${opts.command}\`.`;
logWarn(
`exec: PTY spawn failed (${String(err)}); retrying without PTY for "${opts.command}".`,
);
opts.warnings.push(warning);
usingPty = false;
try {
managedRun = await supervisor.spawn({
runId: sessionId,
sessionId: opts.sessionKey?.trim() || sessionId,
backendId: "exec-host",
scopeKey: opts.scopeKey,
mode: "child",
argv: spawnSpec.childFallbackArgv,
cwd: opts.workdir,
env: spawnSpec.env,
stdinMode: "pipe-open",
timeoutMs,
captureOutput: false,
onStdout: handleStdout,
onStderr: handleStderr,
});
} catch (retryErr) {
markExited(session, null, null, "failed");
maybeNotifyOnExit(session, "failed");
throw retryErr;
} }
} else {
markExited(session, null, null, "failed");
maybeNotifyOnExit(session, "failed");
throw err;
}
}
session.stdin = managedRun.stdin;
session.pid = managedRun.pid;
const promise = managedRun
.wait()
.then((exit): ExecProcessOutcome => {
const durationMs = Date.now() - startedAt; const durationMs = Date.now() - startedAt;
const wasSignal = exitSignal != null; const status: "completed" | "failed" =
const isSuccess = code === 0 && !wasSignal && !timedOut; exit.exitCode === 0 && exit.reason === "exit" ? "completed" : "failed";
const status: "completed" | "failed" = isSuccess ? "completed" : "failed"; markExited(session, exit.exitCode, exit.exitSignal, status);
markExited(session, code, exitSignal, status);
maybeNotifyOnExit(session, status); maybeNotifyOnExit(session, status);
if (!session.child && session.stdin) { if (!session.child && session.stdin) {
session.stdin.destroyed = true; session.stdin.destroyed = true;
} }
if (settled) {
return;
}
const aggregated = session.aggregated.trim(); const aggregated = session.aggregated.trim();
if (!isSuccess) { if (status === "completed") {
const reason = timedOut return {
? `Command timed out after ${opts.timeoutSec} seconds` status: "completed",
: wasSignal && exitSignal exitCode: exit.exitCode ?? 0,
? `Command aborted by signal ${exitSignal}` exitSignal: exit.exitSignal,
: code === null
? "Command aborted before exit code was captured"
: `Command exited with code ${code}`;
const message = aggregated ? `${aggregated}\n\n${reason}` : reason;
settle({
status: "failed",
exitCode: code ?? null,
exitSignal: exitSignal ?? null,
durationMs, durationMs,
aggregated, aggregated,
timedOut, timedOut: false,
reason: message, };
});
return;
} }
settle({ const reason =
status: "completed", exit.reason === "overall-timeout"
exitCode: code ?? 0, ? `Command timed out after ${opts.timeoutSec} seconds`
exitSignal: exitSignal ?? null, : exit.reason === "no-output-timeout"
? "Command timed out waiting for output"
: exit.exitSignal != null
? `Command aborted by signal ${exit.exitSignal}`
: exit.exitCode == null
? "Command aborted before exit code was captured"
: `Command exited with code ${exit.exitCode}`;
return {
status: "failed",
exitCode: exit.exitCode,
exitSignal: exit.exitSignal,
durationMs, durationMs,
aggregated, aggregated,
timedOut: exit.timedOut,
reason: aggregated ? `${aggregated}\n\n${reason}` : reason,
};
})
.catch((err): ExecProcessOutcome => {
markExited(session, null, null, "failed");
maybeNotifyOnExit(session, "failed");
const aggregated = session.aggregated.trim();
const message = aggregated ? `${aggregated}\n\n${String(err)}` : String(err);
return {
status: "failed",
exitCode: null,
exitSignal: null,
durationMs: Date.now() - startedAt,
aggregated,
timedOut: false, timedOut: false,
}); reason: message,
}; };
});
if (pty) {
pty.onExit((event) => {
const rawSignal = event.signal ?? null;
const normalizedSignal = rawSignal === 0 ? null : rawSignal;
handleExit(event.exitCode ?? null, normalizedSignal);
});
} else if (child) {
child.once("close", (code, exitSignal) => {
handleExit(code, exitSignal);
});
child.once("error", (err) => {
if (timeoutTimer) {
clearTimeout(timeoutTimer);
}
if (timeoutFinalizeTimer) {
clearTimeout(timeoutFinalizeTimer);
}
markExited(session, null, null, "failed");
maybeNotifyOnExit(session, "failed");
const aggregated = session.aggregated.trim();
const message = aggregated ? `${aggregated}\n\n${String(err)}` : String(err);
settle({
status: "failed",
exitCode: null,
exitSignal: null,
durationMs: Date.now() - startedAt,
aggregated,
timedOut,
reason: message,
});
});
}
});
return { return {
session, session,
startedAt, startedAt,
pid: session.pid ?? undefined, pid: session.pid ?? undefined,
promise, promise,
kill: () => killSession(session), kill: () => {
managedRun?.cancel("manual-cancel");
},
}; };
} }

View File

@@ -0,0 +1,73 @@
import { afterEach, expect, test, vi } from "vitest";
import { resetProcessRegistryForTests } from "./bash-process-registry";
afterEach(() => {
resetProcessRegistryForTests();
vi.resetModules();
vi.clearAllMocks();
});
test("exec disposes PTY listeners after normal exit", async () => {
const disposeData = vi.fn();
const disposeExit = vi.fn();
vi.doMock("@lydell/node-pty", () => ({
spawn: () => {
return {
pid: 0,
write: vi.fn(),
onData: (listener: (value: string) => void) => {
setTimeout(() => listener("ok"), 0);
return { dispose: disposeData };
},
onExit: (listener: (event: { exitCode: number; signal?: number }) => void) => {
setTimeout(() => listener({ exitCode: 0 }), 0);
return { dispose: disposeExit };
},
kill: vi.fn(),
};
},
}));
const { createExecTool } = await import("./bash-tools.exec");
const tool = createExecTool({ allowBackground: false });
const result = await tool.execute("toolcall", {
command: "echo ok",
pty: true,
});
expect(result.details.status).toBe("completed");
expect(disposeData).toHaveBeenCalledTimes(1);
expect(disposeExit).toHaveBeenCalledTimes(1);
});
test("exec tears down PTY resources on timeout", async () => {
const disposeData = vi.fn();
const disposeExit = vi.fn();
const kill = vi.fn();
vi.doMock("@lydell/node-pty", () => ({
spawn: () => {
return {
pid: 0,
write: vi.fn(),
onData: () => ({ dispose: disposeData }),
onExit: () => ({ dispose: disposeExit }),
kill,
};
},
}));
const { createExecTool } = await import("./bash-tools.exec");
const tool = createExecTool({ allowBackground: false });
await expect(
tool.execute("toolcall", {
command: "sleep 5",
pty: true,
timeout: 0.01,
}),
).rejects.toThrow("Command timed out");
expect(kill).toHaveBeenCalledTimes(1);
expect(disposeData).toHaveBeenCalledTimes(1);
expect(disposeExit).toHaveBeenCalledTimes(1);
});

View File

@@ -0,0 +1,40 @@
import { afterEach, expect, test, vi } from "vitest";
import { listRunningSessions, resetProcessRegistryForTests } from "./bash-process-registry";
const { supervisorSpawnMock } = vi.hoisted(() => ({
supervisorSpawnMock: vi.fn(),
}));
vi.mock("../process/supervisor/index.js", () => ({
getProcessSupervisor: () => ({
spawn: (...args: unknown[]) => supervisorSpawnMock(...args),
cancel: vi.fn(),
cancelScope: vi.fn(),
reconcileOrphans: vi.fn(),
getRecord: vi.fn(),
}),
}));
afterEach(() => {
resetProcessRegistryForTests();
vi.resetModules();
vi.clearAllMocks();
});
test("exec cleans session state when PTY fallback spawn also fails", async () => {
supervisorSpawnMock
.mockRejectedValueOnce(new Error("pty spawn failed"))
.mockRejectedValueOnce(new Error("child fallback failed"));
const { createExecTool } = await import("./bash-tools.exec");
const tool = createExecTool({ allowBackground: false });
await expect(
tool.execute("toolcall", {
command: "echo ok",
pty: true,
}),
).rejects.toThrow("child fallback failed");
expect(listRunningSessions()).toHaveLength(0);
});

View File

@@ -0,0 +1,152 @@
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import type { ProcessSession } from "./bash-process-registry.js";
import {
addSession,
getFinishedSession,
getSession,
resetProcessRegistryForTests,
} from "./bash-process-registry.js";
import { createProcessTool } from "./bash-tools.process.js";
const { supervisorMock } = vi.hoisted(() => ({
supervisorMock: {
spawn: vi.fn(),
cancel: vi.fn(),
cancelScope: vi.fn(),
reconcileOrphans: vi.fn(),
getRecord: vi.fn(),
},
}));
const { killProcessTreeMock } = vi.hoisted(() => ({
killProcessTreeMock: vi.fn(),
}));
vi.mock("../process/supervisor/index.js", () => ({
getProcessSupervisor: () => supervisorMock,
}));
vi.mock("../process/kill-tree.js", () => ({
killProcessTree: (...args: unknown[]) => killProcessTreeMock(...args),
}));
function createBackgroundSession(id: string, pid?: number): ProcessSession {
return {
id,
command: "sleep 999",
startedAt: Date.now(),
cwd: "/tmp",
maxOutputChars: 10_000,
pendingMaxOutputChars: 30_000,
totalOutputChars: 0,
pendingStdout: [],
pendingStderr: [],
pendingStdoutChars: 0,
pendingStderrChars: 0,
aggregated: "",
tail: "",
pid,
exited: false,
exitCode: undefined,
exitSignal: undefined,
truncated: false,
backgrounded: true,
};
}
describe("process tool supervisor cancellation", () => {
beforeEach(() => {
supervisorMock.spawn.mockReset();
supervisorMock.cancel.mockReset();
supervisorMock.cancelScope.mockReset();
supervisorMock.reconcileOrphans.mockReset();
supervisorMock.getRecord.mockReset();
killProcessTreeMock.mockReset();
});
afterEach(() => {
resetProcessRegistryForTests();
});
it("routes kill through supervisor when run is managed", async () => {
supervisorMock.getRecord.mockReturnValue({
runId: "sess",
state: "running",
});
addSession(createBackgroundSession("sess"));
const processTool = createProcessTool();
const result = await processTool.execute("toolcall", {
action: "kill",
sessionId: "sess",
});
expect(supervisorMock.cancel).toHaveBeenCalledWith("sess", "manual-cancel");
expect(getSession("sess")).toBeDefined();
expect(getSession("sess")?.exited).toBe(false);
expect(result.content[0]).toMatchObject({
type: "text",
text: "Termination requested for session sess.",
});
});
it("remove drops running session immediately when cancellation is requested", async () => {
supervisorMock.getRecord.mockReturnValue({
runId: "sess",
state: "running",
});
addSession(createBackgroundSession("sess"));
const processTool = createProcessTool();
const result = await processTool.execute("toolcall", {
action: "remove",
sessionId: "sess",
});
expect(supervisorMock.cancel).toHaveBeenCalledWith("sess", "manual-cancel");
expect(getSession("sess")).toBeUndefined();
expect(getFinishedSession("sess")).toBeUndefined();
expect(result.content[0]).toMatchObject({
type: "text",
text: "Removed session sess (termination requested).",
});
});
it("falls back to process-tree kill when supervisor record is missing", async () => {
supervisorMock.getRecord.mockReturnValue(undefined);
addSession(createBackgroundSession("sess-fallback", 4242));
const processTool = createProcessTool();
const result = await processTool.execute("toolcall", {
action: "kill",
sessionId: "sess-fallback",
});
expect(killProcessTreeMock).toHaveBeenCalledWith(4242);
expect(getSession("sess-fallback")).toBeUndefined();
expect(getFinishedSession("sess-fallback")).toBeDefined();
expect(result.content[0]).toMatchObject({
type: "text",
text: "Killed session sess-fallback.",
});
});
it("fails remove when no supervisor record and no pid is available", async () => {
supervisorMock.getRecord.mockReturnValue(undefined);
addSession(createBackgroundSession("sess-no-pid"));
const processTool = createProcessTool();
const result = await processTool.execute("toolcall", {
action: "remove",
sessionId: "sess-no-pid",
});
expect(killProcessTreeMock).not.toHaveBeenCalled();
expect(getSession("sess-no-pid")).toBeDefined();
expect(result.details).toMatchObject({ status: "failed" });
expect(result.content[0]).toMatchObject({
type: "text",
text: "Unable to remove session sess-no-pid: no active supervisor run or process id.",
});
});
});

View File

@@ -1,7 +1,10 @@
import type { AgentTool, AgentToolResult } from "@mariozechner/pi-agent-core"; import type { AgentTool, AgentToolResult } from "@mariozechner/pi-agent-core";
import { Type } from "@sinclair/typebox"; import { Type } from "@sinclair/typebox";
import { formatDurationCompact } from "../infra/format-time/format-duration.ts"; import { formatDurationCompact } from "../infra/format-time/format-duration.ts";
import { killProcessTree } from "../process/kill-tree.js";
import { getProcessSupervisor } from "../process/supervisor/index.js";
import { import {
type ProcessSession,
deleteSession, deleteSession,
drainSession, drainSession,
getFinishedSession, getFinishedSession,
@@ -11,13 +14,7 @@ import {
markExited, markExited,
setJobTtlMs, setJobTtlMs,
} from "./bash-process-registry.js"; } from "./bash-process-registry.js";
import { import { deriveSessionName, pad, sliceLogLines, truncateMiddle } from "./bash-tools.shared.js";
deriveSessionName,
killSession,
pad,
sliceLogLines,
truncateMiddle,
} from "./bash-tools.shared.js";
import { encodeKeySequence, encodePaste } from "./pty-keys.js"; import { encodeKeySequence, encodePaste } from "./pty-keys.js";
export type ProcessToolDefaults = { export type ProcessToolDefaults = {
@@ -107,9 +104,28 @@ export function createProcessTool(
setJobTtlMs(defaults.cleanupMs); setJobTtlMs(defaults.cleanupMs);
} }
const scopeKey = defaults?.scopeKey; const scopeKey = defaults?.scopeKey;
const supervisor = getProcessSupervisor();
const isInScope = (session?: { scopeKey?: string } | null) => const isInScope = (session?: { scopeKey?: string } | null) =>
!scopeKey || session?.scopeKey === scopeKey; !scopeKey || session?.scopeKey === scopeKey;
const cancelManagedSession = (sessionId: string) => {
const record = supervisor.getRecord(sessionId);
if (!record || record.state === "exited") {
return false;
}
supervisor.cancel(sessionId, "manual-cancel");
return true;
};
const terminateSessionFallback = (session: ProcessSession) => {
const pid = session.pid ?? session.child?.pid;
if (typeof pid !== "number" || !Number.isFinite(pid) || pid <= 0) {
return false;
}
killProcessTree(pid);
return true;
};
return { return {
name: "process", name: "process",
label: "process", label: "process",
@@ -523,10 +539,25 @@ export function createProcessTool(
if (!scopedSession.backgrounded) { if (!scopedSession.backgrounded) {
return failText(`Session ${params.sessionId} is not backgrounded.`); return failText(`Session ${params.sessionId} is not backgrounded.`);
} }
killSession(scopedSession); const canceled = cancelManagedSession(scopedSession.id);
markExited(scopedSession, null, "SIGKILL", "failed"); if (!canceled) {
const terminated = terminateSessionFallback(scopedSession);
if (!terminated) {
return failText(
`Unable to terminate session ${params.sessionId}: no active supervisor run or process id.`,
);
}
markExited(scopedSession, null, "SIGKILL", "failed");
}
return { return {
content: [{ type: "text", text: `Killed session ${params.sessionId}.` }], content: [
{
type: "text",
text: canceled
? `Termination requested for session ${params.sessionId}.`
: `Killed session ${params.sessionId}.`,
},
],
details: { details: {
status: "failed", status: "failed",
name: scopedSession ? deriveSessionName(scopedSession.command) : undefined, name: scopedSession ? deriveSessionName(scopedSession.command) : undefined,
@@ -555,10 +586,30 @@ export function createProcessTool(
case "remove": { case "remove": {
if (scopedSession) { if (scopedSession) {
killSession(scopedSession); const canceled = cancelManagedSession(scopedSession.id);
markExited(scopedSession, null, "SIGKILL", "failed"); if (canceled) {
// Keep remove semantics deterministic: drop from process registry now.
scopedSession.backgrounded = false;
deleteSession(params.sessionId);
} else {
const terminated = terminateSessionFallback(scopedSession);
if (!terminated) {
return failText(
`Unable to remove session ${params.sessionId}: no active supervisor run or process id.`,
);
}
markExited(scopedSession, null, "SIGKILL", "failed");
deleteSession(params.sessionId);
}
return { return {
content: [{ type: "text", text: `Removed session ${params.sessionId}.` }], content: [
{
type: "text",
text: canceled
? `Removed session ${params.sessionId} (termination requested).`
: `Removed session ${params.sessionId}.`,
},
],
details: { details: {
status: "failed", status: "failed",
name: scopedSession ? deriveSessionName(scopedSession.command) : undefined, name: scopedSession ? deriveSessionName(scopedSession.command) : undefined,

View File

@@ -1,11 +1,9 @@
import type { ChildProcessWithoutNullStreams } from "node:child_process";
import { existsSync, statSync } from "node:fs"; import { existsSync, statSync } from "node:fs";
import fs from "node:fs/promises"; import fs from "node:fs/promises";
import { homedir } from "node:os"; import { homedir } from "node:os";
import path from "node:path"; import path from "node:path";
import { sliceUtf16Safe } from "../utils.js"; import { sliceUtf16Safe } from "../utils.js";
import { assertSandboxPath } from "./sandbox-paths.js"; import { assertSandboxPath } from "./sandbox-paths.js";
import { killProcessTree } from "./shell-utils.js";
const CHUNK_LIMIT = 8 * 1024; const CHUNK_LIMIT = 8 * 1024;
@@ -115,13 +113,6 @@ export async function resolveSandboxWorkdir(params: {
} }
} }
export function killSession(session: { pid?: number; child?: ChildProcessWithoutNullStreams }) {
const pid = session.pid ?? session.child?.pid;
if (pid) {
killProcessTree(pid);
}
}
export function resolveWorkdir(workdir: string, warnings: string[]) { export function resolveWorkdir(workdir: string, warnings: string[]) {
const current = safeCwd(); const current = safeCwd();
const fallback = current ?? homedir(); const fallback = current ?? homedir();

View File

@@ -0,0 +1,36 @@
import { describe, expect, it } from "vitest";
import type { OpenClawConfig } from "../config/config.js";
import { resolveCliBackendConfig } from "./cli-backends.js";
describe("resolveCliBackendConfig reliability merge", () => {
it("deep-merges reliability watchdog overrides for codex", () => {
const cfg = {
agents: {
defaults: {
cliBackends: {
"codex-cli": {
command: "codex",
reliability: {
watchdog: {
resume: {
noOutputTimeoutMs: 42_000,
},
},
},
},
},
},
},
} satisfies OpenClawConfig;
const resolved = resolveCliBackendConfig("codex-cli", cfg);
expect(resolved).not.toBeNull();
expect(resolved?.config.reliability?.watchdog?.resume?.noOutputTimeoutMs).toBe(42_000);
// Ensure defaults are retained when only one field is overridden.
expect(resolved?.config.reliability?.watchdog?.resume?.noOutputTimeoutRatio).toBe(0.3);
expect(resolved?.config.reliability?.watchdog?.resume?.minMs).toBe(60_000);
expect(resolved?.config.reliability?.watchdog?.resume?.maxMs).toBe(180_000);
expect(resolved?.config.reliability?.watchdog?.fresh?.noOutputTimeoutRatio).toBe(0.8);
});
});

View File

@@ -1,5 +1,9 @@
import type { OpenClawConfig } from "../config/config.js"; import type { OpenClawConfig } from "../config/config.js";
import type { CliBackendConfig } from "../config/types.js"; import type { CliBackendConfig } from "../config/types.js";
import {
CLI_FRESH_WATCHDOG_DEFAULTS,
CLI_RESUME_WATCHDOG_DEFAULTS,
} from "./cli-watchdog-defaults.js";
import { normalizeProviderId } from "./model-selection.js"; import { normalizeProviderId } from "./model-selection.js";
export type ResolvedCliBackend = { export type ResolvedCliBackend = {
@@ -49,6 +53,12 @@ const DEFAULT_CLAUDE_BACKEND: CliBackendConfig = {
systemPromptMode: "append", systemPromptMode: "append",
systemPromptWhen: "first", systemPromptWhen: "first",
clearEnv: ["ANTHROPIC_API_KEY", "ANTHROPIC_API_KEY_OLD"], clearEnv: ["ANTHROPIC_API_KEY", "ANTHROPIC_API_KEY_OLD"],
reliability: {
watchdog: {
fresh: { ...CLI_FRESH_WATCHDOG_DEFAULTS },
resume: { ...CLI_RESUME_WATCHDOG_DEFAULTS },
},
},
serialize: true, serialize: true,
}; };
@@ -73,6 +83,12 @@ const DEFAULT_CODEX_BACKEND: CliBackendConfig = {
sessionMode: "existing", sessionMode: "existing",
imageArg: "--image", imageArg: "--image",
imageMode: "repeat", imageMode: "repeat",
reliability: {
watchdog: {
fresh: { ...CLI_FRESH_WATCHDOG_DEFAULTS },
resume: { ...CLI_RESUME_WATCHDOG_DEFAULTS },
},
},
serialize: true, serialize: true,
}; };
@@ -96,6 +112,10 @@ function mergeBackendConfig(base: CliBackendConfig, override?: CliBackendConfig)
if (!override) { if (!override) {
return { ...base }; return { ...base };
} }
const baseFresh = base.reliability?.watchdog?.fresh ?? {};
const baseResume = base.reliability?.watchdog?.resume ?? {};
const overrideFresh = override.reliability?.watchdog?.fresh ?? {};
const overrideResume = override.reliability?.watchdog?.resume ?? {};
return { return {
...base, ...base,
...override, ...override,
@@ -106,6 +126,22 @@ function mergeBackendConfig(base: CliBackendConfig, override?: CliBackendConfig)
sessionIdFields: override.sessionIdFields ?? base.sessionIdFields, sessionIdFields: override.sessionIdFields ?? base.sessionIdFields,
sessionArgs: override.sessionArgs ?? base.sessionArgs, sessionArgs: override.sessionArgs ?? base.sessionArgs,
resumeArgs: override.resumeArgs ?? base.resumeArgs, resumeArgs: override.resumeArgs ?? base.resumeArgs,
reliability: {
...base.reliability,
...override.reliability,
watchdog: {
...base.reliability?.watchdog,
...override.reliability?.watchdog,
fresh: {
...baseFresh,
...overrideFresh,
},
resume: {
...baseResume,
...overrideResume,
},
},
},
}; };
} }

View File

@@ -3,50 +3,69 @@ import os from "node:os";
import path from "node:path"; import path from "node:path";
import { beforeEach, describe, expect, it, vi } from "vitest"; import { beforeEach, describe, expect, it, vi } from "vitest";
import type { OpenClawConfig } from "../config/config.js"; import type { OpenClawConfig } from "../config/config.js";
import type { CliBackendConfig } from "../config/types.js";
import { runCliAgent } from "./cli-runner.js"; import { runCliAgent } from "./cli-runner.js";
import { cleanupResumeProcesses, cleanupSuspendedCliProcesses } from "./cli-runner/helpers.js"; import { resolveCliNoOutputTimeoutMs } from "./cli-runner/helpers.js";
const runCommandWithTimeoutMock = vi.fn(); const supervisorSpawnMock = vi.fn();
const runExecMock = vi.fn();
vi.mock("../process/exec.js", () => ({ vi.mock("../process/supervisor/index.js", () => ({
runCommandWithTimeout: (...args: unknown[]) => runCommandWithTimeoutMock(...args), getProcessSupervisor: () => ({
runExec: (...args: unknown[]) => runExecMock(...args), spawn: (...args: unknown[]) => supervisorSpawnMock(...args),
cancel: vi.fn(),
cancelScope: vi.fn(),
reconcileOrphans: vi.fn(),
getRecord: vi.fn(),
}),
})); }));
describe("runCliAgent resume cleanup", () => { type MockRunExit = {
reason:
| "manual-cancel"
| "overall-timeout"
| "no-output-timeout"
| "spawn-error"
| "signal"
| "exit";
exitCode: number | null;
exitSignal: NodeJS.Signals | number | null;
durationMs: number;
stdout: string;
stderr: string;
timedOut: boolean;
noOutputTimedOut: boolean;
};
function createManagedRun(exit: MockRunExit, pid = 1234) {
return {
runId: "run-supervisor",
pid,
startedAtMs: Date.now(),
stdin: undefined,
wait: vi.fn().mockResolvedValue(exit),
cancel: vi.fn(),
};
}
describe("runCliAgent with process supervisor", () => {
beforeEach(() => { beforeEach(() => {
runCommandWithTimeoutMock.mockReset(); supervisorSpawnMock.mockReset();
runExecMock.mockReset();
}); });
it("kills stale resume processes for codex sessions", async () => { it("runs CLI through supervisor and returns payload", async () => {
const selfPid = process.pid; supervisorSpawnMock.mockResolvedValueOnce(
createManagedRun({
runExecMock reason: "exit",
.mockResolvedValueOnce({ exitCode: 0,
stdout: " 1 999 S /bin/launchd\n", exitSignal: null,
durationMs: 50,
stdout: "ok",
stderr: "", stderr: "",
}) // cleanupSuspendedCliProcesses (ps) — ppid 999 != selfPid, no match timedOut: false,
.mockResolvedValueOnce({ noOutputTimedOut: false,
stdout: [ }),
` ${selfPid + 1} ${selfPid} codex exec resume thread-123 --color never --sandbox read-only --skip-git-repo-check`, );
` ${selfPid + 2} 999 codex exec resume thread-123 --color never --sandbox read-only --skip-git-repo-check`,
].join("\n"),
stderr: "",
}) // cleanupResumeProcesses (ps)
.mockResolvedValueOnce({ stdout: "", stderr: "" }) // cleanupResumeProcesses (kill -TERM)
.mockResolvedValueOnce({ stdout: "", stderr: "" }); // cleanupResumeProcesses (kill -9)
runCommandWithTimeoutMock.mockResolvedValueOnce({
stdout: "ok",
stderr: "",
code: 0,
signal: null,
killed: false,
});
await runCliAgent({ const result = await runCliAgent({
sessionId: "s1", sessionId: "s1",
sessionFile: "/tmp/session.jsonl", sessionFile: "/tmp/session.jsonl",
workspaceDir: "/tmp", workspaceDir: "/tmp",
@@ -58,28 +77,80 @@ describe("runCliAgent resume cleanup", () => {
cliSessionId: "thread-123", cliSessionId: "thread-123",
}); });
if (process.platform === "win32") { expect(result.payloads?.[0]?.text).toBe("ok");
expect(runExecMock).not.toHaveBeenCalled(); expect(supervisorSpawnMock).toHaveBeenCalledTimes(1);
return; const input = supervisorSpawnMock.mock.calls[0]?.[0] as {
} argv?: string[];
mode?: string;
timeoutMs?: number;
noOutputTimeoutMs?: number;
replaceExistingScope?: boolean;
scopeKey?: string;
};
expect(input.mode).toBe("child");
expect(input.argv?.[0]).toBe("codex");
expect(input.timeoutMs).toBe(1_000);
expect(input.noOutputTimeoutMs).toBeGreaterThanOrEqual(1_000);
expect(input.replaceExistingScope).toBe(true);
expect(input.scopeKey).toContain("thread-123");
});
expect(runExecMock).toHaveBeenCalledTimes(4); it("fails with timeout when no-output watchdog trips", async () => {
supervisorSpawnMock.mockResolvedValueOnce(
createManagedRun({
reason: "no-output-timeout",
exitCode: null,
exitSignal: "SIGKILL",
durationMs: 200,
stdout: "",
stderr: "",
timedOut: true,
noOutputTimedOut: true,
}),
);
// Second call: cleanupResumeProcesses ps await expect(
const psCall = runExecMock.mock.calls[1] ?? []; runCliAgent({
expect(psCall[0]).toBe("ps"); sessionId: "s1",
sessionFile: "/tmp/session.jsonl",
workspaceDir: "/tmp",
prompt: "hi",
provider: "codex-cli",
model: "gpt-5.2-codex",
timeoutMs: 1_000,
runId: "run-2",
cliSessionId: "thread-123",
}),
).rejects.toThrow("produced no output");
});
// Third call: TERM, only the child PID it("fails with timeout when overall timeout trips", async () => {
const termCall = runExecMock.mock.calls[2] ?? []; supervisorSpawnMock.mockResolvedValueOnce(
expect(termCall[0]).toBe("kill"); createManagedRun({
const termArgs = termCall[1] as string[]; reason: "overall-timeout",
expect(termArgs).toEqual(["-TERM", String(selfPid + 1)]); exitCode: null,
exitSignal: "SIGKILL",
durationMs: 200,
stdout: "",
stderr: "",
timedOut: true,
noOutputTimedOut: false,
}),
);
// Fourth call: KILL, only the child PID await expect(
const killCall = runExecMock.mock.calls[3] ?? []; runCliAgent({
expect(killCall[0]).toBe("kill"); sessionId: "s1",
const killArgs = killCall[1] as string[]; sessionFile: "/tmp/session.jsonl",
expect(killArgs).toEqual(["-9", String(selfPid + 1)]); workspaceDir: "/tmp",
prompt: "hi",
provider: "codex-cli",
model: "gpt-5.2-codex",
timeoutMs: 1_000,
runId: "run-3",
cliSessionId: "thread-123",
}),
).rejects.toThrow("exceeded timeout");
}); });
it("falls back to per-agent workspace when workspaceDir is missing", async () => { it("falls back to per-agent workspace when workspaceDir is missing", async () => {
@@ -94,14 +165,18 @@ describe("runCliAgent resume cleanup", () => {
}, },
} satisfies OpenClawConfig; } satisfies OpenClawConfig;
runExecMock.mockResolvedValue({ stdout: "", stderr: "" }); supervisorSpawnMock.mockResolvedValueOnce(
runCommandWithTimeoutMock.mockResolvedValueOnce({ createManagedRun({
stdout: "ok", reason: "exit",
stderr: "", exitCode: 0,
code: 0, exitSignal: null,
signal: null, durationMs: 25,
killed: false, stdout: "ok",
}); stderr: "",
timedOut: false,
noOutputTimedOut: false,
}),
);
try { try {
await runCliAgent({ await runCliAgent({
@@ -114,264 +189,33 @@ describe("runCliAgent resume cleanup", () => {
provider: "codex-cli", provider: "codex-cli",
model: "gpt-5.2-codex", model: "gpt-5.2-codex",
timeoutMs: 1_000, timeoutMs: 1_000,
runId: "run-1", runId: "run-4",
}); });
} finally { } finally {
await fs.rm(tempDir, { recursive: true, force: true }); await fs.rm(tempDir, { recursive: true, force: true });
} }
const options = runCommandWithTimeoutMock.mock.calls[0]?.[1] as { cwd?: string }; const input = supervisorSpawnMock.mock.calls[0]?.[0] as { cwd?: string };
expect(options.cwd).toBe(path.resolve(fallbackWorkspace)); expect(input.cwd).toBe(path.resolve(fallbackWorkspace));
}); });
});
it("throws when sessionKey is malformed", async () => { describe("resolveCliNoOutputTimeoutMs", () => {
const tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-cli-runner-")); it("uses backend-configured resume watchdog override", () => {
const mainWorkspace = path.join(tempDir, "workspace-main"); const timeoutMs = resolveCliNoOutputTimeoutMs({
const researchWorkspace = path.join(tempDir, "workspace-research"); backend: {
await fs.mkdir(mainWorkspace, { recursive: true }); command: "codex",
await fs.mkdir(researchWorkspace, { recursive: true }); reliability: {
const cfg = { watchdog: {
agents: { resume: {
defaults: { noOutputTimeoutMs: 42_000,
workspace: mainWorkspace, },
},
}, },
list: [{ id: "research", workspace: researchWorkspace }],
}, },
} satisfies OpenClawConfig; timeoutMs: 120_000,
useResume: true,
try {
await expect(
runCliAgent({
sessionId: "s1",
sessionKey: "agent::broken",
agentId: "research",
sessionFile: "/tmp/session.jsonl",
workspaceDir: undefined as unknown as string,
config: cfg,
prompt: "hi",
provider: "codex-cli",
model: "gpt-5.2-codex",
timeoutMs: 1_000,
runId: "run-2",
}),
).rejects.toThrow("Malformed agent session key");
} finally {
await fs.rm(tempDir, { recursive: true, force: true });
}
expect(runCommandWithTimeoutMock).not.toHaveBeenCalled();
});
});
describe("cleanupSuspendedCliProcesses", () => {
beforeEach(() => {
runExecMock.mockReset();
});
it("skips when no session tokens are configured", async () => {
await cleanupSuspendedCliProcesses(
{
command: "tool",
} as CliBackendConfig,
0,
);
if (process.platform === "win32") {
expect(runExecMock).not.toHaveBeenCalled();
return;
}
expect(runExecMock).not.toHaveBeenCalled();
});
it("matches sessionArg-based commands", async () => {
const selfPid = process.pid;
runExecMock
.mockResolvedValueOnce({
stdout: [
` 40 ${selfPid} T+ claude --session-id thread-1 -p`,
` 41 ${selfPid} S claude --session-id thread-2 -p`,
].join("\n"),
stderr: "",
})
.mockResolvedValueOnce({ stdout: "", stderr: "" });
await cleanupSuspendedCliProcesses(
{
command: "claude",
sessionArg: "--session-id",
} as CliBackendConfig,
0,
);
if (process.platform === "win32") {
expect(runExecMock).not.toHaveBeenCalled();
return;
}
expect(runExecMock).toHaveBeenCalledTimes(2);
const killCall = runExecMock.mock.calls[1] ?? [];
expect(killCall[0]).toBe("kill");
expect(killCall[1]).toEqual(["-9", "40"]);
});
it("matches resumeArgs with positional session id", async () => {
const selfPid = process.pid;
runExecMock
.mockResolvedValueOnce({
stdout: [
` 50 ${selfPid} T codex exec resume thread-99 --color never --sandbox read-only`,
` 51 ${selfPid} T codex exec resume other --color never --sandbox read-only`,
].join("\n"),
stderr: "",
})
.mockResolvedValueOnce({ stdout: "", stderr: "" });
await cleanupSuspendedCliProcesses(
{
command: "codex",
resumeArgs: ["exec", "resume", "{sessionId}", "--color", "never", "--sandbox", "read-only"],
} as CliBackendConfig,
1,
);
if (process.platform === "win32") {
expect(runExecMock).not.toHaveBeenCalled();
return;
}
expect(runExecMock).toHaveBeenCalledTimes(2);
const killCall = runExecMock.mock.calls[1] ?? [];
expect(killCall[0]).toBe("kill");
expect(killCall[1]).toEqual(["-9", "50", "51"]);
});
it("only kills child processes of current process (ppid validation)", async () => {
const selfPid = process.pid;
const childPid = selfPid + 1;
const unrelatedPid = 9999;
runExecMock
.mockResolvedValueOnce({
stdout: [
` ${childPid} ${selfPid} T claude --session-id thread-1 -p`,
` ${unrelatedPid} 100 T claude --session-id thread-2 -p`,
].join("\n"),
stderr: "",
})
.mockResolvedValueOnce({ stdout: "", stderr: "" });
await cleanupSuspendedCliProcesses(
{
command: "claude",
sessionArg: "--session-id",
} as CliBackendConfig,
0,
);
if (process.platform === "win32") {
expect(runExecMock).not.toHaveBeenCalled();
return;
}
expect(runExecMock).toHaveBeenCalledTimes(2);
const killCall = runExecMock.mock.calls[1] ?? [];
expect(killCall[0]).toBe("kill");
// Only childPid killed; unrelatedPid (ppid=100) excluded
expect(killCall[1]).toEqual(["-9", String(childPid)]);
});
it("skips all processes when none are children of current process", async () => {
runExecMock.mockResolvedValueOnce({
stdout: [
" 200 100 T claude --session-id thread-1 -p",
" 201 100 T claude --session-id thread-2 -p",
].join("\n"),
stderr: "",
}); });
expect(timeoutMs).toBe(42_000);
await cleanupSuspendedCliProcesses(
{
command: "claude",
sessionArg: "--session-id",
} as CliBackendConfig,
0,
);
if (process.platform === "win32") {
expect(runExecMock).not.toHaveBeenCalled();
return;
}
// Only ps called — no kill because no matching ppid
expect(runExecMock).toHaveBeenCalledTimes(1);
});
});
describe("cleanupResumeProcesses", () => {
beforeEach(() => {
runExecMock.mockReset();
});
it("only kills resume processes owned by current process", async () => {
const selfPid = process.pid;
runExecMock
.mockResolvedValueOnce({
stdout: [
` ${selfPid + 1} ${selfPid} codex exec resume abc-123`,
` ${selfPid + 2} 999 codex exec resume abc-123`,
].join("\n"),
stderr: "",
})
.mockResolvedValueOnce({ stdout: "", stderr: "" })
.mockResolvedValueOnce({ stdout: "", stderr: "" });
await cleanupResumeProcesses(
{
command: "codex",
resumeArgs: ["exec", "resume", "{sessionId}"],
} as CliBackendConfig,
"abc-123",
);
if (process.platform === "win32") {
expect(runExecMock).not.toHaveBeenCalled();
return;
}
expect(runExecMock).toHaveBeenCalledTimes(3);
const termCall = runExecMock.mock.calls[1] ?? [];
expect(termCall[0]).toBe("kill");
expect(termCall[1]).toEqual(["-TERM", String(selfPid + 1)]);
const killCall = runExecMock.mock.calls[2] ?? [];
expect(killCall[0]).toBe("kill");
expect(killCall[1]).toEqual(["-9", String(selfPid + 1)]);
});
it("skips kill when no resume processes match ppid", async () => {
runExecMock.mockResolvedValueOnce({
stdout: [" 300 100 codex exec resume abc-123", " 301 200 codex exec resume abc-123"].join(
"\n",
),
stderr: "",
});
await cleanupResumeProcesses(
{
command: "codex",
resumeArgs: ["exec", "resume", "{sessionId}"],
} as CliBackendConfig,
"abc-123",
);
if (process.platform === "win32") {
expect(runExecMock).not.toHaveBeenCalled();
return;
}
// Only ps called — no kill because no matching ppid
expect(runExecMock).toHaveBeenCalledTimes(1);
}); });
}); });

View File

@@ -6,20 +6,20 @@ import { resolveHeartbeatPrompt } from "../auto-reply/heartbeat.js";
import { shouldLogVerbose } from "../globals.js"; import { shouldLogVerbose } from "../globals.js";
import { isTruthyEnvValue } from "../infra/env.js"; import { isTruthyEnvValue } from "../infra/env.js";
import { createSubsystemLogger } from "../logging/subsystem.js"; import { createSubsystemLogger } from "../logging/subsystem.js";
import { runCommandWithTimeout } from "../process/exec.js"; import { getProcessSupervisor } from "../process/supervisor/index.js";
import { resolveSessionAgentIds } from "./agent-scope.js"; import { resolveSessionAgentIds } from "./agent-scope.js";
import { makeBootstrapWarn, resolveBootstrapContextForRun } from "./bootstrap-files.js"; import { makeBootstrapWarn, resolveBootstrapContextForRun } from "./bootstrap-files.js";
import { resolveCliBackendConfig } from "./cli-backends.js"; import { resolveCliBackendConfig } from "./cli-backends.js";
import { import {
appendImagePathsToPrompt, appendImagePathsToPrompt,
buildCliSupervisorScopeKey,
buildCliArgs, buildCliArgs,
buildSystemPrompt, buildSystemPrompt,
cleanupResumeProcesses,
cleanupSuspendedCliProcesses,
enqueueCliRun, enqueueCliRun,
normalizeCliModel, normalizeCliModel,
parseCliJson, parseCliJson,
parseCliJsonl, parseCliJsonl,
resolveCliNoOutputTimeoutMs,
resolvePromptInput, resolvePromptInput,
resolveSessionIdToSend, resolveSessionIdToSend,
resolveSystemPromptUsage, resolveSystemPromptUsage,
@@ -226,19 +226,32 @@ export async function runCliAgent(params: {
} }
return next; return next;
})(); })();
const noOutputTimeoutMs = resolveCliNoOutputTimeoutMs({
// Cleanup suspended processes that have accumulated (regardless of sessionId) backend,
await cleanupSuspendedCliProcesses(backend);
if (useResume && cliSessionIdToSend) {
await cleanupResumeProcesses(backend, cliSessionIdToSend);
}
const result = await runCommandWithTimeout([backend.command, ...args], {
timeoutMs: params.timeoutMs, timeoutMs: params.timeoutMs,
useResume,
});
const supervisor = getProcessSupervisor();
const scopeKey = buildCliSupervisorScopeKey({
backend,
backendId: backendResolved.id,
cliSessionId: useResume ? cliSessionIdToSend : undefined,
});
const managedRun = await supervisor.spawn({
sessionId: params.sessionId,
backendId: backendResolved.id,
scopeKey,
replaceExistingScope: Boolean(useResume && scopeKey),
mode: "child",
argv: [backend.command, ...args],
timeoutMs: params.timeoutMs,
noOutputTimeoutMs,
cwd: workspaceDir, cwd: workspaceDir,
env, env,
input: stdinPayload, input: stdinPayload,
}); });
const result = await managedRun.wait();
const stdout = result.stdout.trim(); const stdout = result.stdout.trim();
const stderr = result.stderr.trim(); const stderr = result.stderr.trim();
@@ -259,7 +272,28 @@ export async function runCliAgent(params: {
} }
} }
if (result.code !== 0) { if (result.exitCode !== 0 || result.reason !== "exit") {
if (result.reason === "no-output-timeout" || result.noOutputTimedOut) {
const timeoutReason = `CLI produced no output for ${Math.round(noOutputTimeoutMs / 1000)}s and was terminated.`;
log.warn(
`cli watchdog timeout: provider=${params.provider} model=${modelId} session=${cliSessionIdToSend ?? params.sessionId} noOutputTimeoutMs=${noOutputTimeoutMs} pid=${managedRun.pid ?? "unknown"}`,
);
throw new FailoverError(timeoutReason, {
reason: "timeout",
provider: params.provider,
model: modelId,
status: resolveFailoverStatus("timeout"),
});
}
if (result.reason === "overall-timeout") {
const timeoutReason = `CLI exceeded timeout (${Math.round(params.timeoutMs / 1000)}s) and was terminated.`;
throw new FailoverError(timeoutReason, {
reason: "timeout",
provider: params.provider,
model: modelId,
status: resolveFailoverStatus("timeout"),
});
}
const err = stderr || stdout || "CLI failed."; const err = stderr || stdout || "CLI failed.";
const reason = classifyFailoverReason(err) ?? "unknown"; const reason = classifyFailoverReason(err) ?? "unknown";
const status = resolveFailoverStatus(reason); const status = resolveFailoverStatus(reason);

View File

@@ -8,232 +8,27 @@ import type { ThinkLevel } from "../../auto-reply/thinking.js";
import type { OpenClawConfig } from "../../config/config.js"; import type { OpenClawConfig } from "../../config/config.js";
import type { CliBackendConfig } from "../../config/types.js"; import type { CliBackendConfig } from "../../config/types.js";
import type { EmbeddedContextFile } from "../pi-embedded-helpers.js"; import type { EmbeddedContextFile } from "../pi-embedded-helpers.js";
import { runExec } from "../../process/exec.js";
import { buildTtsSystemPromptHint } from "../../tts/tts.js"; import { buildTtsSystemPromptHint } from "../../tts/tts.js";
import { escapeRegExp, isRecord } from "../../utils.js"; import { isRecord } from "../../utils.js";
import { buildModelAliasLines } from "../model-alias-lines.js"; import { buildModelAliasLines } from "../model-alias-lines.js";
import { resolveDefaultModelForAgent } from "../model-selection.js"; import { resolveDefaultModelForAgent } from "../model-selection.js";
import { detectRuntimeShell } from "../shell-utils.js"; import { detectRuntimeShell } from "../shell-utils.js";
import { buildSystemPromptParams } from "../system-prompt-params.js"; import { buildSystemPromptParams } from "../system-prompt-params.js";
import { buildAgentSystemPrompt } from "../system-prompt.js"; import { buildAgentSystemPrompt } from "../system-prompt.js";
export { buildCliSupervisorScopeKey, resolveCliNoOutputTimeoutMs } from "./reliability.js";
const CLI_RUN_QUEUE = new Map<string, Promise<unknown>>(); const CLI_RUN_QUEUE = new Map<string, Promise<unknown>>();
function buildLooseArgOrderRegex(tokens: string[]): RegExp {
// Scan `ps` output lines. Keep matching flexible, but require whitespace arg boundaries
// to avoid substring matches like `codexx` or `/path/to/codexx`.
const [head, ...rest] = tokens.map((t) => String(t ?? "").trim()).filter(Boolean);
if (!head) {
return /$^/;
}
const headEscaped = escapeRegExp(head);
const headFragment = `(?:^|\\s)(?:${headEscaped}|\\S+\\/${headEscaped})(?=\\s|$)`;
const restFragments = rest.map((t) => `(?:^|\\s)${escapeRegExp(t)}(?=\\s|$)`);
return new RegExp([headFragment, ...restFragments].join(".*"));
}
async function psWithFallback(argsA: string[], argsB: string[]): Promise<string> {
try {
const { stdout } = await runExec("ps", argsA);
return stdout;
} catch {
// fallthrough
}
const { stdout } = await runExec("ps", argsB);
return stdout;
}
export async function cleanupResumeProcesses(
backend: CliBackendConfig,
sessionId: string,
): Promise<void> {
if (process.platform === "win32") {
return;
}
const resumeArgs = backend.resumeArgs ?? [];
if (resumeArgs.length === 0) {
return;
}
if (!resumeArgs.some((arg) => arg.includes("{sessionId}"))) {
return;
}
const commandToken = path.basename(backend.command ?? "").trim();
if (!commandToken) {
return;
}
const resumeTokens = resumeArgs.map((arg) => arg.replaceAll("{sessionId}", sessionId));
const pattern = [commandToken, ...resumeTokens]
.filter(Boolean)
.map((token) => escapeRegExp(token))
.join(".*");
if (!pattern) {
return;
}
try {
const stdout = await psWithFallback(
["-axww", "-o", "pid=,ppid=,command="],
["-ax", "-o", "pid=,ppid=,command="],
);
const patternRegex = buildLooseArgOrderRegex([commandToken, ...resumeTokens]);
const toKill: number[] = [];
for (const line of stdout.split("\n")) {
const trimmed = line.trim();
if (!trimmed) {
continue;
}
const match = /^(\d+)\s+(\d+)\s+(.*)$/.exec(trimmed);
if (!match) {
continue;
}
const pid = Number(match[1]);
const ppid = Number(match[2]);
const cmd = match[3] ?? "";
if (!Number.isFinite(pid)) {
continue;
}
if (ppid !== process.pid) {
continue;
}
if (!patternRegex.test(cmd)) {
continue;
}
toKill.push(pid);
}
if (toKill.length > 0) {
const pidArgs = toKill.map((pid) => String(pid));
try {
await runExec("kill", ["-TERM", ...pidArgs]);
} catch {
// ignore
}
await new Promise((resolve) => setTimeout(resolve, 250));
try {
await runExec("kill", ["-9", ...pidArgs]);
} catch {
// ignore
}
}
} catch {
// ignore errors - best effort cleanup
}
}
function buildSessionMatchers(backend: CliBackendConfig): RegExp[] {
const commandToken = path.basename(backend.command ?? "").trim();
if (!commandToken) {
return [];
}
const matchers: RegExp[] = [];
const sessionArg = backend.sessionArg?.trim();
const sessionArgs = backend.sessionArgs ?? [];
const resumeArgs = backend.resumeArgs ?? [];
const addMatcher = (args: string[]) => {
if (args.length === 0) {
return;
}
const tokens = [commandToken, ...args];
const pattern = tokens
.map((token, index) => {
const tokenPattern = tokenToRegex(token);
return index === 0 ? `(?:^|\\s)${tokenPattern}` : `\\s+${tokenPattern}`;
})
.join("");
matchers.push(new RegExp(pattern));
};
if (sessionArgs.some((arg) => arg.includes("{sessionId}"))) {
addMatcher(sessionArgs);
} else if (sessionArg) {
addMatcher([sessionArg, "{sessionId}"]);
}
if (resumeArgs.some((arg) => arg.includes("{sessionId}"))) {
addMatcher(resumeArgs);
}
return matchers;
}
function tokenToRegex(token: string): string {
if (!token.includes("{sessionId}")) {
return escapeRegExp(token);
}
const parts = token.split("{sessionId}").map((part) => escapeRegExp(part));
return parts.join("\\S+");
}
/**
* Cleanup suspended OpenClaw CLI processes that have accumulated.
* Only cleans up if there are more than the threshold (default: 10).
*/
export async function cleanupSuspendedCliProcesses(
backend: CliBackendConfig,
threshold = 10,
): Promise<void> {
if (process.platform === "win32") {
return;
}
const matchers = buildSessionMatchers(backend);
if (matchers.length === 0) {
return;
}
try {
const stdout = await psWithFallback(
["-axww", "-o", "pid=,ppid=,stat=,command="],
["-ax", "-o", "pid=,ppid=,stat=,command="],
);
const suspended: number[] = [];
for (const line of stdout.split("\n")) {
const trimmed = line.trim();
if (!trimmed) {
continue;
}
const match = /^(\d+)\s+(\d+)\s+(\S+)\s+(.*)$/.exec(trimmed);
if (!match) {
continue;
}
const pid = Number(match[1]);
const ppid = Number(match[2]);
const stat = match[3] ?? "";
const command = match[4] ?? "";
if (!Number.isFinite(pid)) {
continue;
}
if (ppid !== process.pid) {
continue;
}
if (!stat.includes("T")) {
continue;
}
if (!matchers.some((matcher) => matcher.test(command))) {
continue;
}
suspended.push(pid);
}
if (suspended.length > threshold) {
// Verified locally: stopped (T) processes ignore SIGTERM, so use SIGKILL.
await runExec("kill", ["-9", ...suspended.map((pid) => String(pid))]);
}
} catch {
// ignore errors - best effort cleanup
}
}
export function enqueueCliRun<T>(key: string, task: () => Promise<T>): Promise<T> { export function enqueueCliRun<T>(key: string, task: () => Promise<T>): Promise<T> {
const prior = CLI_RUN_QUEUE.get(key) ?? Promise.resolve(); const prior = CLI_RUN_QUEUE.get(key) ?? Promise.resolve();
const chained = prior.catch(() => undefined).then(task); const chained = prior.catch(() => undefined).then(task);
const tracked = chained.finally(() => { // Keep queue continuity even when a run rejects, without emitting unhandled rejections.
if (CLI_RUN_QUEUE.get(key) === tracked) { const tracked = chained
CLI_RUN_QUEUE.delete(key); .catch(() => undefined)
} .finally(() => {
}); if (CLI_RUN_QUEUE.get(key) === tracked) {
CLI_RUN_QUEUE.delete(key);
}
});
CLI_RUN_QUEUE.set(key, tracked); CLI_RUN_QUEUE.set(key, tracked);
return chained; return chained;
} }

View File

@@ -0,0 +1,88 @@
import path from "node:path";
import type { CliBackendConfig } from "../../config/types.js";
import {
CLI_FRESH_WATCHDOG_DEFAULTS,
CLI_RESUME_WATCHDOG_DEFAULTS,
CLI_WATCHDOG_MIN_TIMEOUT_MS,
} from "../cli-watchdog-defaults.js";
function pickWatchdogProfile(
backend: CliBackendConfig,
useResume: boolean,
): {
noOutputTimeoutMs?: number;
noOutputTimeoutRatio: number;
minMs: number;
maxMs: number;
} {
const defaults = useResume ? CLI_RESUME_WATCHDOG_DEFAULTS : CLI_FRESH_WATCHDOG_DEFAULTS;
const configured = useResume
? backend.reliability?.watchdog?.resume
: backend.reliability?.watchdog?.fresh;
const ratio = (() => {
const value = configured?.noOutputTimeoutRatio;
if (typeof value !== "number" || !Number.isFinite(value)) {
return defaults.noOutputTimeoutRatio;
}
return Math.max(0.05, Math.min(0.95, value));
})();
const minMs = (() => {
const value = configured?.minMs;
if (typeof value !== "number" || !Number.isFinite(value)) {
return defaults.minMs;
}
return Math.max(CLI_WATCHDOG_MIN_TIMEOUT_MS, Math.floor(value));
})();
const maxMs = (() => {
const value = configured?.maxMs;
if (typeof value !== "number" || !Number.isFinite(value)) {
return defaults.maxMs;
}
return Math.max(CLI_WATCHDOG_MIN_TIMEOUT_MS, Math.floor(value));
})();
return {
noOutputTimeoutMs:
typeof configured?.noOutputTimeoutMs === "number" &&
Number.isFinite(configured.noOutputTimeoutMs)
? Math.max(CLI_WATCHDOG_MIN_TIMEOUT_MS, Math.floor(configured.noOutputTimeoutMs))
: undefined,
noOutputTimeoutRatio: ratio,
minMs: Math.min(minMs, maxMs),
maxMs: Math.max(minMs, maxMs),
};
}
export function resolveCliNoOutputTimeoutMs(params: {
backend: CliBackendConfig;
timeoutMs: number;
useResume: boolean;
}): number {
const profile = pickWatchdogProfile(params.backend, params.useResume);
// Keep watchdog below global timeout in normal cases.
const cap = Math.max(CLI_WATCHDOG_MIN_TIMEOUT_MS, params.timeoutMs - 1_000);
if (profile.noOutputTimeoutMs !== undefined) {
return Math.min(profile.noOutputTimeoutMs, cap);
}
const computed = Math.floor(params.timeoutMs * profile.noOutputTimeoutRatio);
const bounded = Math.min(profile.maxMs, Math.max(profile.minMs, computed));
return Math.min(bounded, cap);
}
export function buildCliSupervisorScopeKey(params: {
backend: CliBackendConfig;
backendId: string;
cliSessionId?: string;
}): string | undefined {
const commandToken = path
.basename(params.backend.command ?? "")
.trim()
.toLowerCase();
const backendToken = params.backendId.trim().toLowerCase();
const sessionToken = params.cliSessionId?.trim();
if (!sessionToken) {
return undefined;
}
return `cli:${backendToken}:${commandToken}:${sessionToken}`;
}

View File

@@ -0,0 +1,13 @@
export const CLI_WATCHDOG_MIN_TIMEOUT_MS = 1_000;
export const CLI_FRESH_WATCHDOG_DEFAULTS = {
noOutputTimeoutRatio: 0.8,
minMs: 180_000,
maxMs: 600_000,
} as const;
export const CLI_RESUME_WATCHDOG_DEFAULTS = {
noOutputTimeoutRatio: 0.3,
minMs: 60_000,
maxMs: 180_000,
} as const;

View File

@@ -91,6 +91,34 @@ export type CliBackendConfig = {
imageMode?: "repeat" | "list"; imageMode?: "repeat" | "list";
/** Serialize runs for this CLI. */ /** Serialize runs for this CLI. */
serialize?: boolean; serialize?: boolean;
/** Runtime reliability tuning for this backend's process lifecycle. */
reliability?: {
/** No-output watchdog tuning (fresh vs resumed runs). */
watchdog?: {
/** Fresh/new sessions (non-resume). */
fresh?: {
/** Fixed watchdog timeout in ms (overrides ratio when set). */
noOutputTimeoutMs?: number;
/** Fraction of overall timeout used when fixed timeout is not set. */
noOutputTimeoutRatio?: number;
/** Lower bound for computed watchdog timeout. */
minMs?: number;
/** Upper bound for computed watchdog timeout. */
maxMs?: number;
};
/** Resume sessions. */
resume?: {
/** Fixed watchdog timeout in ms (overrides ratio when set). */
noOutputTimeoutMs?: number;
/** Fraction of overall timeout used when fixed timeout is not set. */
noOutputTimeoutRatio?: number;
/** Lower bound for computed watchdog timeout. */
minMs?: number;
/** Upper bound for computed watchdog timeout. */
maxMs?: number;
};
};
};
}; };
export type AgentDefaultsConfig = { export type AgentDefaultsConfig = {

View File

@@ -275,6 +275,34 @@ export const CliBackendSchema = z
imageArg: z.string().optional(), imageArg: z.string().optional(),
imageMode: z.union([z.literal("repeat"), z.literal("list")]).optional(), imageMode: z.union([z.literal("repeat"), z.literal("list")]).optional(),
serialize: z.boolean().optional(), serialize: z.boolean().optional(),
reliability: z
.object({
watchdog: z
.object({
fresh: z
.object({
noOutputTimeoutMs: z.number().int().min(1000).optional(),
noOutputTimeoutRatio: z.number().min(0.05).max(0.95).optional(),
minMs: z.number().int().min(1000).optional(),
maxMs: z.number().int().min(1000).optional(),
})
.strict()
.optional(),
resume: z
.object({
noOutputTimeoutMs: z.number().int().min(1000).optional(),
noOutputTimeoutRatio: z.number().min(0.05).max(0.95).optional(),
minMs: z.number().int().min(1000).optional(),
maxMs: z.number().int().min(1000).optional(),
})
.strict()
.optional(),
})
.strict()
.optional(),
})
.strict()
.optional(),
}) })
.strict(); .strict();

View File

@@ -23,6 +23,7 @@ describe("runCommandWithTimeout", () => {
expect(result.code).toBe(0); expect(result.code).toBe(0);
expect(result.stdout).toBe("ok"); expect(result.stdout).toBe("ok");
expect(result.termination).toBe("exit");
}); });
it("merges custom env with process.env", async () => { it("merges custom env with process.env", async () => {
@@ -43,8 +44,58 @@ describe("runCommandWithTimeout", () => {
expect(result.code).toBe(0); expect(result.code).toBe(0);
expect(result.stdout).toBe("base|ok"); expect(result.stdout).toBe("base|ok");
expect(result.termination).toBe("exit");
} finally { } finally {
envSnapshot.restore(); envSnapshot.restore();
} }
}); });
it("kills command when no output timeout elapses", async () => {
const startedAt = Date.now();
const result = await runCommandWithTimeout(
[process.execPath, "-e", "setTimeout(() => {}, 10_000)"],
{
timeoutMs: 5_000,
noOutputTimeoutMs: 300,
},
);
const durationMs = Date.now() - startedAt;
expect(durationMs).toBeLessThan(2_500);
expect(result.termination).toBe("no-output-timeout");
expect(result.noOutputTimedOut).toBe(true);
expect(result.code).not.toBe(0);
});
it("resets no output timer when command keeps emitting output", async () => {
const result = await runCommandWithTimeout(
[
process.execPath,
"-e",
'let i=0; const t=setInterval(() => { process.stdout.write("."); i += 1; if (i >= 5) { clearInterval(t); process.exit(0); } }, 50);',
],
{
timeoutMs: 5_000,
noOutputTimeoutMs: 200,
},
);
expect(result.code).toBe(0);
expect(result.termination).toBe("exit");
expect(result.noOutputTimedOut).toBe(false);
expect(result.stdout.length).toBeGreaterThanOrEqual(5);
});
it("reports global timeout termination when overall timeout elapses", async () => {
const result = await runCommandWithTimeout(
[process.execPath, "-e", "setTimeout(() => {}, 10_000)"],
{
timeoutMs: 200,
},
);
expect(result.termination).toBe("timeout");
expect(result.noOutputTimedOut).toBe(false);
expect(result.code).not.toBe(0);
});
}); });

View File

@@ -76,11 +76,14 @@ export async function runExec(
} }
export type SpawnResult = { export type SpawnResult = {
pid?: number;
stdout: string; stdout: string;
stderr: string; stderr: string;
code: number | null; code: number | null;
signal: NodeJS.Signals | null; signal: NodeJS.Signals | null;
killed: boolean; killed: boolean;
termination: "exit" | "timeout" | "no-output-timeout" | "signal";
noOutputTimedOut?: boolean;
}; };
export type CommandOptions = { export type CommandOptions = {
@@ -89,6 +92,7 @@ export type CommandOptions = {
input?: string; input?: string;
env?: NodeJS.ProcessEnv; env?: NodeJS.ProcessEnv;
windowsVerbatimArguments?: boolean; windowsVerbatimArguments?: boolean;
noOutputTimeoutMs?: number;
}; };
export async function runCommandWithTimeout( export async function runCommandWithTimeout(
@@ -97,7 +101,7 @@ export async function runCommandWithTimeout(
): Promise<SpawnResult> { ): Promise<SpawnResult> {
const options: CommandOptions = const options: CommandOptions =
typeof optionsOrTimeout === "number" ? { timeoutMs: optionsOrTimeout } : optionsOrTimeout; typeof optionsOrTimeout === "number" ? { timeoutMs: optionsOrTimeout } : optionsOrTimeout;
const { timeoutMs, cwd, input, env } = options; const { timeoutMs, cwd, input, env, noOutputTimeoutMs } = options;
const { windowsVerbatimArguments } = options; const { windowsVerbatimArguments } = options;
const hasInput = input !== undefined; const hasInput = input !== undefined;
@@ -144,11 +148,45 @@ export async function runCommandWithTimeout(
let stdout = ""; let stdout = "";
let stderr = ""; let stderr = "";
let settled = false; let settled = false;
let timedOut = false;
let noOutputTimedOut = false;
let noOutputTimer: NodeJS.Timeout | null = null;
const shouldTrackOutputTimeout =
typeof noOutputTimeoutMs === "number" &&
Number.isFinite(noOutputTimeoutMs) &&
noOutputTimeoutMs > 0;
const clearNoOutputTimer = () => {
if (!noOutputTimer) {
return;
}
clearTimeout(noOutputTimer);
noOutputTimer = null;
};
const armNoOutputTimer = () => {
if (!shouldTrackOutputTimeout || settled) {
return;
}
clearNoOutputTimer();
noOutputTimer = setTimeout(() => {
if (settled) {
return;
}
noOutputTimedOut = true;
if (typeof child.kill === "function") {
child.kill("SIGKILL");
}
}, Math.floor(noOutputTimeoutMs));
};
const timer = setTimeout(() => { const timer = setTimeout(() => {
timedOut = true;
if (typeof child.kill === "function") { if (typeof child.kill === "function") {
child.kill("SIGKILL"); child.kill("SIGKILL");
} }
}, timeoutMs); }, timeoutMs);
armNoOutputTimer();
if (hasInput && child.stdin) { if (hasInput && child.stdin) {
child.stdin.write(input ?? ""); child.stdin.write(input ?? "");
@@ -157,9 +195,11 @@ export async function runCommandWithTimeout(
child.stdout?.on("data", (d) => { child.stdout?.on("data", (d) => {
stdout += d.toString(); stdout += d.toString();
armNoOutputTimer();
}); });
child.stderr?.on("data", (d) => { child.stderr?.on("data", (d) => {
stderr += d.toString(); stderr += d.toString();
armNoOutputTimer();
}); });
child.on("error", (err) => { child.on("error", (err) => {
if (settled) { if (settled) {
@@ -167,6 +207,7 @@ export async function runCommandWithTimeout(
} }
settled = true; settled = true;
clearTimeout(timer); clearTimeout(timer);
clearNoOutputTimer();
reject(err); reject(err);
}); });
child.on("close", (code, signal) => { child.on("close", (code, signal) => {
@@ -175,7 +216,24 @@ export async function runCommandWithTimeout(
} }
settled = true; settled = true;
clearTimeout(timer); clearTimeout(timer);
resolve({ stdout, stderr, code, signal, killed: child.killed }); clearNoOutputTimer();
const termination = noOutputTimedOut
? "no-output-timeout"
: timedOut
? "timeout"
: signal != null
? "signal"
: "exit";
resolve({
pid: child.pid ?? undefined,
stdout,
stderr,
code,
signal,
killed: child.killed,
termination,
noOutputTimedOut,
});
}); });
}); });
} }

34
src/process/kill-tree.ts Normal file
View File

@@ -0,0 +1,34 @@
import { spawn } from "node:child_process";
/**
* Best-effort process-tree termination.
* - Windows: use taskkill /T to include descendants.
* - Unix: try process-group kill first, then direct pid kill.
*/
export function killProcessTree(pid: number): void {
if (!Number.isFinite(pid) || pid <= 0) {
return;
}
if (process.platform === "win32") {
try {
spawn("taskkill", ["/F", "/T", "/PID", String(pid)], {
stdio: "ignore",
detached: true,
});
} catch {
// ignore taskkill failures
}
return;
}
try {
process.kill(-pid, "SIGKILL");
} catch {
try {
process.kill(pid, "SIGKILL");
} catch {
// process already gone or inaccessible
}
}
}

View File

@@ -0,0 +1,116 @@
import type { ChildProcess } from "node:child_process";
import { EventEmitter } from "node:events";
import { PassThrough } from "node:stream";
import { beforeEach, describe, expect, it, vi } from "vitest";
const { spawnWithFallbackMock, killProcessTreeMock } = vi.hoisted(() => ({
spawnWithFallbackMock: vi.fn(),
killProcessTreeMock: vi.fn(),
}));
vi.mock("../../spawn-utils.js", () => ({
spawnWithFallback: (...args: unknown[]) => spawnWithFallbackMock(...args),
}));
vi.mock("../../kill-tree.js", () => ({
killProcessTree: (...args: unknown[]) => killProcessTreeMock(...args),
}));
function createStubChild(pid = 1234) {
const child = new EventEmitter() as ChildProcess;
child.stdin = new PassThrough() as ChildProcess["stdin"];
child.stdout = new PassThrough() as ChildProcess["stdout"];
child.stderr = new PassThrough() as ChildProcess["stderr"];
child.pid = pid;
child.killed = false;
const killMock = vi.fn(() => true);
child.kill = killMock as ChildProcess["kill"];
return { child, killMock };
}
describe("createChildAdapter", () => {
beforeEach(() => {
spawnWithFallbackMock.mockReset();
killProcessTreeMock.mockReset();
});
it("uses process-tree kill for default SIGKILL", async () => {
const { child, killMock } = createStubChild(4321);
spawnWithFallbackMock.mockResolvedValue({
child,
usedFallback: false,
});
const { createChildAdapter } = await import("./child.js");
const adapter = await createChildAdapter({
argv: ["node", "-e", "setTimeout(() => {}, 1000)"],
stdinMode: "pipe-open",
});
const spawnArgs = spawnWithFallbackMock.mock.calls[0]?.[0] as {
options?: { detached?: boolean };
fallbacks?: Array<{ options?: { detached?: boolean } }>;
};
expect(spawnArgs.options?.detached).toBe(true);
expect(spawnArgs.fallbacks?.[0]?.options?.detached).toBe(false);
adapter.kill();
expect(killProcessTreeMock).toHaveBeenCalledWith(4321);
expect(killMock).not.toHaveBeenCalled();
});
it("uses direct child.kill for non-SIGKILL signals", async () => {
const { child, killMock } = createStubChild(7654);
spawnWithFallbackMock.mockResolvedValue({
child,
usedFallback: false,
});
const { createChildAdapter } = await import("./child.js");
const adapter = await createChildAdapter({
argv: ["node", "-e", "setTimeout(() => {}, 1000)"],
stdinMode: "pipe-open",
});
adapter.kill("SIGTERM");
expect(killProcessTreeMock).not.toHaveBeenCalled();
expect(killMock).toHaveBeenCalledWith("SIGTERM");
});
it("keeps inherited env when no override env is provided", async () => {
const { child } = createStubChild(3333);
spawnWithFallbackMock.mockResolvedValue({
child,
usedFallback: false,
});
const { createChildAdapter } = await import("./child.js");
await createChildAdapter({
argv: ["node", "-e", "process.exit(0)"],
stdinMode: "pipe-open",
});
const spawnArgs = spawnWithFallbackMock.mock.calls[0]?.[0] as {
options?: { env?: NodeJS.ProcessEnv };
};
expect(spawnArgs.options?.env).toBeUndefined();
});
it("passes explicit env overrides as strings", async () => {
const { child } = createStubChild(4444);
spawnWithFallbackMock.mockResolvedValue({
child,
usedFallback: false,
});
const { createChildAdapter } = await import("./child.js");
await createChildAdapter({
argv: ["node", "-e", "process.exit(0)"],
env: { FOO: "bar", COUNT: "12", DROP_ME: undefined },
stdinMode: "pipe-open",
});
const spawnArgs = spawnWithFallbackMock.mock.calls[0]?.[0] as {
options?: { env?: Record<string, string> };
};
expect(spawnArgs.options?.env).toEqual({ FOO: "bar", COUNT: "12" });
});
});

View File

@@ -0,0 +1,174 @@
import type { ChildProcessWithoutNullStreams, SpawnOptions } from "node:child_process";
import type { ManagedRunStdin } from "../types.js";
import { killProcessTree } from "../../kill-tree.js";
import { spawnWithFallback } from "../../spawn-utils.js";
function resolveCommand(command: string): string {
if (process.platform !== "win32") {
return command;
}
const lower = command.toLowerCase();
if (lower.endsWith(".exe") || lower.endsWith(".cmd") || lower.endsWith(".bat")) {
return command;
}
const basename = lower.split(/[\\/]/).pop() ?? lower;
if (basename === "npm" || basename === "pnpm" || basename === "yarn" || basename === "npx") {
return `${command}.cmd`;
}
return command;
}
function toStringEnv(env?: NodeJS.ProcessEnv): Record<string, string> {
if (!env) {
return {};
}
const out: Record<string, string> = {};
for (const [key, value] of Object.entries(env)) {
if (value === undefined) {
continue;
}
out[key] = String(value);
}
return out;
}
export type ChildAdapter = {
pid?: number;
stdin?: ManagedRunStdin;
onStdout: (listener: (chunk: string) => void) => void;
onStderr: (listener: (chunk: string) => void) => void;
wait: () => Promise<{ code: number | null; signal: NodeJS.Signals | null }>;
kill: (signal?: NodeJS.Signals) => void;
dispose: () => void;
};
export async function createChildAdapter(params: {
argv: string[];
cwd?: string;
env?: NodeJS.ProcessEnv;
windowsVerbatimArguments?: boolean;
input?: string;
stdinMode?: "inherit" | "pipe-open" | "pipe-closed";
}): Promise<ChildAdapter> {
const resolvedArgv = [...params.argv];
resolvedArgv[0] = resolveCommand(resolvedArgv[0] ?? "");
const stdinMode = params.stdinMode ?? (params.input !== undefined ? "pipe-closed" : "inherit");
const options: SpawnOptions = {
cwd: params.cwd,
env: params.env ? toStringEnv(params.env) : undefined,
stdio: ["pipe", "pipe", "pipe"],
detached: true,
windowsHide: true,
windowsVerbatimArguments: params.windowsVerbatimArguments,
};
if (stdinMode === "inherit") {
options.stdio = ["inherit", "pipe", "pipe"];
} else {
options.stdio = ["pipe", "pipe", "pipe"];
}
const spawned = await spawnWithFallback({
argv: resolvedArgv,
options,
fallbacks: [
{
label: "no-detach",
options: { detached: false },
},
],
});
const child = spawned.child as ChildProcessWithoutNullStreams;
if (child.stdin) {
if (params.input !== undefined) {
child.stdin.write(params.input);
child.stdin.end();
} else if (stdinMode === "pipe-closed") {
child.stdin.end();
}
}
const stdin: ManagedRunStdin | undefined = child.stdin
? {
destroyed: false,
write: (data: string, cb?: (err?: Error | null) => void) => {
try {
child.stdin.write(data, cb);
} catch (err) {
cb?.(err as Error);
}
},
end: () => {
try {
child.stdin.end();
} catch {
// ignore close errors
}
},
destroy: () => {
try {
child.stdin.destroy();
} catch {
// ignore destroy errors
}
},
}
: undefined;
const onStdout = (listener: (chunk: string) => void) => {
child.stdout.on("data", (chunk) => {
listener(chunk.toString());
});
};
const onStderr = (listener: (chunk: string) => void) => {
child.stderr.on("data", (chunk) => {
listener(chunk.toString());
});
};
const wait = async () =>
await new Promise<{ code: number | null; signal: NodeJS.Signals | null }>((resolve, reject) => {
child.once("error", reject);
child.once("close", (code, signal) => {
resolve({ code, signal });
});
});
const kill = (signal?: NodeJS.Signals) => {
const pid = child.pid ?? undefined;
if (signal === undefined || signal === "SIGKILL") {
if (pid) {
killProcessTree(pid);
} else {
try {
child.kill("SIGKILL");
} catch {
// ignore kill errors
}
}
return;
}
try {
child.kill(signal);
} catch {
// ignore kill errors for non-kill signals
}
};
const dispose = () => {
child.removeAllListeners();
};
return {
pid: child.pid ?? undefined,
stdin,
onStdout,
onStderr,
wait,
kill,
dispose,
};
}

View File

@@ -0,0 +1,161 @@
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
const { spawnMock, ptyKillMock, killProcessTreeMock } = vi.hoisted(() => ({
spawnMock: vi.fn(),
ptyKillMock: vi.fn(),
killProcessTreeMock: vi.fn(),
}));
vi.mock("@lydell/node-pty", () => ({
spawn: (...args: unknown[]) => spawnMock(...args),
}));
vi.mock("../../kill-tree.js", () => ({
killProcessTree: (...args: unknown[]) => killProcessTreeMock(...args),
}));
function createStubPty(pid = 1234) {
let exitListener: ((event: { exitCode: number; signal?: number }) => void) | null = null;
return {
pid,
write: vi.fn(),
onData: vi.fn(() => ({ dispose: vi.fn() })),
onExit: vi.fn((listener: (event: { exitCode: number; signal?: number }) => void) => {
exitListener = listener;
return { dispose: vi.fn() };
}),
kill: (signal?: string) => ptyKillMock(signal),
emitExit: (event: { exitCode: number; signal?: number }) => {
exitListener?.(event);
},
};
}
describe("createPtyAdapter", () => {
beforeEach(() => {
spawnMock.mockReset();
ptyKillMock.mockReset();
killProcessTreeMock.mockReset();
});
afterEach(() => {
vi.resetModules();
vi.clearAllMocks();
});
it("forwards explicit signals to node-pty kill", async () => {
spawnMock.mockReturnValue(createStubPty());
const { createPtyAdapter } = await import("./pty.js");
const adapter = await createPtyAdapter({
shell: "bash",
args: ["-lc", "sleep 10"],
});
adapter.kill("SIGTERM");
expect(ptyKillMock).toHaveBeenCalledWith("SIGTERM");
expect(killProcessTreeMock).not.toHaveBeenCalled();
});
it("uses process-tree kill for SIGKILL by default", async () => {
spawnMock.mockReturnValue(createStubPty());
const { createPtyAdapter } = await import("./pty.js");
const adapter = await createPtyAdapter({
shell: "bash",
args: ["-lc", "sleep 10"],
});
adapter.kill();
expect(killProcessTreeMock).toHaveBeenCalledWith(1234);
expect(ptyKillMock).not.toHaveBeenCalled();
});
it("resolves wait when exit fires before wait is called", async () => {
const stub = createStubPty();
spawnMock.mockReturnValue(stub);
const { createPtyAdapter } = await import("./pty.js");
const adapter = await createPtyAdapter({
shell: "bash",
args: ["-lc", "exit 3"],
});
expect(stub.onExit).toHaveBeenCalledTimes(1);
stub.emitExit({ exitCode: 3, signal: 0 });
await expect(adapter.wait()).resolves.toEqual({ code: 3, signal: null });
});
it("keeps inherited env when no override env is provided", async () => {
const stub = createStubPty();
spawnMock.mockReturnValue(stub);
const { createPtyAdapter } = await import("./pty.js");
await createPtyAdapter({
shell: "bash",
args: ["-lc", "env"],
});
const spawnOptions = spawnMock.mock.calls[0]?.[2] as { env?: Record<string, string> };
expect(spawnOptions?.env).toBeUndefined();
});
it("passes explicit env overrides as strings", async () => {
const stub = createStubPty();
spawnMock.mockReturnValue(stub);
const { createPtyAdapter } = await import("./pty.js");
await createPtyAdapter({
shell: "bash",
args: ["-lc", "env"],
env: { FOO: "bar", COUNT: "12", DROP_ME: undefined },
});
const spawnOptions = spawnMock.mock.calls[0]?.[2] as { env?: Record<string, string> };
expect(spawnOptions?.env).toEqual({ FOO: "bar", COUNT: "12" });
});
it("does not pass a signal to node-pty on Windows", async () => {
const originalPlatform = Object.getOwnPropertyDescriptor(process, "platform");
Object.defineProperty(process, "platform", { value: "win32", configurable: true });
try {
spawnMock.mockReturnValue(createStubPty());
const { createPtyAdapter } = await import("./pty.js");
const adapter = await createPtyAdapter({
shell: "powershell.exe",
args: ["-NoLogo"],
});
adapter.kill("SIGTERM");
expect(ptyKillMock).toHaveBeenCalledWith(undefined);
expect(killProcessTreeMock).not.toHaveBeenCalled();
} finally {
if (originalPlatform) {
Object.defineProperty(process, "platform", originalPlatform);
}
}
});
it("uses process-tree kill for SIGKILL on Windows", async () => {
const originalPlatform = Object.getOwnPropertyDescriptor(process, "platform");
Object.defineProperty(process, "platform", { value: "win32", configurable: true });
try {
spawnMock.mockReturnValue(createStubPty(4567));
const { createPtyAdapter } = await import("./pty.js");
const adapter = await createPtyAdapter({
shell: "powershell.exe",
args: ["-NoLogo"],
});
adapter.kill("SIGKILL");
expect(killProcessTreeMock).toHaveBeenCalledWith(4567);
expect(ptyKillMock).not.toHaveBeenCalled();
} finally {
if (originalPlatform) {
Object.defineProperty(process, "platform", originalPlatform);
}
}
});
});

View File

@@ -0,0 +1,197 @@
import type { ManagedRunStdin } from "../types.js";
import { killProcessTree } from "../../kill-tree.js";
type PtyExitEvent = { exitCode: number; signal?: number };
type PtyDisposable = { dispose: () => void };
type PtySpawnHandle = {
pid: number;
write: (data: string | Buffer) => void;
onData: (listener: (value: string) => void) => PtyDisposable | void;
onExit: (listener: (event: PtyExitEvent) => void) => PtyDisposable | void;
kill: (signal?: string) => void;
};
type PtySpawn = (
file: string,
args: string[] | string,
options: {
name?: string;
cols?: number;
rows?: number;
cwd?: string;
env?: Record<string, string>;
},
) => PtySpawnHandle;
type PtyModule = {
spawn?: PtySpawn;
default?: {
spawn?: PtySpawn;
};
};
function toStringEnv(env?: NodeJS.ProcessEnv): Record<string, string> {
if (!env) {
return {};
}
const out: Record<string, string> = {};
for (const [key, value] of Object.entries(env)) {
if (value === undefined) {
continue;
}
out[key] = String(value);
}
return out;
}
export type PtyAdapter = {
pid?: number;
stdin?: ManagedRunStdin;
onStdout: (listener: (chunk: string) => void) => void;
onStderr: (listener: (chunk: string) => void) => void;
wait: () => Promise<{ code: number | null; signal: NodeJS.Signals | number | null }>;
kill: (signal?: NodeJS.Signals) => void;
dispose: () => void;
};
export async function createPtyAdapter(params: {
shell: string;
args: string[];
cwd?: string;
env?: NodeJS.ProcessEnv;
cols?: number;
rows?: number;
name?: string;
}): Promise<PtyAdapter> {
const module = (await import("@lydell/node-pty")) as unknown as PtyModule;
const spawn = module.spawn ?? module.default?.spawn;
if (!spawn) {
throw new Error("PTY support is unavailable (node-pty spawn not found).");
}
const pty = spawn(params.shell, params.args, {
cwd: params.cwd,
env: params.env ? toStringEnv(params.env) : undefined,
name: params.name ?? process.env.TERM ?? "xterm-256color",
cols: params.cols ?? 120,
rows: params.rows ?? 30,
});
let dataListener: PtyDisposable | null = null;
let exitListener: PtyDisposable | null = null;
let waitResult: { code: number | null; signal: NodeJS.Signals | number | null } | null = null;
let resolveWait:
| ((value: { code: number | null; signal: NodeJS.Signals | number | null }) => void)
| null = null;
let waitPromise: Promise<{ code: number | null; signal: NodeJS.Signals | number | null }> | null =
null;
const settleWait = (value: { code: number | null; signal: NodeJS.Signals | number | null }) => {
if (waitResult) {
return;
}
waitResult = value;
if (resolveWait) {
const resolve = resolveWait;
resolveWait = null;
resolve(value);
}
};
exitListener =
pty.onExit((event) => {
const signal = event.signal && event.signal !== 0 ? event.signal : null;
settleWait({ code: event.exitCode ?? null, signal });
}) ?? null;
const stdin: ManagedRunStdin = {
destroyed: false,
write: (data, cb) => {
try {
pty.write(data);
cb?.(null);
} catch (err) {
cb?.(err as Error);
}
},
end: () => {
try {
const eof = process.platform === "win32" ? "\x1a" : "\x04";
pty.write(eof);
} catch {
// ignore EOF errors
}
},
};
const onStdout = (listener: (chunk: string) => void) => {
dataListener =
pty.onData((chunk) => {
listener(chunk.toString());
}) ?? null;
};
const onStderr = (_listener: (chunk: string) => void) => {
// PTY gives a unified output stream.
};
const wait = async () => {
if (waitResult) {
return waitResult;
}
if (!waitPromise) {
waitPromise = new Promise<{ code: number | null; signal: NodeJS.Signals | number | null }>(
(resolve) => {
resolveWait = resolve;
if (waitResult) {
const settled = waitResult;
resolveWait = null;
resolve(settled);
}
},
);
}
return waitPromise;
};
const kill = (signal: NodeJS.Signals = "SIGKILL") => {
try {
if (signal === "SIGKILL" && typeof pty.pid === "number" && pty.pid > 0) {
killProcessTree(pty.pid);
} else if (process.platform === "win32") {
pty.kill();
} else {
pty.kill(signal);
}
} catch {
// ignore kill errors
}
// Some PTY hosts do not emit `onExit` reliably after kill.
// Ensure waiters can progress on forced termination.
settleWait({ code: null, signal });
};
const dispose = () => {
try {
dataListener?.dispose();
} catch {
// ignore disposal errors
}
try {
exitListener?.dispose();
} catch {
// ignore disposal errors
}
dataListener = null;
exitListener = null;
settleWait({ code: null, signal: null });
};
return {
pid: pty.pid || undefined,
stdin,
onStdout,
onStderr,
wait,
kill,
dispose,
};
}

View File

@@ -0,0 +1,24 @@
import type { ProcessSupervisor } from "./types.js";
import { createProcessSupervisor } from "./supervisor.js";
let singleton: ProcessSupervisor | null = null;
export function getProcessSupervisor(): ProcessSupervisor {
if (singleton) {
return singleton;
}
singleton = createProcessSupervisor();
return singleton;
}
export { createProcessSupervisor } from "./supervisor.js";
export type {
ManagedRun,
ProcessSupervisor,
RunExit,
RunRecord,
RunState,
SpawnInput,
SpawnMode,
TerminationReason,
} from "./types.js";

View File

@@ -0,0 +1,83 @@
import { describe, expect, it } from "vitest";
import { createRunRegistry } from "./registry.js";
describe("process supervisor run registry", () => {
it("finalize is idempotent and preserves first terminal metadata", () => {
const registry = createRunRegistry();
registry.add({
runId: "r1",
sessionId: "s1",
backendId: "b1",
state: "running",
startedAtMs: 1,
lastOutputAtMs: 1,
createdAtMs: 1,
updatedAtMs: 1,
});
const first = registry.finalize("r1", {
reason: "overall-timeout",
exitCode: null,
exitSignal: "SIGKILL",
});
const second = registry.finalize("r1", {
reason: "manual-cancel",
exitCode: 0,
exitSignal: null,
});
expect(first).not.toBeNull();
expect(first?.firstFinalize).toBe(true);
expect(first?.record.terminationReason).toBe("overall-timeout");
expect(first?.record.exitCode).toBeNull();
expect(first?.record.exitSignal).toBe("SIGKILL");
expect(second).not.toBeNull();
expect(second?.firstFinalize).toBe(false);
expect(second?.record.terminationReason).toBe("overall-timeout");
expect(second?.record.exitCode).toBeNull();
expect(second?.record.exitSignal).toBe("SIGKILL");
});
it("prunes oldest exited records once retention cap is exceeded", () => {
const registry = createRunRegistry({ maxExitedRecords: 2 });
registry.add({
runId: "r1",
sessionId: "s1",
backendId: "b1",
state: "running",
startedAtMs: 1,
lastOutputAtMs: 1,
createdAtMs: 1,
updatedAtMs: 1,
});
registry.add({
runId: "r2",
sessionId: "s2",
backendId: "b1",
state: "running",
startedAtMs: 2,
lastOutputAtMs: 2,
createdAtMs: 2,
updatedAtMs: 2,
});
registry.add({
runId: "r3",
sessionId: "s3",
backendId: "b1",
state: "running",
startedAtMs: 3,
lastOutputAtMs: 3,
createdAtMs: 3,
updatedAtMs: 3,
});
registry.finalize("r1", { reason: "exit", exitCode: 0, exitSignal: null });
registry.finalize("r2", { reason: "exit", exitCode: 0, exitSignal: null });
registry.finalize("r3", { reason: "exit", exitCode: 0, exitSignal: null });
expect(registry.get("r1")).toBeUndefined();
expect(registry.get("r2")?.state).toBe("exited");
expect(registry.get("r3")?.state).toBe("exited");
});
});

View File

@@ -0,0 +1,154 @@
import type { RunRecord, RunState, TerminationReason } from "./types.js";
function nowMs() {
return Date.now();
}
const DEFAULT_MAX_EXITED_RECORDS = 2_000;
function resolveMaxExitedRecords(value?: number): number {
if (typeof value !== "number" || !Number.isFinite(value) || value < 1) {
return DEFAULT_MAX_EXITED_RECORDS;
}
return Math.max(1, Math.floor(value));
}
export type RunRegistry = {
add: (record: RunRecord) => void;
get: (runId: string) => RunRecord | undefined;
list: () => RunRecord[];
listByScope: (scopeKey: string) => RunRecord[];
updateState: (
runId: string,
state: RunState,
patch?: Partial<Pick<RunRecord, "pid" | "terminationReason" | "exitCode" | "exitSignal">>,
) => RunRecord | undefined;
touchOutput: (runId: string) => void;
finalize: (
runId: string,
exit: {
reason: TerminationReason;
exitCode: number | null;
exitSignal: NodeJS.Signals | number | null;
},
) => { record: RunRecord; firstFinalize: boolean } | null;
delete: (runId: string) => void;
};
export function createRunRegistry(options?: { maxExitedRecords?: number }): RunRegistry {
const records = new Map<string, RunRecord>();
const maxExitedRecords = resolveMaxExitedRecords(options?.maxExitedRecords);
const pruneExitedRecords = () => {
if (!records.size) {
return;
}
let exited = 0;
for (const record of records.values()) {
if (record.state === "exited") {
exited += 1;
}
}
if (exited <= maxExitedRecords) {
return;
}
let remove = exited - maxExitedRecords;
for (const [runId, record] of records.entries()) {
if (remove <= 0) {
break;
}
if (record.state !== "exited") {
continue;
}
records.delete(runId);
remove -= 1;
}
};
const add: RunRegistry["add"] = (record) => {
records.set(record.runId, { ...record });
};
const get: RunRegistry["get"] = (runId) => {
const record = records.get(runId);
return record ? { ...record } : undefined;
};
const list: RunRegistry["list"] = () => {
return Array.from(records.values()).map((record) => ({ ...record }));
};
const listByScope: RunRegistry["listByScope"] = (scopeKey) => {
if (!scopeKey.trim()) {
return [];
}
return Array.from(records.values())
.filter((record) => record.scopeKey === scopeKey)
.map((record) => ({ ...record }));
};
const updateState: RunRegistry["updateState"] = (runId, state, patch) => {
const current = records.get(runId);
if (!current) {
return undefined;
}
const updatedAtMs = nowMs();
const next: RunRecord = {
...current,
...patch,
state,
updatedAtMs,
lastOutputAtMs: current.lastOutputAtMs,
};
records.set(runId, next);
return { ...next };
};
const touchOutput: RunRegistry["touchOutput"] = (runId) => {
const current = records.get(runId);
if (!current) {
return;
}
const ts = nowMs();
records.set(runId, {
...current,
lastOutputAtMs: ts,
updatedAtMs: ts,
});
};
const finalize: RunRegistry["finalize"] = (runId, exit) => {
const current = records.get(runId);
if (!current) {
return null;
}
const firstFinalize = current.state !== "exited";
const ts = nowMs();
const next: RunRecord = {
...current,
state: "exited",
terminationReason: current.terminationReason ?? exit.reason,
exitCode: current.exitCode !== undefined ? current.exitCode : exit.exitCode,
exitSignal: current.exitSignal !== undefined ? current.exitSignal : exit.exitSignal,
updatedAtMs: ts,
};
records.set(runId, next);
pruneExitedRecords();
return { record: { ...next }, firstFinalize };
};
const del: RunRegistry["delete"] = (runId) => {
records.delete(runId);
};
return {
add,
get,
list,
listByScope,
updateState,
touchOutput,
finalize,
delete: del,
};
}

View File

@@ -0,0 +1,76 @@
import { beforeEach, describe, expect, it, vi } from "vitest";
const { createPtyAdapterMock } = vi.hoisted(() => ({
createPtyAdapterMock: vi.fn(),
}));
vi.mock("../../agents/shell-utils.js", () => ({
getShellConfig: () => ({ shell: "sh", args: ["-c"] }),
}));
vi.mock("./adapters/pty.js", () => ({
createPtyAdapter: (...args: unknown[]) => createPtyAdapterMock(...args),
}));
function createStubPtyAdapter() {
return {
pid: 1234,
stdin: undefined,
onStdout: (_listener: (chunk: string) => void) => {
// no-op
},
onStderr: (_listener: (chunk: string) => void) => {
// no-op
},
wait: async () => ({ code: 0, signal: null }),
kill: (_signal?: NodeJS.Signals) => {
// no-op
},
dispose: () => {
// no-op
},
};
}
describe("process supervisor PTY command contract", () => {
beforeEach(() => {
createPtyAdapterMock.mockReset();
});
it("passes PTY command verbatim to shell args", async () => {
createPtyAdapterMock.mockResolvedValue(createStubPtyAdapter());
const { createProcessSupervisor } = await import("./supervisor.js");
const supervisor = createProcessSupervisor();
const command = `printf '%s\\n' "a b" && printf '%s\\n' '$HOME'`;
const run = await supervisor.spawn({
sessionId: "s1",
backendId: "test",
mode: "pty",
ptyCommand: command,
timeoutMs: 1_000,
});
const exit = await run.wait();
expect(exit.reason).toBe("exit");
expect(createPtyAdapterMock).toHaveBeenCalledTimes(1);
const params = createPtyAdapterMock.mock.calls[0]?.[0] as { args?: string[] };
expect(params.args).toEqual(["-c", command]);
});
it("rejects empty PTY command", async () => {
createPtyAdapterMock.mockResolvedValue(createStubPtyAdapter());
const { createProcessSupervisor } = await import("./supervisor.js");
const supervisor = createProcessSupervisor();
await expect(
supervisor.spawn({
sessionId: "s1",
backendId: "test",
mode: "pty",
ptyCommand: " ",
}),
).rejects.toThrow("PTY command cannot be empty");
expect(createPtyAdapterMock).not.toHaveBeenCalled();
});
});

View File

@@ -0,0 +1,102 @@
import { describe, expect, it } from "vitest";
import { createProcessSupervisor } from "./supervisor.js";
describe("process supervisor", () => {
it("spawns child runs and captures output", async () => {
const supervisor = createProcessSupervisor();
const run = await supervisor.spawn({
sessionId: "s1",
backendId: "test",
mode: "child",
argv: [process.execPath, "-e", 'process.stdout.write("ok")'],
timeoutMs: 2_000,
stdinMode: "pipe-closed",
});
const exit = await run.wait();
expect(exit.reason).toBe("exit");
expect(exit.exitCode).toBe(0);
expect(exit.stdout).toBe("ok");
});
it("enforces no-output timeout for silent processes", async () => {
const supervisor = createProcessSupervisor();
const run = await supervisor.spawn({
sessionId: "s1",
backendId: "test",
mode: "child",
argv: [process.execPath, "-e", "setTimeout(() => {}, 10_000)"],
timeoutMs: 5_000,
noOutputTimeoutMs: 200,
stdinMode: "pipe-closed",
});
const exit = await run.wait();
expect(exit.reason).toBe("no-output-timeout");
expect(exit.noOutputTimedOut).toBe(true);
expect(exit.timedOut).toBe(true);
});
it("cancels prior scoped run when replaceExistingScope is enabled", async () => {
const supervisor = createProcessSupervisor();
const first = await supervisor.spawn({
sessionId: "s1",
backendId: "test",
scopeKey: "scope:a",
mode: "child",
argv: [process.execPath, "-e", "setTimeout(() => {}, 10_000)"],
timeoutMs: 10_000,
stdinMode: "pipe-open",
});
const second = await supervisor.spawn({
sessionId: "s1",
backendId: "test",
scopeKey: "scope:a",
replaceExistingScope: true,
mode: "child",
argv: [process.execPath, "-e", 'process.stdout.write("new")'],
timeoutMs: 2_000,
stdinMode: "pipe-closed",
});
const firstExit = await first.wait();
const secondExit = await second.wait();
expect(firstExit.reason === "manual-cancel" || firstExit.reason === "signal").toBe(true);
expect(secondExit.reason).toBe("exit");
expect(secondExit.stdout).toBe("new");
});
it("applies overall timeout even for near-immediate timer firing", async () => {
const supervisor = createProcessSupervisor();
const run = await supervisor.spawn({
sessionId: "s-timeout",
backendId: "test",
mode: "child",
argv: [process.execPath, "-e", "setTimeout(() => {}, 10_000)"],
timeoutMs: 1,
stdinMode: "pipe-closed",
});
const exit = await run.wait();
expect(exit.reason).toBe("overall-timeout");
expect(exit.timedOut).toBe(true);
});
it("can stream output without retaining it in RunExit payload", async () => {
const supervisor = createProcessSupervisor();
let streamed = "";
const run = await supervisor.spawn({
sessionId: "s-capture",
backendId: "test",
mode: "child",
argv: [process.execPath, "-e", 'process.stdout.write("streamed")'],
timeoutMs: 2_000,
stdinMode: "pipe-closed",
captureOutput: false,
onStdout: (chunk) => {
streamed += chunk;
},
});
const exit = await run.wait();
expect(streamed).toBe("streamed");
expect(exit.stdout).toBe("");
});
});

View File

@@ -0,0 +1,282 @@
import crypto from "node:crypto";
import type {
ManagedRun,
ProcessSupervisor,
RunExit,
RunRecord,
SpawnInput,
TerminationReason,
} from "./types.js";
import { getShellConfig } from "../../agents/shell-utils.js";
import { createSubsystemLogger } from "../../logging/subsystem.js";
import { createChildAdapter } from "./adapters/child.js";
import { createPtyAdapter } from "./adapters/pty.js";
import { createRunRegistry } from "./registry.js";
const log = createSubsystemLogger("process/supervisor");
type ActiveRun = {
run: ManagedRun;
scopeKey?: string;
};
function clampTimeout(value?: number): number | undefined {
if (typeof value !== "number" || !Number.isFinite(value) || value <= 0) {
return undefined;
}
return Math.max(1, Math.floor(value));
}
function isTimeoutReason(reason: TerminationReason) {
return reason === "overall-timeout" || reason === "no-output-timeout";
}
export function createProcessSupervisor(): ProcessSupervisor {
const registry = createRunRegistry();
const active = new Map<string, ActiveRun>();
const cancel = (runId: string, reason: TerminationReason = "manual-cancel") => {
const current = active.get(runId);
if (!current) {
return;
}
registry.updateState(runId, "exiting", {
terminationReason: reason,
});
current.run.cancel(reason);
};
const cancelScope = (scopeKey: string, reason: TerminationReason = "manual-cancel") => {
if (!scopeKey.trim()) {
return;
}
for (const [runId, run] of active.entries()) {
if (run.scopeKey !== scopeKey) {
continue;
}
cancel(runId, reason);
}
};
const spawn = async (input: SpawnInput): Promise<ManagedRun> => {
const runId = input.runId?.trim() || crypto.randomUUID();
if (input.replaceExistingScope && input.scopeKey?.trim()) {
cancelScope(input.scopeKey, "manual-cancel");
}
const startedAtMs = Date.now();
const record: RunRecord = {
runId,
sessionId: input.sessionId,
backendId: input.backendId,
scopeKey: input.scopeKey?.trim() || undefined,
state: "starting",
startedAtMs,
lastOutputAtMs: startedAtMs,
createdAtMs: startedAtMs,
updatedAtMs: startedAtMs,
};
registry.add(record);
let forcedReason: TerminationReason | null = null;
let settled = false;
let stdout = "";
let stderr = "";
let timeoutTimer: NodeJS.Timeout | null = null;
let noOutputTimer: NodeJS.Timeout | null = null;
const captureOutput = input.captureOutput !== false;
const overallTimeoutMs = clampTimeout(input.timeoutMs);
const noOutputTimeoutMs = clampTimeout(input.noOutputTimeoutMs);
const setForcedReason = (reason: TerminationReason) => {
if (forcedReason) {
return;
}
forcedReason = reason;
registry.updateState(runId, "exiting", { terminationReason: reason });
};
let cancelAdapter: ((reason: TerminationReason) => void) | null = null;
const requestCancel = (reason: TerminationReason) => {
setForcedReason(reason);
cancelAdapter?.(reason);
};
const touchOutput = () => {
registry.touchOutput(runId);
if (!noOutputTimeoutMs || settled) {
return;
}
if (noOutputTimer) {
clearTimeout(noOutputTimer);
}
noOutputTimer = setTimeout(() => {
requestCancel("no-output-timeout");
}, noOutputTimeoutMs);
};
try {
if (input.mode === "child" && input.argv.length === 0) {
throw new Error("spawn argv cannot be empty");
}
const adapter =
input.mode === "pty"
? await (async () => {
const { shell, args: shellArgs } = getShellConfig();
const ptyCommand = input.ptyCommand.trim();
if (!ptyCommand) {
throw new Error("PTY command cannot be empty");
}
return await createPtyAdapter({
shell,
args: [...shellArgs, ptyCommand],
cwd: input.cwd,
env: input.env,
});
})()
: await createChildAdapter({
argv: input.argv,
cwd: input.cwd,
env: input.env,
windowsVerbatimArguments: input.windowsVerbatimArguments,
input: input.input,
stdinMode: input.stdinMode,
});
registry.updateState(runId, "running", { pid: adapter.pid });
const clearTimers = () => {
if (timeoutTimer) {
clearTimeout(timeoutTimer);
timeoutTimer = null;
}
if (noOutputTimer) {
clearTimeout(noOutputTimer);
noOutputTimer = null;
}
};
cancelAdapter = (_reason: TerminationReason) => {
if (settled) {
return;
}
adapter.kill("SIGKILL");
};
if (overallTimeoutMs) {
timeoutTimer = setTimeout(() => {
requestCancel("overall-timeout");
}, overallTimeoutMs);
}
if (noOutputTimeoutMs) {
noOutputTimer = setTimeout(() => {
requestCancel("no-output-timeout");
}, noOutputTimeoutMs);
}
adapter.onStdout((chunk) => {
if (captureOutput) {
stdout += chunk;
}
input.onStdout?.(chunk);
touchOutput();
});
adapter.onStderr((chunk) => {
if (captureOutput) {
stderr += chunk;
}
input.onStderr?.(chunk);
touchOutput();
});
const waitPromise = (async (): Promise<RunExit> => {
const result = await adapter.wait();
if (settled) {
return {
reason: forcedReason ?? "exit",
exitCode: result.code,
exitSignal: result.signal,
durationMs: Date.now() - startedAtMs,
stdout,
stderr,
timedOut: isTimeoutReason(forcedReason ?? "exit"),
noOutputTimedOut: forcedReason === "no-output-timeout",
};
}
settled = true;
clearTimers();
adapter.dispose();
active.delete(runId);
const reason: TerminationReason =
forcedReason ?? (result.signal != null ? ("signal" as const) : ("exit" as const));
const exit: RunExit = {
reason,
exitCode: result.code,
exitSignal: result.signal,
durationMs: Date.now() - startedAtMs,
stdout,
stderr,
timedOut: isTimeoutReason(forcedReason ?? reason),
noOutputTimedOut: forcedReason === "no-output-timeout",
};
registry.finalize(runId, {
reason: exit.reason,
exitCode: exit.exitCode,
exitSignal: exit.exitSignal,
});
return exit;
})().catch((err) => {
if (!settled) {
settled = true;
clearTimers();
active.delete(runId);
adapter.dispose();
registry.finalize(runId, {
reason: "spawn-error",
exitCode: null,
exitSignal: null,
});
}
throw err;
});
const managedRun: ManagedRun = {
runId,
pid: adapter.pid,
startedAtMs,
stdin: adapter.stdin,
wait: async () => await waitPromise,
cancel: (reason = "manual-cancel") => {
requestCancel(reason);
},
};
active.set(runId, {
run: managedRun,
scopeKey: input.scopeKey?.trim() || undefined,
});
return managedRun;
} catch (err) {
registry.finalize(runId, {
reason: "spawn-error",
exitCode: null,
exitSignal: null,
});
log.warn(`spawn failed: runId=${runId} reason=${String(err)}`);
throw err;
}
};
return {
spawn,
cancel,
cancelScope,
reconcileOrphans: async () => {
// Deliberate no-op: this supervisor uses in-memory ownership only.
// Active runs are not recovered after process restart in the current model.
},
getRecord: (runId: string) => registry.get(runId),
};
}

View File

@@ -0,0 +1,96 @@
export type RunState = "starting" | "running" | "exiting" | "exited";
export type TerminationReason =
| "manual-cancel"
| "overall-timeout"
| "no-output-timeout"
| "spawn-error"
| "signal"
| "exit";
export type RunRecord = {
runId: string;
sessionId: string;
backendId: string;
scopeKey?: string;
pid?: number;
processGroupId?: number;
startedAtMs: number;
lastOutputAtMs: number;
createdAtMs: number;
updatedAtMs: number;
state: RunState;
terminationReason?: TerminationReason;
exitCode?: number | null;
exitSignal?: NodeJS.Signals | number | null;
};
export type RunExit = {
reason: TerminationReason;
exitCode: number | null;
exitSignal: NodeJS.Signals | number | null;
durationMs: number;
stdout: string;
stderr: string;
timedOut: boolean;
noOutputTimedOut: boolean;
};
export type ManagedRun = {
runId: string;
pid?: number;
startedAtMs: number;
stdin?: ManagedRunStdin;
wait: () => Promise<RunExit>;
cancel: (reason?: TerminationReason) => void;
};
export type SpawnMode = "child" | "pty";
export type ManagedRunStdin = {
write: (data: string, cb?: (err?: Error | null) => void) => void;
end: () => void;
destroy?: () => void;
destroyed?: boolean;
};
type SpawnBaseInput = {
runId?: string;
sessionId: string;
backendId: string;
scopeKey?: string;
replaceExistingScope?: boolean;
cwd?: string;
env?: NodeJS.ProcessEnv;
timeoutMs?: number;
noOutputTimeoutMs?: number;
/**
* When false, stdout/stderr are streamed via callbacks only and not retained in RunExit payload.
*/
captureOutput?: boolean;
onStdout?: (chunk: string) => void;
onStderr?: (chunk: string) => void;
};
type SpawnChildInput = SpawnBaseInput & {
mode: "child";
argv: string[];
windowsVerbatimArguments?: boolean;
input?: string;
stdinMode?: "inherit" | "pipe-open" | "pipe-closed";
};
type SpawnPtyInput = SpawnBaseInput & {
mode: "pty";
ptyCommand: string;
};
export type SpawnInput = SpawnChildInput | SpawnPtyInput;
export interface ProcessSupervisor {
spawn(input: SpawnInput): Promise<ManagedRun>;
cancel(runId: string, reason?: TerminationReason): void;
cancelScope(scopeKey: string, reason?: TerminationReason): void;
reconcileOrphans(): Promise<void>;
getRecord(runId: string): RunRecord | undefined;
}