fix(gateway): abort active runs during sessions.reset (#16576)

Merged via /review-pr -> /prepare-pr -> /merge-pr.

Prepared head SHA: 43da87f2df
Co-authored-by: Grynn <212880+Grynn@users.noreply.github.com>
Co-authored-by: gumadeiras <5599352+gumadeiras@users.noreply.github.com>
Reviewed-by: @gumadeiras
This commit is contained in:
Vishal Doshi
2026-02-15 04:12:33 +05:30
committed by GitHub
parent d8da642611
commit 3efb752124
3 changed files with 189 additions and 21 deletions

View File

@@ -17,6 +17,7 @@ Docs: https://docs.openclaw.ai
- Agents: keep unresolved mutating tool failures visible until the same action retry succeeds, scope mutation-error surfacing to mutating calls (including `session_status` model changes), and dedupe duplicate failure warnings in outbound replies. (#16131) Thanks @Swader.
- Agents: classify external timeout aborts during compaction the same as internal timeouts, preventing unnecessary auth-profile rotation and preserving compaction-timeout snapshot fallback behavior. (#9855) Thanks @mverrilli.
- Sessions/Agents: harden transcript path resolution for mismatched agent context by preserving explicit store roots and adding safe absolute-path fallback to the correct agent sessions directory. (#16288) Thanks @robbyczgw-cla.
- Gateway/Sessions: abort active embedded runs and clear queued session work before `sessions.reset`, returning unavailable if the run does not stop in time. (#16576) Thanks @Grynn.
- BlueBubbles: include sender identity in group chat envelopes and pass clean message text to the agent prompt, aligning with iMessage/Signal formatting. (#16210) Thanks @zerone0x.
- WhatsApp: honor per-account `dmPolicy` overrides (account-level settings now take precedence over channel defaults for inbound DMs). (#10082) Thanks @mcaxtr.
- Media: accept `MEDIA:`-prefixed paths (lenient whitespace) when loading outbound media to prevent `ENOENT` for tool-returned local media paths. (#13107) Thanks @mcaxtr.

View File

@@ -88,6 +88,33 @@ function archiveSessionTranscriptsForSession(params: {
});
}
async function ensureSessionRuntimeCleanup(params: {
cfg: ReturnType<typeof loadConfig>;
key: string;
target: ReturnType<typeof resolveGatewaySessionStoreTarget>;
sessionId?: string;
}) {
const queueKeys = new Set<string>(params.target.storeKeys);
queueKeys.add(params.target.canonicalKey);
if (params.sessionId) {
queueKeys.add(params.sessionId);
}
clearSessionQueues([...queueKeys]);
stopSubagentsForRequester({ cfg: params.cfg, requesterSessionKey: params.target.canonicalKey });
if (!params.sessionId) {
return undefined;
}
abortEmbeddedPiRun(params.sessionId);
const ended = await waitForEmbeddedPiRunEnd(params.sessionId, 15_000);
if (ended) {
return undefined;
}
return errorShape(
ErrorCodes.UNAVAILABLE,
`Session ${params.key} is still active; try again in a moment.`,
);
}
export const sessionsHandlers: GatewayRequestHandlers = {
"sessions.list": ({ params, respond }) => {
if (!validateSessionsListParams(params)) {
@@ -278,6 +305,13 @@ export const sessionsHandlers: GatewayRequestHandlers = {
const cfg = loadConfig();
const target = resolveGatewaySessionStoreTarget({ cfg, key });
const { entry } = loadSessionEntry(key);
const sessionId = entry?.sessionId;
const cleanupError = await ensureSessionRuntimeCleanup({ cfg, key, target, sessionId });
if (cleanupError) {
respond(false, undefined, cleanupError);
return;
}
const storePath = target.storePath;
let oldSessionId: string | undefined;
let oldSessionFile: string | undefined;
@@ -360,27 +394,10 @@ export const sessionsHandlers: GatewayRequestHandlers = {
const { entry } = loadSessionEntry(key);
const sessionId = entry?.sessionId;
const existed = Boolean(entry);
const queueKeys = new Set<string>(target.storeKeys);
queueKeys.add(target.canonicalKey);
if (sessionId) {
queueKeys.add(sessionId);
}
clearSessionQueues([...queueKeys]);
stopSubagentsForRequester({ cfg, requesterSessionKey: target.canonicalKey });
if (sessionId) {
abortEmbeddedPiRun(sessionId);
const ended = await waitForEmbeddedPiRunEnd(sessionId, 15_000);
if (!ended) {
respond(
false,
undefined,
errorShape(
ErrorCodes.UNAVAILABLE,
`Session ${key} is still active; try again in a moment.`,
),
);
return;
}
const cleanupError = await ensureSessionRuntimeCleanup({ cfg, key, target, sessionId });
if (cleanupError) {
respond(false, undefined, cleanupError);
return;
}
await updateSessionStore(storePath, (store) => {
const { primaryKey } = migrateAndPruneSessionStoreKey({ cfg, key, store });

View File

@@ -597,4 +597,154 @@ describe("gateway server sessions", () => {
ws.close();
});
test("sessions.reset aborts active runs and clears queues", async () => {
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-sessions-"));
const storePath = path.join(dir, "sessions.json");
testState.sessionStorePath = storePath;
await fs.writeFile(
path.join(dir, "sess-main.jsonl"),
`${JSON.stringify({ role: "user", content: "hello" })}\n`,
"utf-8",
);
await writeSessionStore({
entries: {
main: { sessionId: "sess-main", updatedAt: Date.now() },
},
});
embeddedRunMock.activeIds.add("sess-main");
embeddedRunMock.waitResults.set("sess-main", true);
const { ws } = await openClient();
const reset = await rpcReq<{ ok: true; key: string; entry: { sessionId: string } }>(
ws,
"sessions.reset",
{
key: "main",
},
);
expect(reset.ok).toBe(true);
expect(reset.payload?.key).toBe("agent:main:main");
expect(reset.payload?.entry.sessionId).not.toBe("sess-main");
expect(sessionCleanupMocks.stopSubagentsForRequester).toHaveBeenCalledWith({
cfg: expect.any(Object),
requesterSessionKey: "agent:main:main",
});
expect(sessionCleanupMocks.clearSessionQueues).toHaveBeenCalledTimes(1);
const clearedKeys = sessionCleanupMocks.clearSessionQueues.mock.calls[0]?.[0] as string[];
expect(clearedKeys).toEqual(expect.arrayContaining(["main", "agent:main:main", "sess-main"]));
expect(embeddedRunMock.abortCalls).toEqual(["sess-main"]);
expect(embeddedRunMock.waitCalls).toEqual(["sess-main"]);
ws.close();
});
test("sessions.reset returns unavailable when active run does not stop", async () => {
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-sessions-"));
const storePath = path.join(dir, "sessions.json");
testState.sessionStorePath = storePath;
await fs.writeFile(
path.join(dir, "sess-main.jsonl"),
`${JSON.stringify({ role: "user", content: "hello" })}\n`,
"utf-8",
);
await writeSessionStore({
entries: {
main: { sessionId: "sess-main", updatedAt: Date.now() },
},
});
embeddedRunMock.activeIds.add("sess-main");
embeddedRunMock.waitResults.set("sess-main", false);
const { ws } = await openClient();
const reset = await rpcReq(ws, "sessions.reset", {
key: "main",
});
expect(reset.ok).toBe(false);
expect(reset.error?.code).toBe("UNAVAILABLE");
expect(reset.error?.message ?? "").toMatch(/still active/i);
expect(sessionCleanupMocks.stopSubagentsForRequester).toHaveBeenCalledWith({
cfg: expect.any(Object),
requesterSessionKey: "agent:main:main",
});
expect(sessionCleanupMocks.clearSessionQueues).toHaveBeenCalledTimes(1);
const clearedKeys = sessionCleanupMocks.clearSessionQueues.mock.calls[0]?.[0] as string[];
expect(clearedKeys).toEqual(expect.arrayContaining(["main", "agent:main:main", "sess-main"]));
expect(embeddedRunMock.abortCalls).toEqual(["sess-main"]);
expect(embeddedRunMock.waitCalls).toEqual(["sess-main"]);
const store = JSON.parse(await fs.readFile(storePath, "utf-8")) as Record<
string,
{ sessionId?: string }
>;
expect(store["agent:main:main"]?.sessionId).toBe("sess-main");
const filesAfterResetAttempt = await fs.readdir(dir);
expect(filesAfterResetAttempt.some((f) => f.startsWith("sess-main.jsonl.reset."))).toBe(false);
ws.close();
});
test("sessions.delete returns unavailable when active run does not stop", async () => {
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-sessions-"));
const storePath = path.join(dir, "sessions.json");
testState.sessionStorePath = storePath;
await fs.writeFile(
path.join(dir, "sess-active.jsonl"),
`${JSON.stringify({ role: "user", content: "active" })}\n`,
"utf-8",
);
await writeSessionStore({
entries: {
"discord:group:dev": {
sessionId: "sess-active",
updatedAt: Date.now(),
},
},
});
embeddedRunMock.activeIds.add("sess-active");
embeddedRunMock.waitResults.set("sess-active", false);
const { ws } = await openClient();
const deleted = await rpcReq(ws, "sessions.delete", {
key: "discord:group:dev",
});
expect(deleted.ok).toBe(false);
expect(deleted.error?.code).toBe("UNAVAILABLE");
expect(deleted.error?.message ?? "").toMatch(/still active/i);
expect(sessionCleanupMocks.stopSubagentsForRequester).toHaveBeenCalledWith({
cfg: expect.any(Object),
requesterSessionKey: "agent:main:discord:group:dev",
});
expect(sessionCleanupMocks.clearSessionQueues).toHaveBeenCalledTimes(1);
const clearedKeys = sessionCleanupMocks.clearSessionQueues.mock.calls[0]?.[0] as string[];
expect(clearedKeys).toEqual(
expect.arrayContaining(["discord:group:dev", "agent:main:discord:group:dev", "sess-active"]),
);
expect(embeddedRunMock.abortCalls).toEqual(["sess-active"]);
expect(embeddedRunMock.waitCalls).toEqual(["sess-active"]);
const store = JSON.parse(await fs.readFile(storePath, "utf-8")) as Record<
string,
{ sessionId?: string }
>;
expect(store["agent:main:discord:group:dev"]?.sessionId).toBe("sess-active");
const filesAfterDeleteAttempt = await fs.readdir(dir);
expect(filesAfterDeleteAttempt.some((f) => f.startsWith("sess-active.jsonl.deleted."))).toBe(
false,
);
ws.close();
});
});