Refactor discovery consumer (#3403)

* Integrate discv5 into discovery consumer

* Start discovery

* Update test types

* Add metrics for find node queries

* Add cachedENRsSize metric

* Add dashboard

* Track dropped ENRs

* Track peersToConnect metric

* Improve metrics

* Set exemplar to false

* More charts

* Fix e2e tests

* Tune charts

* WIP test

* Uncomment retry

* Track count of sync peers

* Review libp2p options

* Disable libp2p latency monitor

* Improve PeerManager peer data

* Overshoot when connecting to peers

* Skip discv5 e2e test
This commit is contained in:
Lion - dapplion
2021-11-04 18:03:21 +01:00
committed by GitHub
parent 70a534deca
commit 08dbb21538
30 changed files with 3784 additions and 1969 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -3,7 +3,7 @@ import {ErrorAborted} from "@chainsafe/lodestar-utils";
import {LevelDbController} from "@chainsafe/lodestar-db";
import {BeaconNode, BeaconDb, createNodeJsLibp2p} from "@chainsafe/lodestar";
// eslint-disable-next-line no-restricted-imports
import {createDbMetrics, createDiscv5Metrics} from "@chainsafe/lodestar/lib/metrics";
import {createDbMetrics} from "@chainsafe/lodestar/lib/metrics";
import {createIBeaconConfig} from "@chainsafe/lodestar-config";
import {ACTIVE_PRESET, PresetName} from "@chainsafe/lodestar-params";
import {IGlobalArgs} from "../../options";
@@ -56,15 +56,11 @@ export async function beaconHandler(args: IBeaconArgs & IGlobalArgs): Promise<vo
if (ACTIVE_PRESET === PresetName.minimal) logger.info("ACTIVE_PRESET == minimal preset");
let dbMetrics: null | ReturnType<typeof createDbMetrics> = null;
let discv5Metrics: null | ReturnType<typeof createDiscv5Metrics> = null;
// additional metrics registries
const metricsRegistries = [];
if (options.metrics.enabled) {
dbMetrics = createDbMetrics();
discv5Metrics = createDiscv5Metrics();
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
options.network.discv5!.metrics = discv5Metrics.metrics;
metricsRegistries.push(dbMetrics.registry, discv5Metrics.registry);
metricsRegistries.push(dbMetrics.registry);
}
const db = new BeaconDb({
config,

View File

@@ -46,6 +46,15 @@ export function createICachedGenesis(chainForkConfig: IChainForkConfig, genesisV
return forkName;
},
forkDigest2ForkNameOption(forkDigest: ForkDigest | ForkDigestHex): ForkName | null {
const forkDigestHex = toHexStringNoPrefix(forkDigest);
const forkName = forkNameByForkDigest.get(forkDigestHex);
if (!forkName) {
return null;
}
return forkName;
},
forkName2ForkDigest(forkName: ForkName): ForkDigest {
const forkDigest = forkDigestByForkName.get(forkName);
if (!forkDigest) {

View File

@@ -5,6 +5,7 @@ export type ForkDigestHex = string;
export interface IForkDigestContext {
forkDigest2ForkName(forkDigest: ForkDigest | ForkDigestHex): ForkName;
forkDigest2ForkNameOption(forkDigest: ForkDigest | ForkDigestHex): ForkName | null;
forkName2ForkDigest(forkName: ForkName): ForkDigest;
forkName2ForkDigestHex(forkName: ForkName): ForkDigestHex;
}

View File

@@ -4,7 +4,7 @@
import {getCurrentSlot} from "@chainsafe/lodestar-beacon-state-transition";
import {IChainForkConfig} from "@chainsafe/lodestar-config";
import {allForks} from "@chainsafe/lodestar-types";
import {collectDefaultMetrics, Counter, Gauge, Registry} from "prom-client";
import {collectDefaultMetrics, Counter, Registry} from "prom-client";
import gcStats from "prometheus-gc-stats";
import {DbMetricLabels, IDbMetrics} from "@chainsafe/lodestar-db";
import {createBeaconMetrics, IBeaconMetrics} from "./metrics/beacon";
@@ -12,7 +12,6 @@ import {createLodestarMetrics, ILodestarMetrics} from "./metrics/lodestar";
import {IMetricsOptions} from "./options";
import {RegistryMetricCreator} from "./utils/registryMetricCreator";
import {createValidatorMonitor, IValidatorMonitor} from "./validatorMonitor";
import {IDiscv5Metrics} from "@chainsafe/discv5";
export type IMetrics = IBeaconMetrics & ILodestarMetrics & IValidatorMonitor & {register: Registry};
@@ -70,33 +69,3 @@ export function createDbMetrics(): {metrics: IDbMetrics; registry: Registry} {
registry.registerMetric(metrics.dbWrites);
return {metrics, registry};
}
export function createDiscv5Metrics(): {metrics: IDiscv5Metrics; registry: Registry} {
const metrics = {
kadTableSize: new Gauge({
name: "lodestar_discv5_kad_table_size",
help: "Total size of the discv5 kad table",
}) as Gauge<string> & {collect(): void},
activeSessionCount: new Gauge({
name: "lodestar_discv5_active_session_count",
help: "Count of the discv5 active sessions",
}) as Gauge<string> & {collect(): void},
connectedPeerCount: new Gauge({
name: "lodestar_discv5_connected_peer_count",
help: "Count of the discv5 connected peers",
}) as Gauge<string> & {collect(): void},
sentMessageCount: new Gauge<"type">({
name: "lodestar_discv5_sent_message_count",
help: "Count of the discv5 messages sent by message type",
labelNames: ["type"],
}) as Gauge<"type"> & {collect(): void},
rcvdMessageCount: new Gauge<"type">({
name: "lodestar_discv5_rcvd_message_count",
help: "Count of the discv5 messages received by message type",
labelNames: ["type"],
}) as Gauge<"type"> & {collect(): void},
};
const registry = new Registry();
Object.keys(metrics).forEach((metricName) => registry.registerMetric(metrics[metricName as keyof typeof metrics]));
return {metrics, registry};
}

View File

@@ -44,6 +44,10 @@ export function createLodestarMetrics(
help: "number of peers, labeled by direction",
labelNames: ["direction"],
}),
peersSync: register.gauge({
name: "lodestar_peers_sync_count",
help: "Current count of peers useful for sync",
}),
peerConnectedEvent: register.gauge<"direction">({
name: "lodestar_peer_connected_total",
help: "Total number of peer:connected event, labeled by direction",
@@ -68,6 +72,89 @@ export function createLodestarMetrics(
name: "lodestar_peers_total_unique_connected",
help: "Total number of unique peers that have had a connection with",
}),
peersRequestedToConnect: register.gauge({
name: "lodestar_peers_requested_total_to_connect",
help: "Priorization results total peers count requested to connect",
}),
peersRequestedToDisconnect: register.gauge({
name: "lodestar_peers_requested_total_to_disconnect",
help: "Priorization results total peers count requested to disconnect",
}),
peersRequestedSubnetsToQuery: register.gauge<"type">({
name: "lodestar_peers_requested_total_subnets_to_query",
help: "Priorization results total subnets to query and discover peers in",
labelNames: ["type"],
}),
peersRequestedSubnetsPeerCount: register.gauge<"type">({
name: "lodestar_peers_requested_total_subnets_peers_count",
help: "Priorization results total peers in subnets to query and discover peers in",
labelNames: ["type"],
}),
discovery: {
peersToConnect: register.gauge({
name: "lodestar_discovery_peers_to_connect",
help: "Current peers to connect count from discoverPeers requests",
}),
cachedENRsSize: register.gauge({
name: "lodestar_discovery_cached_enrs_size",
help: "Current size of the cachedENRs Set",
}),
findNodeQueryRequests: register.gauge<"action">({
name: "lodestar_discovery_find_node_query_requests_total",
help: "Total count of find node queries started",
labelNames: ["action"],
}),
findNodeQueryTime: register.histogram({
name: "lodestar_discovery_find_node_query_time_seconds",
help: "Time to complete a find node query in seconds in seconds",
buckets: [5, 60],
}),
findNodeQueryEnrCount: register.gauge({
name: "lodestar_discovery_find_node_query_enrs_total",
help: "Total count of found ENRs in queries",
}),
discoveredStatus: register.gauge<"status">({
name: "lodestar_discovery_discovered_status_total_count",
help: "Total count of status results of PeerDiscovery.onDiscovered() function",
labelNames: ["status"],
}),
dialAttempts: register.gauge({
name: "lodestar_discovery_total_dial_attempts",
help: "Total dial attempts by peer discovery",
}),
dialTime: register.histogram<"status">({
name: "lodestar_discovery_dial_time_seconds",
help: "Time to dial peers in seconds",
labelNames: ["status"],
buckets: [0.1, 5, 60],
}),
},
discv5: {
kadTableSize: register.gauge({
name: "lodestar_discv5_kad_table_size",
help: "Total size of the discv5 kad table",
}),
activeSessionCount: register.gauge({
name: "lodestar_discv5_active_session_count",
help: "Count of the discv5 active sessions",
}),
connectedPeerCount: register.gauge({
name: "lodestar_discv5_connected_peer_count",
help: "Count of the discv5 connected peers",
}),
sentMessageCount: register.gauge<"type">({
name: "lodestar_discv5_sent_message_count",
help: "Count of the discv5 messages sent by message type",
labelNames: ["type"],
}),
rcvdMessageCount: register.gauge<"type">({
name: "lodestar_discv5_rcvd_message_count",
help: "Count of the discv5 messages received by message type",
labelNames: ["type"],
}),
},
gossipMesh: {
peersByType: register.gauge<"type" | "fork">({

View File

@@ -4,7 +4,7 @@
export interface IMetricsOptions {
enabled: boolean;
timeout: number;
timeout?: number;
serverPort?: number;
gatewayUrl?: string;
listenAddr?: string;

View File

@@ -8,6 +8,17 @@ import {IBeaconChain} from "../chain";
import {FAR_FUTURE_EPOCH} from "../constants";
import {getCurrentAndNextFork} from "./forks";
export enum ENRKey {
tcp = "tcp",
eth2 = "eth2",
attnets = "attnets",
syncnets = "syncnets",
}
export enum SubnetType {
attnets = "attnets",
syncnets = "syncnets",
}
export interface IMetadataOpts {
metadata?: altair.Metadata;
}
@@ -43,13 +54,13 @@ export class MetadataController {
// updateEth2Field() MUST be called with clock epoch
this.updateEth2Field(this.chain.clock.currentEpoch);
this.enr.set("attnets", ssz.phase0.AttestationSubnets.serialize(this._metadata.attnets));
this.enr.set(ENRKey.attnets, ssz.phase0.AttestationSubnets.serialize(this._metadata.attnets));
// Any fork after altair included
if (currentFork !== ForkName.phase0) {
// Only persist syncnets if altair fork is already activated. If currentFork is altair but head is phase0
// adding syncnets to the ENR is not a problem, we will just have a useless field for a few hours.
this.enr.set("syncnets", ssz.phase0.AttestationSubnets.serialize(this._metadata.syncnets));
this.enr.set(ENRKey.syncnets, ssz.phase0.AttestationSubnets.serialize(this._metadata.syncnets));
}
}
}
@@ -64,7 +75,7 @@ export class MetadataController {
set syncnets(syncnets: BitVector) {
if (this.enr) {
this.enr.set("syncnets", ssz.altair.SyncSubnets.serialize(syncnets));
this.enr.set(ENRKey.syncnets, ssz.altair.SyncSubnets.serialize(syncnets));
}
this._metadata.syncnets = syncnets;
}
@@ -75,7 +86,7 @@ export class MetadataController {
set attnets(attnets: BitVector) {
if (this.enr) {
this.enr.set("attnets", ssz.phase0.AttestationSubnets.serialize(attnets));
this.enr.set(ENRKey.attnets, ssz.phase0.AttestationSubnets.serialize(attnets));
}
this._metadata.seqNumber++;
this._metadata.attnets = attnets;
@@ -101,7 +112,7 @@ export class MetadataController {
if (this.enr) {
const enrForkId = ssz.phase0.ENRForkID.serialize(getENRForkID(this.config, epoch));
this.logger.verbose(`Updated ENR.eth2: ${toHexString(enrForkId)}`);
this.enr.set("eth2", enrForkId);
this.enr.set(ENRKey.eth2, enrForkId);
}
}
}

View File

@@ -9,7 +9,7 @@ import {AbortSignal} from "@chainsafe/abort-controller";
import {IBeaconConfig} from "@chainsafe/lodestar-config";
import {ILogger} from "@chainsafe/lodestar-utils";
import {ATTESTATION_SUBNET_COUNT, ForkName, SYNC_COMMITTEE_SUBNET_COUNT} from "@chainsafe/lodestar-params";
import {Discv5, Discv5Discovery, ENR} from "@chainsafe/discv5";
import {Discv5, ENR} from "@chainsafe/discv5";
import {computeEpochAtSlot} from "@chainsafe/lodestar-beacon-state-transition";
import {Epoch} from "@chainsafe/lodestar-types";
import {IMetrics} from "../metrics";
@@ -132,9 +132,12 @@ export class Network implements INetwork {
async start(): Promise<void> {
await this.libp2p.start();
// Stop latency monitor since we handle disconnects here and don't want additional load on the event loop
this.libp2p.connectionManager._latencyMonitor.stop();
this.reqResp.start();
this.metadata.start(this.getEnr(), this.config.getForkName(this.clock.currentSlot));
this.peerManager.start();
await this.peerManager.start();
this.gossip.start();
this.attnetsService.start();
this.syncnetsService.start();
@@ -145,7 +148,7 @@ export class Network implements INetwork {
async stop(): Promise<void> {
// Must goodbye and disconnect before stopping libp2p
await this.peerManager.goodbyeAndDisconnectAllPeers();
this.peerManager.stop();
await this.peerManager.stop();
this.gossip.stop();
this.reqResp.stop();
this.attnetsService.stop();
@@ -154,8 +157,8 @@ export class Network implements INetwork {
await this.libp2p.stop();
}
get discv5(): Discv5 {
return (this.libp2p._discovery.get("discv5") as Discv5Discovery)?.discv5;
get discv5(): Discv5 | undefined {
return this.peerManager["discovery"]?.discv5;
}
get localMultiaddrs(): Multiaddr[] {
@@ -167,8 +170,7 @@ export class Network implements INetwork {
}
getEnr(): ENR | undefined {
const discv5Discovery = this.libp2p._discovery.get("discv5") as Discv5Discovery;
return discv5Discovery?.discv5?.enr ?? undefined;
return this.peerManager["discovery"]?.discv5.enr;
}
getConnectionsByPeer(): Map<string, Connection[]> {

View File

@@ -9,7 +9,6 @@ import {NOISE} from "@chainsafe/libp2p-noise";
import Bootstrap from "libp2p-bootstrap";
import MDNS from "libp2p-mdns";
import PeerId from "peer-id";
import {ENRInput, Discv5Discovery, IDiscv5Metrics} from "@chainsafe/discv5";
import {Datastore} from "interface-datastore";
export interface ILibp2pOptions {
@@ -17,16 +16,9 @@ export interface ILibp2pOptions {
addresses: {
listen: string[];
announce?: string[];
noAnnounce?: string[];
};
datastore?: Datastore;
discv5: {
bindAddr: string;
enr: ENRInput;
bootEnrs?: ENRInput[];
metrics?: IDiscv5Metrics;
};
peerDiscovery?: (typeof Bootstrap | typeof MDNS | typeof Discv5Discovery)[];
peerDiscovery?: (typeof Bootstrap | typeof MDNS)[];
bootMultiaddrs?: string[];
maxConnections?: number;
minConnections?: number;
@@ -34,21 +26,34 @@ export interface ILibp2pOptions {
export class NodejsNode extends LibP2p {
constructor(options: ILibp2pOptions) {
const defaults = {
super({
peerId: options.peerId,
addresses: {
listen: options.addresses.listen,
announce: options.addresses.announce || [],
noAnnounce: options.addresses.noAnnounce || [],
},
modules: {
connEncryption: [NOISE],
transport: [TCP],
streamMuxer: [Mplex],
peerDiscovery: options.peerDiscovery || [Bootstrap, MDNS, Discv5Discovery],
peerDiscovery: options.peerDiscovery || [Bootstrap, MDNS],
},
dialer: {
maxParallelDials: 100,
maxAddrsToDial: 4,
maxDialsPerPeer: 2,
dialTimeout: 30_000,
},
connectionManager: {
autoDial: false,
// DOCS: the maximum number of connections libp2p is willing to have before it starts disconnecting.
// If ConnectionManager.size > maxConnections calls _maybeDisconnectOne() which will sort peers disconnect
// the one with the least `_peerValues`. That's a custom peer generalized score that's not used, so it always
// has the same value in current Lodestar usage.
maxConnections: options.maxConnections,
// DOCS: the minimum number of connections below which libp2p not activate preemptive disconnections.
// If ConnectionManager.size < minConnections, it won't prune peers in _maybeDisconnectOne(). If autoDial is
// off it doesn't have any effect in behaviour.
minConnections: options.minConnections,
},
datastore: options.datastore,
@@ -83,17 +88,8 @@ export class NodejsNode extends LibP2p {
interval: 2000,
list: (options.bootMultiaddrs || []) as string[],
},
discv5: {
enr: options.discv5.enr,
bindAddr: options.discv5.bindAddr,
bootEnrs: options.discv5.bootEnrs || [],
// TODO: Disable and query on demand https://github.com/ChainSafe/lodestar/pull/3104
searchInterval: 30_000,
metrics: options.discv5.metrics,
},
},
},
};
super(defaults);
});
}
}

View File

@@ -67,7 +67,6 @@ export async function createNodeJsLibp2p(
addresses: {listen: localMultiaddrs},
datastore,
bootMultiaddrs: bootMultiaddrs,
discv5: networkOpts.discv5 || defaultDiscv5Options,
maxConnections: networkOpts.maxPeers,
minConnections: networkOpts.targetPeers,
// If peer discovery is enabled let the default in NodejsNode

View File

@@ -4,7 +4,6 @@ import {PeerManagerOpts} from "./peers";
export interface INetworkOptions extends PeerManagerOpts {
localMultiaddrs: string[];
bootMultiaddrs?: string[];
discv5?: IDiscv5DiscoveryInputOptions;
subscribeAllSubnets?: boolean;
connectToDiscv5Bootnodes?: boolean;
}

View File

@@ -1,192 +1,363 @@
import LibP2p from "libp2p";
import PeerId from "peer-id";
import {Multiaddr} from "multiaddr";
import crypto from "crypto";
import {ssz} from "@chainsafe/lodestar-types";
import {IBeaconConfig} from "@chainsafe/lodestar-config";
import {ILogger} from "@chainsafe/lodestar-utils";
import {Discv5, Discv5Discovery} from "@chainsafe/discv5";
import {shuffle} from "../../util/shuffle";
import {getConnectedPeerIds} from "./utils";
import {Discv5, ENR, IDiscv5Metrics, IDiscv5DiscoveryInputOptions} from "@chainsafe/discv5";
import {IMetrics} from "../../metrics";
import {ENRKey, SubnetType} from "../metadata";
import {prettyPrintPeerId} from "../util";
import {IPeerRpcScoreStore, ScoreState} from "./score";
import {prettyPrintPeerId} from "..";
import {pruneSetToMax} from "../../util/map";
export type AttSubnetQuery = {subnetId: number; maxPeersToDiscover: number};
/** Max number of cached ENRs after discovering a good peer */
const MAX_CACHED_ENRS = 100;
/** Max age a cached ENR will be considered for dial */
const MAX_CACHED_ENR_AGE_MS = 5 * 60 * 1000;
export type PeerDiscoveryOpts = {
maxPeers: number;
discv5: Omit<IDiscv5DiscoveryInputOptions, "metrics" | "searchInterval" | "enabled">;
};
export type PeerDiscoveryModules = {
libp2p: LibP2p;
peerRpcScores: IPeerRpcScoreStore;
metrics: IMetrics | null;
logger: ILogger;
config: IBeaconConfig;
};
type PeerIdStr = string;
enum QueryStatusCode {
NotActive,
Active,
}
type QueryStatus = {code: QueryStatusCode.NotActive} | {code: QueryStatusCode.Active; count: number};
enum DiscoveredPeerStatus {
no_tcp = "no_tcp",
no_eth2 = "no_eth2",
unknown_forkDigest = "unknown_forkDigest",
bad_score = "bad_score",
already_connected = "already_connected",
error = "error",
attempt_dial = "attempt_dial",
cached = "cached",
dropped = "dropped",
}
type UnixMs = number;
export type SubnetDiscvQueryMs = {
subnet: number;
type: SubnetType;
toUnixMs: UnixMs;
maxPeersToDiscover: number;
};
type CachedENR = {
peerId: PeerId;
multiaddrTCP: Multiaddr;
subnets: Record<SubnetType, boolean[]>;
addedUnixMs: number;
};
/**
* PeerDiscovery discovers and dials new peers, and executes discv5 queries.
* Currently relies on discv5 automatic periodic queries
* Currently relies on discv5 automatic periodic queries.
*/
export class PeerDiscovery {
readonly discv5: Discv5;
private libp2p: LibP2p;
private peerRpcScores: IPeerRpcScoreStore;
private metrics: IMetrics | null;
private logger: ILogger;
private config: IBeaconConfig;
private activeDiscv5Query: boolean;
private cachedENRs = new Set<CachedENR>();
private randomNodeQuery: QueryStatus = {code: QueryStatusCode.NotActive};
private peersToConnect = 0;
private subnetRequests: Record<SubnetType, Map<number, UnixMs>> = {
attnets: new Map(),
syncnets: new Map(),
};
/** The maximum number of peers we allow (exceptions for subnet peers) */
private maxPeers: number;
constructor(modules: PeerDiscoveryModules, opts: PeerDiscoveryOpts) {
this.libp2p = modules.libp2p;
this.peerRpcScores = modules.peerRpcScores;
this.logger = modules.logger;
this.config = modules.config;
const {libp2p, peerRpcScores, metrics, logger, config} = modules;
this.libp2p = libp2p;
this.peerRpcScores = peerRpcScores;
this.metrics = metrics;
this.logger = logger;
this.config = config;
this.maxPeers = opts.maxPeers;
this.activeDiscv5Query = false;
this.discv5 = Discv5.create({
enr: opts.discv5.enr,
peerId: modules.libp2p.peerId,
multiaddr: new Multiaddr(opts.discv5.bindAddr),
config: opts.discv5,
// TODO: IDiscv5Metrics is not properly defined, should remove the collect() function
metrics: (modules.metrics?.discv5 as unknown) as {
[K in keyof IMetrics["discv5"]]: IDiscv5Metrics[keyof IDiscv5Metrics];
},
});
opts.discv5.bootEnrs.forEach((bootEnr) => this.discv5.addEnr(bootEnr));
if (metrics) {
metrics.discovery.cachedENRsSize.addCollect(() => {
metrics.discovery.cachedENRsSize.set(this.cachedENRs.size);
metrics.discovery.peersToConnect.set(this.peersToConnect);
});
}
}
async start(): Promise<void> {
await this.discv5.start();
this.discv5.on("discovered", this.onDiscovered);
}
async stop(): Promise<void> {
this.discv5.off("discovered", this.onDiscovered);
await this.discv5.stop();
}
/**
* Request to find peers, both on specific subnets and in general
*/
discoverPeers(peersToConnect: number, subnetRequests: SubnetDiscvQueryMs[] = []): void {
const subnetsToDiscoverPeers: SubnetDiscvQueryMs[] = [];
const cachedENRsToDial = new Set<CachedENR>();
// Iterate in reverse to consider first the most recent ENRs
const cachedENRsReverse: CachedENR[] = [];
for (const cachedENR of this.cachedENRs) {
if (Date.now() - cachedENR.addedUnixMs > MAX_CACHED_ENR_AGE_MS) {
this.cachedENRs.delete(cachedENR);
} else {
cachedENRsReverse.unshift(cachedENR);
}
}
this.peersToConnect += peersToConnect;
subnet: for (const subnetRequest of subnetRequests) {
// Extend the toUnixMs for this subnet
const prevUnixMs = this.subnetRequests[subnetRequest.type].get(subnetRequest.subnet);
if (prevUnixMs === undefined || prevUnixMs < subnetRequest.toUnixMs) {
this.subnetRequests[subnetRequest.type].set(subnetRequest.subnet, subnetRequest.toUnixMs);
}
// Get cached ENRs from the discovery service that are in the requested `subnetId`, but not connected yet
let cachedENRsInSubnet = 0;
for (const cachedENR of cachedENRsReverse) {
if (cachedENR.subnets[subnetRequest.type][subnetRequest.subnet]) {
cachedENRsToDial.add(cachedENR);
if (++cachedENRsInSubnet >= subnetRequest.maxPeersToDiscover) {
continue subnet;
}
}
}
// Query a discv5 query if more peers are needed
subnetsToDiscoverPeers.push(subnetRequest);
}
// If subnetRequests won't connect enough peers for peersToConnect, add more
if (cachedENRsToDial.size < peersToConnect) {
for (const cachedENR of cachedENRsReverse) {
cachedENRsToDial.add(cachedENR);
if (cachedENRsToDial.size >= peersToConnect) {
break;
}
}
}
// Queue an outgoing connection request to the cached peers that are on `s.subnet_id`.
// If we connect to the cached peers before the discovery query starts, then we potentially
// save a costly discovery query.
for (const cachedENRToDial of cachedENRsToDial) {
this.cachedENRs.delete(cachedENRToDial);
void this.dialPeer(cachedENRToDial);
}
// Run a discv5 subnet query to try to discover new peers
if (subnetsToDiscoverPeers.length > 0 || cachedENRsToDial.size < peersToConnect) {
void this.runFindRandomNodeQuery();
}
}
/**
* Request to find peers. First, looked at cached peers in peerStore
*/
discoverPeers(maxPeersToDiscover: number): void {
// To remove self peer if present
const ownPeerIdStr = this.libp2p.peerId.toB58String();
const notConnectedPeers = this.getStoredPeerIdStr().filter(
(peerIdStr) => !this.isPeerConnected(peerIdStr) && peerIdStr !== ownPeerIdStr
);
const discPeers = shuffle(notConnectedPeers).slice(0, maxPeersToDiscover);
this.peersDiscovered(discPeers);
private async runFindRandomNodeQuery(): Promise<void> {
// Run a general discv5 query if one is not already in progress
if (this.activeDiscv5Query) return;
const discovery = this.libp2p._discovery.get("discv5") as Discv5Discovery | undefined;
// if disablePeerDiscovery = true, libp2p will not have any "discv5" module
if (!discovery) return;
const discv5: Discv5 = discovery.discv5;
const randomNodeId = Array.from({length: 64}, () => Math.floor(Math.random() * 15).toString(16)).join();
this.activeDiscv5Query = true;
void discv5.findNode(randomNodeId).finally(() => {
this.activeDiscv5Query = false;
});
}
/**
* Request to find peers on a given subnet.
*/
async discoverSubnetPeers(subnetsToDiscover: AttSubnetQuery[]): Promise<void> {
const subnetsToDiscoverFiltered: number[] = [];
for (const {subnetId, maxPeersToDiscover} of subnetsToDiscover) {
// TODO:
// Queue an outgoing connection request to the cached peers that are on `s.subnet_id`.
// If we connect to the cached peers before the discovery query starts, then we potentially
// save a costly discovery query.
// Get cached ENRs from the discovery service that are in the requested `subnetId`, but not connected yet
const discPeersOnSubnet = await this.getCachedDiscoveryPeersOnSubnet(subnetId, maxPeersToDiscover);
this.peersDiscovered(discPeersOnSubnet);
// Query a discv5 query if more peers are needed
if (maxPeersToDiscover - discPeersOnSubnet.length > 0) {
subnetsToDiscoverFiltered.push(subnetId);
}
if (this.randomNodeQuery.code === QueryStatusCode.Active) {
this.metrics?.discovery.findNodeQueryRequests.inc({action: "ignore"});
return;
} else {
this.metrics?.discovery.findNodeQueryRequests.inc({action: "start"});
}
// Run a discv5 subnet query to try to discover new peers
if (subnetsToDiscoverFiltered.length > 0) {
void this.runSubnetQuery(subnetsToDiscoverFiltered.map((subnetId) => subnetId));
const randomNodeId = crypto.randomBytes(64).toString("hex");
this.randomNodeQuery = {code: QueryStatusCode.Active, count: 0};
const timer = this.metrics?.discovery.findNodeQueryTime.startTimer();
try {
const enrs = await this.discv5.findNode(randomNodeId);
this.metrics?.discovery.findNodeQueryEnrCount.inc(enrs.length);
} catch (e) {
this.logger.error("Error on discv5.findNode()", {}, e as Error);
} finally {
this.randomNodeQuery = {code: QueryStatusCode.NotActive};
timer?.();
}
}
/**
* List existing peers that declare being part of a target subnet
* Progressively called by discv5 as a result of any query.
*/
async getCachedDiscoveryPeersOnSubnet(subnet: number, maxPeersToDiscover: number): Promise<PeerIdStr[]> {
const discovery = this.libp2p._discovery.get("discv5") as Discv5Discovery | undefined;
// if disablePeerDiscovery = true, libp2p will not have any "discv5" module
if (!discovery) return [];
const discv5: Discv5 = discovery.discv5;
private onDiscovered = async (enr: ENR): Promise<void> => {
const status = await this.handleDiscoveredPeer(enr);
this.metrics?.discovery.discoveredStatus.inc({status});
};
const peersOnSubnet: PeerIdStr[] = [];
// TODO: Should kadValues() be shuffle'd?
for (const enr of discv5.kadValues()) {
if (peersOnSubnet.length >= maxPeersToDiscover) {
break;
/**
* Progressively called by discv5 as a result of any query.
*/
private async handleDiscoveredPeer(enr: ENR): Promise<DiscoveredPeerStatus> {
try {
if (this.randomNodeQuery.code === QueryStatusCode.Active) {
this.randomNodeQuery.count++;
}
try {
const attnets = enr.get("attnets");
if (attnets && ssz.phase0.AttestationSubnets.deserialize(attnets)[subnet]) {
// async because peerId runs some crypto lib
const peerId = await enr.peerId();
// We are not interested in peers that don't advertise their tcp addr
const multiaddrTCP = enr.getLocationMultiaddr(ENRKey.tcp);
if (!multiaddrTCP) {
return DiscoveredPeerStatus.no_tcp;
}
// Mimic the regular discv5 + js-libp2p
// Must get the tcp multiaddr and add it the addressBook. Ignore peers without
// https://github.com/ChainSafe/discv5/blob/671a9ac8ec59ba9ad6dcce566036ce4758fe50a7/src/libp2p/discv5.ts#L92
const multiaddrTCP = enr.getLocationMultiaddr("tcp");
if (multiaddrTCP) {
// Must add the multiaddrs array to the address book before dialing
// https://github.com/libp2p/js-libp2p/blob/aec8e3d3bb1b245051b60c2a890550d262d5b062/src/index.js#L638
this.libp2p.peerStore.addressBook.add(peerId, [multiaddrTCP]);
peersOnSubnet.push(peerId.toB58String());
// Check if the ENR.eth2 field matches and is of interest
const eth2 = enr.get(ENRKey.eth2);
if (!eth2) {
return DiscoveredPeerStatus.no_eth2;
}
// Fast de-serialization without SSZ
const forkDigest = eth2.slice(0, 4);
// Check if forkDigest matches any of our known forks.
const forkName = this.config.forkDigest2ForkNameOption(forkDigest);
if (!forkName) {
return DiscoveredPeerStatus.unknown_forkDigest;
}
// TODO: Then check if the next fork info matches ours
// const enrForkId = ssz.phase0.ENRForkID.deserialize(eth2);
// async due to some crypto that's no longer necessary
const peerId = await enr.peerId();
// Check if peer is not banned or disconnected
if (this.peerRpcScores.getScoreState(peerId) !== ScoreState.Healthy) {
return DiscoveredPeerStatus.bad_score;
}
// Ignore connected peers. TODO: Is this check necessary?
if (this.isPeerConnected(peerId.toB58String())) {
return DiscoveredPeerStatus.already_connected;
}
// Are this fields mandatory?
const attnetsBytes = enr.get(ENRKey.attnets);
const syncnetsBytes = enr.get(ENRKey.syncnets);
// TODO: Optimize bitfield parsing
const attnets = attnetsBytes ? (ssz.phase0.AttestationSubnets.deserialize(attnetsBytes) as boolean[]) : [];
const syncnets = syncnetsBytes ? (ssz.altair.SyncSubnets.deserialize(syncnetsBytes) as boolean[]) : [];
// Should dial peer?
const cachedPeer: CachedENR = {
peerId,
multiaddrTCP,
subnets: {attnets, syncnets},
addedUnixMs: Date.now(),
};
// Only dial peer if necessary
if (this.shouldDialPeer(cachedPeer)) {
void this.dialPeer(cachedPeer);
return DiscoveredPeerStatus.attempt_dial;
} else {
// Add to pending good peers with a last seen time
this.cachedENRs.add(cachedPeer);
const dropped = pruneSetToMax(this.cachedENRs, MAX_CACHED_ENRS);
// If the cache was already full, count the peer as dropped
return dropped > 0 ? DiscoveredPeerStatus.dropped : DiscoveredPeerStatus.cached;
}
} catch (e) {
this.logger.error("Error onDiscovered", {}, e as Error);
return DiscoveredPeerStatus.error;
}
}
private shouldDialPeer(peer: CachedENR): boolean {
if (this.peersToConnect > 0) {
return true;
}
for (const type of [SubnetType.attnets, SubnetType.syncnets]) {
for (const [subnet, toUnixMs] of this.subnetRequests[type].entries()) {
if (toUnixMs < Date.now()) {
// Prune all requests
this.subnetRequests[type].delete(subnet);
} else {
if (peer.subnets[type][subnet]) {
return true;
}
}
} catch (e) {
this.logger.debug("Error deserializing ENR", {nodeId: enr.nodeId}, e as Error);
}
}
return peersOnSubnet;
}
// eslint-disable-next-line @typescript-eslint/no-unused-vars
private async runSubnetQuery(subnets: number[]): Promise<void> {
// TODO: Run a discv5 query for a specific set of queries
return false;
}
/**
* Handles DiscoveryEvent::QueryResult
* Peers that have been returned by discovery requests are dialed here if they are suitable.
*/
private peersDiscovered(discoveredPeers: PeerIdStr[]): void {
const connectedPeersCount = getConnectedPeerIds(this.libp2p).length;
const toDialPeers: PeerId[] = [];
private async dialPeer(cachedPeer: CachedENR): Promise<void> {
this.peersToConnect--;
for (const peerIdStr of discoveredPeers) {
const peer = PeerId.createFromB58String(peerIdStr);
if (
connectedPeersCount + toDialPeers.length < this.maxPeers &&
!this.isPeerConnected(peerIdStr) &&
// Ensure peer is not banner or disconnected. New peers are healthy by default
this.peerRpcScores.getScoreState(peer) === ScoreState.Healthy
) {
// we attempt a connection if this peer is a subnet peer or if the max peer count
// is not yet filled (including dialing peers)
toDialPeers.push(peer);
}
const {peerId, multiaddrTCP} = cachedPeer;
// Must add the multiaddrs array to the address book before dialing
// https://github.com/libp2p/js-libp2p/blob/aec8e3d3bb1b245051b60c2a890550d262d5b062/src/index.js#L638
this.libp2p.peerStore.addressBook.add(peerId, [multiaddrTCP]);
// Note: PeerDiscovery adds the multiaddrTCP beforehand
const peerIdShort = prettyPrintPeerId(peerId);
this.logger.debug("Dialing discovered peer", {peer: peerIdShort});
this.metrics?.discovery.dialAttempts.inc();
const timer = this.metrics?.discovery.dialTime.startTimer();
// Note: `libp2p.dial()` is what libp2p.connectionManager autoDial calls
// Note: You must listen to the connected events to listen for a successful conn upgrade
try {
await this.libp2p.dial(peerId);
timer?.({status: "success"});
this.logger.debug("Dialed discovered peer", {peer: peerIdShort});
} catch (e) {
timer?.({status: "error"});
formatLibp2pDialError(e as Error);
this.logger.debug("Error dialing discovered peer", {peer: peerIdShort}, e as Error);
}
for (const peer of toDialPeers) {
// Note: PeerDiscovery adds the multiaddrTCP beforehand
this.logger.debug("Dialing discovered peer", {peer: prettyPrintPeerId(peer)});
// Note: `libp2p.dial()` is what libp2p.connectionManager autoDial calls
// Note: You must listen to the connected events to listen for a successful conn upgrade
this.libp2p.dial(peer).catch((e: Error) => {
formatLibp2pDialError(e);
this.logger.debug("Error dialing discovered peer", {peer: prettyPrintPeerId(peer)}, e);
});
}
}
/** Return stored peerIdStr, may return self peerIdStr */
private getStoredPeerIdStr(): PeerIdStr[] {
return Array.from(((this.libp2p.peerStore as unknown) as Libp2pPeerStore).addressBook.data.keys());
}
/** Check if there is 1+ open connection with this peer */
@@ -196,10 +367,6 @@ export class PeerDiscovery {
}
}
type Libp2pPeerStore = {
addressBook: {data: Map<string, void>};
};
/**
* libp2p errors with extremely noisy errors here, which are deeply nested taking 30-50 lines.
* Some known erors:

View File

@@ -1,8 +1,9 @@
import LibP2p, {Connection} from "libp2p";
import PeerId from "peer-id";
import {IDiscv5DiscoveryInputOptions} from "@chainsafe/discv5";
import {IBeaconConfig} from "@chainsafe/lodestar-config";
import {allForks, altair, phase0} from "@chainsafe/lodestar-types";
import {ILogger} from "@chainsafe/lodestar-utils";
import PeerId from "peer-id";
import {IBeaconChain} from "../../chain";
import {GoodByeReasonCode, GOODBYE_KNOWN_CODES, Libp2pEvent} from "../../constants";
import {IMetrics} from "../../metrics";
@@ -11,16 +12,16 @@ import {IReqResp, ReqRespMethod, RequestTypedContainer} from "../reqresp";
import {prettyPrintPeerId} from "../util";
import {ISubnetsService} from "../subnets";
import {Libp2pPeerMetadataStore} from "./metastore";
import {PeerDiscovery} from "./discover";
import {PeerDiscovery, SubnetDiscvQueryMs} from "./discover";
import {IPeerRpcScoreStore, ScoreState} from "./score";
import {
getConnectedPeerIds,
hasSomeConnectedPeer,
PeerMapDelay,
assertPeerRelevance,
prioritizePeers,
IrrelevantPeerError,
} from "./utils";
import {SubnetType} from "../metadata";
/** heartbeat performs regular updates such as updating reputations and performing discovery requests */
const HEARTBEAT_INTERVAL_MS = 30 * 1000;
@@ -44,8 +45,10 @@ export type PeerManagerOpts = {
targetPeers: number;
/** The maximum number of peers we allow (exceptions for subnet peers) */
maxPeers: number;
/** Don't run discv5 queries, nor connect to cached peers in the peerStore */
disablePeerDiscovery?: boolean;
/**
* If null, Don't run discv5 queries, nor connect to cached peers in the peerStore
*/
discv5: IDiscv5DiscoveryInputOptions | null;
};
export type PeerManagerModules = {
@@ -62,6 +65,23 @@ export type PeerManagerModules = {
networkEventBus: INetworkEventBus;
};
type PeerIdStr = string;
enum RelevantPeerStatus {
Unknown = "unknown",
relevant = "relevant",
irrelevant = "irrelevant",
}
type PeerData = {
lastReceivedMsgUnixTsMs: number;
lastStatusUnixTsMs: number;
connectedUnixTsMs: number;
relevantStatus: RelevantPeerStatus;
direction: Connection["stat"]["direction"];
peerId: PeerId;
};
/**
* Performs all peer managment functionality in a single grouped class:
* - Ping peers every `PING_INTERVAL_MS`
@@ -81,14 +101,13 @@ export class PeerManager {
private config: IBeaconConfig;
private peerMetadata: Libp2pPeerMetadataStore;
private peerRpcScores: IPeerRpcScoreStore;
private discovery: PeerDiscovery;
/** If null, discovery is disabled */
private discovery: PeerDiscovery | null;
private networkEventBus: INetworkEventBus;
/** Map of PeerId -> Time of last PING'd request in ms */
private peersToPingOutbound = new PeerMapDelay(PING_INTERVAL_INBOUND_MS);
private peersToPingInbound = new PeerMapDelay(PING_INTERVAL_OUTBOUND_MS);
/** Map of PeerId -> Time of last STATUS'd request in ms */
private peersToStatus = new PeerMapDelay(STATUS_INTERVAL_MS);
// A single map of connected peers with all necessary data to handle PINGs, STATUS, and metrics
private connectedPeers = new Map<PeerIdStr, PeerData>();
private opts: PeerManagerOpts;
private intervals: NodeJS.Timeout[] = [];
@@ -108,10 +127,17 @@ export class PeerManager {
this.networkEventBus = modules.networkEventBus;
this.opts = opts;
this.discovery = new PeerDiscovery(modules, opts);
// opts.discv5 === null, discovery is disabled
this.discovery = opts.discv5 && new PeerDiscovery(modules, {maxPeers: opts.maxPeers, discv5: opts.discv5});
const {metrics} = modules;
if (metrics) {
metrics.peers.addCollect(() => this.runPeerCountMetrics(metrics));
}
}
start(): void {
async start(): Promise<void> {
await this.discovery?.start();
this.libp2p.connectionManager.on(Libp2pEvent.peerConnect, this.onLibp2pPeerConnect);
this.libp2p.connectionManager.on(Libp2pEvent.peerDisconnect, this.onLibp2pPeerDisconnect);
this.networkEventBus.on(NetworkEvent.reqRespRequest, this.onRequest);
@@ -124,7 +150,8 @@ export class PeerManager {
];
}
stop(): void {
async stop(): Promise<void> {
await this.discovery?.stop();
this.libp2p.connectionManager.removeListener(Libp2pEvent.peerConnect, this.onLibp2pPeerConnect);
this.libp2p.connectionManager.removeListener(Libp2pEvent.peerDisconnect, this.onLibp2pPeerDisconnect);
this.networkEventBus.off(NetworkEvent.reqRespRequest, this.onRequest);
@@ -167,7 +194,13 @@ export class PeerManager {
* The app layer needs to refresh the status of some peers. The sync have reached a target
*/
reStatusPeers(peers: PeerId[]): void {
for (const peer of peers) this.peersToStatus.requestNow(peer);
for (const peer of peers) {
const peerData = this.connectedPeers.get(peer.toB58String());
if (peerData) {
// Set to 0 to trigger a status request after calling pingAndStatusTimeouts()
peerData.lastStatusUnixTsMs = 0;
}
}
this.pingAndStatusTimeouts();
}
@@ -176,6 +209,11 @@ export class PeerManager {
*/
private onRequest = (request: RequestTypedContainer, peer: PeerId): void => {
try {
const peerData = this.connectedPeers.get(peer.toB58String());
if (peerData) {
peerData.lastReceivedMsgUnixTsMs = Date.now();
}
switch (request.method) {
case ReqRespMethod.Ping:
return this.onPing(peer, request.body);
@@ -229,8 +267,9 @@ export class PeerManager {
* Handle a STATUS request + response (rpc handler responds with STATUS automatically)
*/
private onStatus(peer: PeerId, status: phase0.Status): void {
// reset the to-status timer for this peer
this.peersToStatus.requestAfter(peer);
// reset the to-status timer of this peer
const peerData = this.connectedPeers.get(peer.toB58String());
if (peerData) peerData.lastStatusUnixTsMs = Date.now();
try {
assertPeerRelevance(status, this.chain);
@@ -241,6 +280,7 @@ export class PeerManager {
this.logger.error("Unexpected error in assertPeerRelevance", {peer: prettyPrintPeerId(peer)}, e as Error);
}
if (peerData) peerData.relevantStatus = RelevantPeerStatus.irrelevant;
void this.goodbyeAndDisconnect(peer, GoodByeReasonCode.IRRELEVANT_NETWORK);
return;
}
@@ -248,6 +288,7 @@ export class PeerManager {
// Peer is usable, send it to the rangeSync
// NOTE: Peer may not be connected anymore at this point, potential race condition
// libp2p.connectionManager.get() returns not null if there's +1 open connections with `peer`
if (peerData) peerData.relevantStatus = RelevantPeerStatus.relevant;
if (this.libp2p.connectionManager.get(peer)) {
this.networkEventBus.emit(NetworkEvent.peerConnected, peer, status);
}
@@ -309,7 +350,7 @@ export class PeerManager {
}
}
const {peersToDisconnect, discv5Queries, peersToConnect} = prioritizePeers(
const {peersToDisconnect, peersToConnect, attnetQueries, syncnetQueries} = prioritizePeers(
connectedHealthyPeers.map((peer) => ({
id: peer,
attnets: this.peerMetadata.metadata.get(peer)?.attnets ?? [],
@@ -322,16 +363,35 @@ export class PeerManager {
this.opts
);
if (discv5Queries.length > 0 && !this.opts.disablePeerDiscovery) {
// It's a promise due to crypto lib calls only
this.discovery.discoverSubnetPeers(discv5Queries).catch((e: Error) => {
this.logger.error("Error on discoverSubnetPeers", {}, e);
});
// Register results to metrics
this.metrics?.peersRequestedToDisconnect.inc(peersToDisconnect.length);
this.metrics?.peersRequestedToConnect.inc(peersToConnect);
const queriesMerged: SubnetDiscvQueryMs[] = [];
for (const {type, queries} of [
{type: SubnetType.attnets, queries: attnetQueries},
{type: SubnetType.syncnets, queries: syncnetQueries},
]) {
if (queries.length > 0) {
let count = 0;
for (const query of queries) {
count += query.maxPeersToDiscover;
queriesMerged.push({
subnet: query.subnet,
type,
maxPeersToDiscover: query.maxPeersToDiscover,
toUnixMs: 1000 * (this.chain.genesisTime + query.toSlot * this.config.SECONDS_PER_SLOT),
});
}
this.metrics?.peersRequestedSubnetsToQuery.inc({type}, queries.length);
this.metrics?.peersRequestedSubnetsPeerCount.inc({type}, count);
}
}
if (peersToConnect > 0 && !this.opts.disablePeerDiscovery) {
if (this.discovery) {
try {
this.discovery.discoverPeers(peersToConnect);
this.discovery.discoverPeers(peersToConnect, queriesMerged);
} catch (e) {
this.logger.error("Error on discoverPeers", {}, e as Error);
}
@@ -343,20 +403,28 @@ export class PeerManager {
}
private pingAndStatusTimeouts(): void {
// Every interval request to send some peers our seqNumber and process theirs
// If the seqNumber is different it must request the new metadata
const peersToPing = [...this.peersToPingInbound.pollNext(), ...this.peersToPingOutbound.pollNext()];
for (const peer of peersToPing) {
void this.requestPing(peer);
const now = Date.now();
const peersToStatus: PeerId[] = [];
for (const peer of this.connectedPeers.values()) {
// Every interval request to send some peers our seqNumber and process theirs
// If the seqNumber is different it must request the new metadata
const pingInterval = peer.direction === "inbound" ? PING_INTERVAL_INBOUND_MS : PING_INTERVAL_OUTBOUND_MS;
if (now > peer.lastReceivedMsgUnixTsMs + pingInterval) {
void this.requestPing(peer.peerId);
}
// TODO: Consider sending status request to peers that do support status protocol
// {supportsProtocols: getStatusProtocols()}
// Every interval request to send some peers our status, and process theirs
// Must re-check if this peer is relevant to us and emit an event if the status changes
// So the sync layer can update things
if (now > peer.lastStatusUnixTsMs + STATUS_INTERVAL_MS) {
peersToStatus.push(peer.peerId);
}
}
// TODO: Consider sending status request to peers that do support status protocol
// {supportsProtocols: getStatusProtocols()}
// Every interval request to send some peers our status, and process theirs
// Must re-check if this peer is relevant to us and emit an event if the status changes
// So the sync layer can update things
const peersToStatus = this.peersToStatus.pollNext();
if (peersToStatus.length > 0) {
void this.requestStatusMany(peersToStatus);
}
@@ -373,28 +441,32 @@ export class PeerManager {
const {direction, status} = libp2pConnection.stat;
const peer = libp2pConnection.remotePeer;
// On connection:
// - Outbound connections: send a STATUS and PING request
// - Inbound connections: expect to be STATUS'd, schedule STATUS and PING for latter
// NOTE: libp2p may emit two "peer:connect" events: One for inbound, one for outbound
// If that happens, it's okay. Only the "outbound" connection triggers immediate action
if (direction === "outbound") {
this.peersToStatus.requestNow(peer);
this.peersToPingOutbound.requestNow(peer);
this.peersToPingInbound.delete(peer);
} else {
this.peersToStatus.requestAfter(peer, STATUS_INBOUND_GRACE_PERIOD);
this.peersToPingInbound.requestAfter(peer);
this.peersToPingOutbound.delete(peer);
if (!this.connectedPeers.has(peer.toB58String())) {
// On connection:
// - Outbound connections: send a STATUS and PING request
// - Inbound connections: expect to be STATUS'd, schedule STATUS and PING for latter
// NOTE: libp2p may emit two "peer:connect" events: One for inbound, one for outbound
// If that happens, it's okay. Only the "outbound" connection triggers immediate action
const now = Date.now();
this.connectedPeers.set(peer.toB58String(), {
lastReceivedMsgUnixTsMs: direction === "outbound" ? 0 : now,
// If inbound, request after STATUS_INBOUND_GRACE_PERIOD
lastStatusUnixTsMs: direction === "outbound" ? 0 : now - STATUS_INTERVAL_MS + STATUS_INBOUND_GRACE_PERIOD,
connectedUnixTsMs: now,
relevantStatus: RelevantPeerStatus.Unknown,
direction,
peerId: peer,
});
if (direction === "outbound") {
this.pingAndStatusTimeouts();
}
}
this.pingAndStatusTimeouts();
this.logger.verbose("peer connected", {peer: prettyPrintPeerId(peer), direction, status});
// NOTE: The peerConnect event is not emitted here here, but after asserting peer relevance
this.metrics?.peerConnectedEvent.inc({direction});
this.seenPeers.add(peer.toB58String());
this.metrics?.peersTotalUniqueConnected.set(this.seenPeers.size);
this.runPeerCountMetrics();
};
/**
@@ -405,14 +477,11 @@ export class PeerManager {
const peer = libp2pConnection.remotePeer;
// remove the ping and status timer for the peer
this.peersToPingInbound.delete(peer);
this.peersToPingOutbound.delete(peer);
this.peersToStatus.delete(peer);
this.connectedPeers.delete(peer.toB58String());
this.logger.verbose("peer disconnected", {peer: prettyPrintPeerId(peer), direction, status});
this.networkEventBus.emit(NetworkEvent.peerDisconnected, peer);
this.metrics?.peerDisconnectedEvent.inc({direction});
this.runPeerCountMetrics(); // Last in case it throws
};
private async disconnect(peer: PeerId): Promise<void> {
@@ -435,7 +504,7 @@ export class PeerManager {
}
/** Register peer count metrics */
private runPeerCountMetrics(): void {
private runPeerCountMetrics(metrics: IMetrics): void {
let total = 0;
const peersByDirection = new Map<string, number>();
for (const connections of this.libp2p.connectionManager.connections.values()) {
@@ -448,9 +517,18 @@ export class PeerManager {
}
for (const [direction, peers] of peersByDirection.entries()) {
this.metrics?.peersByDirection.set({direction}, peers);
metrics.peersByDirection.set({direction}, peers);
}
this.metrics?.peers.set(total);
let syncPeers = 0;
for (const peer of this.connectedPeers.values()) {
if (peer.relevantStatus === RelevantPeerStatus.relevant) {
syncPeers++;
}
}
metrics.peers.set(total);
metrics.peersTotalUniqueConnected.set(this.seenPeers.size);
metrics.peersSync.set(syncPeers);
}
}

View File

@@ -1,5 +1,4 @@
export * from "./assertPeerRelevance";
export * from "./getConnectedPeerIds";
export * from "./peerMapDelay";
export * from "./prioritizePeers";
export * from "./subnetMap";

View File

@@ -1,40 +0,0 @@
import PeerId from "peer-id";
import {PeerMap} from "../../../util/peerMap";
/**
* Maps PeerIds to unix timestamps to implement a timeout map without timeouts
* Useful to track when a PeerId has to be PING'ed or STATUS'ed
*/
export class PeerMapDelay {
interval: number;
lastMsMap = new PeerMap<number>();
constructor(interval: number) {
this.interval = interval;
}
/** lastMs = 0 -> Request as soon as pollNext() is called */
requestNow(peer: PeerId): void {
this.requestAfter(peer, -1);
}
/** lastMs = now() -> Request after `INTERVAL` */
requestAfter(peer: PeerId, ms = this.interval): void {
this.lastMsMap.set(peer, Date.now() - this.interval + ms);
}
/** Return array of peers with expired interval + calls requestAfter on them */
pollNext(): PeerId[] {
const peers: PeerId[] = [];
for (const [peer, lastMs] of this.lastMsMap.entries()) {
if (Date.now() - lastMs > this.interval) {
this.requestAfter(peer);
peers.push(peer);
}
}
return peers;
}
delete(peer: PeerId): boolean {
return this.lastMsMap.delete(peer);
}
}

View File

@@ -2,10 +2,19 @@ import PeerId from "peer-id";
import {altair, phase0} from "@chainsafe/lodestar-types";
import {shuffle} from "../../../util/shuffle";
import {sortBy} from "../../../util/sortBy";
import {AttSubnetQuery} from "../discover";
import {SubnetType} from "../../metadata";
import {RequestedSubnet} from "./subnetMap";
/** Target number of peers we'd like to have connected to a given long-lived subnet */
const MAX_TARGET_SUBNET_PEERS = 6;
/**
* Instead of attempting to connect the exact amount necessary this will overshoot a little since the success
* rate of outgoing connections is low, <33%. If we try to connect exactly `targetPeers - connectedPeerCount` the
* peer count will almost always be just below targetPeers triggering constant discoveries that are not necessary
*/
const PEERS_TO_CONNECT_OVERSHOOT_FACTOR = 3;
type SubnetDiscvQuery = {subnet: number; toSlot: number; maxPeersToDiscover: number};
/**
* Prioritize which peers to disconect and which to connect. Conditions:
@@ -16,20 +25,26 @@ const MAX_TARGET_SUBNET_PEERS = 6;
*/
export function prioritizePeers(
connectedPeers: {id: PeerId; attnets: phase0.AttestationSubnets; syncnets: altair.SyncSubnets; score: number}[],
activeAttnets: number[],
activeSyncnets: number[],
activeAttnets: RequestedSubnet[],
activeSyncnets: RequestedSubnet[],
{targetPeers, maxPeers}: {targetPeers: number; maxPeers: number}
): {peersToDisconnect: PeerId[]; peersToConnect: number; discv5Queries: AttSubnetQuery[]} {
const peersToDisconnect: PeerId[] = [];
): {
peersToConnect: number;
peersToDisconnect: PeerId[];
attnetQueries: SubnetDiscvQuery[];
syncnetQueries: SubnetDiscvQuery[];
} {
let peersToConnect = 0;
const discv5Queries: AttSubnetQuery[] = [];
const peersToDisconnect: PeerId[] = [];
const attnetQueries: SubnetDiscvQuery[] = [];
const syncnetQueries: SubnetDiscvQuery[] = [];
// To filter out peers that are part of 1+ attnets of interest from possible disconnection
const peerHasDuty = new Map<string, boolean>();
for (const {subnets, subnetKey} of [
{subnets: activeAttnets, subnetKey: "attnets" as const},
{subnets: activeSyncnets, subnetKey: "syncnets" as const},
for (const {subnets, subnetKey, queries} of [
{subnets: activeAttnets, subnetKey: SubnetType.attnets, queries: attnetQueries},
{subnets: activeSyncnets, subnetKey: SubnetType.syncnets, queries: syncnetQueries},
]) {
// Dynamically compute 1 <= TARGET_PEERS_PER_SUBNET <= MAX_TARGET_SUBNET_PEERS
const targetPeersPerSubnet = Math.min(MAX_TARGET_SUBNET_PEERS, Math.max(1, Math.floor(maxPeers / subnets.length)));
@@ -39,19 +54,23 @@ export function prioritizePeers(
const peersPerSubnet = new Map<number, number>();
for (const peer of connectedPeers) {
for (const subnetId of subnets) {
if (peer[subnetKey][subnetId]) {
peerHasDuty.set(peer.id.toB58String(), true);
peersPerSubnet.set(subnetId, 1 + (peersPerSubnet.get(subnetId) ?? 0));
let hasDuty = false;
for (const {subnet} of subnets) {
if (peer[subnetKey][subnet]) {
hasDuty = true;
peersPerSubnet.set(subnet, 1 + (peersPerSubnet.get(subnet) ?? 0));
}
}
if (hasDuty) {
peerHasDuty.set(peer.id.toB58String(), true);
}
}
for (const subnetId of subnets) {
const peersInSubnet = peersPerSubnet.get(subnetId) ?? 0;
for (const {subnet, toSlot} of subnets) {
const peersInSubnet = peersPerSubnet.get(subnet) ?? 0;
if (peersInSubnet < targetPeersPerSubnet) {
// We need more peers
discv5Queries.push({subnetId, maxPeersToDiscover: targetPeersPerSubnet - peersInSubnet});
queries.push({subnet, toSlot, maxPeersToDiscover: targetPeersPerSubnet - peersInSubnet});
}
}
}
@@ -60,8 +79,15 @@ export function prioritizePeers(
const connectedPeerCount = connectedPeers.length;
if (connectedPeerCount < targetPeers) {
// Need more peers,
peersToConnect = targetPeers - connectedPeerCount;
// Need more peers.
// Instead of attempting to connect the exact amount necessary this will overshoot a little since the success
// rate of outgoing connections is low, <33%. If we try to connect exactly `targetPeers - connectedPeerCount` the
// peer count will almost always be just below targetPeers triggering constant discoveries that are not necessary
peersToConnect = Math.min(
PEERS_TO_CONNECT_OVERSHOOT_FACTOR * (targetPeers - connectedPeerCount),
// Never attempt to connect more peers than maxPeers even considering a low chance of dial success
maxPeers - connectedPeerCount
);
} else if (connectedPeerCount > targetPeers) {
// Too much peers, disconnect worst
@@ -80,8 +106,9 @@ export function prioritizePeers(
}
return {
peersToDisconnect,
peersToConnect,
discv5Queries,
peersToDisconnect,
attnetQueries,
syncnetQueries,
};
}

View File

@@ -55,6 +55,17 @@ export class SubnetMap {
return subnetIds;
}
/** Return subnetIds with a `toSlot` equal greater than `currentSlot` */
getActiveTtl(currentSlot: Slot): RequestedSubnet[] {
const subnets: RequestedSubnet[] = [];
for (const [subnet, toSlot] of this.subnets.entries()) {
if (toSlot >= currentSlot) {
subnets.push({subnet, toSlot});
}
}
return subnets;
}
/** Return subnetIds with a `toSlot` less than `currentSlot`. Also deletes expired entries */
getExpired(currentSlot: Slot): number[] {
const subnetIds: number[] = [];

View File

@@ -69,10 +69,9 @@ export class AttnetsService implements IAttnetsService {
/**
* Get all active subnets for the hearbeat.
*/
getActiveSubnets(): number[] {
const currentSlot = this.chain.clock.currentSlot;
getActiveSubnets(): RequestedSubnet[] {
// Omit subscriptionsRandom, not necessary to force the network component to keep peers on that subnets
return this.committeeSubnets.getActive(currentSlot);
return this.committeeSubnets.getActiveTtl(this.chain.clock.currentSlot);
}
/**

View File

@@ -1,5 +1,6 @@
import {ForkName} from "@chainsafe/lodestar-params";
import {Slot, ValidatorIndex} from "@chainsafe/lodestar-types";
import {RequestedSubnet} from "../peers/utils";
/** Generic CommitteeSubscription for both beacon attnets subs and syncnets subs */
export type CommitteeSubscription = {
@@ -13,7 +14,7 @@ export interface ISubnetsService {
start(): void;
stop(): void;
addCommitteeSubscriptions(subscriptions: CommitteeSubscription[]): void;
getActiveSubnets(): number[];
getActiveSubnets(): RequestedSubnet[];
subscribeSubnetsToNextFork(nextFork: ForkName): void;
unsubscribeSubnetsFromPrevFork(prevFork: ForkName): void;
}

View File

@@ -7,7 +7,7 @@ import {ChainEvent, IBeaconChain} from "../../chain";
import {getActiveForks} from "../forks";
import {Eth2Gossipsub, GossipType} from "../gossip";
import {MetadataController} from "../metadata";
import {SubnetMap} from "../peers/utils";
import {RequestedSubnet, SubnetMap} from "../peers/utils";
import {CommitteeSubscription, ISubnetsService, SubnetsServiceOpts} from "./interface";
const gossipType = GossipType.sync_committee;
@@ -46,9 +46,8 @@ export class SyncnetsService implements ISubnetsService {
/**
* Get all active subnets for the hearbeat.
*/
getActiveSubnets(): number[] {
const currentSlot = this.chain.clock.currentSlot;
return this.subscriptionsCommittee.getActive(currentSlot);
getActiveSubnets(): RequestedSubnet[] {
return this.subscriptionsCommittee.getActiveTtl(this.chain.clock.currentSlot);
}
/**

View File

@@ -25,6 +25,7 @@ const opts: INetworkOptions = {
targetPeers: 1,
bootMultiaddrs: [],
localMultiaddrs: [],
discv5: null,
};
describe("network", function () {

View File

@@ -3,7 +3,7 @@ import {expect} from "chai";
import {AbortController} from "@chainsafe/abort-controller";
import PeerId from "peer-id";
import {Discv5Discovery, ENR} from "@chainsafe/discv5";
import {ENR} from "@chainsafe/discv5";
import {createIBeaconConfig} from "@chainsafe/lodestar-config";
import {config} from "@chainsafe/lodestar-config/default";
import {phase0, ssz} from "@chainsafe/lodestar-types";
@@ -22,16 +22,13 @@ import {connect, disconnect, onPeerConnect, onPeerDisconnect} from "../../utils/
import {testLogger} from "../../utils/logger";
import {CommitteeSubscription} from "../../../src/network/subnets";
import {GossipHandlers} from "../../../src/network/gossip";
import {ENRKey} from "../../../src/network/metadata";
import {memoOnce} from "../../utils/cache";
import {Multiaddr} from "multiaddr";
let port = 9000;
const multiaddr = "/ip4/127.0.0.1/tcp/0";
const opts: INetworkOptions = {
maxPeers: 1,
targetPeers: 1,
bootMultiaddrs: [],
localMultiaddrs: [],
};
describe("network", function () {
if (this.timeout() < 5000) this.timeout(5000);
this.retries(2); // This test fail sometimes, with a 5% rate.
@@ -44,10 +41,30 @@ describe("network", function () {
}
});
// eslint-disable-next-line @typescript-eslint/explicit-function-return-type
async function mockModules() {
const controller = new AbortController();
let controller: AbortController;
beforeEach(() => (controller = new AbortController()));
afterEach(() => controller.abort());
async function getOpts(peerId: PeerId): Promise<INetworkOptions> {
const bindAddrUdp = `/ip4/0.0.0.0/udp/${port++}`;
const enr = ENR.createFromPeerId(peerId);
enr.setLocationMultiaddr(new Multiaddr(bindAddrUdp));
return {
maxPeers: 1,
targetPeers: 1,
bootMultiaddrs: [],
localMultiaddrs: [],
discv5: {
enr,
bindAddr: bindAddrUdp,
bootEnrs: [],
enabled: true,
},
};
}
const getStaticData = memoOnce(() => {
const block = generateEmptySignedBlock();
const state = generateState({
finalizedCheckpoint: {
@@ -55,19 +72,25 @@ describe("network", function () {
root: ssz.phase0.BeaconBlock.hashTreeRoot(block.message),
},
});
const beaconConfig = createIBeaconConfig(config, state.genesisValidatorsRoot);
const chain = new MockBeaconChain({genesisTime: 0, chainId: 0, networkId: BigInt(0), state, config: beaconConfig});
return {block, state, config: beaconConfig};
});
// eslint-disable-next-line @typescript-eslint/explicit-function-return-type
async function createTestNode(nodeName: string) {
const {state, config} = getStaticData();
const chain = new MockBeaconChain({genesisTime: 0, chainId: 0, networkId: BigInt(0), state, config});
const db = new StubbedBeaconDb(config);
const reqRespHandlers = getReqRespHandlers({db, chain});
const gossipHandlers = {} as GossipHandlers;
const [libp2pA, libp2pB] = await Promise.all([createNode(multiaddr), createNode(multiaddr)]);
const loggerA = testLogger("A");
const loggerB = testLogger("B");
const libp2p = await createNode(multiaddr);
const logger = testLogger(nodeName);
const opts = await getOpts(libp2p.peerId);
const modules = {
config: beaconConfig,
config,
chain,
db,
reqRespHandlers,
@@ -75,30 +98,34 @@ describe("network", function () {
signal: controller.signal,
metrics: null,
};
const netA = new Network(opts, {...modules, libp2p: libp2pA, logger: loggerA});
const netB = new Network(opts, {...modules, libp2p: libp2pB, logger: loggerB});
await Promise.all([netA.start(), netB.start()]);
const network = new Network(opts, {...modules, libp2p, logger});
await network.start();
afterEachCallbacks.push(async () => {
chain.close();
controller.abort();
await Promise.all([netA.stop(), netB.stop()]);
await network.stop();
sinon.restore();
});
return {netA, netB, chain, controller};
return {network, chain};
}
// eslint-disable-next-line @typescript-eslint/explicit-function-return-type
async function createTestNodesAB() {
return Promise.all([createTestNode("A"), createTestNode("B")]);
}
it("should create a peer on connect", async function () {
const {netA, netB} = await mockModules();
const [{network: netA}, {network: netB}] = await createTestNodesAB();
await Promise.all([onPeerConnect(netA), onPeerConnect(netB), connect(netA, netB.peerId, netB.localMultiaddrs)]);
expect(Array.from(netA.getConnectionsByPeer().values()).length).to.equal(1);
expect(Array.from(netB.getConnectionsByPeer().values()).length).to.equal(1);
});
it("should delete a peer on disconnect", async function () {
const {netA, netB} = await mockModules();
const [{network: netA}, {network: netB}] = await createTestNodesAB();
const connected = Promise.all([onPeerConnect(netA), onPeerConnect(netB)]);
await connect(netA, netB.peerId, netB.localMultiaddrs);
await connected;
@@ -114,24 +141,48 @@ describe("network", function () {
expect(Array.from(netB.getConnectionsByPeer().values()).length).to.equal(0);
});
it("should connect to new peer by subnet", async function () {
// Current implementation of discv5 consumer doesn't allow to deterministically force a peer to be found
// a random find node lookup can yield no results if there are too few peers in the DHT
it.skip("should connect to new peer by subnet", async function () {
const [{network: netBootnode}, {network: netA}, {network: netB}] = await Promise.all([
createTestNode("bootnode"),
createTestNode("A"),
createTestNode("B"),
]);
if (!netBootnode.discv5) throw Error("discv5 in bootnode is not enabled");
if (!netA.discv5) throw Error("discv5 in A is not enabled");
if (!netB.discv5) throw Error("discv5 in B is not enabled");
const subscription: CommitteeSubscription = {
validatorIndex: 2000,
subnet: 10,
slot: 2000,
isAggregator: false,
};
const {netA, netB} = await mockModules();
netB.metadata.attnets[subscription.subnet] = true;
const connected = Promise.all([onPeerConnect(netA), onPeerConnect(netB)]);
const enrB = ENR.createFromPeerId(netB.peerId);
enrB.set("attnets", Buffer.from(ssz.phase0.AttestationSubnets.serialize(netB.metadata.attnets)));
enrB.setLocationMultiaddr((netB["libp2p"]._discovery.get("discv5") as Discv5Discovery).discv5.bindAddress);
enrB.setLocationMultiaddr(netB["libp2p"].multiaddrs[0]);
// let discv5 of A know enr of B
const discovery: Discv5Discovery = netA["libp2p"]._discovery.get("discv5") as Discv5Discovery;
discovery.discv5.addEnr(enrB);
// Add subnets to B ENR
netB.discv5.enr.set(ENRKey.attnets, ssz.phase0.AttestationSubnets.serialize(netB.metadata.attnets));
// A knows about bootnode
netA.discv5.addEnr(netBootnode.discv5.enr);
expect(netA.discv5.kadValues()).have.length(1, "wrong netA kad length");
// bootnode knows about B
netBootnode.discv5.addEnr(netB.discv5.enr);
// const enrB = ENR.createFromPeerId(netB.peerId);
// enrB.set(ENRKey.attnets, Buffer.from(ssz.phase0.AttestationSubnets.serialize(netB.metadata.attnets)));
// Mock findNode to immediately find enrB when attempting to find nodes
// netA.discv5.findNode = async () => {
// console.log("CALLING FIND_NODE");
// netA.discv5?.emit("discovered", enrB);
// return [enrB];
// };
netA.prepareBeaconCommitteeSubnet([subscription]);
await connected;
@@ -142,7 +193,7 @@ describe("network", function () {
});
it("Should goodbye peers on stop", async function () {
const {netA, netB, controller} = await mockModules();
const [{network: netA}, {network: netB}] = await createTestNodesAB();
const connected = Promise.all([onPeerConnect(netA), onPeerConnect(netB)]);
await connect(netA, netB.peerId, netB.localMultiaddrs);
@@ -166,7 +217,7 @@ describe("network", function () {
});
it("Should goodbye peers on stop", async function () {
const {netA, netB, controller} = await mockModules();
const [{network: netA}, {network: netB}] = await createTestNodesAB();
const connected = Promise.all([onPeerConnect(netA), onPeerConnect(netB)]);
await connect(netA, netB.peerId, netB.localMultiaddrs);
@@ -190,7 +241,7 @@ describe("network", function () {
});
it("Should subscribe to gossip core topics on demand", async () => {
const {netA} = await mockModules();
const {network: netA} = await createTestNode("A");
expect(netA.gossip.subscriptions.size).to.equal(0);
netA.subscribeGossipCoreTopics();

View File

@@ -85,9 +85,9 @@ describe("network / peers / PeerManager", function () {
attnetsService: mockSubnetsService,
syncnetsService: mockSubnetsService,
},
{targetPeers: 30, maxPeers: 50}
{targetPeers: 30, maxPeers: 50, discv5: null}
);
peerManager.start();
await peerManager.start();
return {chain, libp2p, reqResp, peerMetadata, peerManager, networkEventBus};
}

View File

@@ -38,6 +38,7 @@ describe("network / ReqResp", function () {
targetPeers: 1,
bootMultiaddrs: [],
localMultiaddrs: [],
discv5: null,
};
const state = generateState();
const beaconConfig = createIBeaconConfig(config, state.genesisValidatorsRoot);

View File

@@ -337,7 +337,7 @@ describe("executionEngine / ExecutionEngineHttp", function () {
options: {
api: {rest: {enabled: true} as RestApiOptions},
sync: {isSingleNode: true},
network: {disablePeerDiscovery: true},
network: {discv5: null},
eth1: {enabled: true, providerUrls: [jsonRpcUrl]},
executionEngine: {urls: [engineApiUrl]},
},

View File

@@ -1,11 +1,11 @@
import {expect} from "chai";
import PeerId from "peer-id";
import {phase0, altair} from "@chainsafe/lodestar-types";
import {AttSubnetQuery} from "../../../../src/network/peers/discover";
import {prioritizePeers} from "../../../../src/network/peers/utils/prioritizePeers";
import {getAttnets} from "../../../utils/network";
import {RequestedSubnet} from "../../../../src/network/peers/utils";
type Result = {peersToDisconnect: PeerId[]; peersToConnect: number; discv5Queries: AttSubnetQuery[]};
type Result = ReturnType<typeof prioritizePeers>;
describe("network / peers / priorization", () => {
const peers: PeerId[] = [];
@@ -32,7 +32,8 @@ describe("network / peers / priorization", () => {
expectedResult: {
peersToDisconnect: [],
peersToConnect: 1,
discv5Queries: [{subnetId: 3, maxPeersToDiscover: 1}],
attnetQueries: [{subnet: 3, maxPeersToDiscover: 1, toSlot: 0}],
syncnetQueries: [],
},
},
{
@@ -44,7 +45,8 @@ describe("network / peers / priorization", () => {
expectedResult: {
peersToDisconnect: [],
peersToConnect: 0,
discv5Queries: [],
attnetQueries: [],
syncnetQueries: [],
},
},
{
@@ -62,7 +64,8 @@ describe("network / peers / priorization", () => {
// Peers sorted by score, excluding with future duties
peersToDisconnect: [peers[3], peers[2], peers[1]],
peersToConnect: 0,
discv5Queries: [],
attnetQueries: [],
syncnetQueries: [],
},
},
{
@@ -84,7 +87,8 @@ describe("network / peers / priorization", () => {
// Peers sorted by score, excluding with future duties
peersToDisconnect: [peers[5], peers[3]],
peersToConnect: 0,
discv5Queries: [{subnetId: 3, maxPeersToDiscover: 2}],
attnetQueries: [{subnet: 3, maxPeersToDiscover: 2, toSlot: 0}],
syncnetQueries: [],
},
},
@@ -93,17 +97,19 @@ describe("network / peers / priorization", () => {
for (const {id, connectedPeers, activeAttnets, activeSyncnets, opts, expectedResult} of testCases) {
it(id, () => {
const result = prioritizePeers(connectedPeers, activeAttnets, activeSyncnets, opts);
const result = prioritizePeers(connectedPeers, toReqSubnet(activeAttnets), toReqSubnet(activeSyncnets), opts);
expect(cleanResult(result)).to.deep.equal(cleanResult(expectedResult));
});
}
function cleanResult(
res: Result
): {peersToDisconnect: string[]; peersToConnect: number; discv5Queries: AttSubnetQuery[]} {
function cleanResult(res: Result): Omit<Result, "peersToDisconnect"> & {peersToDisconnect: string[]} {
return {
...res,
peersToDisconnect: res.peersToDisconnect.map((peer) => peer.toB58String()),
};
}
function toReqSubnet(subnets: number[]): RequestedSubnet[] {
return subnets.map((subnet) => ({subnet, toSlot: 0}));
}
});

View File

@@ -1,49 +0,0 @@
import {expect} from "chai";
import sinon from "sinon";
import PeerId from "peer-id";
import {PeerMapDelay} from "../../../../../src/network/peers/utils";
import {before} from "mocha";
describe("network / peers / utils / PeerMapDelay", () => {
const INTERVAL = 10 * 1000;
const OFFSET = INTERVAL / 2;
const peer = new PeerId(Buffer.from("peer-1"));
peer.toB58String = () => "peer-1";
let clock: sinon.SinonFakeTimers;
before(() => (clock = sinon.useFakeTimers()));
after(() => clock.restore());
it("Should request peer immediatelly", () => {
const peerMapDelay = new PeerMapDelay(INTERVAL);
peerMapDelay.requestNow(peer);
expectNext(peerMapDelay, [peer], "Peer must be ready now");
expectNext(peerMapDelay, [], "Peer must not be ready after pollNext");
});
it("Should request peer on next INTERVAL", () => {
const peerMapDelay = new PeerMapDelay(INTERVAL);
peerMapDelay.requestAfter(peer);
expectNext(peerMapDelay, [], "Peer must not be ready now");
clock.tick(INTERVAL + 1);
expectNext(peerMapDelay, [peer], "Peer must be ready after INTERVAL");
});
it("Should request peer after OFFSET", () => {
const peerMapDelay = new PeerMapDelay(INTERVAL);
peerMapDelay.requestAfter(peer, OFFSET);
expectNext(peerMapDelay, [], "Peer must not be ready now");
clock.tick(OFFSET + 1);
expectNext(peerMapDelay, [peer], "Peer must be ready after OFFSET");
});
function expectNext(peerMapDelay: PeerMapDelay, peers: PeerId[], message?: string): void {
expect(peerMapDelay.pollNext().map(toId)).to.deep.equal(peers.map(toId), message);
}
function toId(peer: PeerId): string {
return peer.toB58String();
}
});

View File

@@ -1,29 +1,16 @@
import {ENR, Discv5Discovery} from "@chainsafe/discv5";
import Bootstrap from "libp2p-bootstrap";
import MDNS from "libp2p-mdns";
import PeerId from "peer-id";
import {Multiaddr} from "multiaddr";
import {ATTESTATION_SUBNET_COUNT, SYNC_COMMITTEE_SUBNET_COUNT} from "@chainsafe/lodestar-params";
import {Network} from "../../src/network";
import {NodejsNode} from "../../src/network/nodejs";
import {createPeerId} from "../../src/network";
import {defaultDiscv5Options} from "../../src/network/options";
import {Libp2pEvent} from "../../src/constants";
export async function createNode(
multiaddr: string,
inPeerId?: PeerId,
peerDiscovery?: (typeof Bootstrap | typeof MDNS | typeof Discv5Discovery)[]
): Promise<NodejsNode> {
export async function createNode(multiaddr: string, inPeerId?: PeerId): Promise<NodejsNode> {
const peerId = inPeerId || (await createPeerId());
const enr = ENR.createFromPeerId(peerId);
const randomPort = Math.round(Math.random() * 40000) + 1000;
const bindAddr = `/ip4/127.0.0.1/udp/${randomPort}`;
return new NodejsNode({
peerId,
addresses: {listen: [multiaddr]},
discv5: {...defaultDiscv5Options, enr, bindAddr},
peerDiscovery,
});
}

View File

@@ -1,5 +1,6 @@
import deepmerge from "deepmerge";
import tmp from "tmp";
import PeerId from "peer-id";
import {createEnr} from "@chainsafe/lodestar-cli/src/config";
import {config as minimalConfig} from "@chainsafe/lodestar-config/default";
import {createIBeaconConfig, createIChainForkConfig, IChainConfig} from "@chainsafe/lodestar-config";
@@ -14,7 +15,6 @@ import {IBeaconNodeOptions} from "../../../src/node/options";
import {defaultOptions} from "../../../src/node/options";
import {BeaconDb} from "../../../src/db";
import {testLogger} from "../logger";
import PeerId from "peer-id";
import {InteropStateOpts} from "../../../src/node/utils/interop/state";
export async function getDevBeaconNode(
@@ -59,9 +59,9 @@ export async function getDevBeaconNode(
deepmerge(
{
db: {name: tmpDir.name},
eth1: {enabled: false},
eth1: {enabled: false, providerUrls: ["http://localhost:8545"]},
metrics: {enabled: false},
network: {disablePeerDiscovery: true},
network: {discv5: null},
} as Partial<IBeaconNodeOptions>,
options
)