feat: update vc to submit beacon committee selections once per epoch (#8669)

**Motivation**

Closes https://github.com/ChainSafe/lodestar/issues/8606

**Description**

This updates our implementation to be compliant with latest spec
https://github.com/ethereum/beacon-APIs/pull/368.

For sync committee aggregation selection (unchanged)
-  we call `submitSyncCommitteeSelections` at the start of the slot
- the timeout is still based on `CONTRIBUTION_DUE_BPS` into the slot (8
seconds)
-  we call the endpoint for all duties of this slot
-  logic has been moved to duties service


For attestation aggregation selection
- we call `submitBeaconCommitteeSelections` at the start of the epoch
for current and next epoch (2 separate calls)
- the timeout uses default which is based on `SLOT_DURATION_MS` (12
seconds)
- we only call `prepareBeaconCommitteeSubnet` once the above call either
resolved or failed, this should be fine as it's not that time sensitive
(one epoch lookahead)
- if duties are reorged, we will call `submitBeaconCommitteeSelections`
with duties of affected epoch
- logic has been moved to duties service


Previous PR https://github.com/ChainSafe/lodestar/pull/5344
This commit is contained in:
Nico Flaig
2025-12-19 16:43:28 +01:00
committed by GitHub
parent 84b481ddb5
commit 493cc12d2f
9 changed files with 312 additions and 263 deletions

View File

@@ -498,6 +498,10 @@ export type Endpoints = {
* a validator client to correctly determine if one of its validators has been selected to
* perform an aggregation duty in this slot.
*
* Validator clients running in a distributed validator cluster must query this endpoint
* at the start of an epoch for the current and lookahead (next) epochs for all validators
* that have attester duties in the current and lookahead epochs.
*
* Note that this endpoint is not implemented by the beacon node and will return a 501 error
*
* Returns an array of threshold aggregated beacon committee selection proofs
@@ -521,6 +525,9 @@ export type Endpoints = {
* a validator client to correctly determine if one of its validators has been selected to
* perform a sync committee contribution (sync aggregation) duty in this slot.
*
* Validator clients running in a distributed validator cluster must query this endpoint
* at the start of each slot for all validators that are included in the current sync committee.
*
* Note that this endpoint is not implemented by the beacon node and will return a 501 error
*
* Returns an array of threshold aggregated sync committee selection proofs

View File

@@ -1,8 +1,8 @@
import {ApiClient, routes} from "@lodestar/api";
import {ApiClient} from "@lodestar/api";
import {ChainForkConfig} from "@lodestar/config";
import {ForkName, isForkPostElectra} from "@lodestar/params";
import {computeEpochAtSlot, isAggregatorFromCommitteeLength} from "@lodestar/state-transition";
import {BLSSignature, SignedAggregateAndProof, SingleAttestation, Slot, phase0, ssz} from "@lodestar/types";
import {computeEpochAtSlot} from "@lodestar/state-transition";
import {SignedAggregateAndProof, SingleAttestation, Slot, phase0, ssz} from "@lodestar/types";
import {prettyBytes, sleep, toRootHex} from "@lodestar/utils";
import {Metrics} from "../metrics.js";
import {PubkeyHex} from "../types.js";
@@ -75,18 +75,6 @@ export class AttestationService {
}
const fork = this.config.getForkName(slot);
if (this.opts?.distributedAggregationSelection) {
// Validator in distributed cluster only has a key share, not the full private key.
// The partial selection proofs must be exchanged for combined selection proofs by
// calling submitBeaconCommitteeSelections on the distributed validator middleware client.
// This will run in parallel to other attestation tasks but must be finished before starting
// attestation aggregation as it is required to correctly determine if validator is aggregator
// and to produce a AggregateAndProof that can be threshold aggregated by the middleware client.
this.runDistributedAggregationSelectionTasks(fork, duties, slot, signal).catch((e) =>
this.logger.error("Error on attestation aggregation selection", {slot}, e)
);
}
// A validator should create and broadcast the attestation to the associated attestation subnet when either
// (a) the validator has received a valid block from the expected block proposer for the assigned slot or
// (b) ATTESTATION_DUE_BPS of the slot has transpired -- whichever comes first.
@@ -274,89 +262,4 @@ export class AttestationService {
}
}
}
/**
* Performs additional attestation aggregation tasks required if validator is part of distributed cluster
*
* 1. Exchange partial for combined selection proofs
* 2. Determine validators that should aggregate attestations
* 3. Mutate duty objects to set selection proofs for aggregators
* 4. Resubscribe validators as aggregators on beacon committee subnets
*
* See https://docs.google.com/document/d/1q9jOTPcYQa-3L8luRvQJ-M0eegtba4Nmon3dpO79TMk/mobilebasic
*/
private async runDistributedAggregationSelectionTasks(
fork: ForkName,
duties: AttDutyAndProof[],
slot: number,
signal: AbortSignal
): Promise<void> {
const partialSelections: routes.validator.BeaconCommitteeSelection[] = duties.map(
({duty, partialSelectionProof}) => ({
validatorIndex: duty.validatorIndex,
slot,
selectionProof: partialSelectionProof as BLSSignature,
})
);
this.logger.debug("Submitting partial beacon committee selection proofs", {slot, count: partialSelections.length});
const res = await Promise.race([
this.api.validator.submitBeaconCommitteeSelections({selections: partialSelections}),
// Exit attestation aggregation flow if there is no response after ATTESTATION_DUE_BPS of the slot as
// beacon node would likely not have enough time to prepare an aggregate attestation.
// Note that the aggregations flow is not explicitly exited but rather will be skipped
// due to the fact that calculation of `is_aggregator` in AttestationDutiesService is not done
// and selectionProof is set to null, meaning no validator will be considered an aggregator.
sleep(this.config.getAttestationDueMs(fork) - this.clock.msFromSlot(slot), signal),
]);
if (!res) {
throw new Error("Failed to receive combined selection proofs before ATTESTATION_DUE_BPS of the slot");
}
const combinedSelections = res.value();
this.logger.debug("Received combined beacon committee selection proofs", {slot, count: combinedSelections.length});
const beaconCommitteeSubscriptions: routes.validator.BeaconCommitteeSubscription[] = [];
for (const dutyAndProof of duties) {
const {validatorIndex, committeeIndex, committeeLength, committeesAtSlot} = dutyAndProof.duty;
const logCtxValidator = {slot, index: committeeIndex, validatorIndex};
const combinedSelection = combinedSelections.find((s) => s.validatorIndex === validatorIndex && s.slot === slot);
if (!combinedSelection) {
this.logger.warn("Did not receive combined beacon committee selection proof", logCtxValidator);
continue;
}
const isAggregator = isAggregatorFromCommitteeLength(committeeLength, combinedSelection.selectionProof);
if (isAggregator) {
// Update selection proof by mutating duty object
dutyAndProof.selectionProof = combinedSelection.selectionProof;
// Only push subnet subscriptions with `isAggregator=true` as all validators
// with duties for slot are already subscribed to subnets with `isAggregator=false`.
beaconCommitteeSubscriptions.push({
validatorIndex,
committeesAtSlot,
committeeIndex,
slot,
isAggregator,
});
this.logger.debug("Resubscribing validator as aggregator on beacon committee subnet", logCtxValidator);
}
}
// If there are any subscriptions with aggregators, push them out to the beacon node.
if (beaconCommitteeSubscriptions.length > 0) {
(await this.api.validator.prepareBeaconCommitteeSubnet({subscriptions: beaconCommitteeSubscriptions})).assertOk();
this.logger.debug("Resubscribed validators as aggregators on beacon committee subnets", {
slot,
count: beaconCommitteeSubscriptions.length,
});
}
}
}

View File

@@ -204,6 +204,17 @@ export class AttestationDutiesService {
for (const epoch of [currentEpoch, nextEpoch]) {
const epochDuties = this.dutiesByIndexByEpoch.get(epoch)?.dutiesByIndex;
if (epochDuties) {
if (this.opts?.distributedAggregationSelection) {
// Validator in distributed cluster only has a key share, not the full private key.
// The partial selection proofs must be exchanged for combined selection proofs by
// calling submitBeaconCommitteeSelections on the distributed validator middleware client.
// This is required to correctly determine if validator is aggregator and to produce
// a AggregateAndProof that can be threshold aggregated by the middleware client.
await this.runDistributedAggregationSelectionTasks(Array.from(epochDuties.values()), epoch).catch((e) =>
this.logger.error("Error on attestation aggregation selection", {epoch}, e)
);
}
for (const {duty, selectionProof} of epochDuties.values()) {
if (indexSet.has(duty.validatorIndex)) {
beaconCommitteeSubscriptions.push({
@@ -367,6 +378,12 @@ export class AttestationDutiesService {
const epochDuties = this.dutiesByIndexByEpoch.get(dutyEpoch)?.dutiesByIndex;
if (epochDuties) {
if (this.opts?.distributedAggregationSelection) {
await this.runDistributedAggregationSelectionTasks(Array.from(epochDuties.values()), dutyEpoch).catch((e) =>
this.logger.error("Error on attestation aggregation selection after duties reorg", logContext, e)
);
}
for (const {duty, selectionProof} of epochDuties.values()) {
beaconCommitteeSubscriptions.push({
validatorIndex: duty.validatorIndex,
@@ -403,8 +420,8 @@ export class AttestationDutiesService {
if (this.opts?.distributedAggregationSelection) {
// Validator in distributed cluster only has a key share, not the full private key.
// Passing a partial selection proof to `is_aggregator` would produce incorrect result.
// AttestationService will exchange partial for combined selection proofs retrieved from
// distributed validator middleware client and determine aggregators at beginning of every slot.
// Before subscribing to beacon committee subnets, aggregators are determined by exchanging
// partial for combined selection proofs retrieved from distributed validator middleware client.
return {duty, selectionProof: null, partialSelectionProof: selectionProof};
}
@@ -427,4 +444,47 @@ export class AttestationDutiesService {
}
}
}
/**
* Performs additional attestation aggregation tasks required if validator is part of distributed cluster
*
* 1. Exchange partial for combined selection proofs
* 2. Determine validators that should aggregate attestations
* 3. Mutate duty objects to set selection proofs for aggregators
*/
private async runDistributedAggregationSelectionTasks(duties: AttDutyAndProof[], epoch: Epoch): Promise<void> {
const partialSelections: routes.validator.BeaconCommitteeSelection[] = duties.map(
({duty, partialSelectionProof}) => ({
validatorIndex: duty.validatorIndex,
slot: duty.slot,
selectionProof: partialSelectionProof as BLSSignature,
})
);
this.logger.debug("Submitting partial beacon committee selection proofs", {epoch, count: partialSelections.length});
const res = await this.api.validator.submitBeaconCommitteeSelections({selections: partialSelections});
const combinedSelections = res.value();
this.logger.debug("Received combined beacon committee selection proofs", {epoch, count: combinedSelections.length});
for (const dutyAndProof of duties) {
const {slot, validatorIndex, committeeIndex, committeeLength} = dutyAndProof.duty;
const logCtxValidator = {slot, index: committeeIndex, validatorIndex};
const combinedSelection = combinedSelections.find((s) => s.validatorIndex === validatorIndex && s.slot === slot);
if (!combinedSelection) {
this.logger.warn("Did not receive combined beacon committee selection proof", logCtxValidator);
continue;
}
const isAggregator = isAggregatorFromCommitteeLength(committeeLength, combinedSelection.selectionProof);
if (isAggregator) {
// Update selection proof by mutating duty object
dutyAndProof.selectionProof = combinedSelection.selectionProof;
}
}
}
}

View File

@@ -1,8 +1,7 @@
import {ApiClient, routes} from "@lodestar/api";
import {ApiClient} from "@lodestar/api";
import {ChainForkConfig} from "@lodestar/config";
import {ForkName, isForkPostAltair} from "@lodestar/params";
import {isSyncCommitteeAggregator} from "@lodestar/state-transition";
import {BLSSignature, CommitteeIndex, Root, Slot, altair} from "@lodestar/types";
import {CommitteeIndex, Root, Slot, altair} from "@lodestar/types";
import {sleep} from "@lodestar/utils";
import {Metrics} from "../metrics.js";
import {PubkeyHex} from "../types.js";
@@ -73,18 +72,6 @@ export class SyncCommitteeService {
return;
}
if (this.opts?.distributedAggregationSelection) {
// Validator in distributed cluster only has a key share, not the full private key.
// The partial selection proofs must be exchanged for combined selection proofs by
// calling submitSyncCommitteeSelections on the distributed validator middleware client.
// This will run in parallel to other sync committee tasks but must be finished before starting
// sync committee contributions as it is required to correctly determine if validator is aggregator
// and to produce a ContributionAndProof that can be threshold aggregated by the middleware client.
this.runDistributedAggregationSelectionTasks(fork, dutiesAtSlot, slot, signal).catch((e) =>
this.logger.error("Error on sync committee aggregation selection", {slot}, e)
);
}
// unlike Attestation, SyncCommitteeSignature could be published asap
// especially with lodestar, it's very busy at ATTESTATION_DUE_BPS of the slot
// see https://github.com/ChainSafe/lodestar/issues/4608
@@ -257,82 +244,4 @@ export class SyncCommitteeService {
}
}
}
/**
* Performs additional sync committee contribution tasks required if validator is part of distributed cluster
*
* 1. Exchange partial for combined selection proofs
* 2. Determine validators that should produce sync committee contribution
* 3. Mutate duty objects to set selection proofs for aggregators
*
* See https://docs.google.com/document/d/1q9jOTPcYQa-3L8luRvQJ-M0eegtba4Nmon3dpO79TMk/mobilebasic
*/
private async runDistributedAggregationSelectionTasks(
fork: ForkName,
duties: SyncDutyAndProofs[],
slot: number,
signal: AbortSignal
): Promise<void> {
const partialSelections: routes.validator.SyncCommitteeSelection[] = [];
for (const {duty, selectionProofs} of duties) {
const validatorSelections: routes.validator.SyncCommitteeSelection[] = selectionProofs.map(
({subcommitteeIndex, partialSelectionProof}) => ({
validatorIndex: duty.validatorIndex,
slot,
subcommitteeIndex,
selectionProof: partialSelectionProof as BLSSignature,
})
);
partialSelections.push(...validatorSelections);
}
this.logger.debug("Submitting partial sync committee selection proofs", {slot, count: partialSelections.length});
const res = await Promise.race([
this.api.validator.submitSyncCommitteeSelections({selections: partialSelections}),
// Exit sync committee contributions flow if there is no response after CONTRIBUTION_DUE_BPS of the slot.
// This is in contrast to attestations aggregations flow which is already exited at ATTESTATION_DUE_BPS of the slot
// because for sync committee is not required to resubscribe to subnets as beacon node will assume
// validator always aggregates. This allows us to wait until we have to produce sync committee contributions.
// Note that the sync committee contributions flow is not explicitly exited but rather will be skipped
// due to the fact that calculation of `is_sync_committee_aggregator` in SyncCommitteeDutiesService is not done
// and selectionProof is set to null, meaning no validator will be considered an aggregator.
sleep(this.config.getSyncContributionDueMs(fork) - this.clock.msFromSlot(slot), signal),
]);
if (!res) {
throw new Error("Failed to receive combined selection proofs before CONTRIBUTION_DUE_BPS of the slot");
}
const combinedSelections = res.value();
this.logger.debug("Received combined sync committee selection proofs", {slot, count: combinedSelections.length});
for (const dutyAndProofs of duties) {
const {validatorIndex, subnets} = dutyAndProofs.duty;
for (const subnet of subnets) {
const logCtxValidator = {slot, index: subnet, validatorIndex};
const combinedSelection = combinedSelections.find(
(s) => s.validatorIndex === validatorIndex && s.slot === slot && s.subcommitteeIndex === subnet
);
if (!combinedSelection) {
this.logger.warn("Did not receive combined sync committee selection proof", logCtxValidator);
continue;
}
const isAggregator = isSyncCommitteeAggregator(combinedSelection.selectionProof);
if (isAggregator) {
const selectionProofObject = dutyAndProofs.selectionProofs.find((p) => p.subcommitteeIndex === subnet);
if (selectionProofObject) {
// Update selection proof by mutating proof objects in duty object
selectionProofObject.selectionProof = combinedSelection.selectionProof;
}
}
}
}
}
}

View File

@@ -85,7 +85,7 @@ export class SyncCommitteeDutiesService {
private readonly config: ChainForkConfig,
private readonly logger: LoggerVc,
private readonly api: ApiClient,
clock: IClock,
private readonly clock: IClock,
private readonly validatorStore: ValidatorStore,
syncingStatusTracker: SyncingStatusTracker,
metrics: Metrics | null,
@@ -134,6 +134,18 @@ export class SyncCommitteeDutiesService {
selectionProofs: await this.getSelectionProofs(slot, dutyAtPeriod.duty),
});
}
if (this.opts?.distributedAggregationSelection) {
// Validator in distributed cluster only has a key share, not the full private key.
// The partial selection proofs must be exchanged for combined selection proofs by
// calling submitSyncCommitteeSelections on the distributed validator middleware client.
// This will run in parallel to other sync committee tasks but must be finished before starting
// sync committee contributions as it is required to correctly determine if validator is aggregator
// and to produce a ContributionAndProof that can be threshold aggregated by the middleware client.
this.runDistributedAggregationSelectionTasks(duties, slot).catch((e) =>
this.logger.error("Error on sync committee aggregation selection", {slot}, e)
);
}
}
return duties;
@@ -307,8 +319,8 @@ export class SyncCommitteeDutiesService {
if (this.opts?.distributedAggregationSelection) {
// Validator in distributed cluster only has a key share, not the full private key.
// Passing a partial selection proof to `is_sync_committee_aggregator` would produce incorrect result.
// SyncCommitteeService will exchange partial for combined selection proofs retrieved from
// distributed validator middleware client and determine aggregators at beginning of every slot.
// For all duties in the slot, aggregators are determined by exchanging partial for combined selection
// proofs retrieved from distributed validator middleware client at beginning of every slot.
dutiesAndProofs.push({
selectionProof: null,
partialSelectionProof: selectionProof,
@@ -334,4 +346,70 @@ export class SyncCommitteeDutiesService {
}
}
}
/**
* Performs additional sync committee contribution tasks required if validator is part of distributed cluster
*
* 1. Exchange partial for combined selection proofs
* 2. Determine validators that should produce sync committee contribution
* 3. Mutate duty objects to set selection proofs for aggregators
*/
private async runDistributedAggregationSelectionTasks(duties: SyncDutyAndProofs[], slot: number): Promise<void> {
const partialSelections: routes.validator.SyncCommitteeSelection[] = [];
for (const {duty, selectionProofs} of duties) {
const validatorSelections: routes.validator.SyncCommitteeSelection[] = selectionProofs.map(
({subcommitteeIndex, partialSelectionProof}) => ({
validatorIndex: duty.validatorIndex,
slot,
subcommitteeIndex,
selectionProof: partialSelectionProof as BLSSignature,
})
);
partialSelections.push(...validatorSelections);
}
this.logger.debug("Submitting partial sync committee selection proofs", {slot, count: partialSelections.length});
const res = await this.api.validator.submitSyncCommitteeSelections(
{selections: partialSelections},
{
// Exit sync committee contributions flow if there is no response until CONTRIBUTION_DUE_BPS of the slot.
// Note that the sync committee contributions flow is not explicitly exited but rather will be skipped
// due to the fact that calculation of `is_sync_committee_aggregator` in SyncCommitteeDutiesService is not done
// and selectionProof is set to null, meaning no validator will be considered an aggregator.
timeoutMs: this.config.getSyncContributionDueMs(this.config.getForkName(slot)) - this.clock.msFromSlot(slot),
}
);
const combinedSelections = res.value();
this.logger.debug("Received combined sync committee selection proofs", {slot, count: combinedSelections.length});
for (const dutyAndProofs of duties) {
const {validatorIndex, subnets} = dutyAndProofs.duty;
for (const subnet of subnets) {
const logCtxValidator = {slot, index: subnet, validatorIndex};
const combinedSelection = combinedSelections.find(
(s) => s.validatorIndex === validatorIndex && s.slot === slot && s.subcommitteeIndex === subnet
);
if (!combinedSelection) {
this.logger.warn("Did not receive combined sync committee selection proof", logCtxValidator);
continue;
}
const isAggregator = isSyncCommitteeAggregator(combinedSelection.selectionProof);
if (isAggregator) {
const selectionProofObject = dutyAndProofs.selectionProofs.find((p) => p.subcommitteeIndex === subnet);
if (selectionProofObject) {
// Update selection proof by mutating proof objects in duty object
selectionProofObject.selectionProof = combinedSelection.selectionProof;
}
}
}
}
}
}

View File

@@ -1,7 +1,6 @@
import {afterEach, beforeEach, describe, expect, it, vi} from "vitest";
import {SecretKey} from "@chainsafe/blst";
import {toHexString} from "@chainsafe/ssz";
import {routes} from "@lodestar/api";
import {ChainConfig, createChainForkConfig} from "@lodestar/config";
import {config as defaultConfig} from "@lodestar/config/default";
import {ForkName} from "@lodestar/params";
@@ -63,7 +62,6 @@ describe("AttestationService", () => {
const testContexts: [string, AttestationServiceOpts, Partial<ChainConfig>][] = [
["With default configuration", {}, {}],
["With default configuration post-electra", {}, electraConfig],
["With distributed aggregation selection enabled", {distributedAggregationSelection: true}, {}],
];
for (const [title, opts, chainConfig] of testContexts) {
@@ -105,8 +103,7 @@ describe("AttestationService", () => {
validatorIndex: 0,
pubkey: pubkeys[0],
},
selectionProof: opts.distributedAggregationSelection ? null : ZERO_HASH,
partialSelectionProof: opts.distributedAggregationSelection ? ZERO_HASH : undefined,
selectionProof: ZERO_HASH,
},
];
@@ -129,16 +126,6 @@ describe("AttestationService", () => {
api.beacon.submitPoolAttestationsV2.mockResolvedValue(mockApiResponse({}));
api.validator.publishAggregateAndProofsV2.mockResolvedValue(mockApiResponse({}));
if (opts.distributedAggregationSelection) {
// Mock distributed validator middleware client selections endpoint
// and return a selection proof that passes `is_aggregator` test
api.validator.submitBeaconCommitteeSelections.mockResolvedValue(
mockApiResponse({data: [{validatorIndex: 0, slot: 0, selectionProof: Buffer.alloc(1, 0x10)}]})
);
// Accept all subscriptions
api.validator.prepareBeaconCommitteeSubnet.mockResolvedValue(mockApiResponse({}));
}
// Mock signing service
validatorStore.signAttestation.mockResolvedValue(singleAttestation);
validatorStore.signAggregateAndProof.mockResolvedValue(aggregateAndProof);
@@ -146,28 +133,6 @@ describe("AttestationService", () => {
// Trigger clock onSlot for slot 0
await clock.tickSlotFns(0, controller.signal);
if (opts.distributedAggregationSelection) {
// Must submit partial beacon committee selection proof based on duty
const selection: routes.validator.BeaconCommitteeSelection = {
validatorIndex: 0,
slot: 0,
selectionProof: ZERO_HASH,
};
expect(api.validator.submitBeaconCommitteeSelections).toHaveBeenCalledOnce();
expect(api.validator.submitBeaconCommitteeSelections).toHaveBeenCalledWith({selections: [selection]});
// Must resubscribe validator as aggregator on beacon committee subnet
const subscription: routes.validator.BeaconCommitteeSubscription = {
validatorIndex: 0,
committeeIndex: 0,
committeesAtSlot: 120,
slot: 0,
isAggregator: true,
};
expect(api.validator.prepareBeaconCommitteeSubnet).toHaveBeenCalledOnce();
expect(api.validator.prepareBeaconCommitteeSubnet).toHaveBeenCalledWith({subscriptions: [subscription]});
}
// Must submit the attestation received through produceAttestationData()
expect(api.beacon.submitPoolAttestationsV2).toHaveBeenCalledOnce();
expect(api.beacon.submitPoolAttestationsV2).toHaveBeenCalledWith({signedAttestations: [singleAttestation]});

View File

@@ -244,6 +244,87 @@ describe("AttestationDutiesService", () => {
expect(api.validator.prepareBeaconCommitteeSubnet).toHaveBeenCalledOnce();
});
it("Should fetch duties with distributed aggregation selection", async () => {
// Reply with some duties
const slot = 1;
const epoch = computeEpochAtSlot(slot);
const duty: routes.validator.AttesterDuty = {
slot: slot,
committeeIndex: 1,
committeeLength: 120,
committeesAtSlot: 120,
validatorCommitteeIndex: 1,
validatorIndex: index,
pubkey: pubkeys[0],
};
api.validator.getAttesterDuties.mockResolvedValue(
mockApiResponse({data: [duty], meta: {dependentRoot: ZERO_HASH_HEX, executionOptimistic: false}})
);
// Accept all subscriptions
api.validator.prepareBeaconCommitteeSubnet.mockResolvedValue(mockApiResponse({}));
// Mock distributed validator middleware client selections endpoint
// and return a selection proof that passes `is_aggregator` test
const aggregatorSelectionProof = Buffer.alloc(1, 0x10);
api.validator.submitBeaconCommitteeSelections.mockResolvedValue(
mockApiResponse({data: [{validatorIndex: index, slot, selectionProof: aggregatorSelectionProof}]})
);
// Clock will call runDutiesTasks() immediately
const clock = new ClockMock();
const syncingStatusTracker = new SyncingStatusTracker(loggerVc, api, clock, null);
const dutiesService = new AttestationDutiesService(
loggerVc,
api,
clock,
validatorStore,
chainHeadTracker,
syncingStatusTracker,
null,
{distributedAggregationSelection: true}
);
// Trigger clock onSlot for slot 0
await clock.tickEpochFns(0, controller.signal);
// Validator index should be persisted
expect(validatorStore.getAllLocalIndices()).toEqual([index]);
expect(validatorStore.getPubkeyOfIndex(index)).toBe(toHexString(pubkeys[0]));
// Must submit partial beacon committee selection proofs for current and next epoch
expect(api.validator.submitBeaconCommitteeSelections).toHaveBeenCalledTimes(2);
expect(api.validator.submitBeaconCommitteeSelections).toHaveBeenCalledWith({
selections: [
expect.objectContaining({
validatorIndex: index,
slot,
}),
],
});
// Duties for current epoch should be persisted with selection proof set for aggregator
const dutiesAtEpoch = dutiesService["dutiesByIndexByEpoch"].get(epoch);
expect(dutiesAtEpoch).toBeDefined();
const dutyAndProof = dutiesAtEpoch?.dutiesByIndex.get(index);
expect(dutyAndProof).toBeDefined();
expect(dutyAndProof?.duty).toEqual(duty);
// Selection proof should be set since the mocked proof passes `is_aggregator`
expect(dutyAndProof?.selectionProof).toEqual(aggregatorSelectionProof);
// Must subscribe validator as aggregator on beacon committee subnet
expect(api.validator.prepareBeaconCommitteeSubnet).toHaveBeenCalledOnce();
expect(api.validator.prepareBeaconCommitteeSubnet).toHaveBeenCalledWith({
subscriptions: expect.arrayContaining([
expect.objectContaining({
validatorIndex: index,
slot,
isAggregator: true,
}),
]),
});
});
describe("Reorg handling", () => {
const oldDependentRoot = toRootHex(Buffer.alloc(32, 1));
const newDependentRoot = toRootHex(Buffer.alloc(32, 2));

View File

@@ -340,6 +340,79 @@ describe("SyncCommitteeDutiesService", () => {
expect(api.validator.prepareSyncCommitteeSubnets).toHaveBeenCalledOnce();
});
it("Should fetch duties with distributed aggregation selection", async () => {
// Reply with some duties
const slot = 1;
const duty: routes.validator.SyncDuty = {
pubkey: pubkeys[0],
validatorIndex: indices[0],
validatorSyncCommitteeIndices: [7],
};
api.validator.getSyncCommitteeDuties.mockResolvedValue(
mockApiResponse({data: [duty], meta: {executionOptimistic: false}})
);
// Accept all subscriptions
api.validator.prepareSyncCommitteeSubnets.mockResolvedValue(mockApiResponse({}));
// Mock distributed validator middleware client selections endpoint
// and return a selection proof that passes `is_sync_committee_aggregator` test
const aggregatorSelectionProof = Buffer.alloc(1, 0x19);
api.validator.submitSyncCommitteeSelections.mockResolvedValue(
mockApiResponse({
data: [{validatorIndex: indices[0], slot, subcommitteeIndex: 0, selectionProof: aggregatorSelectionProof}],
})
);
// Clock will call runDutiesTasks() immediately
const clock = new ClockMock();
const syncingStatusTracker = new SyncingStatusTracker(loggerVc, api, clock, null);
const dutiesService = new SyncCommitteeDutiesService(
altair0Config,
loggerVc,
api,
clock,
validatorStore,
syncingStatusTracker,
null,
{distributedAggregationSelection: true}
);
// Trigger clock onSlot for slot 0 to fetch duties
await clock.tickEpochFns(0, controller.signal);
// Validator index should be persisted
expect(validatorStore.getAllLocalIndices()).toEqual(indices);
// Get duties for the slot
const duties = await dutiesService.getDutiesAtSlot(slot);
// Verify duties are returned with partial selection proofs
expect(duties.length).toBe(1);
expect(duties[0].duty.validatorIndex).toBe(indices[0]);
expect(duties[0].selectionProofs[0].partialSelectionProof).toBeDefined();
// Wait for the async DVT task to complete and verify API was called
await vi.waitFor(() => {
expect(api.validator.submitSyncCommitteeSelections).toHaveBeenCalledOnce();
});
// Must submit partial sync committee selection proof based on duty
expect(api.validator.submitSyncCommitteeSelections).toHaveBeenCalledWith(
{
selections: [
expect.objectContaining({
validatorIndex: duty.validatorIndex,
slot,
subcommitteeIndex: 0,
}),
],
},
expect.any(Object)
);
expect(duties[0].selectionProofs[0].selectionProof).toBe(aggregatorSelectionProof);
});
});
function toSyncDutySubnet(duty: routes.validator.SyncDuty): SyncDutySubnet {

View File

@@ -1,7 +1,6 @@
import {afterEach, beforeEach, describe, expect, it, vi} from "vitest";
import {SecretKey} from "@chainsafe/blst";
import {toHexString} from "@chainsafe/ssz";
import {routes} from "@lodestar/api";
import {createChainForkConfig} from "@lodestar/config";
import {config as mainnetConfig} from "@lodestar/config/default";
import {ssz} from "@lodestar/types";
@@ -64,10 +63,7 @@ describe("SyncCommitteeService", () => {
vi.resetAllMocks();
});
const testContexts: [string, SyncCommitteeServiceOpts][] = [
["With default configuration", {}],
["With distributed aggregation selection enabled", {distributedAggregationSelection: true}],
];
const testContexts: [string, SyncCommitteeServiceOpts][] = [["With default configuration", {}]];
for (const [title, opts] of testContexts) {
describe(title, () => {
@@ -99,8 +95,7 @@ describe("SyncCommitteeService", () => {
},
selectionProofs: [
{
selectionProof: opts.distributedAggregationSelection ? null : ZERO_HASH,
partialSelectionProof: opts.distributedAggregationSelection ? ZERO_HASH : undefined,
selectionProof: ZERO_HASH,
subcommitteeIndex: 0,
},
],
@@ -125,16 +120,6 @@ describe("SyncCommitteeService", () => {
api.validator.produceSyncCommitteeContribution.mockResolvedValue(mockApiResponse({data: contribution}));
api.validator.publishContributionAndProofs.mockResolvedValue(mockApiResponse({}));
if (opts.distributedAggregationSelection) {
// Mock distributed validator middleware client selections endpoint
// and return a selection proof that passes `is_sync_committee_aggregator` test
api.validator.submitSyncCommitteeSelections.mockResolvedValue(
mockApiResponse({
data: [{validatorIndex: 0, slot: 0, subcommitteeIndex: 0, selectionProof: Buffer.alloc(1, 0x19)}],
})
);
}
// Mock signing service
validatorStore.signSyncCommitteeSignature.mockResolvedValue(syncCommitteeSignature);
validatorStore.signContributionAndProof.mockResolvedValue(contributionAndProof);
@@ -142,18 +127,6 @@ describe("SyncCommitteeService", () => {
// Trigger clock onSlot for slot 0
await clock.tickSlotFns(0, controller.signal);
if (opts.distributedAggregationSelection) {
// Must submit partial sync committee selection proof based on duty
const selection: routes.validator.SyncCommitteeSelection = {
validatorIndex: 0,
slot: 0,
subcommitteeIndex: 0,
selectionProof: ZERO_HASH,
};
expect(api.validator.submitSyncCommitteeSelections).toHaveBeenCalledOnce();
expect(api.validator.submitSyncCommitteeSelections).toHaveBeenCalledWith({selections: [selection]});
}
// Must submit the signature received through signSyncCommitteeSignature()
expect(api.beacon.submitPoolSyncCommitteeSignatures).toHaveBeenCalledOnce();
expect(api.beacon.submitPoolSyncCommitteeSignatures).toHaveBeenCalledWith({