Refactor job queue (#2275)

* Refactor job queue

* Update metrics http server
This commit is contained in:
Lion - dapplion
2021-03-26 15:48:44 +01:00
committed by GitHub
parent 06f35b5e8c
commit ce48eafd92
22 changed files with 362 additions and 272 deletions

View File

@@ -2631,7 +2631,7 @@
"steppedLine": false,
"targets": [
{
"expr": "rate(lodestar_block_processor_total_async_time[$__rate_interval])",
"expr": "rate(lodestar_block_processor_queue_job_time_seconds[$__rate_interval])",
"instant": false,
"interval": "",
"legendFormat": "",

View File

@@ -85,7 +85,7 @@
"libp2p-tcp": "^0.15.3",
"multiaddr": "^8.1.2",
"peer-id": "^0.14.3",
"prom-client": "^11.5.3",
"prom-client": "^13.1.0",
"prometheus-gc-stats": "^0.6.3",
"snappyjs": "^0.6.0",
"stream-to-it": "^0.2.0",

View File

@@ -16,6 +16,8 @@ import {IBeaconMetrics} from "../../metrics";
import {processBlock, processChainSegment} from "./process";
import {validateBlock} from "./validate";
const metricsPrefix = "lodestar_block_processor_queue";
type BlockProcessorModules = {
config: IBeaconConfig;
forkChoice: IForkChoice;
@@ -35,27 +37,23 @@ export class BlockProcessor {
constructor({
signal,
queueSize = 256,
maxLength = 256,
...modules
}: BlockProcessorModules & {
signal: AbortSignal;
queueSize?: number;
maxLength?: number;
}) {
this.modules = modules;
this.jobQueue = new JobQueue({queueSize, signal, onJobDone: this.onJobDone});
this.jobQueue = new JobQueue({maxLength, signal}, {metrics: modules.metrics, prefix: metricsPrefix});
}
async processBlockJob(job: IBlockJob): Promise<void> {
return await this.jobQueue.enqueueJob(async () => await processBlockJob(this.modules, job));
await this.jobQueue.push(async () => await processBlockJob(this.modules, job));
}
async processChainSegment(job: IChainSegmentJob): Promise<void> {
return await this.jobQueue.enqueueJob(async () => await processChainSegmentJob(this.modules, job));
await this.jobQueue.push(async () => await processChainSegmentJob(this.modules, job));
}
private onJobDone = ({ms}: {ms: number}): void => {
this.modules.metrics?.blockProcessorTotalAsyncTime.inc(ms / 1000);
};
}
/**

View File

@@ -28,7 +28,7 @@ export class QueuedStateRegenerator implements IStateRegenerator {
checkpointStateCache,
db,
signal,
queueSize = 256,
maxLength = 256,
}: {
config: IBeaconConfig;
emitter: ChainEventEmitter;
@@ -37,25 +37,25 @@ export class QueuedStateRegenerator implements IStateRegenerator {
checkpointStateCache: CheckpointStateCache;
db: IBeaconDb;
signal: AbortSignal;
queueSize?: number;
maxLength?: number;
}) {
this.regen = new StateRegenerator({config, emitter, forkChoice, stateCache, checkpointStateCache, db});
this.jobQueue = new JobQueue({queueSize, signal});
this.jobQueue = new JobQueue({maxLength, signal});
}
async getPreState(block: phase0.BeaconBlock): Promise<CachedBeaconState<phase0.BeaconState>> {
return await this.jobQueue.enqueueJob(async () => await this.regen.getPreState(block));
return await this.jobQueue.push(async () => await this.regen.getPreState(block));
}
async getCheckpointState(cp: phase0.Checkpoint): Promise<CachedBeaconState<phase0.BeaconState>> {
return await this.jobQueue.enqueueJob(async () => await this.regen.getCheckpointState(cp));
return await this.jobQueue.push(async () => await this.regen.getCheckpointState(cp));
}
async getBlockSlotState(blockRoot: Root, slot: Slot): Promise<CachedBeaconState<phase0.BeaconState>> {
return await this.jobQueue.enqueueJob(async () => await this.regen.getBlockSlotState(blockRoot, slot));
return await this.jobQueue.push(async () => await this.regen.getBlockSlotState(blockRoot, slot));
}
async getState(stateRoot: Root): Promise<CachedBeaconState<phase0.BeaconState>> {
return await this.jobQueue.enqueueJob(async () => await this.regen.getState(stateRoot));
return await this.jobQueue.push(async () => await this.regen.getState(stateRoot));
}
}

View File

@@ -10,44 +10,43 @@ import {readLodestarGitData} from "./gitData";
import {ILogger} from "@chainsafe/lodestar-utils";
export class BeaconMetrics extends Metrics implements IBeaconMetrics {
peers: Gauge;
slot: Gauge;
headSlot: Gauge;
headRoot: Gauge;
finalizedEpoch: Gauge;
finalizedRoot: Gauge;
currentJustifiedEpoch: Gauge;
currentJustifiedRoot: Gauge;
previousJustifiedEpoch: Gauge;
previousJustifiedRoot: Gauge;
currentValidators: Gauge;
previousValidators: Gauge;
currentLiveValidators: Gauge;
previousLiveValidators: Gauge;
pendingDeposits: Gauge;
processedDepositsTotal: Gauge;
pendingExits: Gauge;
previousEpochOrphanedBlocks: Gauge;
reorgEventsTotal: Counter;
currentEpochActiveGwei: Gauge;
currentEpochSourceGwei: Gauge;
currentEpochTargetGwei: Gauge;
previousEpochActiveGwei: Gauge;
previousEpochSourceGwei: Gauge;
previousEpochTargetGwei: Gauge;
observedEpochAttesters: Gauge;
observedEpochAggregators: Gauge;
blockProcessorTotalAsyncTime: Gauge;
peersByDirection: Gauge;
peerConnectedEvent: Gauge;
peerDisconnectedEvent: Gauge;
peerGoodbyeReceived: Gauge;
peerGoodbyeSent: Gauge;
peersTotalUniqueConnected: Gauge;
gossipMeshPeersByType: Gauge;
gossipMeshPeersByBeaconAttestationSubnet: Gauge;
peers: Gauge<string>;
slot: Gauge<string>;
headSlot: Gauge<string>;
headRoot: Gauge<string>;
finalizedEpoch: Gauge<string>;
finalizedRoot: Gauge<string>;
currentJustifiedEpoch: Gauge<string>;
currentJustifiedRoot: Gauge<string>;
previousJustifiedEpoch: Gauge<string>;
previousJustifiedRoot: Gauge<string>;
currentValidators: Gauge<string>;
previousValidators: Gauge<string>;
currentLiveValidators: Gauge<string>;
previousLiveValidators: Gauge<string>;
pendingDeposits: Gauge<string>;
processedDepositsTotal: Gauge<string>;
pendingExits: Gauge<string>;
previousEpochOrphanedBlocks: Gauge<string>;
reorgEventsTotal: Counter<string>;
currentEpochActiveGwei: Gauge<string>;
currentEpochSourceGwei: Gauge<string>;
currentEpochTargetGwei: Gauge<string>;
previousEpochActiveGwei: Gauge<string>;
previousEpochSourceGwei: Gauge<string>;
previousEpochTargetGwei: Gauge<string>;
observedEpochAttesters: Gauge<string>;
observedEpochAggregators: Gauge<string>;
peersByDirection: Gauge<string>;
peerConnectedEvent: Gauge<string>;
peerDisconnectedEvent: Gauge<string>;
peerGoodbyeReceived: Gauge<string>;
peerGoodbyeSent: Gauge<string>;
peersTotalUniqueConnected: Gauge<string>;
gossipMeshPeersByType: Gauge<string>;
gossipMeshPeersByBeaconAttestationSubnet: Gauge<string>;
private lodestarVersion: Gauge;
private lodestarVersion: Gauge<string>;
private logger: ILogger;
constructor(opts: IMetricsOptions, {logger}: {logger: ILogger}) {
@@ -194,12 +193,6 @@ export class BeaconMetrics extends Metrics implements IBeaconMetrics {
// Extra Lodestar custom metrics
this.blockProcessorTotalAsyncTime = new Gauge({
name: "lodestar_block_processor_total_async_time",
help: "Total number of seconds spent completing block processor async jobs",
registers,
});
this.peersByDirection = new Gauge({
name: "lodestar_peers_by_direction",
help: "number of peers, labeled by direction",
@@ -264,8 +257,4 @@ export class BeaconMetrics extends Metrics implements IBeaconMetrics {
});
this.lodestarVersion.set(readLodestarGitData(), 1);
}
close(): void {
super.close();
}
}

View File

@@ -16,137 +16,130 @@ export interface IBeaconMetrics extends IMetrics {
/**
* Tracks the number of libp2p peers
*/
peers: Gauge;
peers: Gauge<string>;
/**
* Latest slot of the beacon chain state
*/
slot: Gauge;
slot: Gauge<string>;
/**
* Slot of the head block of the beacon chain
*/
headSlot: Gauge;
headSlot: Gauge<string>;
/**
* Root of the head block of the beacon chain
*/
headRoot: Gauge;
headRoot: Gauge<string>;
/**
* Current finalized epoch
*/
finalizedEpoch: Gauge;
finalizedEpoch: Gauge<string>;
/**
* Current finalized root
*/
finalizedRoot: Gauge;
finalizedRoot: Gauge<string>;
/**
* Current justified epoch
*/
currentJustifiedEpoch: Gauge;
currentJustifiedEpoch: Gauge<string>;
/**
* Current justified root
*/
currentJustifiedRoot: Gauge;
currentJustifiedRoot: Gauge<string>;
/**
* Current previously justified epoch
*/
previousJustifiedEpoch: Gauge;
previousJustifiedEpoch: Gauge<string>;
/**
* Current previously justified root
*/
previousJustifiedRoot: Gauge;
previousJustifiedRoot: Gauge<string>;
/**
* Number of `status="pending|active|exited|withdrawable" validators in current epoch
*/
currentValidators: Gauge;
currentValidators: Gauge<string>;
/**
* Number of `status="pending|active|exited|withdrawable" validators in current epoch
*/
previousValidators: Gauge;
previousValidators: Gauge<string>;
/**
* Number of active validators that successfully included attestation on chain for current epoch
*/
currentLiveValidators: Gauge;
currentLiveValidators: Gauge<string>;
/**
* Number of active validators that successfully included attestation on chain for previous epoch
*/
previousLiveValidators: Gauge;
previousLiveValidators: Gauge<string>;
/**
* Number of pending deposits (`state.eth1Data.depositCount - state.eth1DepositIndex`)
*/
pendingDeposits: Gauge;
pendingDeposits: Gauge<string>;
/**
* Number of total deposits included on chain
*/
processedDepositsTotal: Gauge;
processedDepositsTotal: Gauge<string>;
/**
* Number of pending voluntary exits in local operation pool
*/
pendingExits: Gauge;
pendingExits: Gauge<string>;
/**
* Number of blocks orphaned in the previous epoch
*/
previousEpochOrphanedBlocks: Gauge;
previousEpochOrphanedBlocks: Gauge<string>;
/**
* Total occurances of reorganizations of the chain
*/
reorgEventsTotal: Counter;
reorgEventsTotal: Counter<string>;
/**
* Track current epoch active balances
*/
currentEpochActiveGwei: Gauge;
currentEpochActiveGwei: Gauge<string>;
/**
* Track current epoch active balances
*/
currentEpochSourceGwei: Gauge;
currentEpochSourceGwei: Gauge<string>;
/**
* Track current epoch active balances
*/
currentEpochTargetGwei: Gauge;
currentEpochTargetGwei: Gauge<string>;
/**
* Track previous epoch active balances
*/
previousEpochActiveGwei: Gauge;
previousEpochActiveGwei: Gauge<string>;
/**
* Track previous epoch active balances
*/
previousEpochSourceGwei: Gauge;
previousEpochSourceGwei: Gauge<string>;
/**
* Track previous epoch active balances
*/
previousEpochTargetGwei: Gauge;
previousEpochTargetGwei: Gauge<string>;
/**
* Track number of attesters for which we have seen an attestation.
* That attestation is not necessarily included on chain.
*/
observedEpochAttesters: Gauge;
observedEpochAttesters: Gauge<string>;
/**
* Track number of aggregators for which we have seen an attestation.
* That attestation is not necessarily included on chain.
*/
observedEpochAggregators: Gauge;
/**
* Total number of seconds spent completing block processor async jobs
* Useful to compute the utilitzation ratio of the blockProcessor with:
* `rate(lodestar_block_processor_total_async_time[1m])`
*/
blockProcessorTotalAsyncTime: Gauge;
observedEpochAggregators: Gauge<string>;
/** Peers labeled by direction */
peersByDirection: Gauge;
peersByDirection: Gauge<string>;
/** Number of peer:connected event, labeled by direction */
peerConnectedEvent: Gauge;
peerConnectedEvent: Gauge<string>;
/** Number of peer:disconnected event, labeled by direction */
peerDisconnectedEvent: Gauge;
peerDisconnectedEvent: Gauge<string>;
/** Number of goodbye received, labeled by reason */
peerGoodbyeReceived: Gauge;
peerGoodbyeReceived: Gauge<string>;
/** Number of goodbye sent, labeled by reason */
peerGoodbyeSent: Gauge;
peerGoodbyeSent: Gauge<string>;
/** Total number of unique peers that have had a connection with */
peersTotalUniqueConnected: Gauge;
peersTotalUniqueConnected: Gauge<string>;
/** Gossip mesh peer count by GossipType */
gossipMeshPeersByType: Gauge;
gossipMeshPeersByType: Gauge<string>;
/** Gossip mesh peer count by beacon attestation subnet */
gossipMeshPeersByBeaconAttestationSubnet: Gauge;
close(): void;
gossipMeshPeersByBeaconAttestationSubnet: Gauge<string>;
}
// eslint-disable-next-line @typescript-eslint/no-empty-interface

View File

@@ -10,26 +10,21 @@ import {IMetricsOptions} from "./options";
export class Metrics implements IMetrics {
registry: Registry;
private defaultInterval: NodeJS.Timeout | null = null;
private opts: IMetricsOptions;
constructor(opts: IMetricsOptions) {
this.opts = opts;
this.registry = new Registry();
this.defaultInterval = collectDefaultMetrics({
register: this.registry,
timeout: this.opts.timeout,
}) as NodeJS.Timeout;
if (opts.enabled) {
collectDefaultMetrics({
register: this.registry,
// eventLoopMonitoringPrecision with sampling rate in milliseconds
eventLoopMonitoringPrecision: 10,
});
// Collects GC metrics using a native binding module
// - nodejs_gc_runs_total: Counts the number of time GC is invoked
// - nodejs_gc_pause_seconds_total: Time spent in GC in seconds
// - nodejs_gc_reclaimed_bytes_total: The number of bytes GC has freed
gcStats(this.registry)();
}
close(): void {
clearInterval(this.defaultInterval as NodeJS.Timeout);
// Collects GC metrics using a native binding module
// - nodejs_gc_runs_total: Counts the number of time GC is invoked
// - nodejs_gc_pause_seconds_total: Time spent in GC in seconds
// - nodejs_gc_reclaimed_bytes_total: The number of bytes GC has freed
gcStats(this.registry)();
}
}
}

View File

@@ -6,6 +6,7 @@ import {createHttpTerminator, HttpTerminator} from "http-terminator";
import {ILogger} from "@chainsafe/lodestar-utils";
import {IMetrics, IMetricsServer} from "../interface";
import {IMetricsOptions} from "../options";
import {wrapError} from "../../util/wrapError";
export class HttpMetricsServer implements IMetricsServer {
http: http.Server;
@@ -42,13 +43,16 @@ export class HttpMetricsServer implements IMetricsServer {
}
}
private onRequest(req: http.IncomingMessage, res: http.ServerResponse): void {
private async onRequest(req: http.IncomingMessage, res: http.ServerResponse): Promise<void> {
if (req.method === "GET" && req.url && req.url.includes("/metrics")) {
res.writeHead(200, {"content-type": this.metrics.registry.contentType});
res.end(this.metrics.registry.metrics());
const metricsRes = await wrapError(this.metrics.registry.metrics());
if (metricsRes.err) {
res.writeHead(500, {"content-type": "text/plain"}).end(metricsRes.err.stack);
} else {
res.writeHead(200, {"content-type": this.metrics.registry.contentType}).end(metricsRes.result);
}
} else {
res.writeHead(404);
res.end();
res.writeHead(404).end();
}
}
}

View File

@@ -228,7 +228,6 @@ export class BeaconNode {
if (this.restApi) await this.restApi.close();
this.chain.close();
this.metrics?.close();
await this.db.stop();
if (this.controller) this.controller.abort();
this.status = BeaconNodeStatus.closed;

View File

@@ -5,6 +5,7 @@ import {ErrorAborted, ILogger} from "@chainsafe/lodestar-utils";
import {IBeaconConfig} from "@chainsafe/lodestar-config";
import {ChainSegmentError} from "../../chain/errors";
import {ItTrigger} from "../../util/itTrigger";
import {wrapError} from "../../util/wrapError";
import {Batch, BatchOpts, BatchMetadata, BatchStatus} from "./batch";
import {
validateBatchesStatus,
@@ -12,7 +13,6 @@ import {
toBeProcessedStartEpoch,
toBeDownloadedStartEpoch,
toArr,
wrapError,
ChainPeersBalancer,
} from "./utils";

View File

@@ -1,3 +1,2 @@
export * from "./batches";
export * from "./peerBalancer";
export * from "./wrapError";

View File

@@ -1,81 +0,0 @@
import {AbortSignal} from "abort-controller";
import pushable from "it-pushable";
import pipe from "it-pipe";
import {LodestarError} from "@chainsafe/lodestar-utils";
export enum QueueErrorCode {
QUEUE_ABORTED = "QUEUE_ERROR_QUEUE_ABORTED",
QUEUE_THROTTLED = "QUEUE_ERROR_QUEUE_THROTTLED",
}
export type QueueErrorCodeType = {code: QueueErrorCode.QUEUE_ABORTED} | {code: QueueErrorCode.QUEUE_THROTTLED};
export class QueueError extends LodestarError<QueueErrorCodeType> {
constructor(type: QueueErrorCodeType) {
super(type);
}
}
// eslint-disable-next-line @typescript-eslint/no-explicit-any
type Job = (...args: any) => any;
type JobQueueItem<T extends Job = Job> = {
job: T;
resolve: (value?: ReturnType<T> | PromiseLike<ReturnType<T>> | undefined) => void;
reject: (reason?: unknown) => void;
};
type JobQueueOpts = {
queueSize: number;
signal: AbortSignal;
/**
* Called when a job resolves or rejects.
* Returns the total miliseconds ellapsed from job start to done
*/
onJobDone?: (data: {ms: number}) => void;
};
export class JobQueue {
private currentSize = 0;
private queue = pushable<JobQueueItem>();
private opts: JobQueueOpts;
constructor(opts: JobQueueOpts) {
this.opts = opts;
void pipe(this.queue, async (source) => {
for await (const job of source) {
await this.processJob(job);
}
});
}
async processJob({job, resolve, reject}: JobQueueItem): Promise<void> {
if (this.opts.signal.aborted) {
reject(new QueueError({code: QueueErrorCode.QUEUE_ABORTED}));
} else {
const start = Date.now();
try {
const result = (await job()) as Job;
resolve(result);
} catch (e) {
reject(e);
} finally {
this.opts.onJobDone?.({ms: Date.now() - start});
}
}
this.currentSize--;
}
enqueueJob<T extends Job>(job: T): Promise<ReturnType<T>> {
if (this.opts.signal.aborted) {
throw new QueueError({code: QueueErrorCode.QUEUE_ABORTED});
}
if (this.currentSize + 1 > this.opts.queueSize) {
throw new QueueError({code: QueueErrorCode.QUEUE_THROTTLED});
}
return new Promise((resolve, reject) => {
this.queue.push({job, resolve, reject});
this.currentSize++;
});
}
}

View File

@@ -0,0 +1,14 @@
import {LodestarError} from "@chainsafe/lodestar-utils";
export enum QueueErrorCode {
QUEUE_ABORTED = "QUEUE_ERROR_QUEUE_ABORTED",
QUEUE_MAX_LENGTH = "QUEUE_ERROR_QUEUE_MAX_LENGTH",
}
export type QueueErrorCodeType = {code: QueueErrorCode.QUEUE_ABORTED} | {code: QueueErrorCode.QUEUE_MAX_LENGTH};
export class QueueError extends LodestarError<QueueErrorCodeType> {
constructor(type: QueueErrorCodeType) {
super(type);
}
}

View File

@@ -0,0 +1,100 @@
import {AbortSignal} from "abort-controller";
import {sleep} from "@chainsafe/lodestar-utils";
import {wrapError} from "../wrapError";
import {QueueError, QueueErrorCode} from "./errors";
import {QueueMetricsOpts, IQueueMetrics, createQueueMetrics} from "./metrics";
export {QueueError, QueueErrorCode, QueueMetricsOpts};
export type JobQueueOpts = {
maxLength: number;
signal: AbortSignal;
/** Defaults to FIFO */
type?: QueueType;
};
export enum QueueType {
FIFO = "FIFO",
LIFO = "LIFO",
}
enum QueueState {
Idle,
Running,
Yielding,
}
// eslint-disable-next-line @typescript-eslint/no-explicit-any
type Job<R> = (...args: any) => Promise<R>;
type JobQueueItem<R, Fn extends Job<R>> = {
job: Fn;
resolve: (result: R | PromiseLike<R>) => void;
reject: (error?: Error) => void;
};
export class JobQueue {
private state = QueueState.Idle;
private readonly opts: JobQueueOpts;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
private readonly jobs: JobQueueItem<any, Job<any>>[] = [];
private readonly metrics?: IQueueMetrics;
constructor(opts: JobQueueOpts, metricsOpts?: QueueMetricsOpts) {
this.opts = opts;
this.opts.signal.addEventListener("abort", this.abortAllJobs, {once: true});
this.metrics = metricsOpts && createQueueMetrics(metricsOpts, {getQueueLength: () => this.jobs.length});
}
async push<R, Fn extends Job<R> = Job<R>>(job: Fn): Promise<R> {
if (this.opts.signal.aborted) {
throw new QueueError({code: QueueErrorCode.QUEUE_ABORTED});
}
if (this.jobs.length + 1 > this.opts.maxLength) {
this.metrics?.droppedJobs.inc();
throw new QueueError({code: QueueErrorCode.QUEUE_MAX_LENGTH});
}
return await new Promise<R>((resolve, reject) => {
this.jobs.push({job, resolve, reject});
setTimeout(this.runJob, 0);
});
}
private runJob = async (): Promise<void> => {
if (this.opts.signal.aborted || this.state !== QueueState.Idle) {
return;
}
// Default to FIFO. LIFO -> pop() remove last item, FIFO -> shift() remove first item
const job = this.opts.type === QueueType.LIFO ? this.jobs.pop() : this.jobs.shift();
if (!job) {
return;
}
this.state = QueueState.Running;
const timer = this.metrics?.jobTime.startTimer();
const res = await wrapError<unknown>(job.job());
if (res.err) job.reject(res.err);
else job.resolve(res.result);
if (timer) timer();
// Yield to the macro queue
this.state = QueueState.Yielding;
await sleep(0);
this.state = QueueState.Idle;
// Potentially run a new job
void this.runJob();
};
private abortAllJobs = (): void => {
while (this.jobs.length > 0) {
const job = this.jobs.pop();
if (job) job.reject(new QueueError({code: QueueErrorCode.QUEUE_ABORTED}));
}
};
}

View File

@@ -0,0 +1,49 @@
import {CollectFunction, Gauge, Histogram} from "prom-client";
import {IBeaconMetrics} from "../../metrics";
export type QueueMetricsOpts = {
metrics: IBeaconMetrics | undefined;
prefix: string;
};
export interface IQueueMetrics {
length: Gauge<string>;
droppedJobs: Gauge<string>;
/**
* Total number of seconds spent completing queue jobs
* Useful to compute the utilitzation ratio of this queue with:
* `rate(metrics_name[1m])`
*/
jobTime: Histogram<string>;
}
export function createQueueMetrics(
metricsOpts: QueueMetricsOpts,
hooks: {getQueueLength: () => number}
): IQueueMetrics | undefined {
const {metrics, prefix} = metricsOpts;
if (!metrics) return;
return {
length: new Gauge({
name: `${prefix}_length`,
help: `Count of total queue length of ${prefix}`,
registers: [metrics.registry],
collect: ((lengthGauge: Gauge<string>) => {
lengthGauge.set(hooks.getQueueLength());
}) as CollectFunction<Gauge<string>>,
}),
droppedJobs: new Gauge({
name: `${prefix}_dropped_jobs_total`,
help: `Count of total dropped jobs of ${prefix}`,
registers: [metrics.registry],
}),
jobTime: new Histogram({
name: `${prefix}_job_time_seconds`,
help: `Time to process queue job of ${prefix} in seconds`,
registers: [metrics.registry],
}),
};
}

View File

@@ -4,17 +4,20 @@ import {testLogger} from "../../utils/logger";
describe("BeaconMetrics", () => {
const logger = testLogger();
it("updated metrics should be reflected in the registry", () => {
it("updated metrics should be reflected in the registry", async () => {
const m = new BeaconMetrics({enabled: true, timeout: 5000, pushGateway: false, serverPort: 0}, {logger});
const metricsAsArray = await m.registry.getMetricsAsArray();
const metricsAsText = await m.registry.metrics();
// basic assumptions
expect(m.registry.getMetricsAsArray().length).to.be.gt(0);
expect(m.registry.metrics()).to.not.equal("");
expect(metricsAsArray.length).to.be.gt(0);
expect(metricsAsText).to.not.equal("");
// check updating beacon-specific metrics
expect(m.registry.getSingleMetricAsString("libp2p_peers").match(/libp2p_peers 0/)).to.not.be.null;
expect((await m.registry.getSingleMetricAsString("libp2p_peers")).includes("libp2p_peers 0"));
m.peers.set(1);
expect(m.registry.getSingleMetricAsString("libp2p_peers").match(/libp2p_peers 1/)).to.not.be.null;
expect((await m.registry.getSingleMetricAsString("libp2p_peers")).includes("libp2p_peers 1"));
m.peers.set(20);
expect(m.registry.getSingleMetricAsString("libp2p_peers").match(/libp2p_peers 20/)).to.not.be.null;
m.close();
expect((await m.registry.getSingleMetricAsString("libp2p_peers")).includes("libp2p_peers 20"));
});
});

View File

@@ -2,10 +2,11 @@ import {expect} from "chai";
import {Metrics} from "../../../src/metrics";
describe("Metrics", () => {
it("should get default metrics from registry", () => {
it("should get default metrics from registry", async () => {
const m = new Metrics({enabled: true, timeout: 5000, serverPort: 0, pushGateway: false});
expect(m.registry.getMetricsAsArray().length).to.be.gt(0);
expect(m.registry.metrics()).to.not.equal("");
m.close();
const metricsAsArray = await m.registry.getMetricsAsArray();
const metricsAsText = await m.registry.metrics();
expect(metricsAsArray.length).to.be.gt(0);
expect(metricsAsText).to.not.equal("");
});
});

View File

@@ -4,13 +4,19 @@ import {testLogger} from "../../../utils/logger";
describe("HttpMetricsServer", () => {
const logger = testLogger();
let server: HttpMetricsServer;
it("should serve metrics on /metrics", async () => {
const options = {enabled: true, timeout: 5000, serverPort: 0, pushGateway: false};
const metrics = new Metrics(options);
const server = new HttpMetricsServer(options, {metrics, logger});
server = new HttpMetricsServer(options, {metrics, logger});
await server.start();
await request(server.http).get("/metrics").expect(200);
});
after(async () => {
await server.stop();
metrics.close();
});
});

View File

@@ -1,85 +1,108 @@
import {sleep} from "@chainsafe/lodestar-utils";
import {AbortController} from "abort-controller";
import {expect} from "chai";
import {JobQueue, QueueError, QueueErrorCode} from "../../../src/util/queue";
import {JobQueue, QueueError, QueueErrorCode, QueueType} from "../../../src/util/queue";
import {expectLodestarError, expectRejectedWithLodestarError} from "../../utils/errors";
describe("Job queue", () => {
const queueSize = 3;
const maxLength = 3;
const jobDuration = 20;
it("should only allow a single job at a time to run", async () => {
const controller = new AbortController();
const signal = controller.signal;
const jobQueue = new JobQueue({queueSize, signal});
const jobQueue = new JobQueue({maxLength, signal: controller.signal});
let activeJobs = 0;
const job = async (): Promise<void> => {
async function job(): Promise<void> {
activeJobs++;
await new Promise((resolve) => setTimeout(resolve, jobDuration));
await sleep(jobDuration);
if (activeJobs > 1) {
throw new Error();
}
activeJobs--;
};
}
// Start all jobs at the same time
// expect none of the jobs to be running simultaneously
await Promise.all(Array.from({length: queueSize}, () => jobQueue.enqueueJob(job)));
await Promise.all(Array.from({length: maxLength}, () => jobQueue.push(job)));
});
it("should throw after the queue is full", async () => {
const controller = new AbortController();
const signal = controller.signal;
const jobQueue = new JobQueue({queueSize, signal});
const jobQueue = new JobQueue({maxLength, signal: controller.signal});
const job = async (): Promise<void> => {
await new Promise((resolve) => setTimeout(resolve, jobDuration));
};
// Start `queueSize` # of jobs at the same time
// the queue is now full
const jobs = Promise.all(Array.from({length: queueSize}, () => jobQueue.enqueueJob(job)));
try {
// the next enqueued job should go over the limit
await jobQueue.enqueueJob(job);
} catch (e) {
assertQueueErrorCode(e, QueueErrorCode.QUEUE_THROTTLED);
async function job(): Promise<void> {
await sleep(jobDuration);
}
// Start `maxLength` # of jobs at the same time
// the queue is now full
const jobs = Promise.all(Array.from({length: maxLength}, () => jobQueue.push(job)));
// the next enqueued job should go over the limit
await expectRejectedWithLodestarError(jobQueue.push(job), new QueueError({code: QueueErrorCode.QUEUE_MAX_LENGTH}));
await jobs;
});
it("should throw after the queue is aborted", async () => {
const controller = new AbortController();
const signal = controller.signal;
const jobQueue = new JobQueue({queueSize, signal});
const jobQueue = new JobQueue({maxLength, signal: controller.signal});
const job = async (): Promise<void> => {
await new Promise((resolve) => setTimeout(resolve, jobDuration));
};
const jobs = Promise.allSettled(Array.from({length: queueSize}, () => jobQueue.enqueueJob(job)));
async function job(): Promise<void> {
await sleep(jobDuration);
}
const jobs = Promise.allSettled(Array.from({length: maxLength}, () => jobQueue.push(job)));
controller.abort();
const results = await jobs;
// all jobs should be rejected with ERR_QUEUE_ABORTED
for (const e of results) {
if (e.status === "rejected") {
assertQueueErrorCode(e.reason, QueueErrorCode.QUEUE_ABORTED);
expectLodestarError(e.reason, new QueueError({code: QueueErrorCode.QUEUE_ABORTED}));
} else {
expect.fail();
}
}
// any subsequently enqueued job should also be rejected
try {
await jobQueue.enqueueJob(job);
} catch (e) {
assertQueueErrorCode(e, QueueErrorCode.QUEUE_ABORTED);
await expectRejectedWithLodestarError(jobQueue.push(job), new QueueError({code: QueueErrorCode.QUEUE_ABORTED}));
});
describe("Queue types", () => {
const jobCount = 3;
const testCases: {type: QueueType; expectedResults: number[]}[] = [
// FIFO should pick the first jobs first
{type: QueueType.FIFO, expectedResults: [0, 1, 2]},
// LIFO should pick the last jobs first
{type: QueueType.LIFO, expectedResults: [2, 1, 0]},
];
for (const {type, expectedResults} of testCases) {
it(type, async () => {
const controller = new AbortController();
const jobQueue = new JobQueue({maxLength, type, signal: controller.signal});
const results: number[] = [];
const jobPromises: Promise<void>[] = [];
for (let i = 0; i < jobCount; i++) {
jobPromises.push(
jobQueue.push(async () => {
await sleep(0);
results.push(i);
})
);
}
const jobResults = await Promise.allSettled(jobPromises);
for (const [i, jobResult] of jobResults.entries()) {
expect(jobResult.status).to.equal("fulfilled", `Job ${i} rejected`);
}
expect(results).to.deep.equal(expectedResults, "Wrong results");
});
}
});
});
function assertQueueErrorCode(e: QueueError, code: QueueErrorCode): void {
if (e instanceof QueueError) {
expect(e.type.code).to.be.equal(code, "Wrong QueueErrorCode");
} else {
throw Error(`Expected e ${QueueError} to be instaceof QueueError`);
}
}

View File

@@ -1,7 +1,6 @@
import {minimalConfig} from "@chainsafe/lodestar-config/minimal";
import {SinonSandbox, SinonStubbedInstance} from "sinon";
import {BeaconChain, ForkChoice} from "../../../src/chain";
import {BeaconMetrics} from "../../../src/metrics";
import {LocalClock} from "../../../src/chain/clock";
import {StateRegenerator} from "../../../src/chain/regen";
import {CheckpointStateCache, StateContextCache} from "../../../src/chain/stateCache";
@@ -24,7 +23,6 @@ export class StubbedBeaconChain extends BeaconChain {
opts: {},
config,
logger: testLogger(),
metrics: sinon.createStubInstance(BeaconMetrics),
db: new StubbedBeaconDb(sinon, config),
anchorState: config.types.phase0.BeaconState.tree.createValue({
...config.types.phase0.BeaconState.defaultValue(),

View File

@@ -10546,10 +10546,10 @@ progress@^2.0.0, progress@^2.0.3:
resolved "https://registry.yarnpkg.com/progress/-/progress-2.0.3.tgz#7e8cf8d8f5b8f239c1bc68beb4eb78567d572ef8"
integrity sha512-7PiHtLll5LdnKIMw100I+8xJXR5gW2QwWYkT6iJva0bXitZKa/XMrSbdmg3r2Xnaidz9Qumd0VPaMrZlF9V9sA==
prom-client@^11.5.3:
version "11.5.3"
resolved "https://registry.yarnpkg.com/prom-client/-/prom-client-11.5.3.tgz#5fedfce1083bac6c2b223738e966d0e1643756f8"
integrity sha512-iz22FmTbtkyL2vt0MdDFY+kWof+S9UB/NACxSn2aJcewtw+EERsen0urSkZ2WrHseNdydsvcxCTAnPcSMZZv4Q==
prom-client@^13.1.0:
version "13.1.0"
resolved "https://registry.yarnpkg.com/prom-client/-/prom-client-13.1.0.tgz#1185caffd8691e28d32e373972e662964e3dba45"
integrity sha512-jT9VccZCWrJWXdyEtQddCDszYsiuWj5T0ekrPszi/WEegj3IZy6Mm09iOOVM86A4IKMWq8hZkT2dD9MaSe+sng==
dependencies:
tdigest "^0.1.1"