Add validator client metrics (#3914)

* Add validator metrics in code

* Update test types

* Wire validator metrics

* Wire validator metrics in cli

* Add HTTP client metrics

* Rename metrics prefix

* Metrics for attester steps

* Metrics for sync committee steps

* Metrics for proposer steps

* Fix vc journey metrics times

* De-duplicate log from vc metrics server

* Add validator target to local metrics

* Fix metrics for vc http client

* PR comments

* Fix option types

* Fix attestationNoCommittee property merging
This commit is contained in:
Lion - dapplion
2022-04-13 17:49:59 +02:00
committed by GitHub
parent efa138b409
commit 06a40d36bf
40 changed files with 792 additions and 184 deletions

View File

@@ -23,7 +23,8 @@ import {
// eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types, @typescript-eslint/explicit-function-return-type
export function getFetchOptsSerializer<Fn extends (...args: any) => any, ReqType extends ReqGeneric>(
routeDef: RouteDef,
reqSerializer: ReqSerializer<Fn, ReqType>
reqSerializer: ReqSerializer<Fn, ReqType>,
routeId: string
) {
const urlFormater = compileRouteUrlFormater(routeDef.url);
@@ -35,6 +36,7 @@ export function getFetchOptsSerializer<Fn extends (...args: any) => any, ReqType
query: req.query,
body: req.body as unknown,
headers: req.headers,
routeId,
};
};
}
@@ -47,7 +49,9 @@ export function getFetchOptsSerializers<
Api extends Record<string, RouteGeneric>,
ReqTypes extends {[K in keyof Api]: ReqGeneric}
>(routesData: RoutesData<Api>, reqSerializers: ReqSerializers<Api, ReqTypes>) {
return mapValues(routesData, (routeDef, routeKey) => getFetchOptsSerializer(routeDef, reqSerializers[routeKey]));
return mapValues(routesData, (routeDef, routeId) =>
getFetchOptsSerializer(routeDef, reqSerializers[routeId], routeId as string)
);
}
/**
@@ -62,9 +66,9 @@ export function generateGenericJsonClient<
returnTypes: ReturnTypes<Api>,
fetchFn: IHttpClient
): Api {
return mapValues(routesData, (routeDef, routeKey) => {
const fetchOptsSerializer = getFetchOptsSerializer(routeDef, reqSerializers[routeKey]);
const returnType = returnTypes[routeKey as keyof ReturnTypes<Api>] as TypeJson<any> | null;
return mapValues(routesData, (routeDef, routeId) => {
const fetchOptsSerializer = getFetchOptsSerializer(routeDef, reqSerializers[routeId], routeId as string);
const returnType = returnTypes[routeId as keyof ReturnTypes<Api>] as TypeJson<any> | null;
return async function request(...args: Parameters<Api[keyof Api]>): Promise<any | void> {
const res = await fetchFn.json<unknown>(fetchOptsSerializer(...args));

View File

@@ -3,6 +3,7 @@ import {AbortSignal, AbortController} from "@chainsafe/abort-controller";
import {ErrorAborted, TimeoutError} from "@chainsafe/lodestar-utils";
import {ReqGeneric, RouteDef} from "../../utils";
import {stringifyQuery, urlJoin} from "./format";
import {Metrics} from "./metrics";
export class HttpError extends Error {
status: number;
@@ -21,6 +22,8 @@ export type FetchOpts = {
query?: ReqGeneric["query"];
body?: ReqGeneric["body"];
headers?: ReqGeneric["headers"];
/** Optional, for metrics */
routeId?: string;
};
export interface IHttpClient {
@@ -36,6 +39,8 @@ export type HttpClientOptions = {
getAbortSignal?: () => AbortSignal | undefined;
/** Override fetch function */
fetch?: typeof fetch;
/** Optional metrics */
metrics?: null | Metrics;
};
export class HttpClient implements IHttpClient {
@@ -43,6 +48,7 @@ export class HttpClient implements IHttpClient {
private readonly timeoutMs: number;
private readonly getAbortSignal?: () => AbortSignal | undefined;
private readonly fetch: typeof fetch;
private readonly metrics: null | Metrics;
/**
* timeoutMs = config.params.SECONDS_PER_SLOT * 1000
@@ -53,6 +59,7 @@ export class HttpClient implements IHttpClient {
this.timeoutMs = opts.timeoutMs ?? 60_000;
this.getAbortSignal = opts.getAbortSignal;
this.fetch = opts.fetch ?? fetch;
this.metrics = opts.metrics ?? null;
}
async json<T>(opts: FetchOpts): Promise<T> {
@@ -74,6 +81,9 @@ export class HttpClient implements IHttpClient {
signalGlobal.addEventListener("abort", () => controller.abort());
}
const routeId = opts.routeId; // TODO: Should default to "unknown"?
const timer = this.metrics?.requestTime.startTimer({routeId});
try {
const url = urlJoin(this.baseUrl, opts.url) + (opts.query ? "?" + stringifyQuery(opts.query) : "");
@@ -103,8 +113,13 @@ export class HttpClient implements IHttpClient {
throw Error("Unknown aborted error");
}
}
this.metrics?.errors.inc({routeId});
throw e;
} finally {
timer?.();
clearTimeout(timeout);
if (signalGlobal) {
signalGlobal.removeEventListener("abort", controller.abort);

View File

@@ -0,0 +1,43 @@
export type Metrics = {
requestTime: IHistogram<"routeId">;
errors: IGauge<"routeId">;
};
type LabelValues<T extends string> = Partial<Record<T, string | number>>;
export interface IGauge<T extends string> {
/**
* Increment gauge for given labels
* @param labels Object with label keys and values
* @param value The value to increment with
*/
inc(labels: LabelValues<T>, value?: number): void;
/**
* Increment gauge
* @param value The value to increment with
*/
inc(value?: number): void;
/**
* Set gauge value for labels
* @param labels Object with label keys and values
* @param value The value to set
*/
set(labels: LabelValues<T>, value: number): void;
/**
* Set gauge value
* @param value The value to set
*/
set(value: number): void;
}
export interface IHistogram<T extends string> {
/**
* Start a timer where the value in seconds will observed
* @param labels Object with label keys and values
* @return Function to invoke when timer should be stopped
*/
startTimer(labels?: LabelValues<T>): (labels?: LabelValues<T>) => number;
}

View File

@@ -46,19 +46,19 @@ export function getGenericJsonServer<
const reqSerializers = getReqSerializers(config);
const returnTypes = getReturnTypes(config);
return mapValues(routesData, (routeDef, routeKey) => {
const routeSerdes = reqSerializers[routeKey];
const returnType = returnTypes[routeKey as keyof ReturnTypes<Api>] as TypeJson<any> | null;
return mapValues(routesData, (routeDef, routeId) => {
const routeSerdes = reqSerializers[routeId];
const returnType = returnTypes[routeId as keyof ReturnTypes<Api>] as TypeJson<any> | null;
return {
url: routeDef.url,
method: routeDef.method,
id: routeKey as string,
id: routeId as string,
schema: routeSerdes.schema && getFastifySchema(routeSerdes.schema),
handler: async function handler(req: ReqGeneric): Promise<unknown | void> {
const args: any[] = routeSerdes.parseReq(req as ReqTypes[keyof Api]);
const data = (await api[routeKey](...args)) as Resolves<Api[keyof Api]>;
const data = (await api[routeId](...args)) as Resolves<Api[keyof Api]>;
if (returnType) {
return returnType.toJson(data);
} else {

View File

@@ -36,7 +36,7 @@ export function getTestServer(): {baseUrl: string; server: FastifyInstance} {
/** Type helper to get a Sinon mock object type with Api */
export function getMockApi<Api extends Record<string, any>>(
routeKeys: Record<string, any>
routeIds: Record<string, any>
): Sinon.SinonStubbedInstance<Api> & Api {
return mapValues(routeKeys, () => Sinon.stub()) as Sinon.SinonStubbedInstance<Api> & Api;
return mapValues(routeIds, () => Sinon.stub()) as Sinon.SinonStubbedInstance<Api> & Api;
}

View File

@@ -1,17 +1,18 @@
import {AbortController} from "@chainsafe/abort-controller";
import {getClient} from "@chainsafe/lodestar-api";
import {LevelDbController} from "@chainsafe/lodestar-db";
import {SignerType, Signer, SlashingProtection, Validator} from "@chainsafe/lodestar-validator";
import {getMetrics, MetricsRegister} from "@chainsafe/lodestar-validator";
import {KeymanagerServer, KeymanagerApi} from "@chainsafe/lodestar-keymanager-server";
import {RegistryMetricCreator, collectNodeJSMetrics, HttpMetricsServer} from "@chainsafe/lodestar";
import {getBeaconConfigFromArgs} from "../../config";
import {IGlobalArgs} from "../../options";
import {YargsError, getDefaultGraffiti, initBLS, mkdir, getCliLogger} from "../../util";
import {onGracefulShutdown} from "../../util";
import {getVersion, getVersionGitData} from "../../util/version";
import {getBeaconPaths} from "../beacon/paths";
import {getValidatorPaths} from "./paths";
import {IValidatorCliArgs} from "./options";
import {IValidatorCliArgs, validatorMetricsDefaultOptions} from "./options";
import {getLocalSecretKeys, getExternalSigners, groupExternalSignersByUrl} from "./keys";
import {getVersion} from "../../util/version";
/**
* Runs a validator client.
@@ -28,6 +29,7 @@ export async function validatorHandler(args: IValidatorCliArgs & IGlobalArgs): P
const logger = getCliLogger(args, beaconPaths, config);
const version = getVersion();
const gitData = getVersionGitData();
logger.info("Lodestar", {version: version, network: args.network});
const dbPath = validatorPaths.validatorsDbDir;
@@ -89,19 +91,50 @@ export async function validatorHandler(args: IValidatorCliArgs & IGlobalArgs): P
const controller = new AbortController();
onGracefulShutdownCbs.push(async () => controller.abort());
const api = getClient(config, {baseUrl: args.server});
const dbOps = {
config: config,
controller: new LevelDbController({name: dbPath}, {logger}),
};
const slashingProtection = new SlashingProtection(dbOps);
// Create metrics registry if metrics are enabled
// Send version and network data for static registries
const register = args["metrics.enabled"] ? new RegistryMetricCreator() : null;
const metrics =
register &&
getMetrics((register as unknown) as MetricsRegister, {
semver: gitData.semver ?? "-",
branch: gitData.branch ?? "-",
commit: gitData.commit ?? "-",
version,
network: args.network,
});
// Start metrics server if metrics are enabled.
// Collect NodeJS metrics defined in the Lodestar repo
if (metrics) {
collectNodeJSMetrics(register);
const port = args["metrics.port"] ?? validatorMetricsDefaultOptions.port;
const address = args["metrics.address"] ?? validatorMetricsDefaultOptions.address;
const metricsServer = new HttpMetricsServer({port, address}, {register, logger});
onGracefulShutdownCbs.push(() => metricsServer.stop());
await metricsServer.start();
}
// This promise resolves once genesis is available.
// It will wait for genesis, so this promise can be potentially very long
const validator = await Validator.initializeFromBeaconNode(
{dbOps, slashingProtection, api, logger, signers, graffiti},
controller.signal
{dbOps, slashingProtection, api: args.server, logger, signers, graffiti},
controller.signal,
metrics
);
onGracefulShutdownCbs.push(async () => await validator.stop());
onGracefulShutdownCbs.push(() => validator.stop());
await validator.start();
// Start keymanager API backend

View File

@@ -5,6 +5,12 @@ import {logOptions, beaconPathsOptions} from "../beacon/options";
import {IBeaconPaths} from "../beacon/paths";
import {KeymanagerArgs, keymanagerOptions} from "../../options/keymanagerOptions";
export const validatorMetricsDefaultOptions = {
enabled: false,
port: 5064,
address: "127.0.0.1",
};
export type IValidatorCliArgs = IAccountValidatorArgs &
ILogArgs & {
logFile: IBeaconPaths["logFile"];
@@ -20,6 +26,10 @@ export type IValidatorCliArgs = IAccountValidatorArgs &
interopIndexes?: string;
fromMnemonic?: string;
mnemonicIndexes?: string;
"metrics.enabled"?: boolean;
"metrics.port"?: number;
"metrics.address"?: string;
} & KeymanagerArgs;
export const validatorOptions: ICliCommandOptions<IValidatorCliArgs> = {
@@ -89,6 +99,29 @@ export const validatorOptions: ICliCommandOptions<IValidatorCliArgs> = {
group: "External signer",
},
// Metrics
"metrics.enabled": {
type: "boolean",
description: "Enable the Prometheus metrics HTTP server",
defaultDescription: String(validatorMetricsDefaultOptions.enabled),
group: "metrics",
},
"metrics.port": {
type: "number",
description: "Listen TCP port for the Prometheus metrics HTTP server",
defaultDescription: String(validatorMetricsDefaultOptions.port),
group: "metrics",
},
"metrics.address": {
type: "string",
description: "Listen address for the Prometheus metrics HTTP server",
defaultDescription: String(validatorMetricsDefaultOptions.address),
group: "metrics",
},
// For testing only
interopIndexes: {

View File

@@ -3,55 +3,39 @@ import {ICliCommandOptions} from "../../util";
export interface IMetricsArgs {
"metrics.enabled": boolean;
"metrics.gatewayUrl": string;
"metrics.serverPort": number;
"metrics.timeout": number;
"metrics.listenAddr": string;
"metrics.port": number;
"metrics.address": string;
}
export function parseArgs(args: IMetricsArgs): IBeaconNodeOptions["metrics"] {
return {
enabled: args["metrics.enabled"],
gatewayUrl: args["metrics.gatewayUrl"],
serverPort: args["metrics.serverPort"],
timeout: args["metrics.timeout"],
listenAddr: args["metrics.listenAddr"],
port: args["metrics.port"],
address: args["metrics.address"],
};
}
export const options: ICliCommandOptions<IMetricsArgs> = {
"metrics.enabled": {
type: "boolean",
description: "Enable metrics",
description: "Enable the Prometheus metrics HTTP server",
defaultDescription: String(defaultOptions.metrics.enabled),
group: "metrics",
},
"metrics.gatewayUrl": {
type: "string",
description: "Gateway URL for metrics",
defaultDescription: defaultOptions.metrics.gatewayUrl || "",
group: "metrics",
},
"metrics.serverPort": {
"metrics.port": {
type: "number",
description: "Server port for metrics",
defaultDescription: String(defaultOptions.metrics.serverPort),
alias: ["metrics.serverPort"], // For backwards compatibility
description: "Listen TCP port for the Prometheus metrics HTTP server",
defaultDescription: String(defaultOptions.metrics.port),
group: "metrics",
},
"metrics.timeout": {
type: "number",
description: "How often metrics should be probed",
defaultDescription: String(defaultOptions.metrics.timeout),
group: "metrics",
},
"metrics.listenAddr": {
"metrics.address": {
type: "string",
description: "The address for the metrics http server to listen on",
defaultDescription: String(defaultOptions.metrics.listenAddr),
alias: ["metrics.listenAddr"], // For backwards compatibility
description: "Listen address for the Prometheus metrics HTTP server",
defaultDescription: String(defaultOptions.metrics.address),
group: "metrics",
},
};

View File

@@ -35,10 +35,8 @@ describe("options / beaconNodeOptions", () => {
"logger.unknown.level": "debug",
"metrics.enabled": true,
"metrics.gatewayUrl": "http://localhost:8000",
"metrics.serverPort": 8765,
"metrics.timeout": 5000,
"metrics.listenAddr": "0.0.0.0",
"metrics.port": 8765,
"metrics.address": "0.0.0.0",
"network.discv5.enabled": true,
"network.discv5.bindAddr": "addr",
@@ -98,10 +96,8 @@ describe("options / beaconNodeOptions", () => {
},
metrics: {
enabled: true,
gatewayUrl: "http://localhost:8000",
serverPort: 8765,
timeout: 5000,
listenAddr: "0.0.0.0",
port: 8765,
address: "0.0.0.0",
},
network: {
discv5: {

View File

@@ -3,3 +3,6 @@ export {BeaconDb, IBeaconDb} from "./db";
export {Eth1Provider, IEth1Provider} from "./eth1";
export {createNodeJsLibp2p, NodeJsLibp2pOpts} from "./network";
export * from "./node";
// Export metrics utilities to de-duplicate validator metrics
export {RegistryMetricCreator, collectNodeJSMetrics, HttpMetricsServer} from "./metrics";

View File

@@ -4,3 +4,4 @@
export * from "./metrics";
export * from "./server";
export * from "./interface";
export {RegistryMetricCreator} from "./utils/registryMetricCreator";

View File

@@ -9,14 +9,14 @@ import gcStats from "prometheus-gc-stats";
import {DbMetricLabels, IDbMetrics} from "@chainsafe/lodestar-db";
import {createBeaconMetrics, IBeaconMetrics} from "./metrics/beacon";
import {createLodestarMetrics, ILodestarMetrics} from "./metrics/lodestar";
import {IMetricsOptions} from "./options";
import {MetricsOptions} from "./options";
import {RegistryMetricCreator} from "./utils/registryMetricCreator";
import {createValidatorMonitor, IValidatorMonitor} from "./validatorMonitor";
export type IMetrics = IBeaconMetrics & ILodestarMetrics & IValidatorMonitor & {register: RegistryMetricCreator};
export function createMetrics(
opts: IMetricsOptions,
opts: MetricsOptions,
config: IChainForkConfig,
anchorState: BeaconStateAllForks,
logger: ILogger,
@@ -37,20 +37,9 @@ export function createMetrics(
lodestar.unhandeledPromiseRejections.inc();
});
collectDefaultMetrics({
register,
// eventLoopMonitoringPrecision with sampling rate in milliseconds
eventLoopMonitoringPrecision: 10,
});
// Collects GC metrics using a native binding module
// - nodejs_gc_runs_total: Counts the number of time GC is invoked
// - nodejs_gc_pause_seconds_total: Time spent in GC in seconds
// - nodejs_gc_reclaimed_bytes_total: The number of bytes GC has freed
gcStats(register)();
collectNodeJSMetrics(register);
// Merge external registries
register;
for (const externalRegister of externalRegistries) {
// Wrong types, does not return a promise
const metrics = (externalRegister.getMetricsAsArray() as unknown) as Resolves<
@@ -69,6 +58,20 @@ export function createMetrics(
};
}
export function collectNodeJSMetrics(register: Registry): void {
collectDefaultMetrics({
register,
// eventLoopMonitoringPrecision with sampling rate in milliseconds
eventLoopMonitoringPrecision: 10,
});
// Collects GC metrics using a native binding module
// - nodejs_gc_runs_total: Counts the number of time GC is invoked
// - nodejs_gc_pause_seconds_total: Time spent in GC in seconds
// - nodejs_gc_reclaimed_bytes_total: The number of bytes GC has freed
gcStats(register)();
}
export function createDbMetrics(): {metrics: IDbMetrics; registry: Registry} {
const metrics = {
dbReads: new Counter<DbMetricLabels>({

View File

@@ -1,6 +1,6 @@
import {allForks} from "@chainsafe/lodestar-types";
import {RegistryMetricCreator} from "../utils/registryMetricCreator";
import {IMetricsOptions} from "../options";
import {LodestarMetadata} from "../options";
export type ILodestarMetrics = ReturnType<typeof createLodestarMetrics>;
@@ -10,11 +10,11 @@ export type ILodestarMetrics = ReturnType<typeof createLodestarMetrics>;
// eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types, @typescript-eslint/explicit-function-return-type
export function createLodestarMetrics(
register: RegistryMetricCreator,
metadata: IMetricsOptions["metadata"],
metadata?: LodestarMetadata,
anchorState?: Pick<allForks.BeaconState, "genesisTime">
) {
if (metadata) {
register.static<"semver" | "branch" | "commit" | "version" | "network">({
register.static<keyof LodestarMetadata>({
name: "lodestar_version",
help: "Lodestar version",
value: metadata,

View File

@@ -2,23 +2,9 @@
* @module metrics
*/
export interface IMetricsOptions {
enabled: boolean;
timeout?: number;
serverPort?: number;
gatewayUrl?: string;
listenAddr?: string;
/** Optional metadata to send to Prometheus */
metadata?: LodestarGitData & {network: string};
}
import {HttpMetricsServerOpts} from "./server";
export const defaultMetricsOptions: IMetricsOptions = {
enabled: false,
timeout: 5000,
serverPort: 8008,
};
export type LodestarGitData = {
export type LodestarMetadata = {
/** "0.16.0" */
semver: string;
/** "developer/feature-1" */
@@ -27,4 +13,17 @@ export type LodestarGitData = {
commit: string;
/** "0.16.0 developer/feature-1 ac99f2b5" */
version: string;
/** "prater" */
network: string;
};
export type MetricsOptions = HttpMetricsServerOpts & {
enabled: boolean;
/** Optional metadata to send to Prometheus */
metadata?: LodestarMetadata;
};
export const defaultMetricsOptions: MetricsOptions = {
enabled: false,
port: 8008,
};

View File

@@ -4,22 +4,18 @@
import http from "node:http";
import {Registry} from "prom-client";
import {ILogger} from "@chainsafe/lodestar-utils";
import {IMetricsOptions} from "../options";
import {wrapError} from "../../util/wrapError";
import {HistogramExtra} from "../utils/histogram";
import {HttpActiveSocketsTracker} from "../../api/rest/activeSockets";
import {RegistryMetricCreator} from "../utils/registryMetricCreator";
// eslint-disable-next-line @typescript-eslint/no-empty-interface
export interface IMetricsServer {}
type RegistryHolder = {
register: Registry;
export type HttpMetricsServerOpts = {
port: number;
address?: string;
};
export class HttpMetricsServer implements IMetricsServer {
export class HttpMetricsServer {
private readonly server: http.Server;
private readonly opts: IMetricsOptions;
private readonly register: Registry;
private readonly logger: ILogger;
private readonly activeSockets: HttpActiveSocketsTracker;
@@ -27,14 +23,14 @@ export class HttpMetricsServer implements IMetricsServer {
private readonly httpServerRegister: RegistryMetricCreator;
private readonly scrapeTimeMetric: HistogramExtra<"status">;
constructor(opts: IMetricsOptions, {metrics, logger}: {metrics: RegistryHolder; logger: ILogger}) {
this.opts = opts;
constructor(private readonly opts: HttpMetricsServerOpts, {register, logger}: {register: Registry; logger: ILogger}) {
this.logger = logger;
this.register = metrics.register;
this.register = register;
this.server = http.createServer(this.onRequest.bind(this));
// New registry to metric the metrics. Using the same registry would deadlock the .metrics promise
this.httpServerRegister = new RegistryMetricCreator();
this.scrapeTimeMetric = this.httpServerRegister.histogram<"status">({
name: "lodestar_metrics_scrape_seconds",
help: "Lodestar metrics server async time to scrape metrics",
@@ -61,11 +57,11 @@ export class HttpMetricsServer implements IMetricsServer {
}
async start(): Promise<void> {
const {serverPort, listenAddr} = this.opts;
this.logger.info("Starting metrics HTTP server", {port: serverPort ?? null});
const {port, address} = this.opts;
this.logger.info("Starting metrics HTTP server", {port, address: address ?? "127.0.0.1"});
const listen = this.server.listen.bind(this.server);
return new Promise((resolve, reject) => {
listen(serverPort, listenAddr).once("listening", resolve).once("error", reject);
listen(port, address).once("listening", resolve).once("error", reject);
});
}

View File

@@ -197,7 +197,7 @@ export class BeaconNode {
});
const metricsServer = metrics
? new HttpMetricsServer(opts.metrics, {metrics, logger: logger.child(opts.logger.metrics)})
? new HttpMetricsServer(opts.metrics, {register: metrics.register, logger: logger.child(opts.logger.metrics)})
: undefined;
if (metricsServer) {
await metricsServer.start();

View File

@@ -7,7 +7,7 @@ import {defaultChainOptions, IChainOptions} from "../chain/options";
import {defaultDbOptions, IDatabaseOptions} from "../db/options";
import {defaultEth1Options, Eth1Options} from "../eth1/options";
import {defaultLoggerOptions, IBeaconLoggerOptions} from "./loggerOptions";
import {defaultMetricsOptions, IMetricsOptions} from "../metrics/options";
import {defaultMetricsOptions, MetricsOptions} from "../metrics/options";
import {defaultNetworkOptions, INetworkOptions} from "../network/options";
import {defaultSyncOptions, SyncOptions} from "../sync/options";
import {defaultExecutionEngineOpts, ExecutionEngineOpts} from "../executionEngine";
@@ -21,7 +21,7 @@ export interface IBeaconNodeOptions {
eth1: Eth1Options;
executionEngine: ExecutionEngineOpts;
logger: IBeaconLoggerOptions;
metrics: IMetricsOptions;
metrics: MetricsOptions;
network: INetworkOptions;
sync: SyncOptions;
}

View File

@@ -10,7 +10,7 @@ describe("HttpMetricsServer", () => {
it("should serve metrics on /metrics", async () => {
const metrics = createMetricsTest();
server = new HttpMetricsServer({enabled: true, timeout: 5000, serverPort: 0}, {metrics, logger});
server = new HttpMetricsServer({port: 0}, {register: metrics.register, logger});
await server.start();
await request(server["server"]).get("/metrics").expect(200);

View File

@@ -6,5 +6,5 @@ import {createMetrics, IMetrics} from "../../../src/metrics";
export function createMetricsTest(): IMetrics {
const state = ssz.phase0.BeaconState.defaultViewDU();
const logger = new WinstonLogger();
return createMetrics({enabled: true, timeout: 12000}, config, state, logger);
return createMetrics({enabled: true, port: 0}, config, state, logger);
}

View File

@@ -5,6 +5,7 @@
export {Validator, ValidatorOptions} from "./validator";
export {ValidatorStore, SignerType, Signer, SignerLocal, SignerRemote} from "./services/validatorStore";
export {waitForGenesis} from "./genesis";
export {getMetrics, Metrics, MetricsRegister} from "./metrics";
// Remote signer client
export {externalSignerGetKeys, externalSignerPostSignature, externalSignerUpCheck} from "./util/externalSignerClient";

View File

@@ -0,0 +1,325 @@
export enum MessageSource {
forward = "forward",
publish = "publish",
}
type LabelsGeneric = Record<string, string | undefined>;
type CollectFn<Labels extends LabelsGeneric> = (metric: Gauge<Labels>) => void;
interface Gauge<Labels extends LabelsGeneric = never> {
// Sorry for this mess, `prom-client` API choices are not great
// If the function signature was `inc(value: number, labels?: Labels)`, this would be simpler
inc(value?: number): void;
inc(labels: Labels, value?: number): void;
inc(arg1?: Labels | number, arg2?: number): void;
set(value: number): void;
set(labels: Labels, value: number): void;
set(arg1?: Labels | number, arg2?: number): void;
addCollect(collectFn: CollectFn<Labels>): void;
}
interface Histogram<Labels extends LabelsGeneric = never> {
startTimer(): () => number;
observe(value: number): void;
observe(labels: Labels, values: number): void;
observe(arg1: Labels | number, arg2?: number): void;
reset(): void;
}
interface AvgMinMax<Labels extends LabelsGeneric = never> {
set(values: number[]): void;
set(labels: Labels, values: number[]): void;
set(arg1?: Labels | number[], arg2?: number[]): void;
}
type GaugeConfig<Labels extends LabelsGeneric> = {
name: string;
help: string;
labelNames?: keyof Labels extends string ? (keyof Labels)[] : undefined;
};
type HistogramConfig<Labels extends LabelsGeneric> = {
name: string;
help: string;
labelNames?: (keyof Labels)[];
buckets?: number[];
};
type AvgMinMaxConfig<Labels extends LabelsGeneric> = GaugeConfig<Labels>;
export interface MetricsRegister {
gauge<T extends LabelsGeneric>(config: GaugeConfig<T>): Gauge<T>;
histogram<T extends LabelsGeneric>(config: HistogramConfig<T>): Histogram<T>;
avgMinMax<T extends LabelsGeneric>(config: AvgMinMaxConfig<T>): AvgMinMax<T>;
}
export type Metrics = ReturnType<typeof getMetrics>;
export type LodestarGitData = {
/** "0.16.0" */
semver: string;
/** "developer/feature-1" */
branch: string;
/** "4f816b16dfde718e2d74f95f2c8292596138c248" */
commit: string;
/** "0.16.0 developer/feature-1 ac99f2b5" */
version: string;
/** "prater" */
network: string;
};
/**
* A collection of metrics used throughout the Gossipsub behaviour.
*/
// eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types, @typescript-eslint/explicit-function-return-type
export function getMetrics(register: MetricsRegister, gitData: LodestarGitData) {
// Using function style instead of class to prevent having to re-declare all MetricsPrometheus types.
// Track version, same as https://github.com/ChainSafe/lodestar/blob/6df28de64f12ea90b341b219229a47c8a25c9343/packages/lodestar/src/metrics/metrics/lodestar.ts#L17
register
.gauge<LodestarGitData>({
name: "lodestar_version",
help: "Lodestar version",
labelNames: Object.keys(gitData) as (keyof LodestarGitData)[],
})
.set(gitData, 1);
return {
// Attestation journey:
// - Wait for block or 1/3, call prepare attestation
// - Get attestation, sign, call publish
// - Wait 2/3, call prepare aggregate
// - Get aggregate, sign, call publish
attesterStepCallProduceAttestation: register.histogram({
name: "vc_attester_step_call_produce_attestation_seconds",
help: "Time between 1/3 of slot and call produce attestation",
// Attester flow can start early if block is imported before 1/3 of the slot
// This measure is critical so we need very good resolution around the 0 second mark
buckets: [-3, -1, 0, 1, 2, 3, 6, 12],
}),
attesterStepCallPublishAttestation: register.histogram({
name: "vc_attester_step_call_publish_attestation_seconds",
help: "Time between 1/3 of slot and call publish attestation",
buckets: [-3, -1, 0, 1, 2, 3, 6, 12],
}),
attesterStepCallProduceAggregate: register.histogram({
name: "vc_attester_step_call_produce_aggregate_seconds",
help: "Time between 2/3 of slot and call produce aggregate",
// Aggregate production starts at 2/3 the earliest.
// Track values close to 0 (expected) in greater resolution, values over 12 overflow into the next slot.
buckets: [0.5, 1, 2, 3, 6, 12],
}),
attesterStepCallPublishAggregate: register.histogram({
name: "vc_attester_step_call_publish_aggregate_seconds",
help: "Time between 2/3 of slot and call publish aggregate",
buckets: [0.5, 1, 2, 3, 6, 12],
}),
syncCommitteeStepCallProduceMessage: register.histogram({
name: "vc_sync_committee_step_call_produce_message_seconds",
help: "Time between 1/3 of slot and call produce message",
// Max wait time is 1 / 3 of slot
buckets: [0.5, 1, 2, 3, 6, 12],
}),
syncCommitteeStepCallPublishMessage: register.histogram({
name: "vc_sync_committee_step_call_publish_message_seconds",
help: "Time between 1/3 of slot and call publish message",
buckets: [0.5, 1, 2, 3, 6, 12],
}),
syncCommitteeStepCallProduceAggregate: register.histogram({
name: "vc_sync_committee_step_call_produce_aggregate_seconds",
help: "Time between 2/3 of slot and call produce aggregate",
// Min wait time is 2 / 3 of slot
buckets: [0.5, 1, 2, 3, 6, 12],
}),
syncCommitteeStepCallPublishAggregate: register.histogram({
name: "vc_sync_committee_step_call_publish_aggregate_seconds",
help: "Time between 2/3 of slot and call publish aggregate",
buckets: [0.5, 1, 2, 3, 6, 12],
}),
// Min wait time it 0. CallProduceBlock step is a bit redundant since it must be 0, but let's check
proposerStepCallProduceBlock: register.histogram({
name: "vc_proposer_step_call_produce_block_seconds",
help: "Time between start of slot and call produce block",
buckets: [0.5, 1, 2, 3, 6, 8],
}),
proposerStepCallPublishBlock: register.histogram({
name: "vc_proposer_step_call_publish_block_seconds",
help: "Time between start of slot and call publish block",
buckets: [0.5, 1, 2, 3, 6, 8],
}),
// AttestationService
publishedAttestations: register.gauge({
name: "vc_published_attestations_total",
help: "Total published attestations",
}),
publishedAggregates: register.gauge({
name: "vc_published_aggregates_total",
help: "Total published aggregates",
}),
attestaterError: register.gauge<{error: "produce" | "sign" | "publish"}>({
name: "vc_attestation_service_errors",
help: "Total errors in AttestationService",
labelNames: ["error"],
}),
// AttestationDutiesService
attesterDutiesCount: register.gauge({
name: "vc_attester_duties_count",
help: "Current count of duties in AttestationDutiesService",
}),
attesterDutiesEpochCount: register.gauge({
name: "vc_attester_duties_epoch_count",
help: "Current count of epoch duties in AttestationDutiesService",
}),
attesterDutiesReorg: register.gauge({
name: "vc_attestation_duties_reorg_total",
help: "Total count of instances the attester duties dependant root changed",
}),
// BlockProposingService
blocksProduced: register.gauge({
name: "vc_block_produced_total",
help: "Total count of blocks produced",
}),
blocksPublished: register.gauge({
name: "vc_block_published_total",
help: "Total count of blocks published",
}),
blockProposingErrors: register.gauge<{error: "produce" | "publish"}>({
name: "vc_block_proposing_errors_total",
help: "Total count of errors producing or publishing a block",
labelNames: ["error"],
}),
// BlockDutiesService
proposerDutiesEpochCount: register.gauge({
name: "vc_proposer_duties_epoch_count",
help: "Current count of epoch duties in BlockDutiesService",
}),
proposerDutiesReorg: register.gauge({
name: "vc_proposer_duties_reorg_total",
help: "Total count of instances the proposer duties dependant root changed",
}),
// IndicesService
indices: register.gauge({
name: "vc_indices_count",
help: "Current count of indices in IndicesService",
}),
discoveredIndices: register.gauge({
name: "vc_discovered_indices_total",
help: "Total count of validator indices discovered",
}),
// SyncCommitteeService
publishedSyncCommitteeMessage: register.gauge({
name: "vc_published_sync_committee_message_total",
help: "Total published SyncCommitteeMessage",
}),
publishedSyncCommitteeContribution: register.gauge({
name: "vc_published_sync_committee_contribution_total",
help: "Total published SyncCommitteeContribution",
}),
// SyncCommitteeDutiesService
syncCommitteeDutiesCount: register.gauge({
name: "vc_sync_committee_duties_count",
help: "Current count of duties in SyncCommitteeDutiesService",
}),
syncCommitteeDutiesEpochCount: register.gauge({
name: "vc_sync_committee_duties_epoch_count",
help: "Current count of epoch duties in SyncCommitteeDutiesService",
}),
syncCommitteeDutiesReorg: register.gauge({
name: "vc_sync_committee_duties_reorg_total",
help: "Total count of instances the sync committee duties dependant root changed",
}),
// ValidatorStore
signers: register.gauge({
name: "vc_signers_count",
help: "Total count of instances the sync committee duties dependant root changed",
}),
localSignTime: register.histogram({
name: "vc_local_sign_time_seconds",
help: "Histogram of sign time for any signature with local signer",
// When using a local keystores, signing time is ~ 1ms
buckets: [0.0001, 0.001, 0.01, 0.1],
}),
remoteSignTime: register.histogram({
name: "vc_remote_sign_time_seconds",
help: "Histogram of sign time for any signature with remote signer",
// When using a remote signer sign time can be ~ 50-500ms
buckets: [0.01, 0.1, 1, 5],
}),
remoteSignErrors: register.gauge({
name: "vc_remote_sign_errors_total",
help: "Total count of errors calling a remote signer",
}),
signError: register.gauge({
name: "vc_sign_errors_total",
help: "Total count of errors calling a signer",
}),
slashingProtectionBlockError: register.gauge({
name: "vc_slashing_protection_block_errors_total",
help: "Total count of errors on slashingProtection.checkAndInsertBlockProposal",
}),
slashingProtectionAttestationError: register.gauge({
name: "vc_slashing_protection_attestation_errors_total",
help: "Total count of errors on slashingProtection.checkAndInsertAttestation",
}),
// REST API client
restApiClient: {
requestTime: register.histogram<{routeId: string}>({
name: "vc_rest_api_client_request_time_seconds",
help: "Histogram of REST API client request time by routeId",
labelNames: ["routeId"],
// Expected times are ~ 50-500ms, but in an overload NodeJS they can be greater
buckets: [0.01, 0.1, 1, 5],
}),
errors: register.gauge<{routeId: string}>({
name: "vc_rest_api_client_errors_total",
help: "Total count of errors calling the REST API client by routeId",
labelNames: ["routeId"],
}),
},
};
}

View File

@@ -12,6 +12,7 @@ import {toHexString} from "@chainsafe/ssz";
import {ChainHeaderTracker, HeadEventData} from "./chainHeaderTracker";
import {ValidatorEvent, ValidatorEventEmitter} from "./emitter";
import {PubkeyHex} from "../types";
import {Metrics} from "../metrics";
/**
* Service that sets up and handles validator attester duties.
@@ -26,7 +27,8 @@ export class AttestationService {
private readonly validatorStore: ValidatorStore,
private readonly emitter: ValidatorEventEmitter,
indicesService: IndicesService,
chainHeadTracker: ChainHeaderTracker
chainHeadTracker: ChainHeaderTracker,
private readonly metrics: Metrics | null
) {
this.dutiesService = new AttestationDutiesService(
logger,
@@ -34,7 +36,8 @@ export class AttestationService {
clock,
validatorStore,
indicesService,
chainHeadTracker
chainHeadTracker,
metrics
);
// At most every slot, check existing duties from AttestationDutiesService and run tasks
@@ -55,7 +58,8 @@ export class AttestationService {
// A validator should create and broadcast the attestation to the associated attestation subnet when either
// (a) the validator has received a valid block from the expected block proposer for the assigned slot or
// (b) one-third of the slot has transpired (SECONDS_PER_SLOT / 3 seconds after the start of slot) -- whichever comes first.
await Promise.race([sleep(this.clock.msToSlotFraction(slot, 1 / 3), signal), this.waitForBlockSlot(slot)]);
await Promise.race([sleep(this.clock.msToSlot(slot + 1 / 3), signal), this.waitForBlockSlot(slot)]);
this.metrics?.attesterStepCallProduceAttestation.observe(this.clock.secFromSlot(slot + 1 / 3));
// Beacon node's endpoint produceAttestationData return data is not dependant on committeeIndex.
// Produce a single attestation for all committees, and clone mutate before signing
@@ -105,7 +109,8 @@ export class AttestationService {
// Step 2. If an attestation was produced, make an aggregate.
// First, wait until the `aggregation_production_instant` (2/3rds of the way though the slot)
await sleep(this.clock.msToSlotFraction(slot, 2 / 3), signal);
await sleep(this.clock.msToSlot(slot + 2 / 3), signal);
this.metrics?.attesterStepCallProduceAggregate.observe(this.clock.secFromSlot(slot + 2 / 3));
// Then download, sign and publish a `SignedAggregateAndProof` for each
// validator that is elected to aggregate for this `slot` and
@@ -122,6 +127,7 @@ export class AttestationService {
private async produceAttestation(slot: Slot): Promise<phase0.AttestationData> {
// Produce one attestation data per slot and committeeIndex
const attestationRes = await this.api.validator.produceAttestationData(0, slot).catch((e: Error) => {
this.metrics?.attestaterError.inc({error: "produce"});
throw extendError(e, "Error producing attestation");
});
return attestationRes.data;
@@ -159,15 +165,20 @@ export class AttestationService {
signedAttestations.push(await this.validatorStore.signAttestation(duty, attestation, currentEpoch));
this.logger.debug("Signed attestation", logCtxValidator);
} catch (e) {
this.metrics?.attestaterError.inc({error: "sign"});
this.logger.error("Error signing attestation", logCtxValidator, e as Error);
}
}
this.metrics?.attesterStepCallPublishAttestation.observe(this.clock.secFromSlot(attestation.slot + 1 / 3));
if (signedAttestations.length > 0) {
try {
await this.api.beacon.submitPoolAttestations(signedAttestations);
this.logger.info("Published attestations", {...logCtx, count: signedAttestations.length});
this.metrics?.publishedAttestations.inc(signedAttestations.length);
} catch (e) {
this.metrics?.attestaterError.inc({error: "publish"});
this.logger.error("Error publishing attestations", logCtx, e as Error);
}
}
@@ -220,10 +231,13 @@ export class AttestationService {
}
}
this.metrics?.attesterStepCallPublishAggregate.observe(this.clock.secFromSlot(attestation.slot + 2 / 3));
if (signedAggregateAndProofs.length > 0) {
try {
await this.api.validator.publishAggregateAndProofs(signedAggregateAndProofs);
this.logger.info("Published aggregateAndProofs", {...logCtx, count: signedAggregateAndProofs.length});
this.metrics?.publishedAggregates.inc(signedAggregateAndProofs.length);
} catch (e) {
this.logger.error("Error publishing aggregateAndProofs", logCtx, e as Error);
}

View File

@@ -9,6 +9,7 @@ import {IClock, extendError, ILoggerVc} from "../util";
import {ValidatorStore} from "./validatorStore";
import {ChainHeaderTracker, HeadEventData} from "./chainHeaderTracker";
import {PubkeyHex} from "../types";
import {Metrics} from "../metrics";
/** Only retain `HISTORICAL_DUTIES_EPOCHS` duties prior to the current epoch. */
const HISTORICAL_DUTIES_EPOCHS = 2;
@@ -38,13 +39,25 @@ export class AttestationDutiesService {
private clock: IClock,
private readonly validatorStore: ValidatorStore,
private readonly indicesService: IndicesService,
chainHeadTracker: ChainHeaderTracker
chainHeadTracker: ChainHeaderTracker,
private readonly metrics: Metrics | null
) {
// Running this task every epoch is safe since a re-org of two epochs is very unlikely
// TODO: If the re-org event is reliable consider re-running then
clock.runEveryEpoch(this.runDutiesTasks);
clock.runEverySlot(this.prepareForNextEpoch);
chainHeadTracker.runOnNewHead(this.onNewHead);
if (metrics) {
metrics.attesterDutiesCount.addCollect(() => {
let duties = 0;
for (const attDutiesAtEpoch of this.dutiesByIndexByEpoch.values()) {
duties += attDutiesAtEpoch.dutiesByIndex.size;
}
metrics.attesterDutiesCount.set(duties);
metrics.attesterDutiesEpochCount.set(this.dutiesByIndexByEpoch.size);
});
}
}
removeDutiesForKey(pubkey: PubkeyHex): void {
@@ -89,7 +102,7 @@ export class AttestationDutiesService {
}
// during the 1 / 3 of epoch, last block of epoch may come
await sleep(this.clock.msToSlotFraction(slot, 1 / 3));
await sleep(this.clock.msToSlot(slot + 1 / 3));
const nextEpoch = computeEpochAtSlot(slot) + 1;
const dependentRoot = this.dutiesByIndexByEpoch.get(nextEpoch)?.dependentRoot;
@@ -216,6 +229,7 @@ export class AttestationDutiesService {
}
if (priorDependentRoot && dependentRootChanged) {
this.metrics?.attesterDutiesReorg.inc();
this.logger.warn("Attester duties re-org. This may happen from time to time", {
priorDependentRoot: priorDependentRoot,
dependentRoot: dependentRoot,
@@ -292,13 +306,8 @@ export class AttestationDutiesService {
oldDependentRoot: RootHex,
newDependentRoot: RootHex
): Promise<void> {
const logContext = {
dutyEpoch,
slot,
oldDependentRoot,
newDependentRoot,
};
this.metrics?.attesterDutiesReorg.inc();
const logContext = {dutyEpoch, slot, oldDependentRoot, newDependentRoot};
this.logger.debug("Redownload attester duties", logContext);
await this.pollBeaconAttestersForEpoch(dutyEpoch, this.indicesService.getAllLocalIndices())

View File

@@ -8,6 +8,7 @@ import {IClock, extendError, ILoggerVc} from "../util";
import {ValidatorStore} from "./validatorStore";
import {BlockDutiesService, GENESIS_SLOT} from "./blockDuties";
import {PubkeyHex} from "../types";
import {Metrics} from "../metrics";
/**
* Service that sets up and handles validator block proposal duties.
@@ -19,11 +20,19 @@ export class BlockProposingService {
private readonly config: IChainForkConfig,
private readonly logger: ILoggerVc,
private readonly api: Api,
clock: IClock,
private readonly clock: IClock,
private readonly validatorStore: ValidatorStore,
private readonly metrics: Metrics | null,
private readonly graffiti?: string
) {
this.dutiesService = new BlockDutiesService(logger, api, clock, validatorStore, this.notifyBlockProductionFn);
this.dutiesService = new BlockDutiesService(
logger,
api,
clock,
validatorStore,
metrics,
this.notifyBlockProductionFn
);
}
removeDutiesForKey(pubkey: PubkeyHex): void {
@@ -61,16 +70,25 @@ export class BlockProposingService {
const debugLogCtx = {...logCtx, validator: pubkeyHex};
this.logger.debug("Producing block", debugLogCtx);
this.metrics?.proposerStepCallProduceBlock.observe(this.clock.secFromSlot(slot));
const block = await this.produceBlock(slot, randaoReveal, graffiti).catch((e: Error) => {
this.metrics?.blockProposingErrors.inc({error: "produce"});
throw extendError(e, "Failed to produce block");
});
this.logger.debug("Produced block", debugLogCtx);
this.metrics?.blocksProduced.inc();
const signedBlock = await this.validatorStore.signBlock(pubkey, block.data, slot);
this.metrics?.proposerStepCallPublishBlock.observe(this.clock.secFromSlot(slot));
await this.api.beacon.publishBlock(signedBlock).catch((e: Error) => {
this.metrics?.blockProposingErrors.inc({error: "publish"});
throw extendError(e, "Failed to publish block");
});
this.logger.info("Published block", {...logCtx, graffiti});
this.metrics?.blocksPublished.inc();
} catch (e) {
this.logger.error("Error proposing block", logCtx, e as Error);
}

View File

@@ -5,6 +5,7 @@ import {Api, routes} from "@chainsafe/lodestar-api";
import {IClock, extendError, differenceHex, ILoggerVc} from "../util";
import {ValidatorStore} from "./validatorStore";
import {PubkeyHex} from "../types";
import {Metrics} from "../metrics";
/** Only retain `HISTORICAL_DUTIES_EPOCHS` duties prior to the current epoch */
const HISTORICAL_DUTIES_EPOCHS = 2;
@@ -27,6 +28,7 @@ export class BlockDutiesService {
private readonly api: Api,
clock: IClock,
private readonly validatorStore: ValidatorStore,
private readonly metrics: Metrics | null,
notifyBlockProductionFn: NotifyBlockProductionFn
) {
this.notifyBlockProductionFn = notifyBlockProductionFn;
@@ -35,6 +37,12 @@ export class BlockDutiesService {
// only then re-fetch the block duties. Make sure most clients (including Lodestar)
// properly emit the re-org event
clock.runEverySlot(this.runBlockDutiesTask);
if (metrics) {
metrics.proposerDutiesEpochCount.addCollect(() => {
metrics.proposerDutiesEpochCount.set(this.proposers.size);
});
}
}
/**
@@ -133,8 +141,7 @@ export class BlockDutiesService {
if (additionalBlockProducers.length > 0) {
this.notifyBlockProductionFn(currentSlot, additionalBlockProducers);
this.logger.debug("Detected new block proposer", {currentSlot});
// TODO: Add Metrics
// this.metrics.proposalChanged.inc();
this.metrics?.proposerDutiesReorg.inc();
}
}
@@ -162,6 +169,7 @@ export class BlockDutiesService {
this.proposers.set(epoch, {dependentRoot, data: relevantDuties});
if (prior && !ssz.Root.equals(prior.dependentRoot, dependentRoot)) {
this.metrics?.proposerDutiesReorg.inc();
this.logger.warn("Proposer duties re-org. This may happen from time to time", {
priorDependentRoot: toHexString(prior.dependentRoot),
dependentRoot: toHexString(dependentRoot),

View File

@@ -4,6 +4,7 @@ import {toHexString} from "@chainsafe/ssz";
import {Api} from "@chainsafe/lodestar-api";
import {ValidatorStore} from "./validatorStore";
import {batchItems} from "../util/batch";
import {Metrics} from "../metrics";
/**
* URLs have a limitation on size, adding an unbounded num of pubkeys will break the request.
@@ -24,8 +25,13 @@ export class IndicesService {
constructor(
private readonly logger: ILogger,
private readonly api: Api,
private readonly validatorStore: ValidatorStore
) {}
private readonly validatorStore: ValidatorStore,
private readonly metrics: Metrics | null
) {
if (metrics) {
metrics.indices.addCollect(() => metrics.indices.set(this.index2pubkey.size));
}
}
/** Return all known indices from the validatorStore pubkeys */
getAllLocalIndices(): ValidatorIndex[] {
@@ -81,7 +87,10 @@ export class IndicesService {
const validatorIndicesArr = await this.fetchValidatorIndices(pubkeysHexBatch);
newIndices.push(...validatorIndicesArr);
}
this.logger.info("Discovered new validators", {count: newIndices.length});
this.metrics?.discoveredIndices.inc(newIndices.length);
return newIndices;
}

View File

@@ -11,6 +11,7 @@ import {groupSyncDutiesBySubcommitteeIndex, SubcommitteeDuty} from "./utils";
import {IndicesService} from "./indices";
import {ChainHeaderTracker} from "./chainHeaderTracker";
import {PubkeyHex} from "../types";
import {Metrics} from "../metrics";
/**
* Service that sets up and handles validator sync duties.
@@ -25,9 +26,18 @@ export class SyncCommitteeService {
private readonly clock: IClock,
private readonly validatorStore: ValidatorStore,
private readonly chainHeaderTracker: ChainHeaderTracker,
indicesService: IndicesService
indicesService: IndicesService,
private readonly metrics: Metrics | null
) {
this.dutiesService = new SyncCommitteeDutiesService(config, logger, api, clock, validatorStore, indicesService);
this.dutiesService = new SyncCommitteeDutiesService(
config,
logger,
api,
clock,
validatorStore,
indicesService,
metrics
);
// At most every slot, check existing duties from SyncCommitteeDutiesService and run tasks
clock.runEverySlot(this.runSyncCommitteeTasks);
@@ -51,7 +61,8 @@ export class SyncCommitteeService {
}
// Lighthouse recommends to always wait to 1/3 of the slot, even if the block comes early
await sleep(this.clock.msToSlotFraction(slot, 1 / 3), signal);
await sleep(this.clock.msToSlot(slot + 1 / 3), signal);
this.metrics?.syncCommitteeStepCallProduceMessage.observe(this.clock.secFromSlot(slot + 1 / 3));
// Step 1. Download, sign and publish an `SyncCommitteeMessage` for each validator.
// Differs from AttestationService, `SyncCommitteeMessage` are equal for all
@@ -59,7 +70,8 @@ export class SyncCommitteeService {
// Step 2. If an attestation was produced, make an aggregate.
// First, wait until the `aggregation_production_instant` (2/3rds of the way though the slot)
await sleep(this.clock.msToSlotFraction(slot, 2 / 3), signal);
await sleep(this.clock.msToSlot(slot + 2 / 3), signal);
this.metrics?.syncCommitteeStepCallProduceAggregate.observe(this.clock.secFromSlot(slot + 2 / 3));
// await for all so if the Beacon node is overloaded it auto-throttles
// TODO: This approach is convervative to reduce the node's load, review
@@ -119,10 +131,13 @@ export class SyncCommitteeService {
}
}
this.metrics?.syncCommitteeStepCallPublishMessage.observe(this.clock.secFromSlot(slot + 1 / 3));
if (signatures.length > 0) {
try {
await this.api.beacon.submitPoolSyncCommitteeSignatures(signatures);
this.logger.info("Published SyncCommitteeMessage", {...logCtx, count: signatures.length});
this.metrics?.publishedSyncCommitteeMessage.inc(signatures.length);
} catch (e) {
this.logger.error("Error publishing SyncCommitteeMessage", logCtx, e as Error);
}
@@ -178,10 +193,13 @@ export class SyncCommitteeService {
}
}
this.metrics?.syncCommitteeStepCallPublishAggregate.observe(this.clock.secFromSlot(slot + 2 / 3));
if (signedContributions.length > 0) {
try {
await this.api.validator.publishContributionAndProofs(signedContributions);
this.logger.info("Published SyncCommitteeContribution", {...logCtx, count: signedContributions.length});
this.metrics?.publishedSyncCommitteeContribution.inc(signedContributions.length);
} catch (e) {
this.logger.error("Error publishing SyncCommitteeContribution", logCtx, e as Error);
}

View File

@@ -12,6 +12,7 @@ import {IndicesService} from "./indices";
import {IClock, extendError, ILoggerVc} from "../util";
import {ValidatorStore} from "./validatorStore";
import {PubkeyHex} from "../types";
import {Metrics} from "../metrics";
/** Only retain `HISTORICAL_DUTIES_PERIODS` duties prior to the current periods. */
const HISTORICAL_DUTIES_PERIODS = 2;
@@ -59,11 +60,23 @@ export class SyncCommitteeDutiesService {
private readonly api: Api,
clock: IClock,
private readonly validatorStore: ValidatorStore,
private readonly indicesService: IndicesService
private readonly indicesService: IndicesService,
private readonly metrics: Metrics | null
) {
// Running this task every epoch is safe since a re-org of many epochs is very unlikely
// TODO: If the re-org event is reliable consider re-running then
clock.runEveryEpoch(this.runDutiesTasks);
if (metrics) {
metrics.syncCommitteeDutiesCount.addCollect(() => {
let duties = 0;
for (const dutiesByIndex of this.dutiesByIndexByPeriod.values()) {
duties += dutiesByIndex.size;
}
metrics.syncCommitteeDutiesCount.set(duties);
metrics.syncCommitteeDutiesEpochCount.set(this.dutiesByIndexByPeriod.size);
});
}
}
/**
@@ -224,6 +237,8 @@ export class SyncCommitteeDutiesService {
//
// - There were no known duties for this period.
// - The dependent root has changed, signalling a re-org.
//
// if (reorg) this.metrics?.syncCommitteeDutiesReorg.inc()
// Using `alreadyWarnedReorg` avoids excessive logs.
dutiesByIndex.set(validatorIndex, {dependentRoot, duty});

View File

@@ -33,6 +33,7 @@ import {routes} from "@chainsafe/lodestar-api";
import {Interchange, InterchangeFormatVersion, ISlashingProtection} from "../slashingProtection";
import {PubkeyHex} from "../types";
import {externalSignerPostSignature} from "../util/externalSignerClient";
import {Metrics} from "../metrics";
export enum SignerType {
Local,
@@ -67,6 +68,7 @@ export class ValidatorStore {
constructor(
private readonly config: IBeaconConfig,
private readonly slashingProtection: ISlashingProtection,
private readonly metrics: Metrics | null,
signers: Signer[],
genesis: phase0.Genesis
) {
@@ -74,8 +76,11 @@ export class ValidatorStore {
this.addSigner(signer);
}
this.slashingProtection = slashingProtection;
this.genesisValidatorsRoot = genesis.genesisValidatorsRoot;
if (metrics) {
metrics.signers.addCollect(() => metrics.signers.set(this.validators.size));
}
}
addSigner(signer: Signer): void {
@@ -125,7 +130,12 @@ export class ValidatorStore {
const blockType = this.config.getForkTypes(block.slot).BeaconBlock;
const signingRoot = computeSigningRoot(blockType, block, proposerDomain);
await this.slashingProtection.checkAndInsertBlockProposal(pubkey, {slot: block.slot, signingRoot});
try {
await this.slashingProtection.checkAndInsertBlockProposal(pubkey, {slot: block.slot, signingRoot});
} catch (e) {
this.metrics?.slashingProtectionBlockError.inc();
throw e;
}
return {
message: block,
@@ -158,11 +168,16 @@ export class ValidatorStore {
const domain = this.config.getDomain(DOMAIN_BEACON_ATTESTER, slot);
const signingRoot = computeSigningRoot(ssz.phase0.AttestationData, attestationData, domain);
await this.slashingProtection.checkAndInsertAttestation(duty.pubkey, {
sourceEpoch: attestationData.source.epoch,
targetEpoch: attestationData.target.epoch,
signingRoot,
});
try {
await this.slashingProtection.checkAndInsertAttestation(duty.pubkey, {
sourceEpoch: attestationData.source.epoch,
targetEpoch: attestationData.target.epoch,
signingRoot,
});
} catch (e) {
this.metrics?.slashingProtectionAttestationError.inc();
throw e;
}
return {
aggregationBits: BitArray.fromSingleBit(duty.committeeLength, duty.validatorCommitteeIndex),
@@ -279,16 +294,28 @@ export class ValidatorStore {
}
switch (signer.type) {
case SignerType.Local:
return signer.secretKey.sign(signingRoot).toBytes();
case SignerType.Local: {
const timer = this.metrics?.localSignTime.startTimer();
const signature = signer.secretKey.sign(signingRoot).toBytes();
timer?.();
return signature;
}
case SignerType.Remote: {
const signatureHex = await externalSignerPostSignature(
signer.externalSignerUrl,
pubkeyHex,
toHexString(signingRoot)
);
return fromHexString(signatureHex);
const timer = this.metrics?.remoteSignTime.startTimer();
try {
const signatureHex = await externalSignerPostSignature(
signer.externalSignerUrl,
pubkeyHex,
toHexString(signingRoot)
);
return fromHexString(signatureHex);
} catch (e) {
this.metrics?.remoteSignErrors.inc();
throw e;
} finally {
timer?.();
}
}
}
}

View File

@@ -12,7 +12,8 @@ export interface IClock {
start(signal: AbortSignal): void;
runEverySlot(fn: (slot: Slot, signal: AbortSignal) => Promise<void>): void;
runEveryEpoch(fn: (epoch: Epoch, signal: AbortSignal) => Promise<void>): void;
msToSlotFraction(slot: Slot, fraction: number): number;
msToSlot(slot: Slot): number;
secFromSlot(slot: Slot): number;
}
export enum TimeItem {
@@ -50,13 +51,15 @@ export class Clock implements IClock {
this.fns.push({timeItem: TimeItem.Epoch, fn});
}
/**
* Miliseconds from now to a specific slot fraction.
* If it's negative, return 0.
**/
msToSlotFraction(slot: Slot, fraction: number): number {
const timeAt = this.genesisTime + this.config.SECONDS_PER_SLOT * (slot + fraction);
return Math.max(timeAt * 1000 - Date.now(), 0);
/** Miliseconds from now to a specific slot */
msToSlot(slot: Slot): number {
const timeAt = this.genesisTime + this.config.SECONDS_PER_SLOT * slot;
return timeAt * 1000 - Date.now();
}
/** Seconds elapsed from a specific slot to now */
secFromSlot(slot: Slot): number {
return Date.now() / 1000 - (this.genesisTime + this.config.SECONDS_PER_SLOT * slot);
}
/**

View File

@@ -20,6 +20,7 @@ import {ValidatorEventEmitter} from "./services/emitter";
import {ValidatorStore, Signer} from "./services/validatorStore";
import {computeEpochAtSlot, getCurrentSlot} from "@chainsafe/lodestar-beacon-state-transition";
import {PubkeyHex} from "./types";
import {Metrics} from "./metrics";
export type ValidatorOptions = {
slashingProtection: ISlashingProtection;
@@ -58,7 +59,7 @@ export class Validator {
private readonly logger: ILogger;
private state: State = {status: Status.stopped};
constructor(opts: ValidatorOptions, readonly genesis: Genesis) {
constructor(opts: ValidatorOptions, readonly genesis: Genesis, metrics: Metrics | null = null) {
const {dbOps, logger, slashingProtection, signers, graffiti} = opts;
const config = createIBeaconConfig(dbOps.config, genesis.genesisValidatorsRoot);
@@ -69,17 +70,26 @@ export class Validator {
// Validator would need the beacon to respond within the slot
timeoutMs: config.SECONDS_PER_SLOT * 1000,
getAbortSignal: this.getAbortSignal,
metrics: metrics?.restApiClient,
})
: opts.api;
const clock = new Clock(config, logger, {genesisTime: Number(genesis.genesisTime)});
const validatorStore = new ValidatorStore(config, slashingProtection, signers, genesis);
this.indicesService = new IndicesService(logger, api, validatorStore);
const validatorStore = new ValidatorStore(config, slashingProtection, metrics, signers, genesis);
this.indicesService = new IndicesService(logger, api, validatorStore, metrics);
this.emitter = new ValidatorEventEmitter();
this.chainHeaderTracker = new ChainHeaderTracker(logger, api, this.emitter);
const loggerVc = getLoggerVc(logger, clock);
this.blockProposingService = new BlockProposingService(config, loggerVc, api, clock, validatorStore, graffiti);
this.blockProposingService = new BlockProposingService(
config,
loggerVc,
api,
clock,
validatorStore,
metrics,
graffiti
);
this.attestationService = new AttestationService(
loggerVc,
@@ -88,7 +98,8 @@ export class Validator {
validatorStore,
this.emitter,
this.indicesService,
this.chainHeaderTracker
this.chainHeaderTracker,
metrics
);
this.syncCommitteeService = new SyncCommitteeService(
@@ -98,7 +109,8 @@ export class Validator {
clock,
validatorStore,
this.chainHeaderTracker,
this.indicesService
this.indicesService,
metrics
);
this.config = config;
@@ -109,7 +121,11 @@ export class Validator {
}
/** Waits for genesis and genesis time */
static async initializeFromBeaconNode(opts: ValidatorOptions, signal?: AbortSignal): Promise<Validator> {
static async initializeFromBeaconNode(
opts: ValidatorOptions,
signal?: AbortSignal,
metrics?: Metrics | null
): Promise<Validator> {
const {config} = opts.dbOps;
const api =
typeof opts.api === "string"
@@ -128,7 +144,7 @@ export class Validator {
await assertEqualGenesis(opts, genesis);
opts.logger.info("Verified node and validator have same genesisValidatorRoot");
return new Validator(opts, genesis);
return new Validator(opts, genesis, metrics);
}
removeDutiesForKey(pubkey: PubkeyHex): void {

View File

@@ -46,7 +46,7 @@ describe("AttestationService", function () {
it("Should produce, sign, and publish an attestation + aggregate", async () => {
const clock = new ClockMock();
const indicesService = new IndicesService(logger, api, validatorStore);
const indicesService = new IndicesService(logger, api, validatorStore, null);
const attestationService = new AttestationService(
loggerVc,
api,
@@ -54,7 +54,8 @@ describe("AttestationService", function () {
validatorStore,
emitter,
indicesService,
chainHeadTracker
chainHeadTracker,
null
);
const attestation = generateEmptyAttestation();

View File

@@ -77,14 +77,15 @@ describe("AttestationDutiesService", function () {
// Clock will call runAttesterDutiesTasks() immediatelly
const clock = new ClockMock();
const indicesService = new IndicesService(logger, api, validatorStore);
const indicesService = new IndicesService(logger, api, validatorStore, null);
const dutiesService = new AttestationDutiesService(
loggerVc,
api,
clock,
validatorStore,
indicesService,
chainHeadTracker
chainHeadTracker,
null
);
// Trigger clock onSlot for slot 0
@@ -154,14 +155,15 @@ describe("AttestationDutiesService", function () {
// Clock will call runAttesterDutiesTasks() immediatelly
const clock = new ClockMock();
const indicesService = new IndicesService(logger, api, validatorStore);
const indicesService = new IndicesService(logger, api, validatorStore, null);
const dutiesService = new AttestationDutiesService(
loggerVc,
api,
clock,
validatorStore,
indicesService,
chainHeadTracker
chainHeadTracker,
null
);
// Trigger clock onSlot for slot 0

View File

@@ -48,7 +48,7 @@ describe("BlockDutiesService", function () {
api.validator.getProposerDuties.resolves(duties);
const clock = new ClockMock();
const blockService = new BlockProposingService(config, loggerVc, api, clock, validatorStore);
const blockService = new BlockProposingService(config, loggerVc, api, clock, validatorStore, null);
const signedBlock = generateEmptySignedBlock();
validatorStore.signRandao.resolves(signedBlock.message.body.randaoReveal);

View File

@@ -46,7 +46,7 @@ describe("BlockDutiesService", function () {
const notifyBlockProductionFn = sinon.stub(); // Returns void
const clock = new ClockMock();
const dutiesService = new BlockDutiesService(loggerVc, api, clock, validatorStore, notifyBlockProductionFn);
const dutiesService = new BlockDutiesService(loggerVc, api, clock, validatorStore, null, notifyBlockProductionFn);
// Trigger clock onSlot for slot 0
await clock.tickSlotFns(0, controller.signal);
@@ -81,7 +81,7 @@ describe("BlockDutiesService", function () {
// Clock will call runAttesterDutiesTasks() immediatelly
const clock = new ClockMock();
const dutiesService = new BlockDutiesService(loggerVc, api, clock, validatorStore, notifyBlockProductionFn);
const dutiesService = new BlockDutiesService(loggerVc, api, clock, validatorStore, null, notifyBlockProductionFn);
// Trigger clock onSlot for slot 0
api.validator.getProposerDuties.resolves(dutiesBeforeReorg);
@@ -136,7 +136,7 @@ describe("BlockDutiesService", function () {
const notifyBlockProductionFn = sinon.stub(); // Returns void
const clock = new ClockMock();
const dutiesService = new BlockDutiesService(loggerVc, api, clock, validatorStore, notifyBlockProductionFn);
const dutiesService = new BlockDutiesService(loggerVc, api, clock, validatorStore, null, notifyBlockProductionFn);
// Trigger clock onSlot for slot 0
await clock.tickSlotFns(0, controller.signal);

View File

@@ -26,7 +26,7 @@ describe("IndicesService", function () {
});
it("Should remove pubkey", async function () {
const indicesService = new IndicesService(logger, api, validatorStore);
const indicesService = new IndicesService(logger, api, validatorStore, null);
const firstValidatorIndex = 0;
const secondValidatorIndex = 1;

View File

@@ -81,8 +81,16 @@ describe("SyncCommitteeDutiesService", function () {
// Clock will call runAttesterDutiesTasks() immediatelly
const clock = new ClockMock();
const indicesService = new IndicesService(logger, api, validatorStore);
const dutiesService = new SyncCommitteeDutiesService(config, loggerVc, api, clock, validatorStore, indicesService);
const indicesService = new IndicesService(logger, api, validatorStore, null);
const dutiesService = new SyncCommitteeDutiesService(
config,
loggerVc,
api,
clock,
validatorStore,
indicesService,
null
);
// Trigger clock onSlot for slot 0
await clock.tickEpochFns(0, controller.signal);
@@ -149,8 +157,16 @@ describe("SyncCommitteeDutiesService", function () {
// Clock will call runAttesterDutiesTasks() immediatelly
const clock = new ClockMock();
const indicesService = new IndicesService(logger, api, validatorStore);
const dutiesService = new SyncCommitteeDutiesService(config, loggerVc, api, clock, validatorStore, indicesService);
const indicesService = new IndicesService(logger, api, validatorStore, null);
const dutiesService = new SyncCommitteeDutiesService(
config,
loggerVc,
api,
clock,
validatorStore,
indicesService,
null
);
// Trigger clock onSlot for slot 0
await clock.tickEpochFns(0, controller.signal);
@@ -208,8 +224,16 @@ describe("SyncCommitteeDutiesService", function () {
// Clock will call runAttesterDutiesTasks() immediatelly
const clock = new ClockMock();
const indicesService = new IndicesService(logger, api, validatorStore);
const dutiesService = new SyncCommitteeDutiesService(config, loggerVc, api, clock, validatorStore, indicesService);
const indicesService = new IndicesService(logger, api, validatorStore, null);
const dutiesService = new SyncCommitteeDutiesService(
config,
loggerVc,
api,
clock,
validatorStore,
indicesService,
null
);
// Trigger clock onSlot for slot 0
await clock.tickEpochFns(0, controller.signal);

View File

@@ -50,7 +50,7 @@ describe("SyncCommitteeService", function () {
it("Should produce, sign, and publish a sync committee + contribution", async () => {
const clock = new ClockMock();
const indicesService = new IndicesService(logger, api, validatorStore);
const indicesService = new IndicesService(logger, api, validatorStore, null);
const syncCommitteeService = new SyncCommitteeService(
config,
loggerVc,
@@ -58,7 +58,8 @@ describe("SyncCommitteeService", function () {
clock,
validatorStore,
chainHeaderTracker,
indicesService
indicesService,
null
);
const beaconBlockRoot = Buffer.alloc(32, 0x4d);

View File

@@ -14,7 +14,8 @@ export class ClockMock implements IClock {
start = (): void => {};
runEverySlot = (fn: RunEveryFn): number => this.everySlot.push(fn);
runEveryEpoch = (fn: RunEveryFn): number => this.everyEpoch.push(fn);
msToSlotFraction = (): number => 0;
msToSlot = (): number => 0;
secFromSlot = (): number => 0;
async tickSlotFns(slot: Slot, signal: AbortSignal): Promise<void> {
for (const fn of this.everySlot) await fn(slot, signal);

View File

@@ -3,4 +3,10 @@ scrape_configs:
scrape_interval: 5s
metrics_path: /metrics
static_configs:
- targets: ["127.0.0.1:8008"]
- targets: ["localhost:8008"]
- job_name: validator
scrape_interval: 20s
scrape_timeout: 20s
metrics_path: /metrics
static_configs:
- targets: ["localhost:5064"]