mirror of
https://github.com/openclaw/openclaw.git
synced 2026-02-19 18:39:20 -05:00
refactor(cron): reuse shared run outcome telemetry types
This commit is contained in:
@@ -1,3 +1,7 @@
|
||||
import type { MessagingToolSend } from "../../agents/pi-embedded-messaging.js";
|
||||
import type { OpenClawConfig } from "../../config/config.js";
|
||||
import type { AgentDefaultsConfig } from "../../config/types.js";
|
||||
import type { CronJob, CronRunOutcome, CronRunTelemetry } from "../types.js";
|
||||
import {
|
||||
resolveAgentConfig,
|
||||
resolveAgentDir,
|
||||
@@ -20,7 +24,6 @@ import {
|
||||
resolveHooksGmailModel,
|
||||
resolveThinkingDefault,
|
||||
} from "../../agents/model-selection.js";
|
||||
import type { MessagingToolSend } from "../../agents/pi-embedded-messaging.js";
|
||||
import { runEmbeddedPiAgent } from "../../agents/pi-embedded.js";
|
||||
import { runSubagentAnnounceFlow } from "../../agents/subagent-announce.js";
|
||||
import { countActiveDescendantRuns } from "../../agents/subagent-registry.js";
|
||||
@@ -34,13 +37,11 @@ import {
|
||||
} from "../../auto-reply/thinking.js";
|
||||
import { SILENT_REPLY_TOKEN } from "../../auto-reply/tokens.js";
|
||||
import { createOutboundSendDeps, type CliDeps } from "../../cli/outbound-send-deps.js";
|
||||
import type { OpenClawConfig } from "../../config/config.js";
|
||||
import {
|
||||
resolveAgentMainSessionKey,
|
||||
resolveSessionTranscriptPath,
|
||||
updateSessionStore,
|
||||
} from "../../config/sessions.js";
|
||||
import type { AgentDefaultsConfig } from "../../config/types.js";
|
||||
import { registerAgentRunContext } from "../../infra/agent-events.js";
|
||||
import { deliverOutboundPayloads } from "../../infra/outbound/deliver.js";
|
||||
import { resolveAgentOutboundIdentity } from "../../infra/outbound/identity.js";
|
||||
@@ -53,7 +54,6 @@ import {
|
||||
isExternalHookSession,
|
||||
} from "../../security/external-content.js";
|
||||
import { resolveCronDeliveryPlan } from "../delivery.js";
|
||||
import type { CronJob } from "../types.js";
|
||||
import { resolveDeliveryTarget } from "./delivery-target.js";
|
||||
import {
|
||||
isHeartbeatOnlyResponse,
|
||||
@@ -101,13 +101,8 @@ function resolveCronDeliveryBestEffort(job: CronJob): boolean {
|
||||
}
|
||||
|
||||
export type RunCronAgentTurnResult = {
|
||||
status: "ok" | "error" | "skipped";
|
||||
summary?: string;
|
||||
/** Last non-empty agent text output (not truncated). */
|
||||
outputText?: string;
|
||||
error?: string;
|
||||
sessionId?: string;
|
||||
sessionKey?: string;
|
||||
/**
|
||||
* `true` when the isolated run already delivered its output to the target
|
||||
* channel (via outbound payloads, the subagent announce flow, or a matching
|
||||
@@ -116,18 +111,8 @@ export type RunCronAgentTurnResult = {
|
||||
* messages. See: https://github.com/openclaw/openclaw/issues/15692
|
||||
*/
|
||||
delivered?: boolean;
|
||||
|
||||
// Telemetry (best-effort)
|
||||
model?: string;
|
||||
provider?: string;
|
||||
usage?: {
|
||||
input_tokens?: number;
|
||||
output_tokens?: number;
|
||||
total_tokens?: number;
|
||||
cache_read_tokens?: number;
|
||||
cache_write_tokens?: number;
|
||||
};
|
||||
};
|
||||
} & CronRunOutcome &
|
||||
CronRunTelemetry;
|
||||
|
||||
export async function runCronIsolatedAgentTurn(params: {
|
||||
cfg: OpenClawConfig;
|
||||
@@ -486,19 +471,7 @@ export async function runCronIsolatedAgentTurn(params: {
|
||||
|
||||
// Update token+model fields in the session store.
|
||||
// Also collect best-effort telemetry for the cron run log.
|
||||
let telemetry:
|
||||
| {
|
||||
model?: string;
|
||||
provider?: string;
|
||||
usage?: {
|
||||
input_tokens?: number;
|
||||
output_tokens?: number;
|
||||
total_tokens?: number;
|
||||
cache_read_tokens?: number;
|
||||
cache_write_tokens?: number;
|
||||
};
|
||||
}
|
||||
| undefined;
|
||||
let telemetry: CronRunTelemetry | undefined;
|
||||
{
|
||||
const usage = runResult.meta?.agentMeta?.usage;
|
||||
const promptTokens = runResult.meta?.agentMeta?.promptTokens;
|
||||
|
||||
@@ -1,11 +1,12 @@
|
||||
import fs from "node:fs/promises";
|
||||
import path from "node:path";
|
||||
import type { CronRunStatus, CronRunTelemetry } from "./types.js";
|
||||
|
||||
export type CronRunLogEntry = {
|
||||
ts: number;
|
||||
jobId: string;
|
||||
action: "finished";
|
||||
status?: "ok" | "error" | "skipped";
|
||||
status?: CronRunStatus;
|
||||
error?: string;
|
||||
summary?: string;
|
||||
sessionId?: string;
|
||||
@@ -13,18 +14,7 @@ export type CronRunLogEntry = {
|
||||
runAtMs?: number;
|
||||
durationMs?: number;
|
||||
nextRunAtMs?: number;
|
||||
|
||||
// Telemetry (best-effort)
|
||||
model?: string;
|
||||
provider?: string;
|
||||
usage?: {
|
||||
input_tokens?: number;
|
||||
output_tokens?: number;
|
||||
total_tokens?: number;
|
||||
cache_read_tokens?: number;
|
||||
cache_write_tokens?: number;
|
||||
};
|
||||
};
|
||||
} & CronRunTelemetry;
|
||||
|
||||
export function resolveCronRunLogPath(params: { storePath: string; jobId: string }) {
|
||||
const storePath = path.resolve(params.storePath);
|
||||
|
||||
@@ -1,30 +1,27 @@
|
||||
import type { CronConfig } from "../../config/types.cron.js";
|
||||
import type { HeartbeatRunResult } from "../../infra/heartbeat-wake.js";
|
||||
import type { CronJob, CronJobCreate, CronJobPatch, CronStoreFile } from "../types.js";
|
||||
import type {
|
||||
CronJob,
|
||||
CronJobCreate,
|
||||
CronJobPatch,
|
||||
CronRunOutcome,
|
||||
CronRunStatus,
|
||||
CronRunTelemetry,
|
||||
CronStoreFile,
|
||||
} from "../types.js";
|
||||
|
||||
export type CronEvent = {
|
||||
jobId: string;
|
||||
action: "added" | "updated" | "removed" | "started" | "finished";
|
||||
runAtMs?: number;
|
||||
durationMs?: number;
|
||||
status?: "ok" | "error" | "skipped";
|
||||
status?: CronRunStatus;
|
||||
error?: string;
|
||||
summary?: string;
|
||||
sessionId?: string;
|
||||
sessionKey?: string;
|
||||
nextRunAtMs?: number;
|
||||
|
||||
// Telemetry (best-effort)
|
||||
model?: string;
|
||||
provider?: string;
|
||||
usage?: {
|
||||
input_tokens?: number;
|
||||
output_tokens?: number;
|
||||
total_tokens?: number;
|
||||
cache_read_tokens?: number;
|
||||
cache_write_tokens?: number;
|
||||
};
|
||||
};
|
||||
} & CronRunTelemetry;
|
||||
|
||||
export type Logger = {
|
||||
debug: (obj: unknown, msg?: string) => void;
|
||||
@@ -57,32 +54,20 @@ export type CronServiceDeps = {
|
||||
wakeNowHeartbeatBusyMaxWaitMs?: number;
|
||||
/** WakeMode=now: delay between runHeartbeatOnce retries while busy. */
|
||||
wakeNowHeartbeatBusyRetryDelayMs?: number;
|
||||
runIsolatedAgentJob: (params: { job: CronJob; message: string }) => Promise<{
|
||||
status: "ok" | "error" | "skipped";
|
||||
summary?: string;
|
||||
/** Last non-empty agent text output (not truncated). */
|
||||
outputText?: string;
|
||||
error?: string;
|
||||
sessionId?: string;
|
||||
sessionKey?: string;
|
||||
/**
|
||||
* `true` when the isolated run already delivered its output to the target
|
||||
* channel (including matching messaging-tool sends). See:
|
||||
* https://github.com/openclaw/openclaw/issues/15692
|
||||
*/
|
||||
delivered?: boolean;
|
||||
|
||||
// Telemetry (best-effort)
|
||||
model?: string;
|
||||
provider?: string;
|
||||
usage?: {
|
||||
input_tokens?: number;
|
||||
output_tokens?: number;
|
||||
total_tokens?: number;
|
||||
cache_read_tokens?: number;
|
||||
cache_write_tokens?: number;
|
||||
};
|
||||
}>;
|
||||
runIsolatedAgentJob: (params: { job: CronJob; message: string }) => Promise<
|
||||
{
|
||||
summary?: string;
|
||||
/** Last non-empty agent text output (not truncated). */
|
||||
outputText?: string;
|
||||
/**
|
||||
* `true` when the isolated run already delivered its output to the target
|
||||
* channel (including matching messaging-tool sends). See:
|
||||
* https://github.com/openclaw/openclaw/issues/15692
|
||||
*/
|
||||
delivered?: boolean;
|
||||
} & CronRunOutcome &
|
||||
CronRunTelemetry
|
||||
>;
|
||||
onEvent?: (evt: CronEvent) => void;
|
||||
};
|
||||
|
||||
|
||||
@@ -1,8 +1,9 @@
|
||||
import type { HeartbeatRunResult } from "../../infra/heartbeat-wake.js";
|
||||
import type { CronJob, CronRunOutcome, CronRunStatus, CronRunTelemetry } from "../types.js";
|
||||
import type { CronEvent, CronServiceState } from "./state.js";
|
||||
import { DEFAULT_AGENT_ID } from "../../routing/session-key.js";
|
||||
import { resolveCronDeliveryPlan } from "../delivery.js";
|
||||
import { sweepCronRunSessions } from "../session-reaper.js";
|
||||
import type { CronJob } from "../types.js";
|
||||
import {
|
||||
computeJobNextRunAtMs,
|
||||
nextWakeAtMs,
|
||||
@@ -10,7 +11,6 @@ import {
|
||||
resolveJobPayloadTextForMain,
|
||||
} from "./jobs.js";
|
||||
import { locked } from "./locked.js";
|
||||
import type { CronEvent, CronServiceState } from "./state.js";
|
||||
import { ensureLoaded, persist } from "./store.js";
|
||||
|
||||
const MAX_TIMER_DELAY_MS = 60_000;
|
||||
@@ -31,6 +31,13 @@ const MIN_REFIRE_GAP_MS = 2_000;
|
||||
*/
|
||||
const DEFAULT_JOB_TIMEOUT_MS = 10 * 60_000; // 10 minutes
|
||||
|
||||
type TimedCronRunOutcome = CronRunOutcome &
|
||||
CronRunTelemetry & {
|
||||
jobId: string;
|
||||
startedAt: number;
|
||||
endedAt: number;
|
||||
};
|
||||
|
||||
/**
|
||||
* Exponential backoff delays (in ms) indexed by consecutive error count.
|
||||
* After the last entry the delay stays constant.
|
||||
@@ -57,7 +64,7 @@ function applyJobResult(
|
||||
state: CronServiceState,
|
||||
job: CronJob,
|
||||
result: {
|
||||
status: "ok" | "error" | "skipped";
|
||||
status: CronRunStatus;
|
||||
error?: string;
|
||||
startedAt: number;
|
||||
endedAt: number;
|
||||
@@ -229,25 +236,7 @@ export async function onTimer(state: CronServiceState) {
|
||||
}));
|
||||
});
|
||||
|
||||
const results: Array<{
|
||||
jobId: string;
|
||||
status: "ok" | "error" | "skipped";
|
||||
error?: string;
|
||||
summary?: string;
|
||||
sessionId?: string;
|
||||
sessionKey?: string;
|
||||
model?: string;
|
||||
provider?: string;
|
||||
usage?: {
|
||||
input_tokens?: number;
|
||||
output_tokens?: number;
|
||||
total_tokens?: number;
|
||||
cache_read_tokens?: number;
|
||||
cache_write_tokens?: number;
|
||||
};
|
||||
startedAt: number;
|
||||
endedAt: number;
|
||||
}> = [];
|
||||
const results: TimedCronRunOutcome[] = [];
|
||||
|
||||
for (const { id, job } of dueJobs) {
|
||||
const startedAt = state.deps.nowMs();
|
||||
@@ -449,22 +438,7 @@ export async function runDueJobs(state: CronServiceState) {
|
||||
async function executeJobCore(
|
||||
state: CronServiceState,
|
||||
job: CronJob,
|
||||
): Promise<{
|
||||
status: "ok" | "error" | "skipped";
|
||||
error?: string;
|
||||
summary?: string;
|
||||
sessionId?: string;
|
||||
sessionKey?: string;
|
||||
model?: string;
|
||||
provider?: string;
|
||||
usage?: {
|
||||
input_tokens?: number;
|
||||
output_tokens?: number;
|
||||
total_tokens?: number;
|
||||
cache_read_tokens?: number;
|
||||
cache_write_tokens?: number;
|
||||
};
|
||||
}> {
|
||||
): Promise<CronRunOutcome & CronRunTelemetry> {
|
||||
if (job.sessionTarget === "main") {
|
||||
const text = resolveJobPayloadTextForMain(job);
|
||||
if (!text) {
|
||||
@@ -578,21 +552,9 @@ export async function executeJob(
|
||||
emit(state, { jobId: job.id, action: "started", runAtMs: startedAt });
|
||||
|
||||
let coreResult: {
|
||||
status: "ok" | "error" | "skipped";
|
||||
error?: string;
|
||||
summary?: string;
|
||||
sessionId?: string;
|
||||
sessionKey?: string;
|
||||
model?: string;
|
||||
provider?: string;
|
||||
usage?: {
|
||||
input_tokens?: number;
|
||||
output_tokens?: number;
|
||||
total_tokens?: number;
|
||||
cache_read_tokens?: number;
|
||||
cache_write_tokens?: number;
|
||||
};
|
||||
};
|
||||
status: CronRunStatus;
|
||||
} & CronRunOutcome &
|
||||
CronRunTelemetry;
|
||||
try {
|
||||
coreResult = await executeJobCore(state, job);
|
||||
} catch (err) {
|
||||
@@ -619,21 +581,9 @@ function emitJobFinished(
|
||||
state: CronServiceState,
|
||||
job: CronJob,
|
||||
result: {
|
||||
status: "ok" | "error" | "skipped";
|
||||
error?: string;
|
||||
summary?: string;
|
||||
sessionId?: string;
|
||||
sessionKey?: string;
|
||||
model?: string;
|
||||
provider?: string;
|
||||
usage?: {
|
||||
input_tokens?: number;
|
||||
output_tokens?: number;
|
||||
total_tokens?: number;
|
||||
cache_read_tokens?: number;
|
||||
cache_write_tokens?: number;
|
||||
};
|
||||
},
|
||||
status: CronRunStatus;
|
||||
} & CronRunOutcome &
|
||||
CronRunTelemetry,
|
||||
runAtMs: number,
|
||||
) {
|
||||
emit(state, {
|
||||
|
||||
@@ -21,6 +21,30 @@ export type CronDelivery = {
|
||||
|
||||
export type CronDeliveryPatch = Partial<CronDelivery>;
|
||||
|
||||
export type CronRunStatus = "ok" | "error" | "skipped";
|
||||
|
||||
export type CronUsageSummary = {
|
||||
input_tokens?: number;
|
||||
output_tokens?: number;
|
||||
total_tokens?: number;
|
||||
cache_read_tokens?: number;
|
||||
cache_write_tokens?: number;
|
||||
};
|
||||
|
||||
export type CronRunTelemetry = {
|
||||
model?: string;
|
||||
provider?: string;
|
||||
usage?: CronUsageSummary;
|
||||
};
|
||||
|
||||
export type CronRunOutcome = {
|
||||
status: CronRunStatus;
|
||||
error?: string;
|
||||
summary?: string;
|
||||
sessionId?: string;
|
||||
sessionKey?: string;
|
||||
};
|
||||
|
||||
export type CronPayload =
|
||||
| { kind: "systemEvent"; text: string }
|
||||
| {
|
||||
|
||||
Reference in New Issue
Block a user