Add Gossip Block metrics (#3214)

* Add Gossip Block metrics

* Fix lint

* Fix check types

* Capture seenTimestamp before gossip queues

* Fix merge issue

* Calculate elappsedTimeTillProcessed in gossip handler

Co-authored-by: dapplion <35266934+dapplion@users.noreply.github.com>
This commit is contained in:
tuyennhv
2021-09-21 15:48:36 +07:00
committed by GitHub
parent 456b50ee62
commit f86a4381bd
10 changed files with 1482 additions and 1107 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -291,6 +291,20 @@ export function createLodestarMetrics(
}),
},
// Gossip block
gossipBlock: {
elappsedTimeTillReceived: register.histogram({
name: "gossip_block_elappsed_time_till_received",
help: "Time elappsed between block slot time and the time block received via gossip",
buckets: [0.1, 1, 10],
}),
elappsedTimeTillProcessed: register.histogram({
name: "gossip_block_elappsed_time_till_processed",
help: "Time elappsed between block slot time and the time block processed",
buckets: [0.1, 1, 10],
}),
},
// Validator monitoring
validatorMonitor: {

View File

@@ -213,9 +213,10 @@ export function createValidatorMonitor(
registerBeaconBlock(src, seenTimestampSec, block) {
const index = block.proposerIndex;
const validator = validators.get(index);
// Returns the delay between the start of `block.slot` and `seenTimestamp`.
const delaySec = seenTimestampSec - (genesisTime + block.slot * config.SECONDS_PER_SLOT);
metrics.gossipBlock.elappsedTimeTillReceived.observe(delaySec);
if (validator) {
// Returns the delay between the start of `block.slot` and `seenTimestamp`.
const delaySec = seenTimestampSec - (genesisTime + block.slot * config.SECONDS_PER_SLOT);
metrics.validatorMonitor.beaconBlockTotal.inc({src, index});
metrics.validatorMonitor.beaconBlockDelaySeconds.observe({src, index}, delaySec);
}

View File

@@ -11,7 +11,15 @@ import {ILogger} from "@chainsafe/lodestar-utils";
import {computeStartSlotAtEpoch} from "@chainsafe/lodestar-beacon-state-transition";
import {IMetrics} from "../../metrics";
import {GossipJobQueues, GossipTopic, GossipTopicMap, GossipType, GossipTypeMap, ValidatorFnsByType} from "./interface";
import {
GossipJobQueues,
GossipTopic,
GossipTopicMap,
GossipType,
GossipTypeMap,
ValidatorFnsByType,
GossipHandlers,
} from "./interface";
import {getGossipSSZType, GossipTopicCache, stringifyGossipTopic} from "./topic";
import {computeMsgId, encodeMessageData, UncompressCache} from "./encoding";
import {DEFAULT_ENCODING} from "./constants";
@@ -19,7 +27,6 @@ import {GossipValidationError} from "./errors";
import {IForkDigestContext} from "../../util/forkDigestContext";
import {GOSSIP_MAX_SIZE} from "../../constants";
import {createValidatorFnsByType} from "./validation";
import {GossipHandlers} from "./handlers";
import {Map2d, Map2dArr} from "../../util/map";
import pipe from "it-pipe";
import PeerStreams from "libp2p-interfaces/src/pubsub/peer-streams";
@@ -207,6 +214,16 @@ export class Eth2Gossipsub extends Gossipsub {
return true;
}
// // Snippet of _processRpcMessage from https://github.com/libp2p/js-libp2p-interfaces/blob/92245d66b0073f0a72fed9f7abcf4b533102f1fd/packages/interfaces/src/pubsub/index.js#L442
// async _processRpcMessage(msg: InMessage): Promise<void> {
// try {
// await this.validate(msg);
// } catch (err) {
// this.log("Message is invalid, dropping it. %O", err);
// return;
// }
// }
/**
* @override https://github.com/ChainSafe/js-libp2p-gossipsub/blob/3c3c46595f65823fcd7900ed716f43f76c6b355c/ts/index.ts#L436
* @override https://github.com/libp2p/js-libp2p-interfaces/blob/ff3bd10704a4c166ce63135747e3736915b0be8d/src/pubsub/index.js#L513
@@ -237,8 +254,11 @@ export class Eth2Gossipsub extends Gossipsub {
// Also validates that the topicStr is known
const topic = this.gossipTopicCache.getTopic(topicStr);
// Get seenTimestamp before adding the message to the queue or add async delays
const seenTimestampSec = Date.now() / 1000;
// No error here means that the incoming object is valid
await this.validatorFnsByType[topic.type](topic, message);
await this.validatorFnsByType[topic.type](topic, message, seenTimestampSec);
} catch (e) {
// JobQueue may throw non-typed errors
const code = e instanceof GossipValidationError ? e.code : ERR_TOPIC_VALIDATOR_IGNORE;

View File

@@ -1,4 +1,5 @@
import {toHexString} from "@chainsafe/ssz";
import PeerId from "peer-id";
import {IBeaconConfig} from "@chainsafe/lodestar-config";
import {phase0, ssz, ValidatorIndex} from "@chainsafe/lodestar-types";
import {ILogger, prettyBytes} from "@chainsafe/lodestar-utils";
@@ -13,7 +14,7 @@ import {
GossipAction,
SyncCommitteeError,
} from "../../../chain/errors";
import {GossipTopicMap, GossipType, GossipTypeMap} from "../interface";
import {GossipHandlers, GossipType} from "../interface";
import {
validateGossipAggregateAndProof,
validateGossipAttestation,
@@ -26,17 +27,7 @@ import {
} from "../../../chain/validation";
import {INetwork} from "../../interface";
import {NetworkEvent} from "../../events";
import PeerId from "peer-id";
import {PeerAction} from "../..";
export type GossipHandlerFn = (
object: GossipTypeMap[GossipType],
topic: GossipTopicMap[GossipType],
peerIdStr: string
) => Promise<void>;
export type GossipHandlers = {
[K in GossipType]: (object: GossipTypeMap[K], topic: GossipTopicMap[K], peerIdStr: string) => Promise<void>;
};
import {PeerAction} from "../../peers";
type ValidatorFnsModules = {
chain: IBeaconChain;
@@ -64,8 +55,7 @@ export function getGossipHandlers(modules: ValidatorFnsModules): GossipHandlers
const {chain, config, metrics, network, logger} = modules;
return {
[GossipType.beacon_block]: async (signedBlock, _topic, peerIdStr) => {
const seenTimestampSec = Date.now() / 1000;
[GossipType.beacon_block]: async (signedBlock, _topic, peerIdStr, seenTimestampSec) => {
const slot = signedBlock.message.slot;
const blockHex = prettyBytes(config.getForkTypes(slot).BeaconBlock.hashTreeRoot(signedBlock.message));
logger.verbose("Received gossip block", {
@@ -104,8 +94,12 @@ export function getGossipHandlers(modules: ValidatorFnsModules): GossipHandlers
}
// Handler
chain.processBlock(signedBlock).catch((e) => {
try {
await chain.processBlock(signedBlock);
// Returns the delay between the start of `block.slot` and `current time`
const delaySec = Date.now() / 1000 - (chain.genesisTime + slot * config.SECONDS_PER_SLOT);
metrics?.gossipBlock.elappsedTimeTillProcessed.observe(delaySec);
} catch (e) {
if (e instanceof BlockError) {
switch (e.type.code) {
case BlockErrorCode.ALREADY_KNOWN:
@@ -120,14 +114,11 @@ export function getGossipHandlers(modules: ValidatorFnsModules): GossipHandlers
);
}
}
logger.error("Error receiving block", {slot: signedBlock.message.slot, peer: peerIdStr}, e as Error);
});
logger.error("Error receiving block", {slot, peer: peerIdStr}, e as Error);
}
},
[GossipType.beacon_aggregate_and_proof]: async (signedAggregateAndProof) => {
const seenTimestampSec = Date.now() / 1000;
[GossipType.beacon_aggregate_and_proof]: async (signedAggregateAndProof, _topic, _peer, seenTimestampSec) => {
try {
const {indexedAttestation, committeeIndices} = await validateGossipAggregateAndProof(
chain,
@@ -173,8 +164,7 @@ export function getGossipHandlers(modules: ValidatorFnsModules): GossipHandlers
}
},
[GossipType.beacon_attestation]: async (attestation, {subnet}) => {
const seenTimestampSec = Date.now() / 1000;
[GossipType.beacon_attestation]: async (attestation, {subnet}, _peer, seenTimestampSec) => {
let indexedAttestation: phase0.IndexedAttestation | undefined = undefined;
try {
indexedAttestation = (await validateGossipAttestation(chain, attestation, subnet)).indexedAttestation;

View File

@@ -1,3 +1,3 @@
export {Eth2Gossipsub} from "./gossipsub";
export {GossipHandlers, getGossipHandlers} from "./handlers";
export {getGossipHandlers} from "./handlers";
export * from "./interface";

View File

@@ -114,8 +114,27 @@ export interface IGossipModules {
*
* js-libp2p-gossipsub expects validation functions that look like this
*/
export type GossipValidatorFn = (topic: GossipTopic, message: InMessage) => Promise<void>;
export type GossipValidatorFn = (topic: GossipTopic, message: InMessage, seenTimestampSec: number) => Promise<void>;
export type ValidatorFnsByType = {[K in GossipType]: GossipValidatorFn};
export type GossipJobQueues = {[K in GossipType]: JobItemQueue<[GossipTopic, InMessage], void>};
export type GossipJobQueues = {[K in GossipType]: JobItemQueue<[GossipTopic, InMessage, number], void>};
export type GossipHandlerFn = (
object: GossipTypeMap[GossipType],
topic: GossipTopicMap[GossipType],
peerIdStr: string,
seenTimestampSec: number
) => Promise<void>;
export type GossipHandlers = {
[K in GossipType]: (
object: GossipTypeMap[K],
topic: GossipTopicMap[K],
peerIdStr: string,
seenTimestampSec: number
) => Promise<void>;
};
export type InMessageTimestamp = InMessage & {
seenTimestampMs: number;
};

View File

@@ -5,8 +5,14 @@ import {Json} from "@chainsafe/ssz";
import {ILogger, mapValues} from "@chainsafe/lodestar-utils";
import {IMetrics} from "../../../metrics";
import {getGossipSSZType} from "../topic";
import {GossipHandlers, GossipHandlerFn} from "../handlers";
import {GossipJobQueues, GossipType, GossipValidatorFn, ValidatorFnsByType} from "../interface";
import {
GossipJobQueues,
GossipType,
GossipValidatorFn,
ValidatorFnsByType,
GossipHandlers,
GossipHandlerFn,
} from "../interface";
import {GossipValidationError} from "../errors";
import {GossipActionError, GossipAction} from "../../../chain/errors";
import {decodeMessageData, UncompressCache} from "../encoding";
@@ -39,8 +45,8 @@ export function createValidatorFnsByType(
const validatorFnsByType = mapValues(
jobQueues,
(jobQueue): GossipValidatorFn => {
return async function gossipValidatorFnWithQueue(topic, gossipMsg) {
await jobQueue.push(topic, gossipMsg);
return async function gossipValidatorFnWithQueue(topic, gossipMsg, seenTimestampsMs) {
await jobQueue.push(topic, gossipMsg, seenTimestampsMs);
};
}
);
@@ -70,10 +76,9 @@ function getGossipValidatorFn<K extends GossipType>(
const {config, logger, metrics, uncompressCache} = modules;
const getGossipObjectAcceptMetadata = getGossipAcceptMetadataByType[type] as GetGossipAcceptMetadataFn;
return async function gossipValidatorFn(topic, gossipMsg) {
return async function gossipValidatorFn(topic, gossipMsg, seenTimestampSec) {
// Define in scope above try {} to be used in catch {} if object was parsed
let gossipObject;
try {
const encoding = topic.encoding ?? DEFAULT_ENCODING;
@@ -91,7 +96,7 @@ function getGossipValidatorFn<K extends GossipType>(
throw new GossipActionError(GossipAction.REJECT, {code: (e as Error).message});
}
await (gossipHandler as GossipHandlerFn)(gossipObject, topic, gossipMsg.receivedFrom);
await (gossipHandler as GossipHandlerFn)(gossipObject, topic, gossipMsg.receivedFrom, seenTimestampSec);
const metadata = getGossipObjectAcceptMetadata(config, gossipObject, topic);
logger.debug(`gossip - ${type} - accept`, metadata);

View File

@@ -43,8 +43,8 @@ export function createValidationQueues(
): GossipJobQueues {
return mapValues(gossipQueueOpts, (opts, type) => {
const gossipValidatorFn = gossipValidatorFns[type];
return new JobItemQueue<[GossipTopic, InMessage], void>(
(topic, message) => gossipValidatorFn(topic, message),
return new JobItemQueue<[GossipTopic, InMessage, number], void>(
(topic, message, seenTimestampsMs) => gossipValidatorFn(topic, message, seenTimestampsMs),
{signal, ...opts},
metrics
? {

View File

@@ -8,8 +8,7 @@ import {sleep} from "@chainsafe/lodestar-utils";
import {getReqRespHandlers, Network} from "../../../src/network";
import {INetworkOptions} from "../../../src/network/options";
import {GossipHandlers} from "../../../src/network/gossip/handlers";
import {GossipType} from "../../../src/network/gossip";
import {GossipType, GossipHandlers} from "../../../src/network/gossip";
import {generateEmptySignedBlock} from "../../utils/block";
import {MockBeaconChain} from "../../utils/mocks/chain/chain";