mirror of
https://github.com/openclaw/openclaw.git
synced 2026-02-19 18:39:20 -05:00
refactor: share file lock via plugin-sdk
This commit is contained in:
@@ -7,8 +7,7 @@
|
||||
"@microsoft/agents-hosting": "^1.2.3",
|
||||
"@microsoft/agents-hosting-express": "^1.2.3",
|
||||
"@microsoft/agents-hosting-extensions-teams": "^1.2.3",
|
||||
"express": "^5.2.1",
|
||||
"proper-lockfile": "^4.1.2"
|
||||
"express": "^5.2.1"
|
||||
},
|
||||
"devDependencies": {
|
||||
"openclaw": "workspace:*"
|
||||
|
||||
@@ -1,189 +1 @@
|
||||
import fs from "node:fs/promises";
|
||||
import path from "node:path";
|
||||
|
||||
type FileLockOptions = {
|
||||
retries: {
|
||||
retries: number;
|
||||
factor: number;
|
||||
minTimeout: number;
|
||||
maxTimeout: number;
|
||||
randomize?: boolean;
|
||||
};
|
||||
stale: number;
|
||||
};
|
||||
|
||||
type LockFilePayload = {
|
||||
pid: number;
|
||||
createdAt: string;
|
||||
};
|
||||
|
||||
type HeldLock = {
|
||||
count: number;
|
||||
handle: fs.FileHandle;
|
||||
lockPath: string;
|
||||
};
|
||||
|
||||
const HELD_LOCKS_KEY = Symbol.for("openclaw.msteamsFileLockHeldLocks");
|
||||
|
||||
function resolveHeldLocks(): Map<string, HeldLock> {
|
||||
const proc = process as NodeJS.Process & {
|
||||
[HELD_LOCKS_KEY]?: Map<string, HeldLock>;
|
||||
};
|
||||
if (!proc[HELD_LOCKS_KEY]) {
|
||||
proc[HELD_LOCKS_KEY] = new Map<string, HeldLock>();
|
||||
}
|
||||
return proc[HELD_LOCKS_KEY];
|
||||
}
|
||||
|
||||
const HELD_LOCKS = resolveHeldLocks();
|
||||
|
||||
function isAlive(pid: number): boolean {
|
||||
if (!Number.isFinite(pid) || pid <= 0) {
|
||||
return false;
|
||||
}
|
||||
try {
|
||||
process.kill(pid, 0);
|
||||
return true;
|
||||
} catch {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
function computeDelayMs(retries: FileLockOptions["retries"], attempt: number): number {
|
||||
const base = Math.min(
|
||||
retries.maxTimeout,
|
||||
Math.max(retries.minTimeout, retries.minTimeout * retries.factor ** attempt),
|
||||
);
|
||||
const jitter = retries.randomize ? 1 + Math.random() : 1;
|
||||
return Math.min(retries.maxTimeout, Math.round(base * jitter));
|
||||
}
|
||||
|
||||
async function readLockPayload(lockPath: string): Promise<LockFilePayload | null> {
|
||||
try {
|
||||
const raw = await fs.readFile(lockPath, "utf8");
|
||||
const parsed = JSON.parse(raw) as Partial<LockFilePayload>;
|
||||
if (typeof parsed.pid !== "number" || typeof parsed.createdAt !== "string") {
|
||||
return null;
|
||||
}
|
||||
return { pid: parsed.pid, createdAt: parsed.createdAt };
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
async function resolveNormalizedFilePath(filePath: string): Promise<string> {
|
||||
const resolved = path.resolve(filePath);
|
||||
const dir = path.dirname(resolved);
|
||||
await fs.mkdir(dir, { recursive: true });
|
||||
try {
|
||||
const realDir = await fs.realpath(dir);
|
||||
return path.join(realDir, path.basename(resolved));
|
||||
} catch {
|
||||
return resolved;
|
||||
}
|
||||
}
|
||||
|
||||
async function isStaleLock(lockPath: string, staleMs: number): Promise<boolean> {
|
||||
const payload = await readLockPayload(lockPath);
|
||||
if (payload?.pid && !isAlive(payload.pid)) {
|
||||
return true;
|
||||
}
|
||||
if (payload?.createdAt) {
|
||||
const createdAt = Date.parse(payload.createdAt);
|
||||
if (!Number.isFinite(createdAt) || Date.now() - createdAt > staleMs) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
try {
|
||||
const stat = await fs.stat(lockPath);
|
||||
return Date.now() - stat.mtimeMs > staleMs;
|
||||
} catch {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
type FileLockHandle = {
|
||||
release: () => Promise<void>;
|
||||
};
|
||||
|
||||
async function acquireFileLock(
|
||||
filePath: string,
|
||||
options: FileLockOptions,
|
||||
): Promise<FileLockHandle> {
|
||||
const normalizedFile = await resolveNormalizedFilePath(filePath);
|
||||
const lockPath = `${normalizedFile}.lock`;
|
||||
const held = HELD_LOCKS.get(normalizedFile);
|
||||
if (held) {
|
||||
held.count += 1;
|
||||
return {
|
||||
release: async () => {
|
||||
const current = HELD_LOCKS.get(normalizedFile);
|
||||
if (!current) {
|
||||
return;
|
||||
}
|
||||
current.count -= 1;
|
||||
if (current.count > 0) {
|
||||
return;
|
||||
}
|
||||
HELD_LOCKS.delete(normalizedFile);
|
||||
await current.handle.close().catch(() => undefined);
|
||||
await fs.rm(current.lockPath, { force: true }).catch(() => undefined);
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
const attempts = Math.max(1, options.retries.retries + 1);
|
||||
for (let attempt = 0; attempt < attempts; attempt += 1) {
|
||||
try {
|
||||
const handle = await fs.open(lockPath, "wx");
|
||||
await handle.writeFile(
|
||||
JSON.stringify({ pid: process.pid, createdAt: new Date().toISOString() }, null, 2),
|
||||
"utf8",
|
||||
);
|
||||
HELD_LOCKS.set(normalizedFile, { count: 1, handle, lockPath });
|
||||
return {
|
||||
release: async () => {
|
||||
const current = HELD_LOCKS.get(normalizedFile);
|
||||
if (!current) {
|
||||
return;
|
||||
}
|
||||
current.count -= 1;
|
||||
if (current.count > 0) {
|
||||
return;
|
||||
}
|
||||
HELD_LOCKS.delete(normalizedFile);
|
||||
await current.handle.close().catch(() => undefined);
|
||||
await fs.rm(current.lockPath, { force: true }).catch(() => undefined);
|
||||
},
|
||||
};
|
||||
} catch (err) {
|
||||
const code = (err as { code?: string }).code;
|
||||
if (code !== "EEXIST") {
|
||||
throw err;
|
||||
}
|
||||
if (await isStaleLock(lockPath, options.stale)) {
|
||||
await fs.rm(lockPath, { force: true }).catch(() => undefined);
|
||||
continue;
|
||||
}
|
||||
if (attempt >= attempts - 1) {
|
||||
break;
|
||||
}
|
||||
await new Promise((resolve) => setTimeout(resolve, computeDelayMs(options.retries, attempt)));
|
||||
}
|
||||
}
|
||||
|
||||
throw new Error(`file lock timeout for ${normalizedFile}`);
|
||||
}
|
||||
|
||||
export async function withFileLock<T>(
|
||||
filePath: string,
|
||||
options: FileLockOptions,
|
||||
fn: () => Promise<T>,
|
||||
): Promise<T> {
|
||||
const lock = await acquireFileLock(filePath, options);
|
||||
try {
|
||||
return await fn();
|
||||
} finally {
|
||||
await lock.release();
|
||||
}
|
||||
}
|
||||
export { withFileLock } from "openclaw/plugin-sdk";
|
||||
|
||||
@@ -1,192 +1,2 @@
|
||||
import fs from "node:fs/promises";
|
||||
import path from "node:path";
|
||||
|
||||
export type FileLockOptions = {
|
||||
retries: {
|
||||
retries: number;
|
||||
factor: number;
|
||||
minTimeout: number;
|
||||
maxTimeout: number;
|
||||
randomize?: boolean;
|
||||
};
|
||||
stale: number;
|
||||
};
|
||||
|
||||
type LockFilePayload = {
|
||||
pid: number;
|
||||
createdAt: string;
|
||||
};
|
||||
|
||||
type HeldLock = {
|
||||
count: number;
|
||||
handle: fs.FileHandle;
|
||||
lockPath: string;
|
||||
};
|
||||
|
||||
const HELD_LOCKS_KEY = Symbol.for("openclaw.fileLockHeldLocks");
|
||||
|
||||
function resolveHeldLocks(): Map<string, HeldLock> {
|
||||
const proc = process as NodeJS.Process & {
|
||||
[HELD_LOCKS_KEY]?: Map<string, HeldLock>;
|
||||
};
|
||||
if (!proc[HELD_LOCKS_KEY]) {
|
||||
proc[HELD_LOCKS_KEY] = new Map<string, HeldLock>();
|
||||
}
|
||||
return proc[HELD_LOCKS_KEY];
|
||||
}
|
||||
|
||||
const HELD_LOCKS = resolveHeldLocks();
|
||||
|
||||
function isAlive(pid: number): boolean {
|
||||
if (!Number.isFinite(pid) || pid <= 0) {
|
||||
return false;
|
||||
}
|
||||
try {
|
||||
process.kill(pid, 0);
|
||||
return true;
|
||||
} catch {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
function computeDelayMs(retries: FileLockOptions["retries"], attempt: number): number {
|
||||
const base = Math.min(
|
||||
retries.maxTimeout,
|
||||
Math.max(retries.minTimeout, retries.minTimeout * retries.factor ** attempt),
|
||||
);
|
||||
const jitter = retries.randomize ? 1 + Math.random() : 1;
|
||||
return Math.min(retries.maxTimeout, Math.round(base * jitter));
|
||||
}
|
||||
|
||||
async function readLockPayload(lockPath: string): Promise<LockFilePayload | null> {
|
||||
try {
|
||||
const raw = await fs.readFile(lockPath, "utf8");
|
||||
const parsed = JSON.parse(raw) as Partial<LockFilePayload>;
|
||||
if (typeof parsed.pid !== "number" || typeof parsed.createdAt !== "string") {
|
||||
return null;
|
||||
}
|
||||
return { pid: parsed.pid, createdAt: parsed.createdAt };
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
async function resolveNormalizedFilePath(filePath: string): Promise<string> {
|
||||
const resolved = path.resolve(filePath);
|
||||
const dir = path.dirname(resolved);
|
||||
await fs.mkdir(dir, { recursive: true });
|
||||
try {
|
||||
const realDir = await fs.realpath(dir);
|
||||
return path.join(realDir, path.basename(resolved));
|
||||
} catch {
|
||||
return resolved;
|
||||
}
|
||||
}
|
||||
|
||||
async function isStaleLock(lockPath: string, staleMs: number): Promise<boolean> {
|
||||
const payload = await readLockPayload(lockPath);
|
||||
if (payload?.pid && !isAlive(payload.pid)) {
|
||||
return true;
|
||||
}
|
||||
if (payload?.createdAt) {
|
||||
const createdAt = Date.parse(payload.createdAt);
|
||||
if (!Number.isFinite(createdAt) || Date.now() - createdAt > staleMs) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
try {
|
||||
const stat = await fs.stat(lockPath);
|
||||
return Date.now() - stat.mtimeMs > staleMs;
|
||||
} catch {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
type FileLockHandle = {
|
||||
lockPath: string;
|
||||
release: () => Promise<void>;
|
||||
};
|
||||
|
||||
export async function acquireFileLock(
|
||||
filePath: string,
|
||||
options: FileLockOptions,
|
||||
): Promise<FileLockHandle> {
|
||||
const normalizedFile = await resolveNormalizedFilePath(filePath);
|
||||
const lockPath = `${normalizedFile}.lock`;
|
||||
const held = HELD_LOCKS.get(normalizedFile);
|
||||
if (held) {
|
||||
held.count += 1;
|
||||
return {
|
||||
lockPath,
|
||||
release: async () => {
|
||||
const current = HELD_LOCKS.get(normalizedFile);
|
||||
if (!current) {
|
||||
return;
|
||||
}
|
||||
current.count -= 1;
|
||||
if (current.count > 0) {
|
||||
return;
|
||||
}
|
||||
HELD_LOCKS.delete(normalizedFile);
|
||||
await current.handle.close().catch(() => undefined);
|
||||
await fs.rm(current.lockPath, { force: true }).catch(() => undefined);
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
const attempts = Math.max(1, options.retries.retries + 1);
|
||||
for (let attempt = 0; attempt < attempts; attempt += 1) {
|
||||
try {
|
||||
const handle = await fs.open(lockPath, "wx");
|
||||
await handle.writeFile(
|
||||
JSON.stringify({ pid: process.pid, createdAt: new Date().toISOString() }, null, 2),
|
||||
"utf8",
|
||||
);
|
||||
HELD_LOCKS.set(normalizedFile, { count: 1, handle, lockPath });
|
||||
return {
|
||||
lockPath,
|
||||
release: async () => {
|
||||
const current = HELD_LOCKS.get(normalizedFile);
|
||||
if (!current) {
|
||||
return;
|
||||
}
|
||||
current.count -= 1;
|
||||
if (current.count > 0) {
|
||||
return;
|
||||
}
|
||||
HELD_LOCKS.delete(normalizedFile);
|
||||
await current.handle.close().catch(() => undefined);
|
||||
await fs.rm(current.lockPath, { force: true }).catch(() => undefined);
|
||||
},
|
||||
};
|
||||
} catch (err) {
|
||||
const code = (err as { code?: string }).code;
|
||||
if (code !== "EEXIST") {
|
||||
throw err;
|
||||
}
|
||||
if (await isStaleLock(lockPath, options.stale)) {
|
||||
await fs.rm(lockPath, { force: true }).catch(() => undefined);
|
||||
continue;
|
||||
}
|
||||
if (attempt >= attempts - 1) {
|
||||
break;
|
||||
}
|
||||
await new Promise((resolve) => setTimeout(resolve, computeDelayMs(options.retries, attempt)));
|
||||
}
|
||||
}
|
||||
|
||||
throw new Error(`file lock timeout for ${normalizedFile}`);
|
||||
}
|
||||
|
||||
export async function withFileLock<T>(
|
||||
filePath: string,
|
||||
options: FileLockOptions,
|
||||
fn: () => Promise<T>,
|
||||
): Promise<T> {
|
||||
const lock = await acquireFileLock(filePath, options);
|
||||
try {
|
||||
return await fn();
|
||||
} finally {
|
||||
await lock.release();
|
||||
}
|
||||
}
|
||||
export type { FileLockHandle, FileLockOptions } from "../plugin-sdk/file-lock.js";
|
||||
export { acquireFileLock, withFileLock } from "../plugin-sdk/file-lock.js";
|
||||
|
||||
192
src/plugin-sdk/file-lock.ts
Normal file
192
src/plugin-sdk/file-lock.ts
Normal file
@@ -0,0 +1,192 @@
|
||||
import fs from "node:fs/promises";
|
||||
import path from "node:path";
|
||||
|
||||
export type FileLockOptions = {
|
||||
retries: {
|
||||
retries: number;
|
||||
factor: number;
|
||||
minTimeout: number;
|
||||
maxTimeout: number;
|
||||
randomize?: boolean;
|
||||
};
|
||||
stale: number;
|
||||
};
|
||||
|
||||
type LockFilePayload = {
|
||||
pid: number;
|
||||
createdAt: string;
|
||||
};
|
||||
|
||||
type HeldLock = {
|
||||
count: number;
|
||||
handle: fs.FileHandle;
|
||||
lockPath: string;
|
||||
};
|
||||
|
||||
const HELD_LOCKS_KEY = Symbol.for("openclaw.fileLockHeldLocks");
|
||||
|
||||
function resolveHeldLocks(): Map<string, HeldLock> {
|
||||
const proc = process as NodeJS.Process & {
|
||||
[HELD_LOCKS_KEY]?: Map<string, HeldLock>;
|
||||
};
|
||||
if (!proc[HELD_LOCKS_KEY]) {
|
||||
proc[HELD_LOCKS_KEY] = new Map<string, HeldLock>();
|
||||
}
|
||||
return proc[HELD_LOCKS_KEY];
|
||||
}
|
||||
|
||||
const HELD_LOCKS = resolveHeldLocks();
|
||||
|
||||
function isAlive(pid: number): boolean {
|
||||
if (!Number.isFinite(pid) || pid <= 0) {
|
||||
return false;
|
||||
}
|
||||
try {
|
||||
process.kill(pid, 0);
|
||||
return true;
|
||||
} catch {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
function computeDelayMs(retries: FileLockOptions["retries"], attempt: number): number {
|
||||
const base = Math.min(
|
||||
retries.maxTimeout,
|
||||
Math.max(retries.minTimeout, retries.minTimeout * retries.factor ** attempt),
|
||||
);
|
||||
const jitter = retries.randomize ? 1 + Math.random() : 1;
|
||||
return Math.min(retries.maxTimeout, Math.round(base * jitter));
|
||||
}
|
||||
|
||||
async function readLockPayload(lockPath: string): Promise<LockFilePayload | null> {
|
||||
try {
|
||||
const raw = await fs.readFile(lockPath, "utf8");
|
||||
const parsed = JSON.parse(raw) as Partial<LockFilePayload>;
|
||||
if (typeof parsed.pid !== "number" || typeof parsed.createdAt !== "string") {
|
||||
return null;
|
||||
}
|
||||
return { pid: parsed.pid, createdAt: parsed.createdAt };
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
async function resolveNormalizedFilePath(filePath: string): Promise<string> {
|
||||
const resolved = path.resolve(filePath);
|
||||
const dir = path.dirname(resolved);
|
||||
await fs.mkdir(dir, { recursive: true });
|
||||
try {
|
||||
const realDir = await fs.realpath(dir);
|
||||
return path.join(realDir, path.basename(resolved));
|
||||
} catch {
|
||||
return resolved;
|
||||
}
|
||||
}
|
||||
|
||||
async function isStaleLock(lockPath: string, staleMs: number): Promise<boolean> {
|
||||
const payload = await readLockPayload(lockPath);
|
||||
if (payload?.pid && !isAlive(payload.pid)) {
|
||||
return true;
|
||||
}
|
||||
if (payload?.createdAt) {
|
||||
const createdAt = Date.parse(payload.createdAt);
|
||||
if (!Number.isFinite(createdAt) || Date.now() - createdAt > staleMs) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
try {
|
||||
const stat = await fs.stat(lockPath);
|
||||
return Date.now() - stat.mtimeMs > staleMs;
|
||||
} catch {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
export type FileLockHandle = {
|
||||
lockPath: string;
|
||||
release: () => Promise<void>;
|
||||
};
|
||||
|
||||
export async function acquireFileLock(
|
||||
filePath: string,
|
||||
options: FileLockOptions,
|
||||
): Promise<FileLockHandle> {
|
||||
const normalizedFile = await resolveNormalizedFilePath(filePath);
|
||||
const lockPath = `${normalizedFile}.lock`;
|
||||
const held = HELD_LOCKS.get(normalizedFile);
|
||||
if (held) {
|
||||
held.count += 1;
|
||||
return {
|
||||
lockPath,
|
||||
release: async () => {
|
||||
const current = HELD_LOCKS.get(normalizedFile);
|
||||
if (!current) {
|
||||
return;
|
||||
}
|
||||
current.count -= 1;
|
||||
if (current.count > 0) {
|
||||
return;
|
||||
}
|
||||
HELD_LOCKS.delete(normalizedFile);
|
||||
await current.handle.close().catch(() => undefined);
|
||||
await fs.rm(current.lockPath, { force: true }).catch(() => undefined);
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
const attempts = Math.max(1, options.retries.retries + 1);
|
||||
for (let attempt = 0; attempt < attempts; attempt += 1) {
|
||||
try {
|
||||
const handle = await fs.open(lockPath, "wx");
|
||||
await handle.writeFile(
|
||||
JSON.stringify({ pid: process.pid, createdAt: new Date().toISOString() }, null, 2),
|
||||
"utf8",
|
||||
);
|
||||
HELD_LOCKS.set(normalizedFile, { count: 1, handle, lockPath });
|
||||
return {
|
||||
lockPath,
|
||||
release: async () => {
|
||||
const current = HELD_LOCKS.get(normalizedFile);
|
||||
if (!current) {
|
||||
return;
|
||||
}
|
||||
current.count -= 1;
|
||||
if (current.count > 0) {
|
||||
return;
|
||||
}
|
||||
HELD_LOCKS.delete(normalizedFile);
|
||||
await current.handle.close().catch(() => undefined);
|
||||
await fs.rm(current.lockPath, { force: true }).catch(() => undefined);
|
||||
},
|
||||
};
|
||||
} catch (err) {
|
||||
const code = (err as { code?: string }).code;
|
||||
if (code !== "EEXIST") {
|
||||
throw err;
|
||||
}
|
||||
if (await isStaleLock(lockPath, options.stale)) {
|
||||
await fs.rm(lockPath, { force: true }).catch(() => undefined);
|
||||
continue;
|
||||
}
|
||||
if (attempt >= attempts - 1) {
|
||||
break;
|
||||
}
|
||||
await new Promise((resolve) => setTimeout(resolve, computeDelayMs(options.retries, attempt)));
|
||||
}
|
||||
}
|
||||
|
||||
throw new Error(`file lock timeout for ${normalizedFile}`);
|
||||
}
|
||||
|
||||
export async function withFileLock<T>(
|
||||
filePath: string,
|
||||
options: FileLockOptions,
|
||||
fn: () => Promise<T>,
|
||||
): Promise<T> {
|
||||
const lock = await acquireFileLock(filePath, options);
|
||||
try {
|
||||
return await fn();
|
||||
} finally {
|
||||
await lock.release();
|
||||
}
|
||||
}
|
||||
@@ -78,6 +78,9 @@ export { emptyPluginConfigSchema } from "../plugins/config-schema.js";
|
||||
export type { OpenClawConfig } from "../config/config.js";
|
||||
/** @deprecated Use OpenClawConfig instead */
|
||||
export type { OpenClawConfig as ClawdbotConfig } from "../config/config.js";
|
||||
|
||||
export type { FileLockHandle, FileLockOptions } from "./file-lock.js";
|
||||
export { acquireFileLock, withFileLock } from "./file-lock.js";
|
||||
export type { ChannelDock } from "../channels/dock.js";
|
||||
export { getChatChannelMeta } from "../channels/registry.js";
|
||||
export type {
|
||||
|
||||
Reference in New Issue
Block a user