diff --git a/extensions/msteams/package.json b/extensions/msteams/package.json index a16ddc6dbc..370ad19da4 100644 --- a/extensions/msteams/package.json +++ b/extensions/msteams/package.json @@ -7,8 +7,7 @@ "@microsoft/agents-hosting": "^1.2.3", "@microsoft/agents-hosting-express": "^1.2.3", "@microsoft/agents-hosting-extensions-teams": "^1.2.3", - "express": "^5.2.1", - "proper-lockfile": "^4.1.2" + "express": "^5.2.1" }, "devDependencies": { "openclaw": "workspace:*" diff --git a/extensions/msteams/src/file-lock.ts b/extensions/msteams/src/file-lock.ts index dd1a076355..02bf9aa5b4 100644 --- a/extensions/msteams/src/file-lock.ts +++ b/extensions/msteams/src/file-lock.ts @@ -1,189 +1 @@ -import fs from "node:fs/promises"; -import path from "node:path"; - -type FileLockOptions = { - retries: { - retries: number; - factor: number; - minTimeout: number; - maxTimeout: number; - randomize?: boolean; - }; - stale: number; -}; - -type LockFilePayload = { - pid: number; - createdAt: string; -}; - -type HeldLock = { - count: number; - handle: fs.FileHandle; - lockPath: string; -}; - -const HELD_LOCKS_KEY = Symbol.for("openclaw.msteamsFileLockHeldLocks"); - -function resolveHeldLocks(): Map { - const proc = process as NodeJS.Process & { - [HELD_LOCKS_KEY]?: Map; - }; - if (!proc[HELD_LOCKS_KEY]) { - proc[HELD_LOCKS_KEY] = new Map(); - } - return proc[HELD_LOCKS_KEY]; -} - -const HELD_LOCKS = resolveHeldLocks(); - -function isAlive(pid: number): boolean { - if (!Number.isFinite(pid) || pid <= 0) { - return false; - } - try { - process.kill(pid, 0); - return true; - } catch { - return false; - } -} - -function computeDelayMs(retries: FileLockOptions["retries"], attempt: number): number { - const base = Math.min( - retries.maxTimeout, - Math.max(retries.minTimeout, retries.minTimeout * retries.factor ** attempt), - ); - const jitter = retries.randomize ? 1 + Math.random() : 1; - return Math.min(retries.maxTimeout, Math.round(base * jitter)); -} - -async function readLockPayload(lockPath: string): Promise { - try { - const raw = await fs.readFile(lockPath, "utf8"); - const parsed = JSON.parse(raw) as Partial; - if (typeof parsed.pid !== "number" || typeof parsed.createdAt !== "string") { - return null; - } - return { pid: parsed.pid, createdAt: parsed.createdAt }; - } catch { - return null; - } -} - -async function resolveNormalizedFilePath(filePath: string): Promise { - const resolved = path.resolve(filePath); - const dir = path.dirname(resolved); - await fs.mkdir(dir, { recursive: true }); - try { - const realDir = await fs.realpath(dir); - return path.join(realDir, path.basename(resolved)); - } catch { - return resolved; - } -} - -async function isStaleLock(lockPath: string, staleMs: number): Promise { - const payload = await readLockPayload(lockPath); - if (payload?.pid && !isAlive(payload.pid)) { - return true; - } - if (payload?.createdAt) { - const createdAt = Date.parse(payload.createdAt); - if (!Number.isFinite(createdAt) || Date.now() - createdAt > staleMs) { - return true; - } - } - try { - const stat = await fs.stat(lockPath); - return Date.now() - stat.mtimeMs > staleMs; - } catch { - return true; - } -} - -type FileLockHandle = { - release: () => Promise; -}; - -async function acquireFileLock( - filePath: string, - options: FileLockOptions, -): Promise { - const normalizedFile = await resolveNormalizedFilePath(filePath); - const lockPath = `${normalizedFile}.lock`; - const held = HELD_LOCKS.get(normalizedFile); - if (held) { - held.count += 1; - return { - release: async () => { - const current = HELD_LOCKS.get(normalizedFile); - if (!current) { - return; - } - current.count -= 1; - if (current.count > 0) { - return; - } - HELD_LOCKS.delete(normalizedFile); - await current.handle.close().catch(() => undefined); - await fs.rm(current.lockPath, { force: true }).catch(() => undefined); - }, - }; - } - - const attempts = Math.max(1, options.retries.retries + 1); - for (let attempt = 0; attempt < attempts; attempt += 1) { - try { - const handle = await fs.open(lockPath, "wx"); - await handle.writeFile( - JSON.stringify({ pid: process.pid, createdAt: new Date().toISOString() }, null, 2), - "utf8", - ); - HELD_LOCKS.set(normalizedFile, { count: 1, handle, lockPath }); - return { - release: async () => { - const current = HELD_LOCKS.get(normalizedFile); - if (!current) { - return; - } - current.count -= 1; - if (current.count > 0) { - return; - } - HELD_LOCKS.delete(normalizedFile); - await current.handle.close().catch(() => undefined); - await fs.rm(current.lockPath, { force: true }).catch(() => undefined); - }, - }; - } catch (err) { - const code = (err as { code?: string }).code; - if (code !== "EEXIST") { - throw err; - } - if (await isStaleLock(lockPath, options.stale)) { - await fs.rm(lockPath, { force: true }).catch(() => undefined); - continue; - } - if (attempt >= attempts - 1) { - break; - } - await new Promise((resolve) => setTimeout(resolve, computeDelayMs(options.retries, attempt))); - } - } - - throw new Error(`file lock timeout for ${normalizedFile}`); -} - -export async function withFileLock( - filePath: string, - options: FileLockOptions, - fn: () => Promise, -): Promise { - const lock = await acquireFileLock(filePath, options); - try { - return await fn(); - } finally { - await lock.release(); - } -} +export { withFileLock } from "openclaw/plugin-sdk"; diff --git a/src/infra/file-lock.ts b/src/infra/file-lock.ts index b09af514e2..44e6bc0715 100644 --- a/src/infra/file-lock.ts +++ b/src/infra/file-lock.ts @@ -1,192 +1,2 @@ -import fs from "node:fs/promises"; -import path from "node:path"; - -export type FileLockOptions = { - retries: { - retries: number; - factor: number; - minTimeout: number; - maxTimeout: number; - randomize?: boolean; - }; - stale: number; -}; - -type LockFilePayload = { - pid: number; - createdAt: string; -}; - -type HeldLock = { - count: number; - handle: fs.FileHandle; - lockPath: string; -}; - -const HELD_LOCKS_KEY = Symbol.for("openclaw.fileLockHeldLocks"); - -function resolveHeldLocks(): Map { - const proc = process as NodeJS.Process & { - [HELD_LOCKS_KEY]?: Map; - }; - if (!proc[HELD_LOCKS_KEY]) { - proc[HELD_LOCKS_KEY] = new Map(); - } - return proc[HELD_LOCKS_KEY]; -} - -const HELD_LOCKS = resolveHeldLocks(); - -function isAlive(pid: number): boolean { - if (!Number.isFinite(pid) || pid <= 0) { - return false; - } - try { - process.kill(pid, 0); - return true; - } catch { - return false; - } -} - -function computeDelayMs(retries: FileLockOptions["retries"], attempt: number): number { - const base = Math.min( - retries.maxTimeout, - Math.max(retries.minTimeout, retries.minTimeout * retries.factor ** attempt), - ); - const jitter = retries.randomize ? 1 + Math.random() : 1; - return Math.min(retries.maxTimeout, Math.round(base * jitter)); -} - -async function readLockPayload(lockPath: string): Promise { - try { - const raw = await fs.readFile(lockPath, "utf8"); - const parsed = JSON.parse(raw) as Partial; - if (typeof parsed.pid !== "number" || typeof parsed.createdAt !== "string") { - return null; - } - return { pid: parsed.pid, createdAt: parsed.createdAt }; - } catch { - return null; - } -} - -async function resolveNormalizedFilePath(filePath: string): Promise { - const resolved = path.resolve(filePath); - const dir = path.dirname(resolved); - await fs.mkdir(dir, { recursive: true }); - try { - const realDir = await fs.realpath(dir); - return path.join(realDir, path.basename(resolved)); - } catch { - return resolved; - } -} - -async function isStaleLock(lockPath: string, staleMs: number): Promise { - const payload = await readLockPayload(lockPath); - if (payload?.pid && !isAlive(payload.pid)) { - return true; - } - if (payload?.createdAt) { - const createdAt = Date.parse(payload.createdAt); - if (!Number.isFinite(createdAt) || Date.now() - createdAt > staleMs) { - return true; - } - } - try { - const stat = await fs.stat(lockPath); - return Date.now() - stat.mtimeMs > staleMs; - } catch { - return true; - } -} - -type FileLockHandle = { - lockPath: string; - release: () => Promise; -}; - -export async function acquireFileLock( - filePath: string, - options: FileLockOptions, -): Promise { - const normalizedFile = await resolveNormalizedFilePath(filePath); - const lockPath = `${normalizedFile}.lock`; - const held = HELD_LOCKS.get(normalizedFile); - if (held) { - held.count += 1; - return { - lockPath, - release: async () => { - const current = HELD_LOCKS.get(normalizedFile); - if (!current) { - return; - } - current.count -= 1; - if (current.count > 0) { - return; - } - HELD_LOCKS.delete(normalizedFile); - await current.handle.close().catch(() => undefined); - await fs.rm(current.lockPath, { force: true }).catch(() => undefined); - }, - }; - } - - const attempts = Math.max(1, options.retries.retries + 1); - for (let attempt = 0; attempt < attempts; attempt += 1) { - try { - const handle = await fs.open(lockPath, "wx"); - await handle.writeFile( - JSON.stringify({ pid: process.pid, createdAt: new Date().toISOString() }, null, 2), - "utf8", - ); - HELD_LOCKS.set(normalizedFile, { count: 1, handle, lockPath }); - return { - lockPath, - release: async () => { - const current = HELD_LOCKS.get(normalizedFile); - if (!current) { - return; - } - current.count -= 1; - if (current.count > 0) { - return; - } - HELD_LOCKS.delete(normalizedFile); - await current.handle.close().catch(() => undefined); - await fs.rm(current.lockPath, { force: true }).catch(() => undefined); - }, - }; - } catch (err) { - const code = (err as { code?: string }).code; - if (code !== "EEXIST") { - throw err; - } - if (await isStaleLock(lockPath, options.stale)) { - await fs.rm(lockPath, { force: true }).catch(() => undefined); - continue; - } - if (attempt >= attempts - 1) { - break; - } - await new Promise((resolve) => setTimeout(resolve, computeDelayMs(options.retries, attempt))); - } - } - - throw new Error(`file lock timeout for ${normalizedFile}`); -} - -export async function withFileLock( - filePath: string, - options: FileLockOptions, - fn: () => Promise, -): Promise { - const lock = await acquireFileLock(filePath, options); - try { - return await fn(); - } finally { - await lock.release(); - } -} +export type { FileLockHandle, FileLockOptions } from "../plugin-sdk/file-lock.js"; +export { acquireFileLock, withFileLock } from "../plugin-sdk/file-lock.js"; diff --git a/src/plugin-sdk/file-lock.ts b/src/plugin-sdk/file-lock.ts new file mode 100644 index 0000000000..58f17e72db --- /dev/null +++ b/src/plugin-sdk/file-lock.ts @@ -0,0 +1,192 @@ +import fs from "node:fs/promises"; +import path from "node:path"; + +export type FileLockOptions = { + retries: { + retries: number; + factor: number; + minTimeout: number; + maxTimeout: number; + randomize?: boolean; + }; + stale: number; +}; + +type LockFilePayload = { + pid: number; + createdAt: string; +}; + +type HeldLock = { + count: number; + handle: fs.FileHandle; + lockPath: string; +}; + +const HELD_LOCKS_KEY = Symbol.for("openclaw.fileLockHeldLocks"); + +function resolveHeldLocks(): Map { + const proc = process as NodeJS.Process & { + [HELD_LOCKS_KEY]?: Map; + }; + if (!proc[HELD_LOCKS_KEY]) { + proc[HELD_LOCKS_KEY] = new Map(); + } + return proc[HELD_LOCKS_KEY]; +} + +const HELD_LOCKS = resolveHeldLocks(); + +function isAlive(pid: number): boolean { + if (!Number.isFinite(pid) || pid <= 0) { + return false; + } + try { + process.kill(pid, 0); + return true; + } catch { + return false; + } +} + +function computeDelayMs(retries: FileLockOptions["retries"], attempt: number): number { + const base = Math.min( + retries.maxTimeout, + Math.max(retries.minTimeout, retries.minTimeout * retries.factor ** attempt), + ); + const jitter = retries.randomize ? 1 + Math.random() : 1; + return Math.min(retries.maxTimeout, Math.round(base * jitter)); +} + +async function readLockPayload(lockPath: string): Promise { + try { + const raw = await fs.readFile(lockPath, "utf8"); + const parsed = JSON.parse(raw) as Partial; + if (typeof parsed.pid !== "number" || typeof parsed.createdAt !== "string") { + return null; + } + return { pid: parsed.pid, createdAt: parsed.createdAt }; + } catch { + return null; + } +} + +async function resolveNormalizedFilePath(filePath: string): Promise { + const resolved = path.resolve(filePath); + const dir = path.dirname(resolved); + await fs.mkdir(dir, { recursive: true }); + try { + const realDir = await fs.realpath(dir); + return path.join(realDir, path.basename(resolved)); + } catch { + return resolved; + } +} + +async function isStaleLock(lockPath: string, staleMs: number): Promise { + const payload = await readLockPayload(lockPath); + if (payload?.pid && !isAlive(payload.pid)) { + return true; + } + if (payload?.createdAt) { + const createdAt = Date.parse(payload.createdAt); + if (!Number.isFinite(createdAt) || Date.now() - createdAt > staleMs) { + return true; + } + } + try { + const stat = await fs.stat(lockPath); + return Date.now() - stat.mtimeMs > staleMs; + } catch { + return true; + } +} + +export type FileLockHandle = { + lockPath: string; + release: () => Promise; +}; + +export async function acquireFileLock( + filePath: string, + options: FileLockOptions, +): Promise { + const normalizedFile = await resolveNormalizedFilePath(filePath); + const lockPath = `${normalizedFile}.lock`; + const held = HELD_LOCKS.get(normalizedFile); + if (held) { + held.count += 1; + return { + lockPath, + release: async () => { + const current = HELD_LOCKS.get(normalizedFile); + if (!current) { + return; + } + current.count -= 1; + if (current.count > 0) { + return; + } + HELD_LOCKS.delete(normalizedFile); + await current.handle.close().catch(() => undefined); + await fs.rm(current.lockPath, { force: true }).catch(() => undefined); + }, + }; + } + + const attempts = Math.max(1, options.retries.retries + 1); + for (let attempt = 0; attempt < attempts; attempt += 1) { + try { + const handle = await fs.open(lockPath, "wx"); + await handle.writeFile( + JSON.stringify({ pid: process.pid, createdAt: new Date().toISOString() }, null, 2), + "utf8", + ); + HELD_LOCKS.set(normalizedFile, { count: 1, handle, lockPath }); + return { + lockPath, + release: async () => { + const current = HELD_LOCKS.get(normalizedFile); + if (!current) { + return; + } + current.count -= 1; + if (current.count > 0) { + return; + } + HELD_LOCKS.delete(normalizedFile); + await current.handle.close().catch(() => undefined); + await fs.rm(current.lockPath, { force: true }).catch(() => undefined); + }, + }; + } catch (err) { + const code = (err as { code?: string }).code; + if (code !== "EEXIST") { + throw err; + } + if (await isStaleLock(lockPath, options.stale)) { + await fs.rm(lockPath, { force: true }).catch(() => undefined); + continue; + } + if (attempt >= attempts - 1) { + break; + } + await new Promise((resolve) => setTimeout(resolve, computeDelayMs(options.retries, attempt))); + } + } + + throw new Error(`file lock timeout for ${normalizedFile}`); +} + +export async function withFileLock( + filePath: string, + options: FileLockOptions, + fn: () => Promise, +): Promise { + const lock = await acquireFileLock(filePath, options); + try { + return await fn(); + } finally { + await lock.release(); + } +} diff --git a/src/plugin-sdk/index.ts b/src/plugin-sdk/index.ts index 853b0c0837..709ba3fb4c 100644 --- a/src/plugin-sdk/index.ts +++ b/src/plugin-sdk/index.ts @@ -78,6 +78,9 @@ export { emptyPluginConfigSchema } from "../plugins/config-schema.js"; export type { OpenClawConfig } from "../config/config.js"; /** @deprecated Use OpenClawConfig instead */ export type { OpenClawConfig as ClawdbotConfig } from "../config/config.js"; + +export type { FileLockHandle, FileLockOptions } from "./file-lock.js"; +export { acquireFileLock, withFileLock } from "./file-lock.js"; export type { ChannelDock } from "../channels/dock.js"; export { getChatChannelMeta } from "../channels/registry.js"; export type {