Files
gpt-pilot/src/tasks/task-executor.ts

204 lines
5.1 KiB
TypeScript

import type { OpenClawConfig } from "../config/config.js";
import { createSubsystemLogger } from "../logging/subsystem.js";
import { createFlowForTask, deleteFlowRecordById } from "./flow-registry.js";
import {
cancelTaskById,
createTaskRecord,
linkTaskToFlowById,
markTaskLostById,
markTaskRunningByRunId,
markTaskTerminalByRunId,
recordTaskProgressByRunId,
setTaskRunDeliveryStatusByRunId,
} from "./task-registry.js";
import type {
TaskDeliveryState,
TaskDeliveryStatus,
TaskNotifyPolicy,
TaskRecord,
TaskRuntime,
TaskStatus,
TaskTerminalOutcome,
} from "./task-registry.types.js";
const log = createSubsystemLogger("tasks/executor");
function isOneTaskFlowEligible(task: TaskRecord): boolean {
if (task.parentFlowId?.trim() || !task.requesterSessionKey.trim()) {
return false;
}
if (task.deliveryStatus === "not_applicable") {
return false;
}
return task.runtime === "acp" || task.runtime === "subagent";
}
function ensureSingleTaskFlow(params: {
task: TaskRecord;
requesterOrigin?: TaskDeliveryState["requesterOrigin"];
}): TaskRecord {
if (!isOneTaskFlowEligible(params.task)) {
return params.task;
}
try {
const flow = createFlowForTask({
task: params.task,
requesterOrigin: params.requesterOrigin,
});
const linked = linkTaskToFlowById({
taskId: params.task.taskId,
flowId: flow.flowId,
});
if (!linked) {
deleteFlowRecordById(flow.flowId);
return params.task;
}
if (linked.parentFlowId !== flow.flowId) {
deleteFlowRecordById(flow.flowId);
return linked;
}
return linked;
} catch (error) {
log.warn("Failed to create one-task flow for detached run", {
taskId: params.task.taskId,
runId: params.task.runId,
error,
});
return params.task;
}
}
export function createQueuedTaskRun(params: {
runtime: TaskRuntime;
sourceId?: string;
requesterSessionKey: string;
requesterOrigin?: TaskDeliveryState["requesterOrigin"];
parentFlowId?: string;
childSessionKey?: string;
parentTaskId?: string;
agentId?: string;
runId?: string;
label?: string;
task: string;
preferMetadata?: boolean;
notifyPolicy?: TaskNotifyPolicy;
deliveryStatus?: TaskDeliveryStatus;
}): TaskRecord {
const task = createTaskRecord({
...params,
status: "queued",
});
return ensureSingleTaskFlow({
task,
requesterOrigin: params.requesterOrigin,
});
}
export function createRunningTaskRun(params: {
runtime: TaskRuntime;
sourceId?: string;
requesterSessionKey: string;
requesterOrigin?: TaskDeliveryState["requesterOrigin"];
parentFlowId?: string;
childSessionKey?: string;
parentTaskId?: string;
agentId?: string;
runId?: string;
label?: string;
task: string;
notifyPolicy?: TaskNotifyPolicy;
deliveryStatus?: TaskDeliveryStatus;
preferMetadata?: boolean;
startedAt?: number;
lastEventAt?: number;
progressSummary?: string | null;
}): TaskRecord {
const task = createTaskRecord({
...params,
status: "running",
});
return ensureSingleTaskFlow({
task,
requesterOrigin: params.requesterOrigin,
});
}
export function startTaskRunByRunId(params: {
runId: string;
startedAt?: number;
lastEventAt?: number;
progressSummary?: string | null;
eventSummary?: string | null;
}) {
return markTaskRunningByRunId(params);
}
export function recordTaskRunProgressByRunId(params: {
runId: string;
lastEventAt?: number;
progressSummary?: string | null;
eventSummary?: string | null;
}) {
return recordTaskProgressByRunId(params);
}
export function completeTaskRunByRunId(params: {
runId: string;
endedAt: number;
lastEventAt?: number;
progressSummary?: string | null;
terminalSummary?: string | null;
terminalOutcome?: TaskTerminalOutcome | null;
}) {
return markTaskTerminalByRunId({
runId: params.runId,
status: "succeeded",
endedAt: params.endedAt,
lastEventAt: params.lastEventAt,
progressSummary: params.progressSummary,
terminalSummary: params.terminalSummary,
terminalOutcome: params.terminalOutcome,
});
}
export function failTaskRunByRunId(params: {
runId: string;
status?: Extract<TaskStatus, "failed" | "timed_out" | "cancelled">;
endedAt: number;
lastEventAt?: number;
error?: string;
progressSummary?: string | null;
terminalSummary?: string | null;
}) {
return markTaskTerminalByRunId({
runId: params.runId,
status: params.status ?? "failed",
endedAt: params.endedAt,
lastEventAt: params.lastEventAt,
error: params.error,
progressSummary: params.progressSummary,
terminalSummary: params.terminalSummary,
});
}
export function markTaskRunLostById(params: {
taskId: string;
endedAt: number;
lastEventAt?: number;
error?: string;
cleanupAfter?: number;
}) {
return markTaskLostById(params);
}
export function setDetachedTaskDeliveryStatusByRunId(params: {
runId: string;
deliveryStatus: TaskDeliveryStatus;
}) {
return setTaskRunDeliveryStatusByRunId(params);
}
export async function cancelDetachedTaskRunById(params: { cfg: OpenClawConfig; taskId: string }) {
return cancelTaskById(params);
}