diff --git a/extensions/msteams/src/file-lock.ts b/extensions/msteams/src/file-lock.ts new file mode 100644 index 0000000000..dd1a076355 --- /dev/null +++ b/extensions/msteams/src/file-lock.ts @@ -0,0 +1,189 @@ +import fs from "node:fs/promises"; +import path from "node:path"; + +type FileLockOptions = { + retries: { + retries: number; + factor: number; + minTimeout: number; + maxTimeout: number; + randomize?: boolean; + }; + stale: number; +}; + +type LockFilePayload = { + pid: number; + createdAt: string; +}; + +type HeldLock = { + count: number; + handle: fs.FileHandle; + lockPath: string; +}; + +const HELD_LOCKS_KEY = Symbol.for("openclaw.msteamsFileLockHeldLocks"); + +function resolveHeldLocks(): Map { + const proc = process as NodeJS.Process & { + [HELD_LOCKS_KEY]?: Map; + }; + if (!proc[HELD_LOCKS_KEY]) { + proc[HELD_LOCKS_KEY] = new Map(); + } + return proc[HELD_LOCKS_KEY]; +} + +const HELD_LOCKS = resolveHeldLocks(); + +function isAlive(pid: number): boolean { + if (!Number.isFinite(pid) || pid <= 0) { + return false; + } + try { + process.kill(pid, 0); + return true; + } catch { + return false; + } +} + +function computeDelayMs(retries: FileLockOptions["retries"], attempt: number): number { + const base = Math.min( + retries.maxTimeout, + Math.max(retries.minTimeout, retries.minTimeout * retries.factor ** attempt), + ); + const jitter = retries.randomize ? 1 + Math.random() : 1; + return Math.min(retries.maxTimeout, Math.round(base * jitter)); +} + +async function readLockPayload(lockPath: string): Promise { + try { + const raw = await fs.readFile(lockPath, "utf8"); + const parsed = JSON.parse(raw) as Partial; + if (typeof parsed.pid !== "number" || typeof parsed.createdAt !== "string") { + return null; + } + return { pid: parsed.pid, createdAt: parsed.createdAt }; + } catch { + return null; + } +} + +async function resolveNormalizedFilePath(filePath: string): Promise { + const resolved = path.resolve(filePath); + const dir = path.dirname(resolved); + await fs.mkdir(dir, { recursive: true }); + try { + const realDir = await fs.realpath(dir); + return path.join(realDir, path.basename(resolved)); + } catch { + return resolved; + } +} + +async function isStaleLock(lockPath: string, staleMs: number): Promise { + const payload = await readLockPayload(lockPath); + if (payload?.pid && !isAlive(payload.pid)) { + return true; + } + if (payload?.createdAt) { + const createdAt = Date.parse(payload.createdAt); + if (!Number.isFinite(createdAt) || Date.now() - createdAt > staleMs) { + return true; + } + } + try { + const stat = await fs.stat(lockPath); + return Date.now() - stat.mtimeMs > staleMs; + } catch { + return true; + } +} + +type FileLockHandle = { + release: () => Promise; +}; + +async function acquireFileLock( + filePath: string, + options: FileLockOptions, +): Promise { + const normalizedFile = await resolveNormalizedFilePath(filePath); + const lockPath = `${normalizedFile}.lock`; + const held = HELD_LOCKS.get(normalizedFile); + if (held) { + held.count += 1; + return { + release: async () => { + const current = HELD_LOCKS.get(normalizedFile); + if (!current) { + return; + } + current.count -= 1; + if (current.count > 0) { + return; + } + HELD_LOCKS.delete(normalizedFile); + await current.handle.close().catch(() => undefined); + await fs.rm(current.lockPath, { force: true }).catch(() => undefined); + }, + }; + } + + const attempts = Math.max(1, options.retries.retries + 1); + for (let attempt = 0; attempt < attempts; attempt += 1) { + try { + const handle = await fs.open(lockPath, "wx"); + await handle.writeFile( + JSON.stringify({ pid: process.pid, createdAt: new Date().toISOString() }, null, 2), + "utf8", + ); + HELD_LOCKS.set(normalizedFile, { count: 1, handle, lockPath }); + return { + release: async () => { + const current = HELD_LOCKS.get(normalizedFile); + if (!current) { + return; + } + current.count -= 1; + if (current.count > 0) { + return; + } + HELD_LOCKS.delete(normalizedFile); + await current.handle.close().catch(() => undefined); + await fs.rm(current.lockPath, { force: true }).catch(() => undefined); + }, + }; + } catch (err) { + const code = (err as { code?: string }).code; + if (code !== "EEXIST") { + throw err; + } + if (await isStaleLock(lockPath, options.stale)) { + await fs.rm(lockPath, { force: true }).catch(() => undefined); + continue; + } + if (attempt >= attempts - 1) { + break; + } + await new Promise((resolve) => setTimeout(resolve, computeDelayMs(options.retries, attempt))); + } + } + + throw new Error(`file lock timeout for ${normalizedFile}`); +} + +export async function withFileLock( + filePath: string, + options: FileLockOptions, + fn: () => Promise, +): Promise { + const lock = await acquireFileLock(filePath, options); + try { + return await fn(); + } finally { + await lock.release(); + } +} diff --git a/extensions/msteams/src/store-fs.ts b/extensions/msteams/src/store-fs.ts index 75ce75235b..c827a955f1 100644 --- a/extensions/msteams/src/store-fs.ts +++ b/extensions/msteams/src/store-fs.ts @@ -2,7 +2,7 @@ import crypto from "node:crypto"; import fs from "node:fs"; import path from "node:path"; import { safeParseJson } from "openclaw/plugin-sdk"; -import lockfile from "proper-lockfile"; +import { withFileLock as withPathLock } from "./file-lock.js"; const STORE_LOCK_OPTIONS = { retries: { @@ -60,17 +60,7 @@ export async function withFileLock( fn: () => Promise, ): Promise { await ensureJsonFile(filePath, fallback); - let release: (() => Promise) | undefined; - try { - release = await lockfile.lock(filePath, STORE_LOCK_OPTIONS); + return await withPathLock(filePath, STORE_LOCK_OPTIONS, async () => { return await fn(); - } finally { - if (release) { - try { - await release(); - } catch { - // ignore unlock errors - } - } - } + }); } diff --git a/src/agents/auth-profiles/oauth.ts b/src/agents/auth-profiles/oauth.ts index 4fff5a3012..a7ddc3c651 100644 --- a/src/agents/auth-profiles/oauth.ts +++ b/src/agents/auth-profiles/oauth.ts @@ -4,9 +4,9 @@ import { type OAuthCredentials, type OAuthProvider, } from "@mariozechner/pi-ai"; -import lockfile from "proper-lockfile"; import type { OpenClawConfig } from "../../config/config.js"; import type { AuthProfileStore } from "./types.js"; +import { withFileLock } from "../../infra/file-lock.js"; import { refreshQwenPortalCredentials } from "../../providers/qwen-portal-oauth.js"; import { refreshChutesTokens } from "../chutes-oauth.js"; import { AUTH_STORE_LOCK_OPTIONS, log } from "./constants.js"; @@ -40,12 +40,7 @@ async function refreshOAuthTokenWithLock(params: { const authPath = resolveAuthStorePath(params.agentDir); ensureAuthStoreFile(authPath); - let release: (() => Promise) | undefined; - try { - release = await lockfile.lock(authPath, { - ...AUTH_STORE_LOCK_OPTIONS, - }); - + return await withFileLock(authPath, AUTH_STORE_LOCK_OPTIONS, async () => { const store = ensureAuthProfileStore(params.agentDir); const cred = store.profiles[params.profileId]; if (!cred || cred.type !== "oauth") { @@ -94,15 +89,7 @@ async function refreshOAuthTokenWithLock(params: { saveAuthProfileStore(store, params.agentDir); return result; - } finally { - if (release) { - try { - await release(); - } catch { - // ignore unlock errors - } - } - } + }); } async function tryResolveOAuthProfile(params: { diff --git a/src/agents/auth-profiles/store.ts b/src/agents/auth-profiles/store.ts index 65c133384d..989d89d8ef 100644 --- a/src/agents/auth-profiles/store.ts +++ b/src/agents/auth-profiles/store.ts @@ -1,8 +1,8 @@ import type { OAuthCredentials } from "@mariozechner/pi-ai"; import fs from "node:fs"; -import lockfile from "proper-lockfile"; import type { AuthProfileCredential, AuthProfileStore, ProfileUsageStats } from "./types.js"; import { resolveOAuthPath } from "../../config/paths.js"; +import { withFileLock } from "../../infra/file-lock.js"; import { loadJsonFile, saveJsonFile } from "../../infra/json-file.js"; import { AUTH_STORE_LOCK_OPTIONS, AUTH_STORE_VERSION, log } from "./constants.js"; import { syncExternalCliCredentials } from "./external-cli-sync.js"; @@ -25,25 +25,17 @@ export async function updateAuthProfileStoreWithLock(params: { const authPath = resolveAuthStorePath(params.agentDir); ensureAuthStoreFile(authPath); - let release: (() => Promise) | undefined; try { - release = await lockfile.lock(authPath, AUTH_STORE_LOCK_OPTIONS); - const store = ensureAuthProfileStore(params.agentDir); - const shouldSave = params.updater(store); - if (shouldSave) { - saveAuthProfileStore(store, params.agentDir); - } - return store; + return await withFileLock(authPath, AUTH_STORE_LOCK_OPTIONS, async () => { + const store = ensureAuthProfileStore(params.agentDir); + const shouldSave = params.updater(store); + if (shouldSave) { + saveAuthProfileStore(store, params.agentDir); + } + return store; + }); } catch { return null; - } finally { - if (release) { - try { - await release(); - } catch { - // ignore unlock errors - } - } } } diff --git a/src/infra/file-lock.ts b/src/infra/file-lock.ts new file mode 100644 index 0000000000..b09af514e2 --- /dev/null +++ b/src/infra/file-lock.ts @@ -0,0 +1,192 @@ +import fs from "node:fs/promises"; +import path from "node:path"; + +export type FileLockOptions = { + retries: { + retries: number; + factor: number; + minTimeout: number; + maxTimeout: number; + randomize?: boolean; + }; + stale: number; +}; + +type LockFilePayload = { + pid: number; + createdAt: string; +}; + +type HeldLock = { + count: number; + handle: fs.FileHandle; + lockPath: string; +}; + +const HELD_LOCKS_KEY = Symbol.for("openclaw.fileLockHeldLocks"); + +function resolveHeldLocks(): Map { + const proc = process as NodeJS.Process & { + [HELD_LOCKS_KEY]?: Map; + }; + if (!proc[HELD_LOCKS_KEY]) { + proc[HELD_LOCKS_KEY] = new Map(); + } + return proc[HELD_LOCKS_KEY]; +} + +const HELD_LOCKS = resolveHeldLocks(); + +function isAlive(pid: number): boolean { + if (!Number.isFinite(pid) || pid <= 0) { + return false; + } + try { + process.kill(pid, 0); + return true; + } catch { + return false; + } +} + +function computeDelayMs(retries: FileLockOptions["retries"], attempt: number): number { + const base = Math.min( + retries.maxTimeout, + Math.max(retries.minTimeout, retries.minTimeout * retries.factor ** attempt), + ); + const jitter = retries.randomize ? 1 + Math.random() : 1; + return Math.min(retries.maxTimeout, Math.round(base * jitter)); +} + +async function readLockPayload(lockPath: string): Promise { + try { + const raw = await fs.readFile(lockPath, "utf8"); + const parsed = JSON.parse(raw) as Partial; + if (typeof parsed.pid !== "number" || typeof parsed.createdAt !== "string") { + return null; + } + return { pid: parsed.pid, createdAt: parsed.createdAt }; + } catch { + return null; + } +} + +async function resolveNormalizedFilePath(filePath: string): Promise { + const resolved = path.resolve(filePath); + const dir = path.dirname(resolved); + await fs.mkdir(dir, { recursive: true }); + try { + const realDir = await fs.realpath(dir); + return path.join(realDir, path.basename(resolved)); + } catch { + return resolved; + } +} + +async function isStaleLock(lockPath: string, staleMs: number): Promise { + const payload = await readLockPayload(lockPath); + if (payload?.pid && !isAlive(payload.pid)) { + return true; + } + if (payload?.createdAt) { + const createdAt = Date.parse(payload.createdAt); + if (!Number.isFinite(createdAt) || Date.now() - createdAt > staleMs) { + return true; + } + } + try { + const stat = await fs.stat(lockPath); + return Date.now() - stat.mtimeMs > staleMs; + } catch { + return true; + } +} + +type FileLockHandle = { + lockPath: string; + release: () => Promise; +}; + +export async function acquireFileLock( + filePath: string, + options: FileLockOptions, +): Promise { + const normalizedFile = await resolveNormalizedFilePath(filePath); + const lockPath = `${normalizedFile}.lock`; + const held = HELD_LOCKS.get(normalizedFile); + if (held) { + held.count += 1; + return { + lockPath, + release: async () => { + const current = HELD_LOCKS.get(normalizedFile); + if (!current) { + return; + } + current.count -= 1; + if (current.count > 0) { + return; + } + HELD_LOCKS.delete(normalizedFile); + await current.handle.close().catch(() => undefined); + await fs.rm(current.lockPath, { force: true }).catch(() => undefined); + }, + }; + } + + const attempts = Math.max(1, options.retries.retries + 1); + for (let attempt = 0; attempt < attempts; attempt += 1) { + try { + const handle = await fs.open(lockPath, "wx"); + await handle.writeFile( + JSON.stringify({ pid: process.pid, createdAt: new Date().toISOString() }, null, 2), + "utf8", + ); + HELD_LOCKS.set(normalizedFile, { count: 1, handle, lockPath }); + return { + lockPath, + release: async () => { + const current = HELD_LOCKS.get(normalizedFile); + if (!current) { + return; + } + current.count -= 1; + if (current.count > 0) { + return; + } + HELD_LOCKS.delete(normalizedFile); + await current.handle.close().catch(() => undefined); + await fs.rm(current.lockPath, { force: true }).catch(() => undefined); + }, + }; + } catch (err) { + const code = (err as { code?: string }).code; + if (code !== "EEXIST") { + throw err; + } + if (await isStaleLock(lockPath, options.stale)) { + await fs.rm(lockPath, { force: true }).catch(() => undefined); + continue; + } + if (attempt >= attempts - 1) { + break; + } + await new Promise((resolve) => setTimeout(resolve, computeDelayMs(options.retries, attempt))); + } + } + + throw new Error(`file lock timeout for ${normalizedFile}`); +} + +export async function withFileLock( + filePath: string, + options: FileLockOptions, + fn: () => Promise, +): Promise { + const lock = await acquireFileLock(filePath, options); + try { + return await fn(); + } finally { + await lock.release(); + } +} diff --git a/src/pairing/pairing-store.ts b/src/pairing/pairing-store.ts index b3f629d11d..69a1e8cab7 100644 --- a/src/pairing/pairing-store.ts +++ b/src/pairing/pairing-store.ts @@ -2,10 +2,10 @@ import crypto from "node:crypto"; import fs from "node:fs"; import os from "node:os"; import path from "node:path"; -import lockfile from "proper-lockfile"; import type { ChannelId, ChannelPairingAdapter } from "../channels/plugins/types.js"; import { getPairingAdapter } from "../channels/plugins/pairing.js"; import { resolveOAuthDir, resolveStateDir } from "../config/paths.js"; +import { withFileLock as withPathLock } from "../infra/file-lock.js"; import { resolveRequiredHomeDir } from "../infra/home-dir.js"; import { safeParseJson } from "../utils.js"; @@ -118,19 +118,9 @@ async function withFileLock( fn: () => Promise, ): Promise { await ensureJsonFile(filePath, fallback); - let release: (() => Promise) | undefined; - try { - release = await lockfile.lock(filePath, PAIRING_STORE_LOCK_OPTIONS); + return await withPathLock(filePath, PAIRING_STORE_LOCK_OPTIONS, async () => { return await fn(); - } finally { - if (release) { - try { - await release(); - } catch { - // ignore unlock errors - } - } - } + }); } function parseTimestamp(value: string | undefined): number | null {