Apply gossip attestations to forkchoice (#3585)

* Apply gossip attestations to forkchoice

* Only add unaggregated attestation to forkchoice if shouldProcess

* Add forkchoice.onAttestation() benchmark

* Add hidden cli option network.passGossipAttestationsToForkchoice

* Remove single quote to run yarn:benchmark

* Cache validated attestation data roots per slot

* Fix type issue in yarn:benchmark

* Change cli flag to dontSendGossipAttestationsToForkchoice
This commit is contained in:
tuyennhv
2022-01-14 14:31:21 +07:00
committed by GitHub
parent 5d31cad457
commit 98ab4d2174
8 changed files with 249 additions and 20 deletions

View File

@@ -16,6 +16,7 @@ export interface INetworkArgs {
"network.blockCountTotalLimit": number;
"network.blockCountPeerLimit": number;
"network.rateTrackerTimeoutMs": number;
"network.dontSendGossipAttestationsToForkchoice": boolean;
}
export function parseArgs(args: INetworkArgs): IBeaconNodeOptions["network"] {
@@ -38,6 +39,7 @@ export function parseArgs(args: INetworkArgs): IBeaconNodeOptions["network"] {
blockCountTotalLimit: args["network.blockCountTotalLimit"],
blockCountPeerLimit: args["network.blockCountPeerLimit"],
rateTrackerTimeoutMs: args["network.rateTrackerTimeoutMs"],
dontSendGossipAttestationsToForkchoice: args["network.dontSendGossipAttestationsToForkchoice"],
};
}
@@ -144,4 +146,11 @@ export const options: ICliCommandOptions<INetworkArgs> = {
defaultDescription: String(defaultOptions.network.rateTrackerTimeoutMs),
group: "network",
},
"network.dontSendGossipAttestationsToForkchoice": {
hidden: true,
type: "boolean",
description: "Pass gossip attestations to forkchoice or not",
group: "network",
},
};

View File

@@ -50,6 +50,7 @@ describe("options / beaconNodeOptions", () => {
"network.blockCountTotalLimit": 1000,
"network.blockCountPeerLimit": 500,
"network.rateTrackerTimeoutMs": 60000,
"network.dontSendGossipAttestationsToForkchoice": true,
"sync.isSingleNode": true,
"sync.disableProcessAsChainSegment": true,
} as IBeaconNodeArgs;
@@ -109,6 +110,7 @@ describe("options / beaconNodeOptions", () => {
blockCountTotalLimit: 1000,
blockCountPeerLimit: 500,
rateTrackerTimeoutMs: 60000,
dontSendGossipAttestationsToForkchoice: true,
},
sync: {
isSingleNode: true,

View File

@@ -1,4 +1,4 @@
import {readonlyValues, toHexString} from "@chainsafe/ssz";
import {isTreeBacked, readonlyValues, toHexString, TreeBacked} from "@chainsafe/ssz";
import {SAFE_SLOTS_TO_UPDATE_JUSTIFIED, SLOTS_PER_HISTORICAL_ROOT} from "@chainsafe/lodestar-params";
import {Slot, ValidatorIndex, phase0, allForks, ssz, RootHex, Epoch, Root} from "@chainsafe/lodestar-types";
import {
@@ -64,7 +64,10 @@ export class ForkChoice implements IForkChoice {
private synced = false;
/** Cached head */
private head: IProtoBlock;
/**
* Only cache attestation data root hex if it's tree backed since it's available.
**/
private validatedAttestationDatas = new Set<string>();
/**
* Instantiates a Fork Choice from some existing components
*
@@ -410,16 +413,16 @@ export class ForkChoice implements IForkChoice {
const attestationData = attestation.data;
const {slot, beaconBlockRoot} = attestationData;
const blockRootHex = toHexString(beaconBlockRoot);
const epoch = attestationData.target.epoch;
const targetEpoch = attestationData.target.epoch;
if (ssz.Root.equals(beaconBlockRoot, ZERO_HASH)) {
return;
}
this.validateOnAttestation(attestation);
this.validateOnAttestation(attestation, slot, blockRootHex, targetEpoch);
if (slot < this.fcStore.currentSlot) {
for (const validatorIndex of readonlyValues(attestation.attestingIndices)) {
this.addLatestMessage(validatorIndex, epoch, blockRootHex);
this.addLatestMessage(validatorIndex, targetEpoch, blockRootHex);
}
} else {
// The spec declares:
@@ -432,7 +435,7 @@ export class ForkChoice implements IForkChoice {
slot: slot,
attestingIndices: Array.from(readonlyValues(attestation.attestingIndices)),
blockRoot: blockRootHex,
targetEpoch: epoch,
targetEpoch,
});
}
}
@@ -460,6 +463,7 @@ export class ForkChoice implements IForkChoice {
// Process any attestations that might now be eligible.
this.processAttestationQueue();
this.validatedAttestationDatas = new Set();
}
getTime(): Slot {
@@ -725,7 +729,12 @@ export class ForkChoice implements IForkChoice {
*
* https://github.com/ethereum/eth2.0-specs/blob/v0.12.1/specs/phase0/fork-choice.md#validate_on_attestation
*/
private validateOnAttestation(indexedAttestation: phase0.IndexedAttestation): void {
private validateOnAttestation(
indexedAttestation: phase0.IndexedAttestation,
slot: Slot,
blockRootHex: string,
targetEpoch: Epoch
): void {
// There is no point in processing an attestation with an empty bitfield. Reject
// it immediately.
//
@@ -740,12 +749,28 @@ export class ForkChoice implements IForkChoice {
});
}
const epochNow = computeEpochAtSlot(this.fcStore.currentSlot);
const attestationData = indexedAttestation.data;
const {target, slot, beaconBlockRoot} = attestationData;
const beaconBlockRootHex = toHexString(beaconBlockRoot);
const {epoch: targetEpoch, root: targetRoot} = target;
const targetRootHex = toHexString(targetRoot);
// Only cache attestation data root hex if it's tree backed since it's available.
if (
isTreeBacked(attestationData) &&
this.validatedAttestationDatas.has(
toHexString(((attestationData as unknown) as TreeBacked<phase0.AttestationData>).tree.root)
)
) {
return;
}
this.validateAttestationData(indexedAttestation.data, slot, blockRootHex, targetEpoch);
}
private validateAttestationData(
attestationData: phase0.AttestationData,
slot: Slot,
beaconBlockRootHex: string,
targetEpoch: Epoch
): void {
const epochNow = computeEpochAtSlot(this.fcStore.currentSlot);
const targetRootHex = toHexString(attestationData.target.root);
// Attestation must be from the current of previous epoch.
if (targetEpoch > epochNow) {
@@ -799,7 +824,7 @@ export class ForkChoice implements IForkChoice {
code: ForkChoiceErrorCode.INVALID_ATTESTATION,
err: {
code: InvalidAttestationCode.UNKNOWN_TARGET_ROOT,
root: toHexString(targetRoot),
root: targetRootHex,
},
});
}
@@ -818,7 +843,7 @@ export class ForkChoice implements IForkChoice {
code: ForkChoiceErrorCode.INVALID_ATTESTATION,
err: {
code: InvalidAttestationCode.UNKNOWN_HEAD_BLOCK,
beaconBlockRoot: toHexString(beaconBlockRoot),
beaconBlockRoot: beaconBlockRootHex,
},
});
}
@@ -827,14 +852,14 @@ export class ForkChoice implements IForkChoice {
// then all slots between the block and attestation must be skipped. Therefore if the block
// is from a prior epoch to the attestation, then the target root must be equal to the root
// of the block that is being attested to.
const expectedTargetHex = target.epoch > computeEpochAtSlot(block.slot) ? beaconBlockRootHex : block.targetRoot;
const expectedTargetHex = targetEpoch > computeEpochAtSlot(block.slot) ? beaconBlockRootHex : block.targetRoot;
if (expectedTargetHex !== targetRootHex) {
throw new ForkChoiceError({
code: ForkChoiceErrorCode.INVALID_ATTESTATION,
err: {
code: InvalidAttestationCode.INVALID_TARGET,
attestation: toHexString(targetRoot),
attestation: targetRootHex,
local: expectedTargetHex,
},
});
@@ -852,6 +877,13 @@ export class ForkChoice implements IForkChoice {
},
});
}
// Only cache attestation data root hex if it's tree backed since it's available.
if (isTreeBacked(attestationData)) {
this.validatedAttestationDatas.add(
toHexString(((attestationData as unknown) as TreeBacked<phase0.AttestationData>).tree.root)
);
}
}
/**

View File

@@ -0,0 +1,148 @@
import {config} from "@chainsafe/lodestar-config/default";
import {itBench} from "@dapplion/benchmark";
import {AttestationData, IndexedAttestation} from "@chainsafe/lodestar-types/phase0";
import {ATTESTATION_SUBNET_COUNT} from "@chainsafe/lodestar-params";
import {ssz} from "@chainsafe/lodestar-types";
import {fromHexString} from "@chainsafe/ssz";
import {ExecutionStatus, ForkChoice, IForkChoiceStore, IProtoBlock, ProtoArray} from "../../../src";
describe("ForkChoice", () => {
let forkchoice: ForkChoice;
let protoArr: ProtoArray;
const genesisSlot = 0;
const genesisEpoch = 0;
const genesisRoot = "0x0000000000000000000000000000000000000000000000000000000000000000";
const parentRoot = "0x853d08094d83f1db67159144db54ec0c882eb9715184c4bde8f4191c926a1671";
const blockRootPrefix = "0x37487efdbfbeeb82d7d35c6eb96438c4576f645b0f4c0386184592abab4b17";
const finalizedRoot = blockRootPrefix + "00";
const stateRootPrefix = "0xb021a96da54dd89dfafc0e8817e23fe708f5746e924855f49b3f978133c3ac";
const genesisStateRoot = stateRootPrefix + "00";
function initializeForkChoice(): void {
protoArr = ProtoArray.initialize({
slot: genesisSlot,
stateRoot: genesisStateRoot,
parentRoot,
blockRoot: finalizedRoot,
justifiedEpoch: genesisEpoch,
justifiedRoot: genesisRoot,
finalizedEpoch: genesisEpoch,
finalizedRoot: genesisRoot,
executionPayloadBlockHash: null,
executionStatus: ExecutionStatus.PreMerge,
} as Omit<IProtoBlock, "targetRoot">);
const fcStore: IForkChoiceStore = {
currentSlot: genesisSlot,
justifiedCheckpoint: {epoch: genesisEpoch, root: fromHexString(finalizedRoot), rootHex: finalizedRoot},
finalizedCheckpoint: {epoch: genesisEpoch, root: fromHexString(finalizedRoot), rootHex: finalizedRoot},
bestJustifiedCheckpoint: {epoch: genesisEpoch, root: fromHexString(finalizedRoot), rootHex: finalizedRoot},
};
forkchoice = new ForkChoice(config, fcStore, protoArr, []);
let parentBlockRoot = finalizedRoot;
// assume there are 64 unfinalized blocks, this number does not make a difference in term of performance
for (let i = 1; i < 64; i++) {
const blockRoot = i < 10 ? blockRootPrefix + "0" + i : blockRootPrefix + i;
const block: IProtoBlock = {
slot: genesisSlot + i,
blockRoot,
parentRoot: parentBlockRoot,
stateRoot: i < 10 ? stateRootPrefix + "0" + i : stateRootPrefix + i,
targetRoot: i < 32 ? genesisRoot : blockRootPrefix + "32",
justifiedEpoch: i < 32 ? genesisEpoch : genesisEpoch + 1,
justifiedRoot: i < 32 ? genesisRoot : blockRootPrefix + "32",
finalizedEpoch: genesisEpoch,
finalizedRoot: genesisRoot,
executionPayloadBlockHash: null,
executionStatus: ExecutionStatus.PreMerge,
};
protoArr.onBlock(block);
parentBlockRoot = blockRoot;
}
}
/**
* Committee: | ----------- 0 --------------| ... | ----------------------- i --------------------- | ------------------------63 -------------------------|
* Validator index: | 0 1 2 ... committeeLength-1 | ... | (i*committeeLengh + ) 0 1 2 ... committeeLengh-1| (63*committeeLengh +) 0 1 2 ... committeeLength - 1 |
*/
itBench({
id: "pass gossip attestations to forkchoice per slot",
beforeEach: () => {
initializeForkChoice();
// at slot 64, forkchoice receives attestations of slot 63
forkchoice.updateTime(64);
// there are 700 aggregate and proof max per slot
// as of Jan 2022
const committeeLength = 135;
// considering TARGET_AGGREGATORS_PER_COMMITTEE=16, it's not likely we have more than this number of aggregators
// connect to node per slot
const numAggregatorsConnectedToNode = 3;
const attestationDataOmitIndex: Omit<AttestationData, "index"> = {
beaconBlockRoot: fromHexString(blockRootPrefix + "63"),
slot: 63,
source: {
epoch: genesisEpoch,
root: fromHexString(finalizedRoot),
},
target: {
epoch: genesisEpoch + 1,
root: fromHexString(blockRootPrefix + "32"),
},
};
// unaggregatedAttestations: aggregator {i} for committee index {i}
const unaggregatedAttestations: IndexedAttestation[] = [];
for (let committeeIndex = 0; committeeIndex < numAggregatorsConnectedToNode; committeeIndex++) {
const attestationData: AttestationData = {
...attestationDataOmitIndex,
index: committeeIndex,
};
for (let i = 0; i < committeeLength; i++) {
const validatorIndex = committeeIndex * committeeLength + i;
unaggregatedAttestations.push({
attestingIndices: [validatorIndex],
data: attestationData,
signature: Buffer.alloc(96),
});
}
}
// aggregated attestations: each committee index has 11 aggregators in average
// 64 committee indices map to 704 aggregated attestations per slot
const aggregatedAttestations: IndexedAttestation[] = [];
const averageAggregatorsPerSlot = 11;
for (let committeeIndex = 0; committeeIndex < ATTESTATION_SUBNET_COUNT; committeeIndex++) {
const tbAttestationData = ssz.phase0.AttestationData.createTreeBackedFromStruct({
...attestationDataOmitIndex,
index: committeeIndex,
});
// cache the root
tbAttestationData.hashTreeRoot();
for (let aggregator = 0; aggregator < averageAggregatorsPerSlot; aggregator++) {
// same data, different signatures
aggregatedAttestations.push({
attestingIndices: Array.from({length: committeeLength}, (_, i) => committeeIndex * committeeLength + i),
data: tbAttestationData,
signature: Buffer.alloc(96, aggregator),
});
}
}
return [...unaggregatedAttestations, ...aggregatedAttestations];
},
fn: (allAttestationsPerSlot) => {
for (const attestation of allAttestationsPerSlot) {
forkchoice.onAttestation(attestation);
}
},
});
});

View File

@@ -29,6 +29,21 @@ import {INetwork} from "../../interface";
import {NetworkEvent} from "../../events";
import {PeerAction} from "../../peers";
/**
* Gossip handler options as part of network options
*/
export type GossipHandlerOpts = {
dontSendGossipAttestationsToForkchoice: boolean;
};
/**
* By default:
* + pass gossip attestations to forkchoice
*/
export const defaultGossipHandlerOpts = {
dontSendGossipAttestationsToForkchoice: false,
};
type ValidatorFnsModules = {
chain: IBeaconChain;
config: IBeaconConfig;
@@ -51,7 +66,7 @@ type ValidatorFnsModules = {
* the handler function scope is hard to achieve without very hacky strategies
* - Eth2.0 gossipsub protocol strictly defined a single topic for message
*/
export function getGossipHandlers(modules: ValidatorFnsModules): GossipHandlers {
export function getGossipHandlers(modules: ValidatorFnsModules, options: GossipHandlerOpts): GossipHandlers {
const {chain, config, metrics, network, logger} = modules;
return {
@@ -152,6 +167,18 @@ export function getGossipHandlers(modules: ValidatorFnsModules): GossipHandlers
indexedAttestation.attestingIndices as ValidatorIndex[],
committeeIndices
);
if (!options.dontSendGossipAttestationsToForkchoice) {
try {
chain.forkChoice.onAttestation(indexedAttestation);
} catch (e) {
logger.error(
"Error adding aggregated attestation to forkchoice",
{slot: aggregatedAttestation.data.slot},
e as Error
);
}
}
} catch (e) {
if (e instanceof AttestationError && e.action === GossipAction.REJECT) {
const archivedPath = chain.persistInvalidSszObject(
@@ -194,7 +221,15 @@ export function getGossipHandlers(modules: ValidatorFnsModules): GossipHandlers
try {
chain.attestationPool.add(attestation);
} catch (e) {
logger.error("Error adding attestation to pool", {subnet}, e as Error);
logger.error("Error adding unaggregated attestation to pool", {subnet}, e as Error);
}
if (!options.dontSendGossipAttestationsToForkchoice) {
try {
chain.forkChoice.onAttestation(indexedAttestation);
} catch (e) {
logger.error("Error adding unaggregated attestation to forkchoice", {subnet}, e as Error);
}
}
},

View File

@@ -93,7 +93,7 @@ export class Network implements INetwork {
logger,
metrics,
signal,
gossipHandlers: gossipHandlers ?? getGossipHandlers({chain, config, logger, network: this, metrics}),
gossipHandlers: gossipHandlers ?? getGossipHandlers({chain, config, logger, network: this, metrics}, opts),
eth2Context: {
activeValidatorCount: chain.getHeadState().currentShuffling.activeIndices.length,
currentSlot: this.clock.currentSlot,

View File

@@ -1,8 +1,9 @@
import {ENR, IDiscv5DiscoveryInputOptions} from "@chainsafe/discv5";
import {defaultGossipHandlerOpts, GossipHandlerOpts} from "./gossip/handlers";
import {PeerManagerOpts} from "./peers";
import {defaultRateLimiterOpts, RateLimiterOpts} from "./reqresp/response/rateLimiter";
export interface INetworkOptions extends PeerManagerOpts, RateLimiterOpts {
export interface INetworkOptions extends PeerManagerOpts, RateLimiterOpts, GossipHandlerOpts {
localMultiaddrs: string[];
bootMultiaddrs?: string[];
subscribeAllSubnets?: boolean;
@@ -25,4 +26,5 @@ export const defaultNetworkOptions: INetworkOptions = {
bootMultiaddrs: [],
discv5: defaultDiscv5Options,
...defaultRateLimiterOpts,
...defaultGossipHandlerOpts,
};

View File

@@ -3,6 +3,7 @@
"compilerOptions": {
"emitDeclarationOnly": false,
"incremental": false,
"typeRoots": ["node_modules/@types", "./packages/lodestar/types"],
"noEmit": true
}
}