diff --git a/src/cron/isolated-agent/run.ts b/src/cron/isolated-agent/run.ts index c398a52a2e..4cf7183693 100644 --- a/src/cron/isolated-agent/run.ts +++ b/src/cron/isolated-agent/run.ts @@ -1,3 +1,7 @@ +import type { MessagingToolSend } from "../../agents/pi-embedded-messaging.js"; +import type { OpenClawConfig } from "../../config/config.js"; +import type { AgentDefaultsConfig } from "../../config/types.js"; +import type { CronJob, CronRunOutcome, CronRunTelemetry } from "../types.js"; import { resolveAgentConfig, resolveAgentDir, @@ -20,7 +24,6 @@ import { resolveHooksGmailModel, resolveThinkingDefault, } from "../../agents/model-selection.js"; -import type { MessagingToolSend } from "../../agents/pi-embedded-messaging.js"; import { runEmbeddedPiAgent } from "../../agents/pi-embedded.js"; import { runSubagentAnnounceFlow } from "../../agents/subagent-announce.js"; import { countActiveDescendantRuns } from "../../agents/subagent-registry.js"; @@ -34,13 +37,11 @@ import { } from "../../auto-reply/thinking.js"; import { SILENT_REPLY_TOKEN } from "../../auto-reply/tokens.js"; import { createOutboundSendDeps, type CliDeps } from "../../cli/outbound-send-deps.js"; -import type { OpenClawConfig } from "../../config/config.js"; import { resolveAgentMainSessionKey, resolveSessionTranscriptPath, updateSessionStore, } from "../../config/sessions.js"; -import type { AgentDefaultsConfig } from "../../config/types.js"; import { registerAgentRunContext } from "../../infra/agent-events.js"; import { deliverOutboundPayloads } from "../../infra/outbound/deliver.js"; import { resolveAgentOutboundIdentity } from "../../infra/outbound/identity.js"; @@ -53,7 +54,6 @@ import { isExternalHookSession, } from "../../security/external-content.js"; import { resolveCronDeliveryPlan } from "../delivery.js"; -import type { CronJob } from "../types.js"; import { resolveDeliveryTarget } from "./delivery-target.js"; import { isHeartbeatOnlyResponse, @@ -101,13 +101,8 @@ function resolveCronDeliveryBestEffort(job: CronJob): boolean { } export type RunCronAgentTurnResult = { - status: "ok" | "error" | "skipped"; - summary?: string; /** Last non-empty agent text output (not truncated). */ outputText?: string; - error?: string; - sessionId?: string; - sessionKey?: string; /** * `true` when the isolated run already delivered its output to the target * channel (via outbound payloads, the subagent announce flow, or a matching @@ -116,18 +111,8 @@ export type RunCronAgentTurnResult = { * messages. See: https://github.com/openclaw/openclaw/issues/15692 */ delivered?: boolean; - - // Telemetry (best-effort) - model?: string; - provider?: string; - usage?: { - input_tokens?: number; - output_tokens?: number; - total_tokens?: number; - cache_read_tokens?: number; - cache_write_tokens?: number; - }; -}; +} & CronRunOutcome & + CronRunTelemetry; export async function runCronIsolatedAgentTurn(params: { cfg: OpenClawConfig; @@ -486,19 +471,7 @@ export async function runCronIsolatedAgentTurn(params: { // Update token+model fields in the session store. // Also collect best-effort telemetry for the cron run log. - let telemetry: - | { - model?: string; - provider?: string; - usage?: { - input_tokens?: number; - output_tokens?: number; - total_tokens?: number; - cache_read_tokens?: number; - cache_write_tokens?: number; - }; - } - | undefined; + let telemetry: CronRunTelemetry | undefined; { const usage = runResult.meta?.agentMeta?.usage; const promptTokens = runResult.meta?.agentMeta?.promptTokens; diff --git a/src/cron/run-log.ts b/src/cron/run-log.ts index 56d369b394..18d5f22d4a 100644 --- a/src/cron/run-log.ts +++ b/src/cron/run-log.ts @@ -1,11 +1,12 @@ import fs from "node:fs/promises"; import path from "node:path"; +import type { CronRunStatus, CronRunTelemetry } from "./types.js"; export type CronRunLogEntry = { ts: number; jobId: string; action: "finished"; - status?: "ok" | "error" | "skipped"; + status?: CronRunStatus; error?: string; summary?: string; sessionId?: string; @@ -13,18 +14,7 @@ export type CronRunLogEntry = { runAtMs?: number; durationMs?: number; nextRunAtMs?: number; - - // Telemetry (best-effort) - model?: string; - provider?: string; - usage?: { - input_tokens?: number; - output_tokens?: number; - total_tokens?: number; - cache_read_tokens?: number; - cache_write_tokens?: number; - }; -}; +} & CronRunTelemetry; export function resolveCronRunLogPath(params: { storePath: string; jobId: string }) { const storePath = path.resolve(params.storePath); diff --git a/src/cron/service/state.ts b/src/cron/service/state.ts index 0d30bc99a0..dfa29769bd 100644 --- a/src/cron/service/state.ts +++ b/src/cron/service/state.ts @@ -1,30 +1,27 @@ import type { CronConfig } from "../../config/types.cron.js"; import type { HeartbeatRunResult } from "../../infra/heartbeat-wake.js"; -import type { CronJob, CronJobCreate, CronJobPatch, CronStoreFile } from "../types.js"; +import type { + CronJob, + CronJobCreate, + CronJobPatch, + CronRunOutcome, + CronRunStatus, + CronRunTelemetry, + CronStoreFile, +} from "../types.js"; export type CronEvent = { jobId: string; action: "added" | "updated" | "removed" | "started" | "finished"; runAtMs?: number; durationMs?: number; - status?: "ok" | "error" | "skipped"; + status?: CronRunStatus; error?: string; summary?: string; sessionId?: string; sessionKey?: string; nextRunAtMs?: number; - - // Telemetry (best-effort) - model?: string; - provider?: string; - usage?: { - input_tokens?: number; - output_tokens?: number; - total_tokens?: number; - cache_read_tokens?: number; - cache_write_tokens?: number; - }; -}; +} & CronRunTelemetry; export type Logger = { debug: (obj: unknown, msg?: string) => void; @@ -57,32 +54,20 @@ export type CronServiceDeps = { wakeNowHeartbeatBusyMaxWaitMs?: number; /** WakeMode=now: delay between runHeartbeatOnce retries while busy. */ wakeNowHeartbeatBusyRetryDelayMs?: number; - runIsolatedAgentJob: (params: { job: CronJob; message: string }) => Promise<{ - status: "ok" | "error" | "skipped"; - summary?: string; - /** Last non-empty agent text output (not truncated). */ - outputText?: string; - error?: string; - sessionId?: string; - sessionKey?: string; - /** - * `true` when the isolated run already delivered its output to the target - * channel (including matching messaging-tool sends). See: - * https://github.com/openclaw/openclaw/issues/15692 - */ - delivered?: boolean; - - // Telemetry (best-effort) - model?: string; - provider?: string; - usage?: { - input_tokens?: number; - output_tokens?: number; - total_tokens?: number; - cache_read_tokens?: number; - cache_write_tokens?: number; - }; - }>; + runIsolatedAgentJob: (params: { job: CronJob; message: string }) => Promise< + { + summary?: string; + /** Last non-empty agent text output (not truncated). */ + outputText?: string; + /** + * `true` when the isolated run already delivered its output to the target + * channel (including matching messaging-tool sends). See: + * https://github.com/openclaw/openclaw/issues/15692 + */ + delivered?: boolean; + } & CronRunOutcome & + CronRunTelemetry + >; onEvent?: (evt: CronEvent) => void; }; diff --git a/src/cron/service/timer.ts b/src/cron/service/timer.ts index 9e88b12177..d48569a60e 100644 --- a/src/cron/service/timer.ts +++ b/src/cron/service/timer.ts @@ -1,8 +1,9 @@ import type { HeartbeatRunResult } from "../../infra/heartbeat-wake.js"; +import type { CronJob, CronRunOutcome, CronRunStatus, CronRunTelemetry } from "../types.js"; +import type { CronEvent, CronServiceState } from "./state.js"; import { DEFAULT_AGENT_ID } from "../../routing/session-key.js"; import { resolveCronDeliveryPlan } from "../delivery.js"; import { sweepCronRunSessions } from "../session-reaper.js"; -import type { CronJob } from "../types.js"; import { computeJobNextRunAtMs, nextWakeAtMs, @@ -10,7 +11,6 @@ import { resolveJobPayloadTextForMain, } from "./jobs.js"; import { locked } from "./locked.js"; -import type { CronEvent, CronServiceState } from "./state.js"; import { ensureLoaded, persist } from "./store.js"; const MAX_TIMER_DELAY_MS = 60_000; @@ -31,6 +31,13 @@ const MIN_REFIRE_GAP_MS = 2_000; */ const DEFAULT_JOB_TIMEOUT_MS = 10 * 60_000; // 10 minutes +type TimedCronRunOutcome = CronRunOutcome & + CronRunTelemetry & { + jobId: string; + startedAt: number; + endedAt: number; + }; + /** * Exponential backoff delays (in ms) indexed by consecutive error count. * After the last entry the delay stays constant. @@ -57,7 +64,7 @@ function applyJobResult( state: CronServiceState, job: CronJob, result: { - status: "ok" | "error" | "skipped"; + status: CronRunStatus; error?: string; startedAt: number; endedAt: number; @@ -229,25 +236,7 @@ export async function onTimer(state: CronServiceState) { })); }); - const results: Array<{ - jobId: string; - status: "ok" | "error" | "skipped"; - error?: string; - summary?: string; - sessionId?: string; - sessionKey?: string; - model?: string; - provider?: string; - usage?: { - input_tokens?: number; - output_tokens?: number; - total_tokens?: number; - cache_read_tokens?: number; - cache_write_tokens?: number; - }; - startedAt: number; - endedAt: number; - }> = []; + const results: TimedCronRunOutcome[] = []; for (const { id, job } of dueJobs) { const startedAt = state.deps.nowMs(); @@ -449,22 +438,7 @@ export async function runDueJobs(state: CronServiceState) { async function executeJobCore( state: CronServiceState, job: CronJob, -): Promise<{ - status: "ok" | "error" | "skipped"; - error?: string; - summary?: string; - sessionId?: string; - sessionKey?: string; - model?: string; - provider?: string; - usage?: { - input_tokens?: number; - output_tokens?: number; - total_tokens?: number; - cache_read_tokens?: number; - cache_write_tokens?: number; - }; -}> { +): Promise { if (job.sessionTarget === "main") { const text = resolveJobPayloadTextForMain(job); if (!text) { @@ -578,21 +552,9 @@ export async function executeJob( emit(state, { jobId: job.id, action: "started", runAtMs: startedAt }); let coreResult: { - status: "ok" | "error" | "skipped"; - error?: string; - summary?: string; - sessionId?: string; - sessionKey?: string; - model?: string; - provider?: string; - usage?: { - input_tokens?: number; - output_tokens?: number; - total_tokens?: number; - cache_read_tokens?: number; - cache_write_tokens?: number; - }; - }; + status: CronRunStatus; + } & CronRunOutcome & + CronRunTelemetry; try { coreResult = await executeJobCore(state, job); } catch (err) { @@ -619,21 +581,9 @@ function emitJobFinished( state: CronServiceState, job: CronJob, result: { - status: "ok" | "error" | "skipped"; - error?: string; - summary?: string; - sessionId?: string; - sessionKey?: string; - model?: string; - provider?: string; - usage?: { - input_tokens?: number; - output_tokens?: number; - total_tokens?: number; - cache_read_tokens?: number; - cache_write_tokens?: number; - }; - }, + status: CronRunStatus; + } & CronRunOutcome & + CronRunTelemetry, runAtMs: number, ) { emit(state, { diff --git a/src/cron/types.ts b/src/cron/types.ts index 6c7e7bec02..a7ee3ed413 100644 --- a/src/cron/types.ts +++ b/src/cron/types.ts @@ -21,6 +21,30 @@ export type CronDelivery = { export type CronDeliveryPatch = Partial; +export type CronRunStatus = "ok" | "error" | "skipped"; + +export type CronUsageSummary = { + input_tokens?: number; + output_tokens?: number; + total_tokens?: number; + cache_read_tokens?: number; + cache_write_tokens?: number; +}; + +export type CronRunTelemetry = { + model?: string; + provider?: string; + usage?: CronUsageSummary; +}; + +export type CronRunOutcome = { + status: CronRunStatus; + error?: string; + summary?: string; + sessionId?: string; + sessionKey?: string; +}; + export type CronPayload = | { kind: "systemEvent"; text: string } | {