fix: codex and similar processes keep dying on pty, solved by refactoring process spawning (#14257)

* exec: clean up PTY resources on timeout and exit

* cli: harden resume cleanup and watchdog stalled runs

* cli: productionize PTY and resume reliability paths

* docs: add PTY process supervision architecture plan

* docs: rewrite PTY supervision plan as pre-rewrite baseline

* docs: switch PTY supervision plan to one-go execution

* docs: add one-line root cause to PTY supervision plan

* docs: add OS contracts and test matrix to PTY supervision plan

* docs: define process-supervisor package placement and scope

* docs: tie supervisor plan to existing CI lanes

* docs: place PTY supervisor plan under src/process

* refactor(process): route exec and cli runs through supervisor

* docs(process): refresh PTY supervision plan

* wip

* fix(process): harden supervisor timeout and PTY termination

* fix(process): harden supervisor adapters env and wait handling

* ci: avoid failing formal conformance on comment permissions

* test(ui): fix cron request mock argument typing

* fix(ui): remove leftover conflict marker

* fix: supervise PTY processes (#14257) (openclaw#14257) (thanks @onutc)
This commit is contained in:
Onur
2026-02-16 09:32:05 +08:00
committed by GitHub
parent a73e7786e7
commit cd44a0d01e
32 changed files with 2759 additions and 855 deletions

View File

@@ -23,6 +23,7 @@ describe("runCommandWithTimeout", () => {
expect(result.code).toBe(0);
expect(result.stdout).toBe("ok");
expect(result.termination).toBe("exit");
});
it("merges custom env with process.env", async () => {
@@ -43,8 +44,58 @@ describe("runCommandWithTimeout", () => {
expect(result.code).toBe(0);
expect(result.stdout).toBe("base|ok");
expect(result.termination).toBe("exit");
} finally {
envSnapshot.restore();
}
});
it("kills command when no output timeout elapses", async () => {
const startedAt = Date.now();
const result = await runCommandWithTimeout(
[process.execPath, "-e", "setTimeout(() => {}, 10_000)"],
{
timeoutMs: 5_000,
noOutputTimeoutMs: 300,
},
);
const durationMs = Date.now() - startedAt;
expect(durationMs).toBeLessThan(2_500);
expect(result.termination).toBe("no-output-timeout");
expect(result.noOutputTimedOut).toBe(true);
expect(result.code).not.toBe(0);
});
it("resets no output timer when command keeps emitting output", async () => {
const result = await runCommandWithTimeout(
[
process.execPath,
"-e",
'let i=0; const t=setInterval(() => { process.stdout.write("."); i += 1; if (i >= 5) { clearInterval(t); process.exit(0); } }, 50);',
],
{
timeoutMs: 5_000,
noOutputTimeoutMs: 200,
},
);
expect(result.code).toBe(0);
expect(result.termination).toBe("exit");
expect(result.noOutputTimedOut).toBe(false);
expect(result.stdout.length).toBeGreaterThanOrEqual(5);
});
it("reports global timeout termination when overall timeout elapses", async () => {
const result = await runCommandWithTimeout(
[process.execPath, "-e", "setTimeout(() => {}, 10_000)"],
{
timeoutMs: 200,
},
);
expect(result.termination).toBe("timeout");
expect(result.noOutputTimedOut).toBe(false);
expect(result.code).not.toBe(0);
});
});

View File

@@ -76,11 +76,14 @@ export async function runExec(
}
export type SpawnResult = {
pid?: number;
stdout: string;
stderr: string;
code: number | null;
signal: NodeJS.Signals | null;
killed: boolean;
termination: "exit" | "timeout" | "no-output-timeout" | "signal";
noOutputTimedOut?: boolean;
};
export type CommandOptions = {
@@ -89,6 +92,7 @@ export type CommandOptions = {
input?: string;
env?: NodeJS.ProcessEnv;
windowsVerbatimArguments?: boolean;
noOutputTimeoutMs?: number;
};
export async function runCommandWithTimeout(
@@ -97,7 +101,7 @@ export async function runCommandWithTimeout(
): Promise<SpawnResult> {
const options: CommandOptions =
typeof optionsOrTimeout === "number" ? { timeoutMs: optionsOrTimeout } : optionsOrTimeout;
const { timeoutMs, cwd, input, env } = options;
const { timeoutMs, cwd, input, env, noOutputTimeoutMs } = options;
const { windowsVerbatimArguments } = options;
const hasInput = input !== undefined;
@@ -144,11 +148,45 @@ export async function runCommandWithTimeout(
let stdout = "";
let stderr = "";
let settled = false;
let timedOut = false;
let noOutputTimedOut = false;
let noOutputTimer: NodeJS.Timeout | null = null;
const shouldTrackOutputTimeout =
typeof noOutputTimeoutMs === "number" &&
Number.isFinite(noOutputTimeoutMs) &&
noOutputTimeoutMs > 0;
const clearNoOutputTimer = () => {
if (!noOutputTimer) {
return;
}
clearTimeout(noOutputTimer);
noOutputTimer = null;
};
const armNoOutputTimer = () => {
if (!shouldTrackOutputTimeout || settled) {
return;
}
clearNoOutputTimer();
noOutputTimer = setTimeout(() => {
if (settled) {
return;
}
noOutputTimedOut = true;
if (typeof child.kill === "function") {
child.kill("SIGKILL");
}
}, Math.floor(noOutputTimeoutMs));
};
const timer = setTimeout(() => {
timedOut = true;
if (typeof child.kill === "function") {
child.kill("SIGKILL");
}
}, timeoutMs);
armNoOutputTimer();
if (hasInput && child.stdin) {
child.stdin.write(input ?? "");
@@ -157,9 +195,11 @@ export async function runCommandWithTimeout(
child.stdout?.on("data", (d) => {
stdout += d.toString();
armNoOutputTimer();
});
child.stderr?.on("data", (d) => {
stderr += d.toString();
armNoOutputTimer();
});
child.on("error", (err) => {
if (settled) {
@@ -167,6 +207,7 @@ export async function runCommandWithTimeout(
}
settled = true;
clearTimeout(timer);
clearNoOutputTimer();
reject(err);
});
child.on("close", (code, signal) => {
@@ -175,7 +216,24 @@ export async function runCommandWithTimeout(
}
settled = true;
clearTimeout(timer);
resolve({ stdout, stderr, code, signal, killed: child.killed });
clearNoOutputTimer();
const termination = noOutputTimedOut
? "no-output-timeout"
: timedOut
? "timeout"
: signal != null
? "signal"
: "exit";
resolve({
pid: child.pid ?? undefined,
stdout,
stderr,
code,
signal,
killed: child.killed,
termination,
noOutputTimedOut,
});
});
});
}

34
src/process/kill-tree.ts Normal file
View File

@@ -0,0 +1,34 @@
import { spawn } from "node:child_process";
/**
* Best-effort process-tree termination.
* - Windows: use taskkill /T to include descendants.
* - Unix: try process-group kill first, then direct pid kill.
*/
export function killProcessTree(pid: number): void {
if (!Number.isFinite(pid) || pid <= 0) {
return;
}
if (process.platform === "win32") {
try {
spawn("taskkill", ["/F", "/T", "/PID", String(pid)], {
stdio: "ignore",
detached: true,
});
} catch {
// ignore taskkill failures
}
return;
}
try {
process.kill(-pid, "SIGKILL");
} catch {
try {
process.kill(pid, "SIGKILL");
} catch {
// process already gone or inaccessible
}
}
}

View File

@@ -0,0 +1,116 @@
import type { ChildProcess } from "node:child_process";
import { EventEmitter } from "node:events";
import { PassThrough } from "node:stream";
import { beforeEach, describe, expect, it, vi } from "vitest";
const { spawnWithFallbackMock, killProcessTreeMock } = vi.hoisted(() => ({
spawnWithFallbackMock: vi.fn(),
killProcessTreeMock: vi.fn(),
}));
vi.mock("../../spawn-utils.js", () => ({
spawnWithFallback: (...args: unknown[]) => spawnWithFallbackMock(...args),
}));
vi.mock("../../kill-tree.js", () => ({
killProcessTree: (...args: unknown[]) => killProcessTreeMock(...args),
}));
function createStubChild(pid = 1234) {
const child = new EventEmitter() as ChildProcess;
child.stdin = new PassThrough() as ChildProcess["stdin"];
child.stdout = new PassThrough() as ChildProcess["stdout"];
child.stderr = new PassThrough() as ChildProcess["stderr"];
child.pid = pid;
child.killed = false;
const killMock = vi.fn(() => true);
child.kill = killMock as ChildProcess["kill"];
return { child, killMock };
}
describe("createChildAdapter", () => {
beforeEach(() => {
spawnWithFallbackMock.mockReset();
killProcessTreeMock.mockReset();
});
it("uses process-tree kill for default SIGKILL", async () => {
const { child, killMock } = createStubChild(4321);
spawnWithFallbackMock.mockResolvedValue({
child,
usedFallback: false,
});
const { createChildAdapter } = await import("./child.js");
const adapter = await createChildAdapter({
argv: ["node", "-e", "setTimeout(() => {}, 1000)"],
stdinMode: "pipe-open",
});
const spawnArgs = spawnWithFallbackMock.mock.calls[0]?.[0] as {
options?: { detached?: boolean };
fallbacks?: Array<{ options?: { detached?: boolean } }>;
};
expect(spawnArgs.options?.detached).toBe(true);
expect(spawnArgs.fallbacks?.[0]?.options?.detached).toBe(false);
adapter.kill();
expect(killProcessTreeMock).toHaveBeenCalledWith(4321);
expect(killMock).not.toHaveBeenCalled();
});
it("uses direct child.kill for non-SIGKILL signals", async () => {
const { child, killMock } = createStubChild(7654);
spawnWithFallbackMock.mockResolvedValue({
child,
usedFallback: false,
});
const { createChildAdapter } = await import("./child.js");
const adapter = await createChildAdapter({
argv: ["node", "-e", "setTimeout(() => {}, 1000)"],
stdinMode: "pipe-open",
});
adapter.kill("SIGTERM");
expect(killProcessTreeMock).not.toHaveBeenCalled();
expect(killMock).toHaveBeenCalledWith("SIGTERM");
});
it("keeps inherited env when no override env is provided", async () => {
const { child } = createStubChild(3333);
spawnWithFallbackMock.mockResolvedValue({
child,
usedFallback: false,
});
const { createChildAdapter } = await import("./child.js");
await createChildAdapter({
argv: ["node", "-e", "process.exit(0)"],
stdinMode: "pipe-open",
});
const spawnArgs = spawnWithFallbackMock.mock.calls[0]?.[0] as {
options?: { env?: NodeJS.ProcessEnv };
};
expect(spawnArgs.options?.env).toBeUndefined();
});
it("passes explicit env overrides as strings", async () => {
const { child } = createStubChild(4444);
spawnWithFallbackMock.mockResolvedValue({
child,
usedFallback: false,
});
const { createChildAdapter } = await import("./child.js");
await createChildAdapter({
argv: ["node", "-e", "process.exit(0)"],
env: { FOO: "bar", COUNT: "12", DROP_ME: undefined },
stdinMode: "pipe-open",
});
const spawnArgs = spawnWithFallbackMock.mock.calls[0]?.[0] as {
options?: { env?: Record<string, string> };
};
expect(spawnArgs.options?.env).toEqual({ FOO: "bar", COUNT: "12" });
});
});

View File

@@ -0,0 +1,174 @@
import type { ChildProcessWithoutNullStreams, SpawnOptions } from "node:child_process";
import type { ManagedRunStdin } from "../types.js";
import { killProcessTree } from "../../kill-tree.js";
import { spawnWithFallback } from "../../spawn-utils.js";
function resolveCommand(command: string): string {
if (process.platform !== "win32") {
return command;
}
const lower = command.toLowerCase();
if (lower.endsWith(".exe") || lower.endsWith(".cmd") || lower.endsWith(".bat")) {
return command;
}
const basename = lower.split(/[\\/]/).pop() ?? lower;
if (basename === "npm" || basename === "pnpm" || basename === "yarn" || basename === "npx") {
return `${command}.cmd`;
}
return command;
}
function toStringEnv(env?: NodeJS.ProcessEnv): Record<string, string> {
if (!env) {
return {};
}
const out: Record<string, string> = {};
for (const [key, value] of Object.entries(env)) {
if (value === undefined) {
continue;
}
out[key] = String(value);
}
return out;
}
export type ChildAdapter = {
pid?: number;
stdin?: ManagedRunStdin;
onStdout: (listener: (chunk: string) => void) => void;
onStderr: (listener: (chunk: string) => void) => void;
wait: () => Promise<{ code: number | null; signal: NodeJS.Signals | null }>;
kill: (signal?: NodeJS.Signals) => void;
dispose: () => void;
};
export async function createChildAdapter(params: {
argv: string[];
cwd?: string;
env?: NodeJS.ProcessEnv;
windowsVerbatimArguments?: boolean;
input?: string;
stdinMode?: "inherit" | "pipe-open" | "pipe-closed";
}): Promise<ChildAdapter> {
const resolvedArgv = [...params.argv];
resolvedArgv[0] = resolveCommand(resolvedArgv[0] ?? "");
const stdinMode = params.stdinMode ?? (params.input !== undefined ? "pipe-closed" : "inherit");
const options: SpawnOptions = {
cwd: params.cwd,
env: params.env ? toStringEnv(params.env) : undefined,
stdio: ["pipe", "pipe", "pipe"],
detached: true,
windowsHide: true,
windowsVerbatimArguments: params.windowsVerbatimArguments,
};
if (stdinMode === "inherit") {
options.stdio = ["inherit", "pipe", "pipe"];
} else {
options.stdio = ["pipe", "pipe", "pipe"];
}
const spawned = await spawnWithFallback({
argv: resolvedArgv,
options,
fallbacks: [
{
label: "no-detach",
options: { detached: false },
},
],
});
const child = spawned.child as ChildProcessWithoutNullStreams;
if (child.stdin) {
if (params.input !== undefined) {
child.stdin.write(params.input);
child.stdin.end();
} else if (stdinMode === "pipe-closed") {
child.stdin.end();
}
}
const stdin: ManagedRunStdin | undefined = child.stdin
? {
destroyed: false,
write: (data: string, cb?: (err?: Error | null) => void) => {
try {
child.stdin.write(data, cb);
} catch (err) {
cb?.(err as Error);
}
},
end: () => {
try {
child.stdin.end();
} catch {
// ignore close errors
}
},
destroy: () => {
try {
child.stdin.destroy();
} catch {
// ignore destroy errors
}
},
}
: undefined;
const onStdout = (listener: (chunk: string) => void) => {
child.stdout.on("data", (chunk) => {
listener(chunk.toString());
});
};
const onStderr = (listener: (chunk: string) => void) => {
child.stderr.on("data", (chunk) => {
listener(chunk.toString());
});
};
const wait = async () =>
await new Promise<{ code: number | null; signal: NodeJS.Signals | null }>((resolve, reject) => {
child.once("error", reject);
child.once("close", (code, signal) => {
resolve({ code, signal });
});
});
const kill = (signal?: NodeJS.Signals) => {
const pid = child.pid ?? undefined;
if (signal === undefined || signal === "SIGKILL") {
if (pid) {
killProcessTree(pid);
} else {
try {
child.kill("SIGKILL");
} catch {
// ignore kill errors
}
}
return;
}
try {
child.kill(signal);
} catch {
// ignore kill errors for non-kill signals
}
};
const dispose = () => {
child.removeAllListeners();
};
return {
pid: child.pid ?? undefined,
stdin,
onStdout,
onStderr,
wait,
kill,
dispose,
};
}

View File

@@ -0,0 +1,161 @@
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
const { spawnMock, ptyKillMock, killProcessTreeMock } = vi.hoisted(() => ({
spawnMock: vi.fn(),
ptyKillMock: vi.fn(),
killProcessTreeMock: vi.fn(),
}));
vi.mock("@lydell/node-pty", () => ({
spawn: (...args: unknown[]) => spawnMock(...args),
}));
vi.mock("../../kill-tree.js", () => ({
killProcessTree: (...args: unknown[]) => killProcessTreeMock(...args),
}));
function createStubPty(pid = 1234) {
let exitListener: ((event: { exitCode: number; signal?: number }) => void) | null = null;
return {
pid,
write: vi.fn(),
onData: vi.fn(() => ({ dispose: vi.fn() })),
onExit: vi.fn((listener: (event: { exitCode: number; signal?: number }) => void) => {
exitListener = listener;
return { dispose: vi.fn() };
}),
kill: (signal?: string) => ptyKillMock(signal),
emitExit: (event: { exitCode: number; signal?: number }) => {
exitListener?.(event);
},
};
}
describe("createPtyAdapter", () => {
beforeEach(() => {
spawnMock.mockReset();
ptyKillMock.mockReset();
killProcessTreeMock.mockReset();
});
afterEach(() => {
vi.resetModules();
vi.clearAllMocks();
});
it("forwards explicit signals to node-pty kill", async () => {
spawnMock.mockReturnValue(createStubPty());
const { createPtyAdapter } = await import("./pty.js");
const adapter = await createPtyAdapter({
shell: "bash",
args: ["-lc", "sleep 10"],
});
adapter.kill("SIGTERM");
expect(ptyKillMock).toHaveBeenCalledWith("SIGTERM");
expect(killProcessTreeMock).not.toHaveBeenCalled();
});
it("uses process-tree kill for SIGKILL by default", async () => {
spawnMock.mockReturnValue(createStubPty());
const { createPtyAdapter } = await import("./pty.js");
const adapter = await createPtyAdapter({
shell: "bash",
args: ["-lc", "sleep 10"],
});
adapter.kill();
expect(killProcessTreeMock).toHaveBeenCalledWith(1234);
expect(ptyKillMock).not.toHaveBeenCalled();
});
it("resolves wait when exit fires before wait is called", async () => {
const stub = createStubPty();
spawnMock.mockReturnValue(stub);
const { createPtyAdapter } = await import("./pty.js");
const adapter = await createPtyAdapter({
shell: "bash",
args: ["-lc", "exit 3"],
});
expect(stub.onExit).toHaveBeenCalledTimes(1);
stub.emitExit({ exitCode: 3, signal: 0 });
await expect(adapter.wait()).resolves.toEqual({ code: 3, signal: null });
});
it("keeps inherited env when no override env is provided", async () => {
const stub = createStubPty();
spawnMock.mockReturnValue(stub);
const { createPtyAdapter } = await import("./pty.js");
await createPtyAdapter({
shell: "bash",
args: ["-lc", "env"],
});
const spawnOptions = spawnMock.mock.calls[0]?.[2] as { env?: Record<string, string> };
expect(spawnOptions?.env).toBeUndefined();
});
it("passes explicit env overrides as strings", async () => {
const stub = createStubPty();
spawnMock.mockReturnValue(stub);
const { createPtyAdapter } = await import("./pty.js");
await createPtyAdapter({
shell: "bash",
args: ["-lc", "env"],
env: { FOO: "bar", COUNT: "12", DROP_ME: undefined },
});
const spawnOptions = spawnMock.mock.calls[0]?.[2] as { env?: Record<string, string> };
expect(spawnOptions?.env).toEqual({ FOO: "bar", COUNT: "12" });
});
it("does not pass a signal to node-pty on Windows", async () => {
const originalPlatform = Object.getOwnPropertyDescriptor(process, "platform");
Object.defineProperty(process, "platform", { value: "win32", configurable: true });
try {
spawnMock.mockReturnValue(createStubPty());
const { createPtyAdapter } = await import("./pty.js");
const adapter = await createPtyAdapter({
shell: "powershell.exe",
args: ["-NoLogo"],
});
adapter.kill("SIGTERM");
expect(ptyKillMock).toHaveBeenCalledWith(undefined);
expect(killProcessTreeMock).not.toHaveBeenCalled();
} finally {
if (originalPlatform) {
Object.defineProperty(process, "platform", originalPlatform);
}
}
});
it("uses process-tree kill for SIGKILL on Windows", async () => {
const originalPlatform = Object.getOwnPropertyDescriptor(process, "platform");
Object.defineProperty(process, "platform", { value: "win32", configurable: true });
try {
spawnMock.mockReturnValue(createStubPty(4567));
const { createPtyAdapter } = await import("./pty.js");
const adapter = await createPtyAdapter({
shell: "powershell.exe",
args: ["-NoLogo"],
});
adapter.kill("SIGKILL");
expect(killProcessTreeMock).toHaveBeenCalledWith(4567);
expect(ptyKillMock).not.toHaveBeenCalled();
} finally {
if (originalPlatform) {
Object.defineProperty(process, "platform", originalPlatform);
}
}
});
});

View File

@@ -0,0 +1,197 @@
import type { ManagedRunStdin } from "../types.js";
import { killProcessTree } from "../../kill-tree.js";
type PtyExitEvent = { exitCode: number; signal?: number };
type PtyDisposable = { dispose: () => void };
type PtySpawnHandle = {
pid: number;
write: (data: string | Buffer) => void;
onData: (listener: (value: string) => void) => PtyDisposable | void;
onExit: (listener: (event: PtyExitEvent) => void) => PtyDisposable | void;
kill: (signal?: string) => void;
};
type PtySpawn = (
file: string,
args: string[] | string,
options: {
name?: string;
cols?: number;
rows?: number;
cwd?: string;
env?: Record<string, string>;
},
) => PtySpawnHandle;
type PtyModule = {
spawn?: PtySpawn;
default?: {
spawn?: PtySpawn;
};
};
function toStringEnv(env?: NodeJS.ProcessEnv): Record<string, string> {
if (!env) {
return {};
}
const out: Record<string, string> = {};
for (const [key, value] of Object.entries(env)) {
if (value === undefined) {
continue;
}
out[key] = String(value);
}
return out;
}
export type PtyAdapter = {
pid?: number;
stdin?: ManagedRunStdin;
onStdout: (listener: (chunk: string) => void) => void;
onStderr: (listener: (chunk: string) => void) => void;
wait: () => Promise<{ code: number | null; signal: NodeJS.Signals | number | null }>;
kill: (signal?: NodeJS.Signals) => void;
dispose: () => void;
};
export async function createPtyAdapter(params: {
shell: string;
args: string[];
cwd?: string;
env?: NodeJS.ProcessEnv;
cols?: number;
rows?: number;
name?: string;
}): Promise<PtyAdapter> {
const module = (await import("@lydell/node-pty")) as unknown as PtyModule;
const spawn = module.spawn ?? module.default?.spawn;
if (!spawn) {
throw new Error("PTY support is unavailable (node-pty spawn not found).");
}
const pty = spawn(params.shell, params.args, {
cwd: params.cwd,
env: params.env ? toStringEnv(params.env) : undefined,
name: params.name ?? process.env.TERM ?? "xterm-256color",
cols: params.cols ?? 120,
rows: params.rows ?? 30,
});
let dataListener: PtyDisposable | null = null;
let exitListener: PtyDisposable | null = null;
let waitResult: { code: number | null; signal: NodeJS.Signals | number | null } | null = null;
let resolveWait:
| ((value: { code: number | null; signal: NodeJS.Signals | number | null }) => void)
| null = null;
let waitPromise: Promise<{ code: number | null; signal: NodeJS.Signals | number | null }> | null =
null;
const settleWait = (value: { code: number | null; signal: NodeJS.Signals | number | null }) => {
if (waitResult) {
return;
}
waitResult = value;
if (resolveWait) {
const resolve = resolveWait;
resolveWait = null;
resolve(value);
}
};
exitListener =
pty.onExit((event) => {
const signal = event.signal && event.signal !== 0 ? event.signal : null;
settleWait({ code: event.exitCode ?? null, signal });
}) ?? null;
const stdin: ManagedRunStdin = {
destroyed: false,
write: (data, cb) => {
try {
pty.write(data);
cb?.(null);
} catch (err) {
cb?.(err as Error);
}
},
end: () => {
try {
const eof = process.platform === "win32" ? "\x1a" : "\x04";
pty.write(eof);
} catch {
// ignore EOF errors
}
},
};
const onStdout = (listener: (chunk: string) => void) => {
dataListener =
pty.onData((chunk) => {
listener(chunk.toString());
}) ?? null;
};
const onStderr = (_listener: (chunk: string) => void) => {
// PTY gives a unified output stream.
};
const wait = async () => {
if (waitResult) {
return waitResult;
}
if (!waitPromise) {
waitPromise = new Promise<{ code: number | null; signal: NodeJS.Signals | number | null }>(
(resolve) => {
resolveWait = resolve;
if (waitResult) {
const settled = waitResult;
resolveWait = null;
resolve(settled);
}
},
);
}
return waitPromise;
};
const kill = (signal: NodeJS.Signals = "SIGKILL") => {
try {
if (signal === "SIGKILL" && typeof pty.pid === "number" && pty.pid > 0) {
killProcessTree(pty.pid);
} else if (process.platform === "win32") {
pty.kill();
} else {
pty.kill(signal);
}
} catch {
// ignore kill errors
}
// Some PTY hosts do not emit `onExit` reliably after kill.
// Ensure waiters can progress on forced termination.
settleWait({ code: null, signal });
};
const dispose = () => {
try {
dataListener?.dispose();
} catch {
// ignore disposal errors
}
try {
exitListener?.dispose();
} catch {
// ignore disposal errors
}
dataListener = null;
exitListener = null;
settleWait({ code: null, signal: null });
};
return {
pid: pty.pid || undefined,
stdin,
onStdout,
onStderr,
wait,
kill,
dispose,
};
}

View File

@@ -0,0 +1,24 @@
import type { ProcessSupervisor } from "./types.js";
import { createProcessSupervisor } from "./supervisor.js";
let singleton: ProcessSupervisor | null = null;
export function getProcessSupervisor(): ProcessSupervisor {
if (singleton) {
return singleton;
}
singleton = createProcessSupervisor();
return singleton;
}
export { createProcessSupervisor } from "./supervisor.js";
export type {
ManagedRun,
ProcessSupervisor,
RunExit,
RunRecord,
RunState,
SpawnInput,
SpawnMode,
TerminationReason,
} from "./types.js";

View File

@@ -0,0 +1,83 @@
import { describe, expect, it } from "vitest";
import { createRunRegistry } from "./registry.js";
describe("process supervisor run registry", () => {
it("finalize is idempotent and preserves first terminal metadata", () => {
const registry = createRunRegistry();
registry.add({
runId: "r1",
sessionId: "s1",
backendId: "b1",
state: "running",
startedAtMs: 1,
lastOutputAtMs: 1,
createdAtMs: 1,
updatedAtMs: 1,
});
const first = registry.finalize("r1", {
reason: "overall-timeout",
exitCode: null,
exitSignal: "SIGKILL",
});
const second = registry.finalize("r1", {
reason: "manual-cancel",
exitCode: 0,
exitSignal: null,
});
expect(first).not.toBeNull();
expect(first?.firstFinalize).toBe(true);
expect(first?.record.terminationReason).toBe("overall-timeout");
expect(first?.record.exitCode).toBeNull();
expect(first?.record.exitSignal).toBe("SIGKILL");
expect(second).not.toBeNull();
expect(second?.firstFinalize).toBe(false);
expect(second?.record.terminationReason).toBe("overall-timeout");
expect(second?.record.exitCode).toBeNull();
expect(second?.record.exitSignal).toBe("SIGKILL");
});
it("prunes oldest exited records once retention cap is exceeded", () => {
const registry = createRunRegistry({ maxExitedRecords: 2 });
registry.add({
runId: "r1",
sessionId: "s1",
backendId: "b1",
state: "running",
startedAtMs: 1,
lastOutputAtMs: 1,
createdAtMs: 1,
updatedAtMs: 1,
});
registry.add({
runId: "r2",
sessionId: "s2",
backendId: "b1",
state: "running",
startedAtMs: 2,
lastOutputAtMs: 2,
createdAtMs: 2,
updatedAtMs: 2,
});
registry.add({
runId: "r3",
sessionId: "s3",
backendId: "b1",
state: "running",
startedAtMs: 3,
lastOutputAtMs: 3,
createdAtMs: 3,
updatedAtMs: 3,
});
registry.finalize("r1", { reason: "exit", exitCode: 0, exitSignal: null });
registry.finalize("r2", { reason: "exit", exitCode: 0, exitSignal: null });
registry.finalize("r3", { reason: "exit", exitCode: 0, exitSignal: null });
expect(registry.get("r1")).toBeUndefined();
expect(registry.get("r2")?.state).toBe("exited");
expect(registry.get("r3")?.state).toBe("exited");
});
});

View File

@@ -0,0 +1,154 @@
import type { RunRecord, RunState, TerminationReason } from "./types.js";
function nowMs() {
return Date.now();
}
const DEFAULT_MAX_EXITED_RECORDS = 2_000;
function resolveMaxExitedRecords(value?: number): number {
if (typeof value !== "number" || !Number.isFinite(value) || value < 1) {
return DEFAULT_MAX_EXITED_RECORDS;
}
return Math.max(1, Math.floor(value));
}
export type RunRegistry = {
add: (record: RunRecord) => void;
get: (runId: string) => RunRecord | undefined;
list: () => RunRecord[];
listByScope: (scopeKey: string) => RunRecord[];
updateState: (
runId: string,
state: RunState,
patch?: Partial<Pick<RunRecord, "pid" | "terminationReason" | "exitCode" | "exitSignal">>,
) => RunRecord | undefined;
touchOutput: (runId: string) => void;
finalize: (
runId: string,
exit: {
reason: TerminationReason;
exitCode: number | null;
exitSignal: NodeJS.Signals | number | null;
},
) => { record: RunRecord; firstFinalize: boolean } | null;
delete: (runId: string) => void;
};
export function createRunRegistry(options?: { maxExitedRecords?: number }): RunRegistry {
const records = new Map<string, RunRecord>();
const maxExitedRecords = resolveMaxExitedRecords(options?.maxExitedRecords);
const pruneExitedRecords = () => {
if (!records.size) {
return;
}
let exited = 0;
for (const record of records.values()) {
if (record.state === "exited") {
exited += 1;
}
}
if (exited <= maxExitedRecords) {
return;
}
let remove = exited - maxExitedRecords;
for (const [runId, record] of records.entries()) {
if (remove <= 0) {
break;
}
if (record.state !== "exited") {
continue;
}
records.delete(runId);
remove -= 1;
}
};
const add: RunRegistry["add"] = (record) => {
records.set(record.runId, { ...record });
};
const get: RunRegistry["get"] = (runId) => {
const record = records.get(runId);
return record ? { ...record } : undefined;
};
const list: RunRegistry["list"] = () => {
return Array.from(records.values()).map((record) => ({ ...record }));
};
const listByScope: RunRegistry["listByScope"] = (scopeKey) => {
if (!scopeKey.trim()) {
return [];
}
return Array.from(records.values())
.filter((record) => record.scopeKey === scopeKey)
.map((record) => ({ ...record }));
};
const updateState: RunRegistry["updateState"] = (runId, state, patch) => {
const current = records.get(runId);
if (!current) {
return undefined;
}
const updatedAtMs = nowMs();
const next: RunRecord = {
...current,
...patch,
state,
updatedAtMs,
lastOutputAtMs: current.lastOutputAtMs,
};
records.set(runId, next);
return { ...next };
};
const touchOutput: RunRegistry["touchOutput"] = (runId) => {
const current = records.get(runId);
if (!current) {
return;
}
const ts = nowMs();
records.set(runId, {
...current,
lastOutputAtMs: ts,
updatedAtMs: ts,
});
};
const finalize: RunRegistry["finalize"] = (runId, exit) => {
const current = records.get(runId);
if (!current) {
return null;
}
const firstFinalize = current.state !== "exited";
const ts = nowMs();
const next: RunRecord = {
...current,
state: "exited",
terminationReason: current.terminationReason ?? exit.reason,
exitCode: current.exitCode !== undefined ? current.exitCode : exit.exitCode,
exitSignal: current.exitSignal !== undefined ? current.exitSignal : exit.exitSignal,
updatedAtMs: ts,
};
records.set(runId, next);
pruneExitedRecords();
return { record: { ...next }, firstFinalize };
};
const del: RunRegistry["delete"] = (runId) => {
records.delete(runId);
};
return {
add,
get,
list,
listByScope,
updateState,
touchOutput,
finalize,
delete: del,
};
}

View File

@@ -0,0 +1,76 @@
import { beforeEach, describe, expect, it, vi } from "vitest";
const { createPtyAdapterMock } = vi.hoisted(() => ({
createPtyAdapterMock: vi.fn(),
}));
vi.mock("../../agents/shell-utils.js", () => ({
getShellConfig: () => ({ shell: "sh", args: ["-c"] }),
}));
vi.mock("./adapters/pty.js", () => ({
createPtyAdapter: (...args: unknown[]) => createPtyAdapterMock(...args),
}));
function createStubPtyAdapter() {
return {
pid: 1234,
stdin: undefined,
onStdout: (_listener: (chunk: string) => void) => {
// no-op
},
onStderr: (_listener: (chunk: string) => void) => {
// no-op
},
wait: async () => ({ code: 0, signal: null }),
kill: (_signal?: NodeJS.Signals) => {
// no-op
},
dispose: () => {
// no-op
},
};
}
describe("process supervisor PTY command contract", () => {
beforeEach(() => {
createPtyAdapterMock.mockReset();
});
it("passes PTY command verbatim to shell args", async () => {
createPtyAdapterMock.mockResolvedValue(createStubPtyAdapter());
const { createProcessSupervisor } = await import("./supervisor.js");
const supervisor = createProcessSupervisor();
const command = `printf '%s\\n' "a b" && printf '%s\\n' '$HOME'`;
const run = await supervisor.spawn({
sessionId: "s1",
backendId: "test",
mode: "pty",
ptyCommand: command,
timeoutMs: 1_000,
});
const exit = await run.wait();
expect(exit.reason).toBe("exit");
expect(createPtyAdapterMock).toHaveBeenCalledTimes(1);
const params = createPtyAdapterMock.mock.calls[0]?.[0] as { args?: string[] };
expect(params.args).toEqual(["-c", command]);
});
it("rejects empty PTY command", async () => {
createPtyAdapterMock.mockResolvedValue(createStubPtyAdapter());
const { createProcessSupervisor } = await import("./supervisor.js");
const supervisor = createProcessSupervisor();
await expect(
supervisor.spawn({
sessionId: "s1",
backendId: "test",
mode: "pty",
ptyCommand: " ",
}),
).rejects.toThrow("PTY command cannot be empty");
expect(createPtyAdapterMock).not.toHaveBeenCalled();
});
});

View File

@@ -0,0 +1,102 @@
import { describe, expect, it } from "vitest";
import { createProcessSupervisor } from "./supervisor.js";
describe("process supervisor", () => {
it("spawns child runs and captures output", async () => {
const supervisor = createProcessSupervisor();
const run = await supervisor.spawn({
sessionId: "s1",
backendId: "test",
mode: "child",
argv: [process.execPath, "-e", 'process.stdout.write("ok")'],
timeoutMs: 2_000,
stdinMode: "pipe-closed",
});
const exit = await run.wait();
expect(exit.reason).toBe("exit");
expect(exit.exitCode).toBe(0);
expect(exit.stdout).toBe("ok");
});
it("enforces no-output timeout for silent processes", async () => {
const supervisor = createProcessSupervisor();
const run = await supervisor.spawn({
sessionId: "s1",
backendId: "test",
mode: "child",
argv: [process.execPath, "-e", "setTimeout(() => {}, 10_000)"],
timeoutMs: 5_000,
noOutputTimeoutMs: 200,
stdinMode: "pipe-closed",
});
const exit = await run.wait();
expect(exit.reason).toBe("no-output-timeout");
expect(exit.noOutputTimedOut).toBe(true);
expect(exit.timedOut).toBe(true);
});
it("cancels prior scoped run when replaceExistingScope is enabled", async () => {
const supervisor = createProcessSupervisor();
const first = await supervisor.spawn({
sessionId: "s1",
backendId: "test",
scopeKey: "scope:a",
mode: "child",
argv: [process.execPath, "-e", "setTimeout(() => {}, 10_000)"],
timeoutMs: 10_000,
stdinMode: "pipe-open",
});
const second = await supervisor.spawn({
sessionId: "s1",
backendId: "test",
scopeKey: "scope:a",
replaceExistingScope: true,
mode: "child",
argv: [process.execPath, "-e", 'process.stdout.write("new")'],
timeoutMs: 2_000,
stdinMode: "pipe-closed",
});
const firstExit = await first.wait();
const secondExit = await second.wait();
expect(firstExit.reason === "manual-cancel" || firstExit.reason === "signal").toBe(true);
expect(secondExit.reason).toBe("exit");
expect(secondExit.stdout).toBe("new");
});
it("applies overall timeout even for near-immediate timer firing", async () => {
const supervisor = createProcessSupervisor();
const run = await supervisor.spawn({
sessionId: "s-timeout",
backendId: "test",
mode: "child",
argv: [process.execPath, "-e", "setTimeout(() => {}, 10_000)"],
timeoutMs: 1,
stdinMode: "pipe-closed",
});
const exit = await run.wait();
expect(exit.reason).toBe("overall-timeout");
expect(exit.timedOut).toBe(true);
});
it("can stream output without retaining it in RunExit payload", async () => {
const supervisor = createProcessSupervisor();
let streamed = "";
const run = await supervisor.spawn({
sessionId: "s-capture",
backendId: "test",
mode: "child",
argv: [process.execPath, "-e", 'process.stdout.write("streamed")'],
timeoutMs: 2_000,
stdinMode: "pipe-closed",
captureOutput: false,
onStdout: (chunk) => {
streamed += chunk;
},
});
const exit = await run.wait();
expect(streamed).toBe("streamed");
expect(exit.stdout).toBe("");
});
});

View File

@@ -0,0 +1,282 @@
import crypto from "node:crypto";
import type {
ManagedRun,
ProcessSupervisor,
RunExit,
RunRecord,
SpawnInput,
TerminationReason,
} from "./types.js";
import { getShellConfig } from "../../agents/shell-utils.js";
import { createSubsystemLogger } from "../../logging/subsystem.js";
import { createChildAdapter } from "./adapters/child.js";
import { createPtyAdapter } from "./adapters/pty.js";
import { createRunRegistry } from "./registry.js";
const log = createSubsystemLogger("process/supervisor");
type ActiveRun = {
run: ManagedRun;
scopeKey?: string;
};
function clampTimeout(value?: number): number | undefined {
if (typeof value !== "number" || !Number.isFinite(value) || value <= 0) {
return undefined;
}
return Math.max(1, Math.floor(value));
}
function isTimeoutReason(reason: TerminationReason) {
return reason === "overall-timeout" || reason === "no-output-timeout";
}
export function createProcessSupervisor(): ProcessSupervisor {
const registry = createRunRegistry();
const active = new Map<string, ActiveRun>();
const cancel = (runId: string, reason: TerminationReason = "manual-cancel") => {
const current = active.get(runId);
if (!current) {
return;
}
registry.updateState(runId, "exiting", {
terminationReason: reason,
});
current.run.cancel(reason);
};
const cancelScope = (scopeKey: string, reason: TerminationReason = "manual-cancel") => {
if (!scopeKey.trim()) {
return;
}
for (const [runId, run] of active.entries()) {
if (run.scopeKey !== scopeKey) {
continue;
}
cancel(runId, reason);
}
};
const spawn = async (input: SpawnInput): Promise<ManagedRun> => {
const runId = input.runId?.trim() || crypto.randomUUID();
if (input.replaceExistingScope && input.scopeKey?.trim()) {
cancelScope(input.scopeKey, "manual-cancel");
}
const startedAtMs = Date.now();
const record: RunRecord = {
runId,
sessionId: input.sessionId,
backendId: input.backendId,
scopeKey: input.scopeKey?.trim() || undefined,
state: "starting",
startedAtMs,
lastOutputAtMs: startedAtMs,
createdAtMs: startedAtMs,
updatedAtMs: startedAtMs,
};
registry.add(record);
let forcedReason: TerminationReason | null = null;
let settled = false;
let stdout = "";
let stderr = "";
let timeoutTimer: NodeJS.Timeout | null = null;
let noOutputTimer: NodeJS.Timeout | null = null;
const captureOutput = input.captureOutput !== false;
const overallTimeoutMs = clampTimeout(input.timeoutMs);
const noOutputTimeoutMs = clampTimeout(input.noOutputTimeoutMs);
const setForcedReason = (reason: TerminationReason) => {
if (forcedReason) {
return;
}
forcedReason = reason;
registry.updateState(runId, "exiting", { terminationReason: reason });
};
let cancelAdapter: ((reason: TerminationReason) => void) | null = null;
const requestCancel = (reason: TerminationReason) => {
setForcedReason(reason);
cancelAdapter?.(reason);
};
const touchOutput = () => {
registry.touchOutput(runId);
if (!noOutputTimeoutMs || settled) {
return;
}
if (noOutputTimer) {
clearTimeout(noOutputTimer);
}
noOutputTimer = setTimeout(() => {
requestCancel("no-output-timeout");
}, noOutputTimeoutMs);
};
try {
if (input.mode === "child" && input.argv.length === 0) {
throw new Error("spawn argv cannot be empty");
}
const adapter =
input.mode === "pty"
? await (async () => {
const { shell, args: shellArgs } = getShellConfig();
const ptyCommand = input.ptyCommand.trim();
if (!ptyCommand) {
throw new Error("PTY command cannot be empty");
}
return await createPtyAdapter({
shell,
args: [...shellArgs, ptyCommand],
cwd: input.cwd,
env: input.env,
});
})()
: await createChildAdapter({
argv: input.argv,
cwd: input.cwd,
env: input.env,
windowsVerbatimArguments: input.windowsVerbatimArguments,
input: input.input,
stdinMode: input.stdinMode,
});
registry.updateState(runId, "running", { pid: adapter.pid });
const clearTimers = () => {
if (timeoutTimer) {
clearTimeout(timeoutTimer);
timeoutTimer = null;
}
if (noOutputTimer) {
clearTimeout(noOutputTimer);
noOutputTimer = null;
}
};
cancelAdapter = (_reason: TerminationReason) => {
if (settled) {
return;
}
adapter.kill("SIGKILL");
};
if (overallTimeoutMs) {
timeoutTimer = setTimeout(() => {
requestCancel("overall-timeout");
}, overallTimeoutMs);
}
if (noOutputTimeoutMs) {
noOutputTimer = setTimeout(() => {
requestCancel("no-output-timeout");
}, noOutputTimeoutMs);
}
adapter.onStdout((chunk) => {
if (captureOutput) {
stdout += chunk;
}
input.onStdout?.(chunk);
touchOutput();
});
adapter.onStderr((chunk) => {
if (captureOutput) {
stderr += chunk;
}
input.onStderr?.(chunk);
touchOutput();
});
const waitPromise = (async (): Promise<RunExit> => {
const result = await adapter.wait();
if (settled) {
return {
reason: forcedReason ?? "exit",
exitCode: result.code,
exitSignal: result.signal,
durationMs: Date.now() - startedAtMs,
stdout,
stderr,
timedOut: isTimeoutReason(forcedReason ?? "exit"),
noOutputTimedOut: forcedReason === "no-output-timeout",
};
}
settled = true;
clearTimers();
adapter.dispose();
active.delete(runId);
const reason: TerminationReason =
forcedReason ?? (result.signal != null ? ("signal" as const) : ("exit" as const));
const exit: RunExit = {
reason,
exitCode: result.code,
exitSignal: result.signal,
durationMs: Date.now() - startedAtMs,
stdout,
stderr,
timedOut: isTimeoutReason(forcedReason ?? reason),
noOutputTimedOut: forcedReason === "no-output-timeout",
};
registry.finalize(runId, {
reason: exit.reason,
exitCode: exit.exitCode,
exitSignal: exit.exitSignal,
});
return exit;
})().catch((err) => {
if (!settled) {
settled = true;
clearTimers();
active.delete(runId);
adapter.dispose();
registry.finalize(runId, {
reason: "spawn-error",
exitCode: null,
exitSignal: null,
});
}
throw err;
});
const managedRun: ManagedRun = {
runId,
pid: adapter.pid,
startedAtMs,
stdin: adapter.stdin,
wait: async () => await waitPromise,
cancel: (reason = "manual-cancel") => {
requestCancel(reason);
},
};
active.set(runId, {
run: managedRun,
scopeKey: input.scopeKey?.trim() || undefined,
});
return managedRun;
} catch (err) {
registry.finalize(runId, {
reason: "spawn-error",
exitCode: null,
exitSignal: null,
});
log.warn(`spawn failed: runId=${runId} reason=${String(err)}`);
throw err;
}
};
return {
spawn,
cancel,
cancelScope,
reconcileOrphans: async () => {
// Deliberate no-op: this supervisor uses in-memory ownership only.
// Active runs are not recovered after process restart in the current model.
},
getRecord: (runId: string) => registry.get(runId),
};
}

View File

@@ -0,0 +1,96 @@
export type RunState = "starting" | "running" | "exiting" | "exited";
export type TerminationReason =
| "manual-cancel"
| "overall-timeout"
| "no-output-timeout"
| "spawn-error"
| "signal"
| "exit";
export type RunRecord = {
runId: string;
sessionId: string;
backendId: string;
scopeKey?: string;
pid?: number;
processGroupId?: number;
startedAtMs: number;
lastOutputAtMs: number;
createdAtMs: number;
updatedAtMs: number;
state: RunState;
terminationReason?: TerminationReason;
exitCode?: number | null;
exitSignal?: NodeJS.Signals | number | null;
};
export type RunExit = {
reason: TerminationReason;
exitCode: number | null;
exitSignal: NodeJS.Signals | number | null;
durationMs: number;
stdout: string;
stderr: string;
timedOut: boolean;
noOutputTimedOut: boolean;
};
export type ManagedRun = {
runId: string;
pid?: number;
startedAtMs: number;
stdin?: ManagedRunStdin;
wait: () => Promise<RunExit>;
cancel: (reason?: TerminationReason) => void;
};
export type SpawnMode = "child" | "pty";
export type ManagedRunStdin = {
write: (data: string, cb?: (err?: Error | null) => void) => void;
end: () => void;
destroy?: () => void;
destroyed?: boolean;
};
type SpawnBaseInput = {
runId?: string;
sessionId: string;
backendId: string;
scopeKey?: string;
replaceExistingScope?: boolean;
cwd?: string;
env?: NodeJS.ProcessEnv;
timeoutMs?: number;
noOutputTimeoutMs?: number;
/**
* When false, stdout/stderr are streamed via callbacks only and not retained in RunExit payload.
*/
captureOutput?: boolean;
onStdout?: (chunk: string) => void;
onStderr?: (chunk: string) => void;
};
type SpawnChildInput = SpawnBaseInput & {
mode: "child";
argv: string[];
windowsVerbatimArguments?: boolean;
input?: string;
stdinMode?: "inherit" | "pipe-open" | "pipe-closed";
};
type SpawnPtyInput = SpawnBaseInput & {
mode: "pty";
ptyCommand: string;
};
export type SpawnInput = SpawnChildInput | SpawnPtyInput;
export interface ProcessSupervisor {
spawn(input: SpawnInput): Promise<ManagedRun>;
cancel(runId: string, reason?: TerminationReason): void;
cancelScope(scopeKey: string, reason?: TerminationReason): void;
reconcileOrphans(): Promise<void>;
getRecord(runId: string): RunRecord | undefined;
}