feat: use piscina to manage worker threads

This commit is contained in:
Nico Flaig
2025-12-06 01:12:59 +01:00
parent 1f2a3a4524
commit a0b2b467a9
28 changed files with 766 additions and 449 deletions

View File

@@ -124,7 +124,7 @@
"@chainsafe/prometheus-gc-stats": "^1.0.0",
"@chainsafe/pubkey-index-map": "^3.0.0",
"@chainsafe/ssz": "^1.2.2",
"@chainsafe/threads": "^1.11.3",
"piscina": "^5.0.0",
"@crate-crypto/node-eth-kzg": "0.9.1",
"@ethersproject/abi": "^5.7.0",
"@fastify/bearer-auth": "^10.0.1",

View File

@@ -1,7 +1,9 @@
import path from "node:path";
import {ModuleThread, Thread, Worker, spawn} from "@chainsafe/threads";
import {Worker} from "node:worker_threads";
import {fileURLToPath} from "node:url";
import {chainConfigToJson} from "@lodestar/config";
import {LoggerNode} from "@lodestar/logger/node";
import {terminateWorkerThread} from "../../../util/workerEvents.js";
import {createWorkerRpcClient} from "../../../util/workerRpc.js";
import {
HistoricalStateRegenInitModules,
HistoricalStateRegenModules,
@@ -9,22 +11,30 @@ import {
HistoricalStateWorkerData,
} from "./types.js";
// Worker constructor consider the path relative to the current working directory
const WORKER_DIR = process.env.NODE_ENV === "test" ? "../../../../lib/chain/archiveStore/historicalState" : "./";
// Resolve worker path relative to this file
// In dev/test mode: running from src/, worker is in lib/
// In production: running from lib/, worker is in same directory
const __dirname = path.dirname(fileURLToPath(import.meta.url));
const workerPath =
process.env.NODE_ENV === "test"
? path.join(__dirname, "../../../../lib/chain/archiveStore/historicalState/worker.js")
: path.join(__dirname, "worker.js");
const HISTORICAL_STATE_WORKER_TIMEOUT_MS = 1000;
const HISTORICAL_STATE_WORKER_TIMEOUT_RETRY_COUNT = 3;
/**
* HistoricalStateRegen limits the damage from recreating historical states
* by running regen in a separate worker thread.
*/
export class HistoricalStateRegen implements HistoricalStateWorkerApi {
private readonly api: ModuleThread<HistoricalStateWorkerApi>;
private readonly logger: LoggerNode;
private readonly modules: HistoricalStateRegenModules;
constructor(modules: HistoricalStateRegenModules) {
this.api = modules.api;
this.logger = modules.logger;
this.modules = modules;
modules.signal?.addEventListener("abort", () => this.close(), {once: true});
}
static async init(modules: HistoricalStateRegenInitModules): Promise<HistoricalStateRegen> {
const workerData: HistoricalStateWorkerData = {
chainConfigJson: chainConfigToJson(modules.config),
@@ -37,32 +47,58 @@ export class HistoricalStateRegen implements HistoricalStateWorkerApi {
loggerOpts: modules.logger.toOpts(),
};
const worker = new Worker(path.join(WORKER_DIR, "worker.js"), {
suppressTranspileTS: Boolean(globalThis.Bun),
workerData,
} as ConstructorParameters<typeof Worker>[1]);
const worker = new Worker(workerPath, {workerData});
const api = await spawn<HistoricalStateWorkerApi>(worker, {
// A Lodestar Node may do very expensive task at start blocking the event loop and causing
// the initialization to timeout. The number below is big enough to almost disable the timeout
timeout: 5 * 60 * 1000,
// Wait for worker to be online
await new Promise<void>((resolve, reject) => {
const timeout = setTimeout(() => {
reject(new Error("Historical state worker initialization timeout"));
}, 5 * 60 * 1000);
worker.once("online", () => {
clearTimeout(timeout);
resolve();
});
worker.once("error", (error) => {
clearTimeout(timeout);
reject(error);
});
});
return new HistoricalStateRegen({...modules, api});
// Subscribe to worker errors
worker.on("error", (err) => {
modules.logger.error("Historical state worker thread error", {}, err);
});
// Create RPC client for typed method calls
const {api, close: closeRpc} = createWorkerRpcClient<HistoricalStateWorkerApi>(worker);
return new HistoricalStateRegen({
...modules,
api,
worker,
closeRpc,
});
}
async scrapeMetrics(): Promise<string> {
return this.api.scrapeMetrics();
return this.modules.api.scrapeMetrics();
}
async close(): Promise<void> {
await this.api.close();
this.logger.debug("Terminating historical state worker");
await Thread.terminate(this.api);
this.logger.debug("Terminated historical state worker");
await this.modules.api.close();
this.modules.closeRpc();
this.modules.logger.debug("Terminating historical state worker");
await terminateWorkerThread({
worker: this.modules.worker,
retryCount: HISTORICAL_STATE_WORKER_TIMEOUT_RETRY_COUNT,
retryMs: HISTORICAL_STATE_WORKER_TIMEOUT_MS,
});
this.modules.logger.debug("Terminated historical state worker");
}
async getHistoricalState(slot: number): Promise<Uint8Array> {
return this.api.getHistoricalState(slot);
return this.modules.api.getHistoricalState(slot);
}
}

View File

@@ -1,4 +1,4 @@
import {ModuleThread} from "@chainsafe/threads";
import {Worker} from "node:worker_threads";
import {BeaconConfig, SpecJson} from "@lodestar/config";
import {LoggerNode, LoggerNodeOpts} from "@lodestar/logger/node";
import {Metrics} from "../../../metrics/index.js";
@@ -14,7 +14,9 @@ export type HistoricalStateRegenInitModules = {
signal?: AbortSignal;
};
export type HistoricalStateRegenModules = HistoricalStateRegenInitModules & {
api: ModuleThread<HistoricalStateWorkerApi>;
api: HistoricalStateWorkerApi;
worker: Worker;
closeRpc: () => void;
};
export type HistoricalStateWorkerData = {

View File

@@ -1,6 +1,5 @@
import worker from "node:worker_threads";
import {PubkeyIndexMap} from "@chainsafe/pubkey-index-map";
import {Transfer, expose} from "@chainsafe/threads/worker";
import {chainConfigFromJson, createBeaconConfig} from "@lodestar/config";
import {LevelDbController} from "@lodestar/db/controller/level";
import {getNodeLogger} from "@lodestar/logger/node";
@@ -8,6 +7,7 @@ import {BeaconDb} from "../../../db/index.js";
import {RegistryMetricCreator, collectNodeJSMetrics} from "../../../metrics/index.js";
import {JobFnQueue} from "../../../util/queue/fnQueue.js";
import {QueueMetrics} from "../../../util/queue/options.js";
import {handleWorkerRpc} from "../../../util/workerRpc.js";
import {getHistoricalState} from "./getHistoricalState.js";
import {
HistoricalStateRegenMetrics,
@@ -19,6 +19,9 @@ import {HistoricalStateWorkerApi, HistoricalStateWorkerData} from "./types.js";
// most of this setup copied from networkCoreWorker.ts
const workerData = worker.workerData as HistoricalStateWorkerData;
const parentPort = worker.parentPort;
if (!workerData) throw Error("workerData must be defined");
if (!parentPort) throw Error("parentPort must be defined");
const logger = getNodeLogger(workerData.loggerOpts);
@@ -67,11 +70,12 @@ const api: HistoricalStateWorkerApi = {
const stateBytes = await queue.push<Uint8Array>(() =>
getHistoricalState(slot, config, db, pubkey2index, historicalStateRegenMetrics)
);
const result = Transfer(stateBytes, [stateBytes.buffer]) as unknown as Uint8Array;
historicalStateRegenMetrics?.regenSuccessCount.inc();
return result;
// Return state bytes - structured cloning handles Uint8Array efficiently
return stateBytes;
},
};
expose(api);
// Handle RPC calls from main thread
handleWorkerRpc(parentPort, api);

View File

@@ -1,11 +1,6 @@
import path from "node:path";
import {Worker, spawn} from "@chainsafe/threads";
// `threads` library creates self global variable which breaks `timeout-abort-controller` https://github.com/jacobheun/timeout-abort-controller/issues/9
// @ts-expect-error
// biome-ignore lint/suspicious/noGlobalAssign: We need the global `self` to reassign module properties later
self = undefined;
import {fileURLToPath} from "node:url";
import {Piscina} from "piscina";
import {PublicKey} from "@chainsafe/blst";
import {ISignatureSet} from "@lodestar/state-transition";
import {Logger} from "@lodestar/utils";
@@ -25,11 +20,15 @@ import {
jobItemWorkReq,
} from "./jobItem.js";
import {defaultPoolSize} from "./poolSize.js";
import {BlsWorkReq, BlsWorkResult, WorkResultCode, WorkResultError, WorkerData} from "./types.js";
import {BlsWorkReq, BlsWorkResult, WorkResultCode, WorkResultError} from "./types.js";
import {chunkifyMaximizeChunkSize} from "./utils.js";
// Worker constructor consider the path relative to the current working directory
const workerDir = process.env.NODE_ENV === "test" ? "../../../../lib/chain/bls/multithread" : "./";
// Resolve worker path relative to this file
const __dirname = path.dirname(fileURLToPath(import.meta.url));
const workerPath =
process.env.NODE_ENV === "test"
? path.join(__dirname, "../../../../lib/chain/bls/multithread/worker.js")
: path.join(__dirname, "worker.js");
export type BlsMultiThreadWorkerPoolModules = {
logger: Logger;
@@ -78,32 +77,8 @@ const MAX_BUFFER_WAIT_MS = 100;
*/
const MAX_JOBS_CAN_ACCEPT_WORK = 512;
type WorkerApi = {
verifyManySignatureSets(workReqArr: BlsWorkReq[]): Promise<BlsWorkResult>;
};
enum WorkerStatusCode {
notInitialized,
initializing,
initializationError,
idle,
running,
}
type WorkerStatus =
| {code: WorkerStatusCode.notInitialized}
| {code: WorkerStatusCode.initializing; initPromise: Promise<WorkerApi>}
| {code: WorkerStatusCode.initializationError; error: Error}
| {code: WorkerStatusCode.idle; workerApi: WorkerApi}
| {code: WorkerStatusCode.running; workerApi: WorkerApi};
type WorkerDescriptor = {
worker: Worker;
status: WorkerStatus;
};
/**
* Wraps "threads" library thread pool queue system with the goals:
* Wraps piscina thread pool with the goals:
* - Complete total outstanding jobs in total minimum time possible.
* Will split large signature sets into smaller sets and send to different workers
* - Reduce the latency cost for small signature sets. In NodeJS 12,14 worker <-> main thread
@@ -113,8 +88,8 @@ type WorkerDescriptor = {
export class BlsMultiThreadWorkerPool implements IBlsVerifier {
private readonly logger: Logger;
private readonly metrics: Metrics | null;
private readonly pool: Piscina;
private readonly workers: WorkerDescriptor[];
private readonly jobs = new LinkedList<JobQueueItem>();
private bufferedJobs: {
jobs: LinkedList<JobQueueItem>;
@@ -133,10 +108,21 @@ export class BlsMultiThreadWorkerPool implements IBlsVerifier {
this.metrics = metrics;
this.blsVerifyAllMultiThread = options.blsVerifyAllMultiThread ?? false;
// Use compressed for herumi for now.
// THe worker is not able to deserialize from uncompressed
// `Error: err _wrapDeserialize`
this.workers = this.createWorkers(blsPoolSize);
// Create piscina pool with optimal settings for BLS verification
this.pool = new Piscina({
filename: workerPath,
// Keep workers alive - BLS verification is constant workload
minThreads: blsPoolSize,
maxThreads: blsPoolSize,
// Never terminate idle workers - avoid thread churn
idleTimeout: 0,
// Use atomics for fastest sync communication (BLS is pure CPU work)
atomics: "sync",
// Limit queue to prevent memory issues
maxQueue: MAX_JOBS_CAN_ACCEPT_WORK,
// Enable timing statistics
recordTiming: true,
});
if (metrics) {
metrics.blsThreadPool.queueLength.addCollect(() => {
@@ -147,11 +133,7 @@ export class BlsMultiThreadWorkerPool implements IBlsVerifier {
}
canAcceptWork(): boolean {
return (
this.workersBusy < blsPoolSize &&
// TODO: Should also bound the jobs queue?
this.jobs.length < MAX_JOBS_CAN_ACCEPT_WORK
);
return this.workersBusy < blsPoolSize && this.jobs.length < MAX_JOBS_CAN_ACCEPT_WORK;
}
async verifySignatureSets(sets: ISignatureSet[], opts: VerifySignatureOpts = {}): Promise<boolean> {
@@ -246,55 +228,10 @@ export class BlsMultiThreadWorkerPool implements IBlsVerifier {
}
this.jobs.clear();
// Terminate all workers. await to ensure no workers are left hanging
await Promise.all(
Array.from(this.workers.entries()).map(([id, worker]) =>
// NOTE: 'threads' has not yet updated types, and NodeJS complains with
// [DEP0132] DeprecationWarning: Passing a callback to worker.terminate() is deprecated. It returns a Promise instead.
(worker.worker.terminate() as unknown as Promise<void>).catch((e: Error) => {
this.logger.error("Error terminating worker", {id}, e);
})
)
);
}
this.closed = true;
private createWorkers(poolSize: number): WorkerDescriptor[] {
const workers: WorkerDescriptor[] = [];
for (let i = 0; i < poolSize; i++) {
const workerData: WorkerData = {workerId: i};
const worker = new Worker(path.join(workerDir, "worker.js"), {
suppressTranspileTS: Boolean(globalThis.Bun),
workerData,
} as ConstructorParameters<typeof Worker>[1]);
const workerDescriptor: WorkerDescriptor = {
worker,
status: {code: WorkerStatusCode.notInitialized},
};
workers.push(workerDescriptor);
// TODO: Consider initializing only when necessary
const initPromise = spawn<WorkerApi>(worker, {
// A Lodestar Node may do very expensive task at start blocking the event loop and causing
// the initialization to timeout. The number below is big enough to almost disable the timeout
timeout: 5 * 60 * 1000,
});
workerDescriptor.status = {code: WorkerStatusCode.initializing, initPromise};
initPromise
.then((workerApi) => {
workerDescriptor.status = {code: WorkerStatusCode.idle, workerApi};
// Potentially run jobs that were queued before initialization of the first worker
setTimeout(this.runJob, 0);
})
.catch((error: Error) => {
workerDescriptor.status = {code: WorkerStatusCode.initializationError, error};
});
}
return workers;
// Terminate all workers - piscina handles cleanup
await this.pool.close();
}
/**
@@ -309,15 +246,6 @@ export class BlsMultiThreadWorkerPool implements IBlsVerifier {
// It would be bad to reject signatures because the node is slow.
// However, if the worker communication broke jobs won't ever finish
if (
this.workers.length > 0 &&
this.workers[0].status.code === WorkerStatusCode.initializationError &&
this.workers.every((worker) => worker.status.code === WorkerStatusCode.initializationError)
) {
job.reject(this.workers[0].status.error);
return;
}
// Append batchable sets to `bufferedJobs`, starting a timeout to push them into `jobs`.
// Do not call `runJob()`, it is called from `runBufferedJobs()`
if (job.opts.batchable) {
@@ -353,16 +281,15 @@ export class BlsMultiThreadWorkerPool implements IBlsVerifier {
}
/**
* Potentially submit jobs to an idle worker, only if there's a worker and jobs
* Potentially submit jobs to the piscina pool
*/
private runJob = async (): Promise<void> => {
if (this.closed) {
return;
}
// Find idle worker
const worker = this.workers.find((worker) => worker.status.code === WorkerStatusCode.idle);
if (!worker || worker.status.code !== WorkerStatusCode.idle) {
// Check if we can accept more work
if (this.workersBusy >= blsPoolSize) {
return;
}
@@ -372,12 +299,6 @@ export class BlsMultiThreadWorkerPool implements IBlsVerifier {
return;
}
// TODO: After sending the work to the worker the main thread can drop the job arguments
// and free-up memory, only needs to keep the job's Promise handlers.
// Maybe it's not useful since all data referenced in jobs is likely referenced by others
const workerApi = worker.status.workerApi;
worker.status = {code: WorkerStatusCode.running, workerApi};
this.workersBusy++;
try {
@@ -432,14 +353,11 @@ export class BlsMultiThreadWorkerPool implements IBlsVerifier {
this.metrics?.blsThreadPool.totalSigSetsStarted.inc({type: JobQueueItemType.default}, startedSetsDefault);
this.metrics?.blsThreadPool.totalSigSetsStarted.inc({type: JobQueueItemType.sameMessage}, startedSetsSameMessage);
// Send work package to the worker
// If the job, metrics or any code below throws: the job will reject never going stale.
// Only downside is the job promise may be resolved twice, but that's not an issue
// Send work package to piscina
const [jobStartSec, jobStartNs] = process.hrtime();
const workResult = await workerApi.verifyManySignatureSets(workReqs);
const workResult: BlsWorkResult = await this.pool.run(workReqs);
const [jobEndSec, jobEndNs] = process.hrtime();
const {workerId, batchRetries, batchSigsSuccess, workerStartTime, workerEndTime, results} = workResult;
const {batchRetries, batchSigsSuccess, workerStartTime, workerEndTime, results} = workResult;
const [workerStartSec, workerStartNs] = workerStartTime;
const [workerEndSec, workerEndNs] = workerEndTime;
@@ -489,7 +407,6 @@ export class BlsMultiThreadWorkerPool implements IBlsVerifier {
const latencyFromWorkerSec = jobEndSec - workerEndSec + Number(jobEndNs - workerEndNs) / 1e9;
this.metrics?.blsThreadPool.timePerSigSet.observe(workerJobTimeSec / startedSigSets);
this.metrics?.blsThreadPool.jobsWorkerTime.inc({workerId}, workerJobTimeSec);
this.metrics?.blsThreadPool.latencyToWorker.observe(latencyToWorkerSec);
this.metrics?.blsThreadPool.latencyFromWorker.observe(latencyFromWorkerSec);
this.metrics?.blsThreadPool.successJobsSignatureSetsCount.inc(successCount);
@@ -507,7 +424,6 @@ export class BlsMultiThreadWorkerPool implements IBlsVerifier {
}
}
worker.status = {code: WorkerStatusCode.idle, workerApi};
this.workersBusy--;
// Potentially run a new job
@@ -562,17 +478,6 @@ export class BlsMultiThreadWorkerPool implements IBlsVerifier {
this.metrics?.blsThreadPool.sameMessageRetryJobs.inc(1);
this.metrics?.blsThreadPool.sameMessageRetrySets.inc(job.sets.length);
}
/** For testing */
protected async waitTillInitialized(): Promise<void> {
await Promise.all(
this.workers.map(async (worker) => {
if (worker.status.code === WorkerStatusCode.initializing) {
await worker.status.initPromise;
}
})
);
}
}
function getJobResultError(jobResult: WorkResultError | null, i: number): Error {

View File

@@ -1,9 +1,5 @@
import {VerifySignatureOpts} from "../interface.js";
export type WorkerData = {
workerId: number;
};
export type SerializedSet = {
publicKey: Uint8Array;
message: Uint8Array;
@@ -24,8 +20,6 @@ export type WorkResultError = {code: WorkResultCode.error; error: Error};
export type WorkResult<R> = {code: WorkResultCode.success; result: R} | WorkResultError;
export type BlsWorkResult = {
/** Ascending integer identifying the worker for metrics */
workerId: number;
/** Total num of batches that had to be retried */
batchRetries: number;
/** Total num of sigs that have been successfully verified with batching */

View File

@@ -1,8 +1,6 @@
import worker from "node:worker_threads";
import {PublicKey} from "@chainsafe/blst";
import {expose} from "@chainsafe/threads/worker";
import {SignatureSetDeserialized, verifySignatureSetsMaybeBatch} from "../maybeBatch.js";
import {BlsWorkReq, BlsWorkResult, SerializedSet, WorkResult, WorkResultCode, WorkerData} from "./types.js";
import {BlsWorkReq, BlsWorkResult, SerializedSet, WorkResult, WorkResultCode} from "./types.js";
import {chunkifyMaximizeChunkSize} from "./utils.js";
/**
@@ -14,18 +12,11 @@ import {chunkifyMaximizeChunkSize} from "./utils.js";
*/
const BATCHABLE_MIN_PER_CHUNK = 16;
// Cloned data from instatiation
const workerData = worker.workerData as WorkerData;
if (!workerData) throw Error("workerData must be defined");
const {workerId} = workerData || {};
expose({
async verifyManySignatureSets(workReqArr: BlsWorkReq[]): Promise<BlsWorkResult> {
return verifyManySignatureSets(workReqArr);
},
});
function verifyManySignatureSets(workReqArr: BlsWorkReq[]): BlsWorkResult {
/**
* Worker function for BLS signature verification.
* Exported as default for piscina to pick up.
*/
export default function verifyManySignatureSets(workReqArr: BlsWorkReq[]): BlsWorkResult {
const [startSec, startNs] = process.hrtime();
const results: WorkResult<boolean>[] = [];
let batchRetries = 0;
@@ -96,7 +87,6 @@ function verifyManySignatureSets(workReqArr: BlsWorkReq[]): BlsWorkResult {
const [workerEndSec, workerEndNs] = process.hrtime();
return {
workerId,
batchRetries,
batchSigsSuccess,
workerStartTime: [startSec, startNs],

View File

@@ -326,11 +326,6 @@ export function createLodestarMetrics(
},
blsThreadPool: {
jobsWorkerTime: register.gauge<{workerId: number}>({
name: "lodestar_bls_thread_pool_time_seconds_sum",
help: "Total time spent verifying signature sets measured on the worker",
labelNames: ["workerId"],
}),
successJobsSignatureSetsCount: register.gauge({
name: "lodestar_bls_thread_pool_success_jobs_signature_sets_count",
help: "Count of total verified signature sets",

View File

@@ -1,8 +1,6 @@
import worker from "node:worker_threads";
import {privateKeyFromProtobuf} from "@libp2p/crypto/keys";
import {peerIdFromPrivateKey} from "@libp2p/peer-id";
import type {ModuleThread} from "@chainsafe/threads";
import {expose} from "@chainsafe/threads/worker";
import {chainConfigFromJson, createBeaconConfig} from "@lodestar/config";
import {getNodeLogger} from "@lodestar/logger/node";
import {RegistryMetricCreator, collectNodeJSMetrics} from "../../metrics/index.js";
@@ -11,6 +9,7 @@ import {Clock} from "../../util/clock.js";
import {peerIdToString} from "../../util/peerId.js";
import {ProfileThread, profileThread, writeHeapSnapshot} from "../../util/profile.js";
import {wireEventsOnWorkerThread} from "../../util/workerEvents.js";
import {handleWorkerRpc} from "../../util/workerRpc.js";
import {NetworkEventBus, NetworkEventData, networkEventDirection} from "../events.js";
import {
NetworkWorkerThreadEventType,
@@ -173,4 +172,5 @@ const libp2pWorkerApi: NetworkWorkerApi = {
},
};
expose(libp2pWorkerApi as ModuleThread<NetworkWorkerApi>);
// Handle RPC calls from main thread
handleWorkerRpc(parentPort, libp2pWorkerApi);

View File

@@ -1,10 +1,10 @@
import path from "node:path";
import workerThreads from "node:worker_threads";
import {Worker} from "node:worker_threads";
import {fileURLToPath} from "node:url";
import {privateKeyToProtobuf} from "@libp2p/crypto/keys";
import {PrivateKey} from "@libp2p/interface";
import {PeerScoreStatsDump} from "@chainsafe/libp2p-gossipsub/score";
import {PublishOpts} from "@chainsafe/libp2p-gossipsub/types";
import {ModuleThread, Thread, Worker, spawn} from "@chainsafe/threads";
import {routes} from "@lodestar/api";
import {BeaconConfig, chainConfigToJson} from "@lodestar/config";
import type {LoggerNode} from "@lodestar/logger/node";
@@ -14,6 +14,7 @@ import {Metrics} from "../../metrics/index.js";
import {AsyncIterableBridgeCaller, AsyncIterableBridgeHandler} from "../../util/asyncIterableToEvents.js";
import {PeerIdStr, peerIdFromString} from "../../util/peerId.js";
import {terminateWorkerThread, wireEventsOnMainThread} from "../../util/workerEvents.js";
import {createWorkerRpcClient} from "../../util/workerRpc.js";
import {NetworkEventBus, NetworkEventData, networkEventDirection} from "../events.js";
import {NetworkOptions} from "../options.js";
import {PeerAction, PeerScoreStats} from "../peers/index.js";
@@ -29,8 +30,14 @@ import {
} from "./events.js";
import {INetworkCore, MultiaddrStr, NetworkWorkerApi, NetworkWorkerData} from "./types.js";
// Worker constructor consider the path relative to the current working directory
const workerDir = process.env.NODE_ENV === "test" ? "../../../lib/network/core/" : "./";
// Resolve worker path relative to this file
// In dev/test mode: running from src/, worker is in lib/
// In production: running from lib/, worker is in same directory
const __dirname = path.dirname(fileURLToPath(import.meta.url));
const workerPath =
process.env.NODE_ENV === "test"
? path.join(__dirname, "../../../lib/network/core/networkCoreWorker.js")
: path.join(__dirname, "networkCoreWorker.js");
export type WorkerNetworkCoreOpts = NetworkOptions & {
metricsEnabled: boolean;
@@ -52,8 +59,9 @@ export type WorkerNetworkCoreInitModules = {
};
type WorkerNetworkCoreModules = WorkerNetworkCoreInitModules & {
networkThreadApi: ModuleThread<NetworkWorkerApi>;
networkThreadApi: NetworkWorkerApi;
worker: Worker;
closeRpc: () => void;
};
const NETWORK_WORKER_EXIT_TIMEOUT_MS = 1000;
@@ -79,19 +87,20 @@ export class WorkerNetworkCore implements INetworkCore {
wireEventsOnMainThread<NetworkEventData>(
NetworkWorkerThreadEventType.networkEvent,
modules.events,
modules.worker as unknown as workerThreads.Worker,
modules.worker,
modules.metrics,
networkEventDirection
);
wireEventsOnMainThread<ReqRespBridgeEventData>(
NetworkWorkerThreadEventType.reqRespBridgeEvents,
this.reqRespBridgeEventBus,
modules.worker as unknown as workerThreads.Worker,
modules.worker,
modules.metrics,
reqRespBridgeEventDirection
);
Thread.errors(modules.networkThreadApi).subscribe((err) => {
// Subscribe to worker errors
modules.worker.on("error", (err) => {
this.modules.logger.error("Network worker thread error", {}, err);
});
@@ -133,9 +142,7 @@ export class WorkerNetworkCore implements INetworkCore {
const workerOpts: ConstructorParameters<typeof Worker>[1] = {
workerData,
};
if (globalThis.Bun) {
workerOpts.suppressTranspileTS = true;
} else {
if (!globalThis.Bun) {
/**
* maxYoungGenerationSizeMb defaults to 152mb through the cli option defaults.
* That default value was determined via https://github.com/ChainSafe/lodestar/issues/2115 and
@@ -150,29 +157,43 @@ export class WorkerNetworkCore implements INetworkCore {
workerOpts.resourceLimits = {maxYoungGenerationSizeMb: opts.maxYoungGenerationSizeMb};
}
const worker = new Worker(path.join(workerDir, "networkCoreWorker.js"), workerOpts);
const worker = new Worker(workerPath, workerOpts);
// biome-ignore lint/suspicious/noExplicitAny: Don't know any specific interface for the spawn
const networkThreadApi = (await spawn<any>(worker, {
// A Lodestar Node may do very expensive task at start blocking the event loop and causing
// the initialization to timeout. The number below is big enough to almost disable the timeout
timeout: 5 * 60 * 1000,
// TODO: types are broken on spawn, which claims that `NetworkWorkerApi` does not satisfies its contrains
})) as unknown as ModuleThread<NetworkWorkerApi>;
// Wait for worker to be online
await new Promise<void>((resolve, reject) => {
const timeout = setTimeout(() => {
reject(new Error("Network worker initialization timeout"));
}, 5 * 60 * 1000);
worker.once("online", () => {
clearTimeout(timeout);
resolve();
});
worker.once("error", (error) => {
clearTimeout(timeout);
reject(error);
});
});
// Create RPC client for typed method calls
const {api: networkThreadApi, close: closeRpc} = createWorkerRpcClient<NetworkWorkerApi>(worker);
return new WorkerNetworkCore({
...modules,
networkThreadApi,
worker,
closeRpc,
});
}
async close(): Promise<void> {
this.modules.logger.debug("closing network core running in network worker");
await this.getApi().close();
this.modules.closeRpc();
this.modules.logger.debug("terminating network worker");
await terminateWorkerThread({
worker: this.getApi(),
worker: this.modules.worker,
retryCount: NETWORK_WORKER_EXIT_RETRY_COUNT,
retryMs: NETWORK_WORKER_EXIT_TIMEOUT_MS,
logger: this.modules.logger,
@@ -278,7 +299,7 @@ export class WorkerNetworkCore implements INetworkCore {
return this.getApi().writeDiscv5HeapSnapshot(prefix, dirpath);
}
private getApi(): ModuleThread<NetworkWorkerApi> {
private getApi(): NetworkWorkerApi {
return this.modules.networkThreadApi;
}
}

View File

@@ -92,9 +92,6 @@ export type NetworkWorkerData = {
* API exposed by the libp2p worker
*/
export type NetworkWorkerApi = INetworkCorePublic & {
// To satisfy the constraint of `ModuleThread` type
// biome-ignore lint/suspicious/noExplicitAny: Explicitly needed the `any` type here
[string: string]: (...args: any[]) => Promise<any> | any;
// Async method through worker boundary
reportPeer(peer: PeerIdStr, action: PeerAction, actionName: string): Promise<void>;
reStatusPeers(peers: PeerIdStr[]): Promise<void>;
@@ -113,6 +110,8 @@ export type NetworkWorkerApi = INetworkCorePublic & {
scrapeMetrics(): Promise<string>;
writeProfile(durationMs: number, dirpath: string): Promise<string>;
writeDiscv5Profile(durationMs: number, dirpath: string): Promise<string>;
writeHeapSnapshot(prefix: string, dirpath: string): Promise<string>;
writeDiscv5HeapSnapshot(prefix: string, dirpath: string): Promise<string>;
// TODO: ReqResp outgoing
// TODO: ReqResp incoming

View File

@@ -1,13 +1,24 @@
import EventEmitter from "node:events";
import path from "node:path";
import {Worker} from "node:worker_threads";
import {fileURLToPath} from "node:url";
import {privateKeyToProtobuf} from "@libp2p/crypto/keys";
import {PrivateKey} from "@libp2p/interface";
import {StrictEventEmitter} from "strict-event-emitter-types";
import {ENR, ENRData, SignableENR} from "@chainsafe/enr";
import {Thread, Worker, spawn} from "@chainsafe/threads";
import {BeaconConfig, chainConfigFromJson, chainConfigToJson} from "@lodestar/config";
import {LoggerNode} from "@lodestar/logger/node";
import {NetworkCoreMetrics} from "../core/metrics.js";
import {Discv5WorkerApi, Discv5WorkerData, LodestarDiscv5Opts} from "./types.js";
import {Discv5WorkerData, Discv5WorkerMessage, Discv5WorkerResponse, LodestarDiscv5Opts} from "./types.js";
// Resolve worker path relative to this file
// In dev/test mode: running from src/, worker is in lib/
// In production: running from lib/, worker is in same directory
const __dirname = path.dirname(fileURLToPath(import.meta.url));
const workerPath =
process.env.NODE_ENV === "test"
? path.join(__dirname, "../../../lib/network/discv5/worker.js")
: path.join(__dirname, "worker.js");
export type Discv5Opts = {
privateKey: PrivateKey;
@@ -22,20 +33,30 @@ export type Discv5Events = {
discovered: (enr: ENR) => void;
};
type PendingRequest = {
resolve: (value: unknown) => void;
reject: (error: Error) => void;
};
/**
* Wrapper class abstracting the details of discv5 worker instantiation and message-passing
*/
export class Discv5Worker extends (EventEmitter as {new (): StrictEventEmitter<EventEmitter, Discv5Events>}) {
private readonly subscription: {unsubscribe: () => void};
private readonly worker: Worker;
private readonly opts: Discv5Opts;
private readonly pendingRequests = new Map<number, PendingRequest>();
private nextId = 0;
private closed = false;
constructor(
private readonly opts: Discv5Opts,
private readonly workerApi: Discv5WorkerApi
) {
constructor(opts: Discv5Opts, worker: Worker) {
super();
this.opts = opts;
this.worker = worker;
this.subscription = workerApi.discovered().subscribe((enrObj) => this.onDiscovered(enrObj));
this.worker.on("message", this.handleWorkerMessage);
this.worker.on("error", (error) => {
opts.logger.error("Discv5 worker error", {}, error);
});
}
static async init(opts: Discv5Opts): Promise<Discv5Worker> {
@@ -51,27 +72,35 @@ export class Discv5Worker extends (EventEmitter as {new (): StrictEventEmitter<E
loggerOpts: opts.logger.toOpts(),
genesisTime: opts.genesisTime,
};
const worker = new Worker("./worker.js", {
suppressTranspileTS: Boolean(globalThis.Bun),
workerData,
} as ConstructorParameters<typeof Worker>[1]);
const workerApi = await spawn<Discv5WorkerApi>(worker, {
// A Lodestar Node may do very expensive task at start blocking the event loop and causing
// the initialization to timeout. The number below is big enough to almost disable the timeout
timeout: 5 * 60 * 1000,
const worker = new Worker(workerPath, {workerData});
// Wait for worker to be ready (it starts discv5 on initialization)
await new Promise<void>((resolve, reject) => {
const timeout = setTimeout(() => {
reject(new Error("Discv5 worker initialization timeout"));
}, 5 * 60 * 1000);
worker.once("online", () => {
clearTimeout(timeout);
resolve();
});
worker.once("error", (error) => {
clearTimeout(timeout);
reject(error);
});
});
return new Discv5Worker(opts, workerApi);
return new Discv5Worker(opts, worker);
}
async close(): Promise<void> {
if (this.closed) return;
this.closed = true;
this.subscription.unsubscribe();
await this.workerApi.close();
await Thread.terminate(this.workerApi as unknown as Thread);
await this.sendRequest({type: "close", id: this.nextId++});
await this.worker.terminate();
}
onDiscovered(obj: ENRData): void {
@@ -82,38 +111,75 @@ export class Discv5Worker extends (EventEmitter as {new (): StrictEventEmitter<E
}
async enr(): Promise<SignableENR> {
const obj = await this.workerApi.enr();
return new SignableENR(obj.kvs, obj.seq, this.opts.privateKey.raw);
const response = (await this.sendRequest({type: "enr", id: this.nextId++})) as {enr: {kvs: Map<string, Uint8Array>; seq: bigint}};
return new SignableENR(response.enr.kvs, response.enr.seq, this.opts.privateKey.raw);
}
setEnrValue(key: string, value: Uint8Array): Promise<void> {
return this.workerApi.setEnrValue(key, value);
return this.sendRequest({type: "setEnrValue", id: this.nextId++, key, value}) as Promise<void>;
}
async kadValues(): Promise<ENR[]> {
return this.decodeEnrs(await this.workerApi.kadValues());
const response = (await this.sendRequest({type: "kadValues", id: this.nextId++})) as {enrs: ENRData[]};
return this.decodeEnrs(response.enrs);
}
discoverKadValues(): Promise<void> {
return this.workerApi.discoverKadValues();
return this.sendRequest({type: "discoverKadValues", id: this.nextId++}) as Promise<void>;
}
async findRandomNode(): Promise<ENR[]> {
return this.decodeEnrs(await this.workerApi.findRandomNode());
const response = (await this.sendRequest({type: "findRandomNode", id: this.nextId++})) as {enrs: ENRData[]};
return this.decodeEnrs(response.enrs);
}
scrapeMetrics(): Promise<string> {
return this.workerApi.scrapeMetrics();
async scrapeMetrics(): Promise<string> {
const response = (await this.sendRequest({type: "scrapeMetrics", id: this.nextId++})) as {metrics: string};
return response.metrics;
}
async writeProfile(durationMs: number, dirpath: string): Promise<string> {
return this.workerApi.writeProfile(durationMs, dirpath);
const response = (await this.sendRequest({type: "writeProfile", id: this.nextId++, durationMs, dirpath})) as {path: string};
return response.path;
}
async writeHeapSnapshot(prefix: string, dirpath: string): Promise<string> {
return this.workerApi.writeHeapSnapshot(prefix, dirpath);
const response = (await this.sendRequest({type: "writeHeapSnapshot", id: this.nextId++, prefix, dirpath})) as {path: string};
return response.path;
}
private sendRequest(message: Discv5WorkerMessage): Promise<unknown> {
return new Promise((resolve, reject) => {
this.pendingRequests.set(message.id, {resolve, reject});
this.worker.postMessage(message);
});
}
private handleWorkerMessage = (response: Discv5WorkerResponse): void => {
// Handle discovered events (no id, broadcast)
if (response.type === "discovered") {
this.onDiscovered(response.enr);
return;
}
// Handle request/response messages
const pending = this.pendingRequests.get(response.id);
if (!pending) {
this.opts.logger.warn("Received response for unknown request", {id: response.id, type: response.type});
return;
}
this.pendingRequests.delete(response.id);
if (response.type === "error") {
const error = new Error(response.error.message);
if (response.error.stack) error.stack = response.error.stack;
pending.reject(error);
} else {
pending.resolve(response);
}
};
private decodeEnrs(objs: ENRData[]): ENR[] {
const enrs: ENR[] = [];
for (const obj of objs) {

View File

@@ -1,6 +1,5 @@
import {Discv5} from "@chainsafe/discv5";
import {ENRData, SignableENRData} from "@chainsafe/enr";
import {Observable} from "@chainsafe/threads/observable";
import {ChainConfig} from "@lodestar/config";
import {LoggerNodeOpts} from "@lodestar/logger/node";
@@ -43,32 +42,31 @@ export interface Discv5WorkerData {
}
/**
* API exposed by the discv5 worker
*
* Note: ENRs are represented as bytes to facilitate message-passing
* Messages sent from main thread to worker
*/
export type Discv5WorkerApi = {
/** The current host ENR */
enr(): Promise<SignableENRData>;
/** Set a key-value of the current host ENR */
setEnrValue(key: string, value: Uint8Array): Promise<void>;
export type Discv5WorkerMessage =
| {type: "enr"; id: number}
| {type: "setEnrValue"; id: number; key: string; value: Uint8Array}
| {type: "kadValues"; id: number}
| {type: "discoverKadValues"; id: number}
| {type: "findRandomNode"; id: number}
| {type: "scrapeMetrics"; id: number}
| {type: "writeProfile"; id: number; durationMs: number; dirpath: string}
| {type: "writeHeapSnapshot"; id: number; prefix: string; dirpath: string}
| {type: "close"; id: number};
/** Return the ENRs currently in the kad table */
kadValues(): Promise<ENRData[]>;
/** emit the ENRs currently in the kad table */
discoverKadValues(): Promise<void>;
/** Begin a random search through the DHT, return discovered ENRs */
findRandomNode(): Promise<ENRData[]>;
/** Stream of discovered ENRs */
discovered(): Observable<ENRData>;
/** Prometheus metrics string */
scrapeMetrics(): Promise<string>;
/** write profile to disc */
writeProfile(durationMs: number, dirpath: string): Promise<string>;
/** write heap snapshot to disc */
writeHeapSnapshot(prefix: string, dirpath: string): Promise<string>;
/** tear down discv5 resources */
close(): Promise<void>;
};
/**
* Messages sent from worker to main thread
*/
export type Discv5WorkerResponse =
| {type: "discovered"; enr: ENRData}
| {type: "enr"; id: number; enr: SignableENRData}
| {type: "setEnrValue"; id: number}
| {type: "kadValues"; id: number; enrs: ENRData[]}
| {type: "discoverKadValues"; id: number}
| {type: "findRandomNode"; id: number; enrs: ENRData[]}
| {type: "scrapeMetrics"; id: number; metrics: string}
| {type: "writeProfile"; id: number; path: string}
| {type: "writeHeapSnapshot"; id: number; path: string}
| {type: "close"; id: number}
| {type: "error"; id: number; error: {message: string; stack?: string}};

View File

@@ -3,9 +3,7 @@ import {privateKeyFromProtobuf} from "@libp2p/crypto/keys";
import {peerIdFromPrivateKey} from "@libp2p/peer-id";
import {Multiaddr, multiaddr} from "@multiformats/multiaddr";
import {Discv5, Discv5EventEmitter} from "@chainsafe/discv5";
import {ENR, ENRData, SignableENR, SignableENRData} from "@chainsafe/enr";
import {Observable, Subject} from "@chainsafe/threads/observable";
import {expose} from "@chainsafe/threads/worker";
import {ENR, SignableENR} from "@chainsafe/enr";
import {createBeaconConfig} from "@lodestar/config";
import {getNodeLogger} from "@lodestar/logger/node";
import {Gauge} from "@lodestar/utils";
@@ -13,7 +11,7 @@ import {RegistryMetricCreator} from "../../metrics/index.js";
import {collectNodeJSMetrics} from "../../metrics/nodeJsMetrics.js";
import {Clock} from "../../util/clock.js";
import {ProfileThread, profileThread, writeHeapSnapshot} from "../../util/profile.js";
import {Discv5WorkerApi, Discv5WorkerData} from "./types.js";
import {Discv5WorkerData, Discv5WorkerMessage, Discv5WorkerResponse} from "./types.js";
import {ENRRelevance, enrRelevance} from "./utils.js";
// This discv5 worker will start discv5 on initialization (there is no `start` function to call)
@@ -23,13 +21,15 @@ import {ENRRelevance, enrRelevance} from "./utils.js";
// Cloned data from instatiation
const workerData = worker.workerData as Discv5WorkerData;
if (!workerData) throw Error("workerData must be defined");
const parentPort = worker.parentPort;
if (!parentPort) throw Error("parentPort must be defined");
const logger = getNodeLogger(workerData.loggerOpts);
// Set up metrics, nodejs and discv5-specific
let metricsRegistry: RegistryMetricCreator | undefined;
let enrRelevanceMetric: Gauge<{status: string}> | undefined;
let closeMetrics: () => void | undefined;
let closeMetrics: (() => void) | undefined;
if (workerData.metrics) {
metricsRegistry = new RegistryMetricCreator();
closeMetrics = collectNodeJSMetrics(metricsRegistry, "discv5_worker_");
@@ -64,9 +64,6 @@ for (const bootEnr of workerData.bootEnrs) {
discv5.addEnr(bootEnr);
}
/** Used to push discovered ENRs */
const subject = new Subject<ENRData>();
/** Define a new clock */
const abortController = new AbortController();
const clock = new Clock({config, genesisTime: workerData.genesisTime, signal: abortController.signal});
@@ -75,7 +72,12 @@ const onDiscovered = (enr: ENR): void => {
const status = enrRelevance(enr, config, clock);
enrRelevanceMetric?.inc({status});
if (status === ENRRelevance.relevant) {
subject.next(enr.toObject());
// Send discovered ENR to main thread
const response: Discv5WorkerResponse = {
type: "discovered",
enr: enr.toObject(),
};
parentPort.postMessage(response);
}
};
discv5.addListener("discovered", onDiscovered);
@@ -83,43 +85,104 @@ discv5.addListener("discovered", onDiscovered);
// Discv5 will now begin accepting request/responses
await discv5.start();
const module: Discv5WorkerApi = {
async enr(): Promise<SignableENRData> {
return discv5.enr.toObject();
},
async setEnrValue(key: string, value: Uint8Array): Promise<void> {
discv5.enr.set(key, value);
},
async kadValues(): Promise<ENRData[]> {
return discv5.kadValues().map((enr: ENR) => enr.toObject());
},
async discoverKadValues(): Promise<void> {
discv5.kadValues().map(onDiscovered);
},
async findRandomNode(): Promise<ENRData[]> {
return (await discv5.findRandomNode()).map((enr: ENR) => enr.toObject());
},
discovered() {
return Observable.from(subject);
},
async scrapeMetrics(): Promise<string> {
return (await metricsRegistry?.metrics()) ?? "";
},
writeProfile: async (durationMs: number, dirpath: string) => {
return profileThread(ProfileThread.DISC5, durationMs, dirpath);
},
writeHeapSnapshot: async (prefix: string, dirpath: string) => {
return writeHeapSnapshot(prefix, dirpath);
},
async close() {
closeMetrics?.();
discv5.removeListener("discovered", onDiscovered);
subject.complete();
await discv5.stop();
},
};
// Handle messages from main thread
parentPort.on("message", async (message: Discv5WorkerMessage) => {
try {
let response: Discv5WorkerResponse;
expose(module);
switch (message.type) {
case "enr": {
response = {
type: "enr",
id: message.id,
enr: discv5.enr.toObject(),
};
break;
}
case "setEnrValue": {
discv5.enr.set(message.key, message.value);
response = {
type: "setEnrValue",
id: message.id,
};
break;
}
case "kadValues": {
response = {
type: "kadValues",
id: message.id,
enrs: discv5.kadValues().map((enr: ENR) => enr.toObject()),
};
break;
}
case "discoverKadValues": {
discv5.kadValues().map(onDiscovered);
response = {
type: "discoverKadValues",
id: message.id,
};
break;
}
case "findRandomNode": {
const enrs = await discv5.findRandomNode();
response = {
type: "findRandomNode",
id: message.id,
enrs: enrs.map((enr: ENR) => enr.toObject()),
};
break;
}
case "scrapeMetrics": {
response = {
type: "scrapeMetrics",
id: message.id,
metrics: (await metricsRegistry?.metrics()) ?? "",
};
break;
}
case "writeProfile": {
const path = await profileThread(ProfileThread.DISC5, message.durationMs, message.dirpath);
response = {
type: "writeProfile",
id: message.id,
path,
};
break;
}
case "writeHeapSnapshot": {
const path = await writeHeapSnapshot(message.prefix, message.dirpath);
response = {
type: "writeHeapSnapshot",
id: message.id,
path,
};
break;
}
case "close": {
closeMetrics?.();
discv5.removeListener("discovered", onDiscovered);
abortController.abort();
await discv5.stop();
response = {
type: "close",
id: message.id,
};
break;
}
default:
throw new Error(`Unknown message type: ${(message as Discv5WorkerMessage).type}`);
}
parentPort.postMessage(response);
} catch (e) {
const response: Discv5WorkerResponse = {
type: "error",
id: (message as Discv5WorkerMessage).id,
error: {message: (e as Error).message, stack: (e as Error).stack},
};
parentPort.postMessage(response);
}
});
const logData: Record<string, string> = {
peerId: peerId.toString(),

View File

@@ -1,5 +1,4 @@
import {MessagePort, Worker} from "node:worker_threads";
import {Thread} from "@chainsafe/threads";
import {Logger} from "@lodestar/logger";
import {sleep} from "@lodestar/utils";
import {Metrics} from "../metrics/metrics.js";
@@ -116,22 +115,17 @@ export async function terminateWorkerThread({
retryCount,
logger,
}: {
worker: Thread;
worker: Worker;
retryMs: number;
retryCount: number;
logger?: Logger;
}): Promise<void> {
const terminated = new Promise((resolve) => {
Thread.events(worker).subscribe((event) => {
if (event.type === "termination") {
resolve(true);
}
});
});
for (let i = 0; i < retryCount; i++) {
await Thread.terminate(worker);
const result = await Promise.race([terminated, sleep(retryMs).then(() => false)]);
const terminatePromise = worker.terminate();
const result = await Promise.race([
terminatePromise.then(() => true),
sleep(retryMs).then(() => false),
]);
if (result) return;

View File

@@ -0,0 +1,136 @@
import {MessagePort, Worker} from "node:worker_threads";
/**
* RPC message format for worker communication
*/
export type RpcMessage = {
type: "rpc";
id: number;
method: string;
args: unknown[];
};
/**
* RPC response format for worker communication
*/
export type RpcResponse = {
type: "rpc-response";
id: number;
result?: unknown;
error?: {message: string; stack?: string};
};
type PendingRequest = {
resolve: (value: unknown) => void;
reject: (error: Error) => void;
};
/**
* Creates a proxy that forwards method calls to a worker via RPC
*/
export function createWorkerRpcClient<T extends object>(
worker: Worker,
_filter?: (message: unknown) => boolean
): {api: T; close: () => void} {
const pendingRequests = new Map<number, PendingRequest>();
let nextId = 0;
const handleMessage = (message: unknown): void => {
if (typeof message !== "object" || message === null) return;
const response = message as RpcResponse;
if (response.type !== "rpc-response") return;
const pending = pendingRequests.get(response.id);
if (!pending) return;
pendingRequests.delete(response.id);
if (response.error) {
const error = new Error(response.error.message);
if (response.error.stack) error.stack = response.error.stack;
pending.reject(error);
} else {
pending.resolve(response.result);
}
};
worker.on("message", handleMessage);
const api = new Proxy({} as T, {
get(_target, prop) {
if (typeof prop !== "string") return undefined;
return (...args: unknown[]): Promise<unknown> => {
return new Promise((resolve, reject) => {
const id = nextId++;
pendingRequests.set(id, {resolve, reject});
const message: RpcMessage = {
type: "rpc",
id,
method: prop,
args,
};
worker.postMessage(message);
});
};
},
});
const close = (): void => {
// Reject all pending requests
for (const pending of pendingRequests.values()) {
pending.reject(new Error("Worker RPC client closed"));
}
pendingRequests.clear();
};
return {api, close};
}
/**
* Handles RPC calls on the worker side
*/
export function handleWorkerRpc<T extends object>(
parentPort: MessagePort,
api: T,
filter?: (message: unknown) => boolean
): void {
parentPort.on("message", async (message: unknown) => {
if (typeof message !== "object" || message === null) return;
const rpcMessage = message as RpcMessage;
if (rpcMessage.type !== "rpc") return;
if (filter && !filter(message)) return;
const {id, method, args} = rpcMessage;
try {
const fn = (api as Record<string, unknown>)[method];
if (typeof fn !== "function") {
throw new Error(`Unknown method: ${method}`);
}
const result = await fn.apply(api, args);
const response: RpcResponse = {
type: "rpc-response",
id,
result,
};
parentPort.postMessage(response);
} catch (e) {
const response: RpcResponse = {
type: "rpc-response",
id,
error: {
message: (e as Error).message,
stack: (e as Error).stack,
},
};
parentPort.postMessage(response);
}
});
}

View File

@@ -50,8 +50,7 @@ describe("chain / bls / multithread queue", () => {
const pool = new BlsMultiThreadWorkerPool({}, {logger, metrics: null});
// await terminating all workers
afterEachCallbacks.push(() => pool.close());
// Wait until initialized
await pool["waitTillInitialized"]();
// Piscina handles worker initialization automatically
return pool;
}

View File

@@ -134,8 +134,9 @@ describe("data serialization through worker boundary", () => {
scrapeMetrics: [],
writeProfile: [0, ""],
writeDiscv5Profile: [0, ""],
writeHeapSnapshot: ["", ""],
writeDiscv5HeapSnapshot: ["", ""],
setTargetGroupCount: [4],
setAdvertisedGroupCount: [4],
};
const lodestarPeer: routes.lodestar.LodestarNodePeer = {
@@ -205,7 +206,8 @@ describe("data serialization through worker boundary", () => {
scrapeMetrics: "test-metrics",
writeProfile: "",
writeDiscv5Profile: "",
setAdvertisedGroupCount: null,
writeHeapSnapshot: "",
writeDiscv5HeapSnapshot: "",
setTargetGroupCount: null,
};

View File

@@ -1,13 +1,9 @@
import worker from "node:worker_threads";
import {expose} from "@chainsafe/threads/worker";
const parentPort = worker.parentPort;
if (!parentPort) throw Error("parentPort must be defined");
parentPort.on("message", (data) => {
parentPort.postMessage(data);
});
expose(() => {
//
parentPort.on("message", (msg) => {
// Echo back the data with the same id
parentPort.postMessage({id: msg.id, data: msg.data});
});

View File

@@ -1,5 +1,6 @@
import workerThreads from "node:worker_threads";
import {Worker, spawn} from "@chainsafe/threads";
import path from "node:path";
import {Worker} from "node:worker_threads";
import {fileURLToPath} from "node:url";
export type EchoWorker = {
send<T>(data: T): Promise<T>;
@@ -7,28 +8,56 @@ export type EchoWorker = {
};
export async function getEchoWorker(): Promise<EchoWorker> {
const workerThreadjs = new Worker("./workerEcho.js");
const worker = workerThreadjs as unknown as workerThreads.Worker;
const __dirname = path.dirname(fileURLToPath(import.meta.url));
const worker = new Worker(path.join(__dirname, "workerEcho.js"));
await spawn<any>(workerThreadjs, {
// A Lodestar Node may do very expensive task at start blocking the event loop and causing
// the initialization to timeout. The number below is big enough to almost disable the timeout
timeout: 5 * 60 * 1000,
// TODO: types are broken on spawn, which claims that `NetworkWorkerApi` does not satifies its contrains
// Wait for worker to be online
await new Promise<void>((resolve, reject) => {
const timeout = setTimeout(() => {
reject(new Error("Echo worker initialization timeout"));
}, 5 * 60 * 1000);
worker.once("online", () => {
clearTimeout(timeout);
resolve();
});
worker.once("error", (error) => {
clearTimeout(timeout);
reject(error);
});
});
// Track pending requests
let requestId = 0;
const pending = new Map<number, {resolve: (data: unknown) => void; reject: (err: Error) => void}>();
worker.on("message", (msg: {id: number; data: unknown}) => {
const handler = pending.get(msg.id);
if (handler) {
pending.delete(msg.id);
handler.resolve(msg.data);
}
});
worker.on("error", (err) => {
for (const handler of pending.values()) {
handler.reject(err);
}
pending.clear();
});
return {
send<T>(data: T): Promise<T> {
return new Promise((resolve, reject) => {
worker.once("message", (data) => resolve(data));
worker.once("messageerror", reject);
worker.once("error", reject);
worker.postMessage(data);
const id = requestId++;
pending.set(id, {resolve: resolve as (data: unknown) => void, reject});
worker.postMessage({id, data});
});
},
async close() {
await workerThreadjs.terminate();
await worker.terminate();
},
};
}

View File

@@ -65,7 +65,7 @@
"@chainsafe/enr": "^5.0.1",
"@chainsafe/persistent-merkle-tree": "^1.2.1",
"@chainsafe/ssz": "^1.2.2",
"@chainsafe/threads": "^1.11.3",
"piscina": "^5.0.0",
"@libp2p/crypto": "^5.0.15",
"@libp2p/interface": "^2.7.0",
"@libp2p/peer-id": "^5.1.0",

View File

@@ -1,41 +1,47 @@
import path from "node:path";
import {ModuleThread, Pool, QueuedTask, Worker, spawn} from "@chainsafe/threads";
import {fileURLToPath} from "node:url";
import {Piscina} from "piscina";
import {maxPoolSize} from "./poolSize.js";
import {DecryptKeystoreArgs, DecryptKeystoreWorkerAPI} from "./types.js";
import {DecryptKeystoreArgs} from "./types.js";
// Worker constructor consider the path relative to the current working directory
const workerDir =
process.env.NODE_ENV === "test" ? "../../../../../lib/cmds/validator/keymanager/decryptKeystores" : "./";
// Resolve worker path relative to this file
const __dirname = path.dirname(fileURLToPath(import.meta.url));
const workerPath =
process.env.NODE_ENV === "test"
? path.join(__dirname, "../../../../../lib/cmds/validator/keymanager/decryptKeystores/worker.js")
: path.join(__dirname, "worker.js");
type PendingTask = {
controller: AbortController;
promise: Promise<Uint8Array>;
};
/**
* Thread pool to decrypt keystores
*/
export class DecryptKeystoresThreadPool {
private pool: Pool<ModuleThread<DecryptKeystoreWorkerAPI>>;
private tasks: QueuedTask<ModuleThread<DecryptKeystoreWorkerAPI>, Uint8Array>[] = [];
private pool: Piscina;
private tasks: PendingTask[] = [];
private terminatePoolHandler: () => void;
constructor(
keystoreCount: number,
private readonly signal: AbortSignal
) {
this.pool = Pool(
() =>
spawn<DecryptKeystoreWorkerAPI>(new Worker(path.join(workerDir, "worker.js")), {
// The number below is big enough to almost disable the timeout
// which helps during tests run on unpredictably slow hosts
timeout: 5 * 60 * 1000,
}),
{
// Adjust worker pool size based on keystore count
size: Math.min(keystoreCount, maxPoolSize),
// Decrypt keystores in sequence, increasing concurrency does not improve performance
concurrency: 1,
}
);
this.pool = new Piscina({
filename: workerPath,
// Adjust worker pool size based on keystore count
minThreads: Math.min(keystoreCount, maxPoolSize),
maxThreads: Math.min(keystoreCount, maxPoolSize),
// Decrypt keystores in sequence per worker, increasing concurrency does not improve performance
concurrentTasksPerWorker: 1,
// Enable timing statistics
recordTiming: true,
});
// Terminate worker threads when process receives exit signal
this.terminatePoolHandler = () => {
void this.pool.terminate(true);
void this.pool.destroy();
};
signal.addEventListener("abort", this.terminatePoolHandler, {once: true});
}
@@ -48,9 +54,11 @@ export class DecryptKeystoresThreadPool {
onDecrypted: (secretKeyBytes: Uint8Array) => void,
onError: (e: Error) => void
): void {
const task = this.pool.queue((thread) => thread.decryptKeystore(args));
this.tasks.push(task);
task.then(onDecrypted).catch(onError);
const controller = new AbortController();
const promise = this.pool.run(args, {signal: controller.signal});
this.tasks.push({controller, promise});
promise.then(onDecrypted).catch(onError);
}
/**
@@ -58,8 +66,9 @@ export class DecryptKeystoresThreadPool {
* Errors during executing can be captured in `onError` handler for each task.
*/
async completed(): Promise<void> {
await this.pool.settled(true);
await this.pool.terminate();
// Wait for all tasks to settle (resolve or reject)
await Promise.allSettled(this.tasks.map((t) => t.promise));
await this.pool.close();
this.signal.removeEventListener("abort", this.terminatePoolHandler);
}
@@ -68,7 +77,7 @@ export class DecryptKeystoresThreadPool {
*/
cancel(): void {
for (const task of this.tasks) {
task.cancel();
task.controller.abort();
}
this.tasks = [];
}

View File

@@ -1,10 +1,6 @@
import {KeystoreStr} from "@lodestar/api/keymanager";
import {LocalKeystoreDefinition} from "../interface.js";
export type DecryptKeystoreWorkerAPI = {
decryptKeystore(args: DecryptKeystoreArgs): Promise<Uint8Array>;
};
export type DecryptKeystoreArgs = LocalKeystoreDefinition | {keystoreStr: KeystoreStr; password: string};
export function isLocalKeystoreDefinition(args: DecryptKeystoreArgs): args is LocalKeystoreDefinition {

View File

@@ -1,24 +1,21 @@
import fs from "node:fs";
import {Keystore} from "@chainsafe/bls-keystore";
import {Transfer, TransferDescriptor} from "@chainsafe/threads";
import {expose} from "@chainsafe/threads/worker";
import {DecryptKeystoreArgs, DecryptKeystoreWorkerAPI, isLocalKeystoreDefinition} from "./types.js";
import {DecryptKeystoreArgs, isLocalKeystoreDefinition} from "./types.js";
/**
* Decrypt a single keystore, returning the secret key as a Uint8Array
*
* NOTE: This is a memory (and cpu) -intensive process, since decrypting the keystore involves running a key derivation function (either pbkdf2 or scrypt)
*/
export async function decryptKeystore(args: DecryptKeystoreArgs): Promise<TransferDescriptor<Uint8Array>> {
export default async function decryptKeystore(args: DecryptKeystoreArgs): Promise<Uint8Array> {
const keystore = Keystore.parse(
isLocalKeystoreDefinition(args) ? fs.readFileSync(args.keystorePath, "utf8") : args.keystoreStr
);
// Memory-hogging function
const secret = await keystore.decrypt(args.password);
// Transfer the underlying ArrayBuffer back to the main thread: https://threads.js.org/usage-advanced#transferable-objects
// This small performance gain may help in cases where this is run for many keystores
return Transfer(secret, [secret.buffer]);
// Return the secret directly - piscina handles transferring via the transferList option
// in the main thread if needed. For small keys like BLS secret keys (32 bytes),
// the structured clone overhead is negligible.
return secret;
}
expose({decryptKeystore} as unknown as DecryptKeystoreWorkerAPI);

View File

@@ -68,7 +68,6 @@
"winston-transport": "^4.5.0"
},
"devDependencies": {
"@chainsafe/threads": "^1.11.3",
"@lodestar/test-utils": "^1.37.0",
"@types/triple-beam": "^1.3.2",
"triple-beam": "^1.3.0"

View File

@@ -1,6 +1,5 @@
import fs from "node:fs";
import worker from "node:worker_threads";
import {expose} from "@chainsafe/threads/worker";
const parentPort = worker.parentPort;
const workerData = worker.workerData;
@@ -12,7 +11,3 @@ parentPort.on("message", (data) => {
console.log(data);
file.write(data);
});
expose(() => {
//
});

View File

@@ -1,5 +1,6 @@
import workerThreads from "node:worker_threads";
import {Worker, spawn} from "@chainsafe/threads";
import path from "node:path";
import {fileURLToPath} from "node:url";
import {Worker} from "node:worker_threads";
export type LoggerWorker = {
log(data: string): void;
@@ -9,16 +10,26 @@ export type LoggerWorker = {
type WorkerData = {logFilepath: string};
export async function getLoggerWorker(opts: WorkerData): Promise<LoggerWorker> {
const workerThreadjs = new Worker("./workerLogger.js", {
const __dirname = path.dirname(fileURLToPath(import.meta.url));
const worker = new Worker(path.join(__dirname, "workerLogger.js"), {
workerData: opts,
});
const worker = workerThreadjs as unknown as workerThreads.Worker;
await spawn<any>(workerThreadjs, {
// A Lodestar Node may do very expensive task at start blocking the event loop and causing
// the initialization to timeout. The number below is big enough to almost disable the timeout
timeout: 5 * 60 * 1000,
// TODO: types are broken on spawn, which claims that `NetworkWorkerApi` does not satifies its contrains
// Wait for worker to be online
await new Promise<void>((resolve, reject) => {
const timeout = setTimeout(() => {
reject(new Error("Logger worker initialization timeout"));
}, 5 * 60 * 1000);
worker.once("online", () => {
clearTimeout(timeout);
resolve();
});
worker.once("error", (error) => {
clearTimeout(timeout);
reject(error);
});
});
return {
@@ -27,7 +38,7 @@ export async function getLoggerWorker(opts: WorkerData): Promise<LoggerWorker> {
},
async close() {
await workerThreadjs.terminate();
await worker.terminate();
},
};
}

153
yarn.lock
View File

@@ -775,18 +775,6 @@
"@chainsafe/swap-or-not-shuffle-win32-arm64-msvc" "1.2.1"
"@chainsafe/swap-or-not-shuffle-win32-x64-msvc" "1.2.1"
"@chainsafe/threads@^1.11.3":
version "1.11.3"
resolved "https://registry.yarnpkg.com/@chainsafe/threads/-/threads-1.11.3.tgz#7dba606277bfb0e9c8a54325da0372494ec02042"
integrity sha512-wTIHTOOJ3MMRFtnJJT6KJCuauyv8pgs79m5ipspyPjHdKM9HJnkeZcoo06G3qArx2xMvd6MqNj1TLfnh5iFvaQ==
dependencies:
callsites "^3.1.0"
debug "^4.2.0"
is-observable "^2.1.0"
observable-fns "^0.6.1"
optionalDependencies:
tiny-worker ">= 2"
"@colors/colors@1.5.0":
version "1.5.0"
resolved "https://registry.yarnpkg.com/@colors/colors/-/colors-1.5.0.tgz#bb504579c1cae923e6576a4f5da43d25f97bdbd9"
@@ -2267,6 +2255,114 @@
uint8-varint "^2.0.1"
uint8arrays "^5.0.0"
"@napi-rs/nice-android-arm-eabi@1.1.1":
version "1.1.1"
resolved "https://registry.yarnpkg.com/@napi-rs/nice-android-arm-eabi/-/nice-android-arm-eabi-1.1.1.tgz#4ebd966821cd6c2cc7cc020eb468de397bb9b40f"
integrity sha512-kjirL3N6TnRPv5iuHw36wnucNqXAO46dzK9oPb0wj076R5Xm8PfUVA9nAFB5ZNMmfJQJVKACAPd/Z2KYMppthw==
"@napi-rs/nice-android-arm64@1.1.1":
version "1.1.1"
resolved "https://registry.yarnpkg.com/@napi-rs/nice-android-arm64/-/nice-android-arm64-1.1.1.tgz#e183ba874512bc005852daab8b78c63e0a4288a8"
integrity sha512-blG0i7dXgbInN5urONoUCNf+DUEAavRffrO7fZSeoRMJc5qD+BJeNcpr54msPF6qfDD6kzs9AQJogZvT2KD5nw==
"@napi-rs/nice-darwin-arm64@1.1.1":
version "1.1.1"
resolved "https://registry.yarnpkg.com/@napi-rs/nice-darwin-arm64/-/nice-darwin-arm64-1.1.1.tgz#64b1585809774cbb8bf95cea3d4c8827c9897394"
integrity sha512-s/E7w45NaLqTGuOjC2p96pct4jRfo61xb9bU1unM/MJ/RFkKlJyJDx7OJI/O0ll/hrfpqKopuAFDV8yo0hfT7A==
"@napi-rs/nice-darwin-x64@1.1.1":
version "1.1.1"
resolved "https://registry.yarnpkg.com/@napi-rs/nice-darwin-x64/-/nice-darwin-x64-1.1.1.tgz#99c0c7f62cb1e23ca76881bb29cc6000aeccc6f0"
integrity sha512-dGoEBnVpsdcC+oHHmW1LRK5eiyzLwdgNQq3BmZIav+9/5WTZwBYX7r5ZkQC07Nxd3KHOCkgbHSh4wPkH1N1LiQ==
"@napi-rs/nice-freebsd-x64@1.1.1":
version "1.1.1"
resolved "https://registry.yarnpkg.com/@napi-rs/nice-freebsd-x64/-/nice-freebsd-x64-1.1.1.tgz#9a5ca0e3ced86207887c98a5a560de8cde5a909e"
integrity sha512-kHv4kEHAylMYmlNwcQcDtXjklYp4FCf0b05E+0h6nDHsZ+F0bDe04U/tXNOqrx5CmIAth4vwfkjjUmp4c4JktQ==
"@napi-rs/nice-linux-arm-gnueabihf@1.1.1":
version "1.1.1"
resolved "https://registry.yarnpkg.com/@napi-rs/nice-linux-arm-gnueabihf/-/nice-linux-arm-gnueabihf-1.1.1.tgz#b8a6a1bc88d0de3e99ac3fdea69980dc6e20b502"
integrity sha512-E1t7K0efyKXZDoZg1LzCOLxgolxV58HCkaEkEvIYQx12ht2pa8hoBo+4OB3qh7e+QiBlp1SRf+voWUZFxyhyqg==
"@napi-rs/nice-linux-arm64-gnu@1.1.1":
version "1.1.1"
resolved "https://registry.yarnpkg.com/@napi-rs/nice-linux-arm64-gnu/-/nice-linux-arm64-gnu-1.1.1.tgz#226f1ef30fcb80fa40370e843b75cc86e39e1183"
integrity sha512-CIKLA12DTIZlmTaaKhQP88R3Xao+gyJxNWEn04wZwC2wmRapNnxCUZkVwggInMJvtVElA+D4ZzOU5sX4jV+SmQ==
"@napi-rs/nice-linux-arm64-musl@1.1.1":
version "1.1.1"
resolved "https://registry.yarnpkg.com/@napi-rs/nice-linux-arm64-musl/-/nice-linux-arm64-musl-1.1.1.tgz#01345c3db79210ba5406c8729e8db75ed11c5f14"
integrity sha512-+2Rzdb3nTIYZ0YJF43qf2twhqOCkiSrHx2Pg6DJaCPYhhaxbLcdlV8hCRMHghQ+EtZQWGNcS2xF4KxBhSGeutg==
"@napi-rs/nice-linux-ppc64-gnu@1.1.1":
version "1.1.1"
resolved "https://registry.yarnpkg.com/@napi-rs/nice-linux-ppc64-gnu/-/nice-linux-ppc64-gnu-1.1.1.tgz#ce7a1025227daab491ded40784b561394d688fcb"
integrity sha512-4FS8oc0GeHpwvv4tKciKkw3Y4jKsL7FRhaOeiPei0X9T4Jd619wHNe4xCLmN2EMgZoeGg+Q7GY7BsvwKpL22Tg==
"@napi-rs/nice-linux-riscv64-gnu@1.1.1":
version "1.1.1"
resolved "https://registry.yarnpkg.com/@napi-rs/nice-linux-riscv64-gnu/-/nice-linux-riscv64-gnu-1.1.1.tgz#9bef5dc89a0425d03163853b4968dbb686d98fd5"
integrity sha512-HU0nw9uD4FO/oGCCk409tCi5IzIZpH2agE6nN4fqpwVlCn5BOq0MS1dXGjXaG17JaAvrlpV5ZeyZwSon10XOXw==
"@napi-rs/nice-linux-s390x-gnu@1.1.1":
version "1.1.1"
resolved "https://registry.yarnpkg.com/@napi-rs/nice-linux-s390x-gnu/-/nice-linux-s390x-gnu-1.1.1.tgz#247c8c7c45876877bdb337cfeb290ff4fd82de62"
integrity sha512-2YqKJWWl24EwrX0DzCQgPLKQBxYDdBxOHot1KWEq7aY2uYeX+Uvtv4I8xFVVygJDgf6/92h9N3Y43WPx8+PAgQ==
"@napi-rs/nice-linux-x64-gnu@1.1.1":
version "1.1.1"
resolved "https://registry.yarnpkg.com/@napi-rs/nice-linux-x64-gnu/-/nice-linux-x64-gnu-1.1.1.tgz#7fd1f5e037cb44ab4f5f95a3b3225a99e3248f12"
integrity sha512-/gaNz3R92t+dcrfCw/96pDopcmec7oCcAQ3l/M+Zxr82KT4DljD37CpgrnXV+pJC263JkW572pdbP3hP+KjcIg==
"@napi-rs/nice-linux-x64-musl@1.1.1":
version "1.1.1"
resolved "https://registry.yarnpkg.com/@napi-rs/nice-linux-x64-musl/-/nice-linux-x64-musl-1.1.1.tgz#d447cd7157ae5da5c0b15fc618bf61f0c344ff6f"
integrity sha512-xScCGnyj/oppsNPMnevsBe3pvNaoK7FGvMjT35riz9YdhB2WtTG47ZlbxtOLpjeO9SqqQ2J2igCmz6IJOD5JYw==
"@napi-rs/nice-openharmony-arm64@1.1.1":
version "1.1.1"
resolved "https://registry.yarnpkg.com/@napi-rs/nice-openharmony-arm64/-/nice-openharmony-arm64-1.1.1.tgz#1120e457d2cc6b2bc86ef0a697faefe2e194dfce"
integrity sha512-6uJPRVwVCLDeoOaNyeiW0gp2kFIM4r7PL2MczdZQHkFi9gVlgm+Vn+V6nTWRcu856mJ2WjYJiumEajfSm7arPQ==
"@napi-rs/nice-win32-arm64-msvc@1.1.1":
version "1.1.1"
resolved "https://registry.yarnpkg.com/@napi-rs/nice-win32-arm64-msvc/-/nice-win32-arm64-msvc-1.1.1.tgz#91e4cfecf339b43fa7934f0c8b19d04f4cdd9bc0"
integrity sha512-uoTb4eAvM5B2aj/z8j+Nv8OttPf2m+HVx3UjA5jcFxASvNhQriyCQF1OB1lHL43ZhW+VwZlgvjmP5qF3+59atA==
"@napi-rs/nice-win32-ia32-msvc@1.1.1":
version "1.1.1"
resolved "https://registry.yarnpkg.com/@napi-rs/nice-win32-ia32-msvc/-/nice-win32-ia32-msvc-1.1.1.tgz#ed9300bba074d3e3b0a077d6b157f2b4ff70af0e"
integrity sha512-CNQqlQT9MwuCsg1Vd/oKXiuH+TcsSPJmlAFc5frFyX/KkOh0UpBLEj7aoY656d5UKZQMQFP7vJNa1DNUNORvug==
"@napi-rs/nice-win32-x64-msvc@1.1.1":
version "1.1.1"
resolved "https://registry.yarnpkg.com/@napi-rs/nice-win32-x64-msvc/-/nice-win32-x64-msvc-1.1.1.tgz#8292b82fb46458618ccff5b8130f78974349541e"
integrity sha512-vB+4G/jBQCAh0jelMTY3+kgFy00Hlx2f2/1zjMoH821IbplbWZOkLiTYXQkygNTzQJTq5cvwBDgn2ppHD+bglQ==
"@napi-rs/nice@^1.0.4":
version "1.1.1"
resolved "https://registry.yarnpkg.com/@napi-rs/nice/-/nice-1.1.1.tgz#c1aacd631ecd4c500c959e3e7cfedd5c73bffe2a"
integrity sha512-xJIPs+bYuc9ASBl+cvGsKbGrJmS6fAKaSZCnT0lhahT5rhA2VVy9/EcIgd2JhtEuFOJNx7UHNn/qiTPTY4nrQw==
optionalDependencies:
"@napi-rs/nice-android-arm-eabi" "1.1.1"
"@napi-rs/nice-android-arm64" "1.1.1"
"@napi-rs/nice-darwin-arm64" "1.1.1"
"@napi-rs/nice-darwin-x64" "1.1.1"
"@napi-rs/nice-freebsd-x64" "1.1.1"
"@napi-rs/nice-linux-arm-gnueabihf" "1.1.1"
"@napi-rs/nice-linux-arm64-gnu" "1.1.1"
"@napi-rs/nice-linux-arm64-musl" "1.1.1"
"@napi-rs/nice-linux-ppc64-gnu" "1.1.1"
"@napi-rs/nice-linux-riscv64-gnu" "1.1.1"
"@napi-rs/nice-linux-s390x-gnu" "1.1.1"
"@napi-rs/nice-linux-x64-gnu" "1.1.1"
"@napi-rs/nice-linux-x64-musl" "1.1.1"
"@napi-rs/nice-openharmony-arm64" "1.1.1"
"@napi-rs/nice-win32-arm64-msvc" "1.1.1"
"@napi-rs/nice-win32-ia32-msvc" "1.1.1"
"@napi-rs/nice-win32-x64-msvc" "1.1.1"
"@napi-rs/snappy-android-arm-eabi@7.2.2":
version "7.2.2"
resolved "https://registry.yarnpkg.com/@napi-rs/snappy-android-arm-eabi/-/snappy-android-arm-eabi-7.2.2.tgz#85fee3ba198dad4b444b5f12bceebcf72db0d65e"
@@ -4678,7 +4774,7 @@ call-bind@^1.0.0, call-bind@^1.0.2, call-bind@^1.0.4, call-bind@^1.0.5:
get-intrinsic "^1.2.1"
set-function-length "^1.1.1"
callsites@^3.0.0, callsites@^3.1.0:
callsites@^3.0.0:
version "3.1.0"
resolved "https://registry.yarnpkg.com/callsites/-/callsites-3.1.0.tgz#b3630abd8943432f54b3f0519238e33cd7df2f73"
integrity sha512-P8BjAsXvZS+VIDUI11hHCQEv74YT67YUi5JJFNWIqL235sBmjX4+qx9Muvls5ivyNENctx46xQLQ3aTuE7ssaQ==
@@ -5316,7 +5412,7 @@ de-indent@^1.0.2:
resolved "https://registry.yarnpkg.com/de-indent/-/de-indent-1.0.2.tgz#b2038e846dc33baa5796128d0804b455b8c1e21d"
integrity sha512-e/1zu3xH5MQryN2zdVaF0OrdNLUbvWxzMbi+iNA6Bky7l1RoP8a2fIbRocyHclXt/arDrrR6lL3TqFD9pMQTsg==
debug@4, debug@^4.1.0, debug@^4.1.1, debug@^4.2.0, debug@^4.3.1, debug@^4.3.3, debug@^4.3.4:
debug@4, debug@^4.1.0, debug@^4.1.1, debug@^4.3.1, debug@^4.3.3, debug@^4.3.4:
version "4.3.4"
resolved "https://registry.yarnpkg.com/debug/-/debug-4.3.4.tgz#1319f6579357f2338d3337d2cdd4914bb5dcc865"
integrity sha512-PRWFHuSU3eDtQJPvnNY7Jcket1j0t5OuOsFzPPzsekD52Zl8qUfFIPEiswXqIvHWGVHOgX+7G/vCNNhehwxfkQ==
@@ -5826,11 +5922,6 @@ escape-string-regexp@^5.0.0:
resolved "https://registry.yarnpkg.com/escape-string-regexp/-/escape-string-regexp-5.0.0.tgz#4683126b500b61762f2dbebace1806e8be31b1c8"
integrity sha512-/veY75JbMK4j1yjvuUxuVsiS/hr/4iHs9FTT6cgTexxdE0Ly/glccBAkloH/DofkjRbZU3bnoj38mOmhkZ0lHw==
esm@^3.2.25:
version "3.2.25"
resolved "https://registry.npmjs.org/esm/-/esm-3.2.25.tgz"
integrity sha512-U1suiZ2oDVWv4zPO56S0NcR5QriEahGtdN2OR6FiOG4WJvcjBVFB0qI4+eKoWFH483PKGuLuu6V8Z4T5g63UVA==
esprima@^4.0.0:
version "4.0.1"
resolved "https://registry.npmjs.org/esprima/-/esprima-4.0.1.tgz"
@@ -7291,11 +7382,6 @@ is-obj@^2.0.0:
resolved "https://registry.yarnpkg.com/is-obj/-/is-obj-2.0.0.tgz#473fb05d973705e3fd9620545018ca8e22ef4982"
integrity sha512-drqDG3cbczxxEJRoOXcOjtdp1J/lyp1mNn0xaznRs8+muBhgQcrnbspox5X5fOw0HnMnbfDzvnEMEtqDEJEo8w==
is-observable@^2.1.0:
version "2.1.0"
resolved "https://registry.npmjs.org/is-observable/-/is-observable-2.1.0.tgz"
integrity sha512-DailKdLb0WU+xX8K5w7VsJhapwHLZ9jjmazqCJq4X12CTgqq73TKnbRcnSLuXYPOoLQgV5IrD7ePiX/h1vnkBw==
is-plain-obj@^1.0.0, is-plain-obj@^1.1.0:
version "1.1.0"
resolved "https://registry.yarnpkg.com/is-plain-obj/-/is-plain-obj-1.1.0.tgz#71a50c8429dfca773c92a390a4a03b39fcd51d3e"
@@ -9263,11 +9349,6 @@ obliterator@^2.0.1:
resolved "https://registry.yarnpkg.com/obliterator/-/obliterator-2.0.4.tgz#fa650e019b2d075d745e44f1effeb13a2adbe816"
integrity sha512-lgHwxlxV1qIg1Eap7LgIeoBWIMFibOjbrYPIPJZcI1mmGAI2m3lNYpK12Y+GBdPQ0U1hRwSord7GIaawz962qQ==
observable-fns@^0.6.1:
version "0.6.1"
resolved "https://registry.npmjs.org/observable-fns/-/observable-fns-0.6.1.tgz"
integrity sha512-9gRK4+sRWzeN6AOewNBTLXir7Zl/i3GB6Yl26gK4flxz8BXVpD3kt8amREmWNb0mxYOGDotvE5a4N+PtGGKdkg==
on-exit-leak-free@^2.1.0:
version "2.1.0"
resolved "https://registry.yarnpkg.com/on-exit-leak-free/-/on-exit-leak-free-2.1.0.tgz#5c703c968f7e7f851885f6459bf8a8a57edc9cc4"
@@ -9793,6 +9874,13 @@ pino@^9.0.0:
sonic-boom "^3.7.0"
thread-stream "^2.6.0"
piscina@^5.0.0:
version "5.1.4"
resolved "https://registry.yarnpkg.com/piscina/-/piscina-5.1.4.tgz#86ca2b8e42bcbfc258dc7b09d918ee04b2327a67"
integrity sha512-7uU4ZnKeQq22t9AsmHGD2w4OYQGonwFnTypDypaWi7Qr2EvQIFVtG8J5D/3bE7W123Wdc9+v4CZDu5hJXVCtBg==
optionalDependencies:
"@napi-rs/nice" "^1.0.4"
pixelmatch@7.1.0:
version "7.1.0"
resolved "https://registry.yarnpkg.com/pixelmatch/-/pixelmatch-7.1.0.tgz#9d59bddc8c779340e791106c0f245ac33ae4d113"
@@ -11315,13 +11403,6 @@ timers-browserify@^2.0.4:
dependencies:
setimmediate "^1.0.4"
"tiny-worker@>= 2":
version "2.3.0"
resolved "https://registry.npmjs.org/tiny-worker/-/tiny-worker-2.3.0.tgz"
integrity sha512-pJ70wq5EAqTAEl9IkGzA+fN0836rycEuz2Cn6yeZ6FRzlVS5IDOkFHpIoEsksPRQV34GDqXm65+OlnZqUSyK2g==
dependencies:
esm "^3.2.25"
tinybench@^2.9.0:
version "2.9.0"
resolved "https://registry.yarnpkg.com/tinybench/-/tinybench-2.9.0.tgz#103c9f8ba6d7237a47ab6dd1dcff77251863426b"