diff --git a/changelog/kaloyantanev_rework-dv-selection-proofs.md b/changelog/kaloyantanev_rework-dv-selection-proofs.md new file mode 100644 index 0000000000..6294f6d1d8 --- /dev/null +++ b/changelog/kaloyantanev_rework-dv-selection-proofs.md @@ -0,0 +1,3 @@ +### Fixed + +- DV aggregations failing first slot of the epoch. diff --git a/validator/client/aggregate.go b/validator/client/aggregate.go index 9bd560f2be..cfdb38949e 100644 --- a/validator/client/aggregate.go +++ b/validator/client/aggregate.go @@ -45,16 +45,7 @@ func (v *validator) SubmitAggregateAndProof(ctx context.Context, slot primitives } var slotSig []byte - if v.distributed { - slotSig, err = v.attSelection(attSelectionKey{slot: slot, index: duty.ValidatorIndex}) - if err != nil { - log.WithError(err).Error("Could not find aggregated selection proof") - if v.emitAccountMetrics { - ValidatorAggFailVec.WithLabelValues(fmtKey).Inc() - } - return - } - } else { + if !v.distributed { // Avoid sending beacon node duplicated aggregation requests. k := validatorSubnetSubscriptionKey(slot, duty.CommitteeIndex) v.aggregatedSlotCommitteeIDCacheLock.Lock() @@ -80,6 +71,21 @@ func (v *validator) SubmitAggregateAndProof(ctx context.Context, slot primitives // https://github.com/ethereum/consensus-specs/blob/v0.9.3/specs/validator/0_beacon-chain-validator.md#broadcast-aggregate v.waitToSlotTwoThirds(ctx, slot) + // In a DV setup, selection proofs need to be agreed upon by the DV. + // Checking for selection proofs at slot 0 of the epoch will result in an error, as the call to the DV executes slower than the start of this function. + // Checking for selection proofs after 2/3 of slot in a DV setup is much faster than non-DV as it's quickly fetched from memory, + // hence it does not slow down the aggregation as a non-DV would. + if v.distributed { + slotSig, err = v.attSelection(attSelectionKey{slot: slot, index: duty.ValidatorIndex}) + if err != nil { + log.WithError(err).Error("Could not find aggregated selection proof") + if v.emitAccountMetrics { + ValidatorAggFailVec.WithLabelValues(fmtKey).Inc() + } + return + } + } + postElectra := slots.ToEpoch(slot) >= params.BeaconConfig().ElectraForkEpoch aggSelectionRequest := ðpb.AggregateSelectionRequest{ diff --git a/validator/client/validator.go b/validator/client/validator.go index fcbb5205e6..2596aa01c9 100644 --- a/validator/client/validator.go +++ b/validator/client/validator.go @@ -830,6 +830,7 @@ func (v *validator) isAggregator( err error ) if v.distributed { + // This call is blocking. It is awaitng for selection proof response from DV to be written in memory. slotSig, err = v.attSelection(attSelectionKey{slot: slot, index: validatorIndex}) if err != nil { return false, err @@ -1493,8 +1494,12 @@ func (v *validator) aggregatedSelectionProofs(ctx context.Context, duties *ethpb ctx, span := trace.StartSpan(ctx, "validator.aggregatedSelectionProofs") defer span.End() + // Lock the selection proofs until we receive response from DV. + v.attSelectionLock.Lock() + defer v.attSelectionLock.Unlock() + // Create new instance of attestation selections map. - v.newAttSelections() + v.attSelections = make(map[attSelectionKey]iface.BeaconCommitteeSelection) var req []iface.BeaconCommitteeSelection for _, duty := range duties.CurrentEpochDuties { @@ -1515,52 +1520,20 @@ func (v *validator) aggregatedSelectionProofs(ctx context.Context, duties *ethpb }) } - for _, duty := range duties.NextEpochDuties { - if duty.Status != ethpb.ValidatorStatus_ACTIVE && duty.Status != ethpb.ValidatorStatus_EXITING { - continue - } - - pk := bytesutil.ToBytes48(duty.PublicKey) - slotSig, err := v.signSlotWithSelectionProof(ctx, pk, duty.AttesterSlot) - if err != nil { - return err - } - - req = append(req, iface.BeaconCommitteeSelection{ - SelectionProof: slotSig, - Slot: duty.AttesterSlot, - ValidatorIndex: duty.ValidatorIndex, - }) - } - resp, err := v.validatorClient.AggregatedSelections(ctx, req) if err != nil { return err } // Store aggregated selection proofs in state. - v.addAttSelections(resp) - - return nil -} - -func (v *validator) addAttSelections(selections []iface.BeaconCommitteeSelection) { - v.attSelectionLock.Lock() - defer v.attSelectionLock.Unlock() - - for _, s := range selections { + for _, s := range resp { v.attSelections[attSelectionKey{ slot: s.Slot, index: s.ValidatorIndex, }] = s } -} -func (v *validator) newAttSelections() { - v.attSelectionLock.Lock() - defer v.attSelectionLock.Unlock() - - v.attSelections = make(map[attSelectionKey]iface.BeaconCommitteeSelection) + return nil } func (v *validator) attSelection(key attSelectionKey) ([]byte, error) { diff --git a/validator/client/validator_test.go b/validator/client/validator_test.go index 39121ebf23..9b4d145ec2 100644 --- a/validator/client/validator_test.go +++ b/validator/client/validator_test.go @@ -595,7 +595,7 @@ func TestUpdateDuties_Distributed(t *testing.T) { ).Return( ðpb.DomainResponse{SignatureDomain: sigDomain}, nil, /*err*/ - ).Times(2) + ) client.EXPECT().AggregatedSelections( gomock.Any(),