Remove slot from UpdateDuties (#15223)

* Remove slot from `UpdateDuties`

* Fix slot to epoch conversion

* Add gossip clock disparity

* do not use disparity

* check for nil duties

---------

Co-authored-by: terence tsao <terence@prysmaticlabs.com>
Co-authored-by: james-prysm <90280386+james-prysm@users.noreply.github.com>
This commit is contained in:
Potuz
2025-04-29 15:25:09 -03:00
committed by GitHub
parent b69c71d65a
commit 5e3a5b877a
7 changed files with 41 additions and 43 deletions

View File

@@ -43,7 +43,7 @@ type Validator interface {
NextSlot() <-chan primitives.Slot
SlotDeadline(slot primitives.Slot) time.Time
LogValidatorGainsAndLosses(ctx context.Context, slot primitives.Slot) error
UpdateDuties(ctx context.Context, slot primitives.Slot) error
UpdateDuties(ctx context.Context) error
RolesAt(ctx context.Context, slot primitives.Slot) (map[[fieldparams.BLSPubkeyLength]byte][]ValidatorRole, error) // validator pubKey -> roles
SubmitAttestation(ctx context.Context, slot primitives.Slot, pubKey [fieldparams.BLSPubkeyLength]byte)
ProposeBlock(ctx context.Context, slot primitives.Slot, pubKey [fieldparams.BLSPubkeyLength]byte)

View File

@@ -43,7 +43,7 @@ func run(ctx context.Context, v iface.Validator) {
if err != nil {
return // Exit if context is canceled.
}
if err := v.UpdateDuties(ctx, headSlot); err != nil {
if err := v.UpdateDuties(ctx); err != nil {
handleAssignmentError(err, headSlot)
}
eventsChan := make(chan *event.Event, 1)
@@ -89,11 +89,13 @@ func run(ctx context.Context, v iface.Validator) {
// Keep trying to update assignments if they are nil or if we are past an
// epoch transition in the beacon node's state.
if err := v.UpdateDuties(slotCtx, slot); err != nil {
handleAssignmentError(err, slot)
span.End()
cancel()
continue
if slots.IsEpochStart(slot) {
if err := v.UpdateDuties(slotCtx); err != nil {
handleAssignmentError(err, slot)
span.End()
cancel()
continue
}
}
// call push proposer settings often to account for the following edge cases:
@@ -125,7 +127,7 @@ func run(ctx context.Context, v iface.Validator) {
log.WithError(err).Error("Failed to re initialize validator and get head slot")
continue
}
if err := v.UpdateDuties(ctx, headSlot); err != nil {
if err := v.UpdateDuties(ctx); err != nil {
handleAssignmentError(err, headSlot)
continue
}

View File

@@ -116,7 +116,6 @@ func TestUpdateDuties_NextSlot(t *testing.T) {
run(ctx, v)
require.Equal(t, true, v.UpdateDutiesCalled, "Expected UpdateAssignments(%d) to be called", slot)
assert.Equal(t, uint64(slot), v.UpdateDutiesArg1, "UpdateAssignments was called with wrong argument")
}
func TestUpdateDuties_HandlesError(t *testing.T) {

View File

@@ -137,9 +137,8 @@ func (fv *FakeValidator) NextSlot() <-chan primitives.Slot {
}
// UpdateDuties for mocking.
func (fv *FakeValidator) UpdateDuties(_ context.Context, slot primitives.Slot) error {
func (fv *FakeValidator) UpdateDuties(_ context.Context) error {
fv.UpdateDutiesCalled = true
fv.UpdateDutiesArg1 = uint64(slot)
return fv.UpdateDutiesRet
}

View File

@@ -514,13 +514,11 @@ func retrieveLatestRecord(recs []*dbCommon.AttestationRecord) *dbCommon.Attestat
// UpdateDuties checks the slot number to determine if the validator's
// list of upcoming assignments needs to be updated. For example, at the
// beginning of a new epoch.
func (v *validator) UpdateDuties(ctx context.Context, slot primitives.Slot) error {
if !slots.IsEpochStart(slot) && v.duties != nil {
// Do nothing if not epoch start AND assignments already exist.
return nil
}
func (v *validator) UpdateDuties(ctx context.Context) error {
// Set deadline to end of epoch.
ss, err := slots.EpochStart(primitives.Epoch(slots.CurrentSlot(v.genesisTime) + 1))
epoch := slots.ToEpoch(slots.CurrentSlot(v.genesisTime) + 1)
ss, err := slots.EpochStart(epoch + 1)
if err != nil {
return err
}
@@ -550,13 +548,13 @@ func (v *validator) UpdateDuties(ctx context.Context, slot primitives.Slot) erro
v.blacklistedPubkeysLock.RUnlock()
req := &ethpb.DutiesRequest{
Epoch: primitives.Epoch(slot / params.BeaconConfig().SlotsPerEpoch),
Epoch: epoch,
PublicKeys: bytesutil.FromBytes48Array(filteredKeys),
}
// If duties is nil it means we have had no prior duties and just started up.
resp, err := v.validatorClient.Duties(ctx, req)
if err != nil {
if err != nil || resp == nil {
v.dutiesLock.Lock()
v.duties = nil // Clear assignments so we know to retry the request.
v.dutiesLock.Unlock()
@@ -566,7 +564,7 @@ func (v *validator) UpdateDuties(ctx context.Context, slot primitives.Slot) erro
v.dutiesLock.Lock()
v.duties = resp
v.logDuties(slot, v.duties.CurrentEpochDuties, v.duties.NextEpochDuties)
v.logDuties(ss-params.BeaconConfig().SlotsPerEpoch, v.duties.CurrentEpochDuties, v.duties.NextEpochDuties)
v.dutiesLock.Unlock()
allExitedCounter := 0
@@ -689,6 +687,10 @@ func (v *validator) RolesAt(ctx context.Context, slot primitives.Slot) (map[[fie
v.dutiesLock.RLock()
defer v.dutiesLock.RUnlock()
if v.duties == nil {
return nil, errors.New("validator duties are not initialized")
}
var (
rolesAt = make(map[[fieldparams.BLSPubkeyLength]byte][]iface.ValidatorRole)
@@ -1151,18 +1153,8 @@ func (v *validator) checkDependentRoots(ctx context.Context, head *structs.HeadE
if bytes.Equal(prevDepedentRoot, params.BeaconConfig().ZeroHash[:]) {
return nil
}
uintSlot, err := strconv.ParseUint(head.Slot, 10, 64)
if err != nil {
return errors.Wrap(err, "Failed to parse slot")
}
slot := primitives.Slot(uintSlot)
currEpochStart, err := slots.EpochStart(slots.ToEpoch(slot))
if err != nil {
return err
}
if !bytes.Equal(prevDepedentRoot, v.duties.PrevDependentRoot) {
if err := v.UpdateDuties(ctx, currEpochStart); err != nil {
if err := v.UpdateDuties(ctx); err != nil {
return errors.Wrap(err, "failed to update duties")
}
log.Info("Updated duties due to previous dependent root change")
@@ -1176,7 +1168,7 @@ func (v *validator) checkDependentRoots(ctx context.Context, head *structs.HeadE
return nil
}
if !bytes.Equal(currDepedentRoot, v.duties.CurrDependentRoot) {
if err := v.UpdateDuties(ctx, currEpochStart); err != nil {
if err := v.UpdateDuties(ctx); err != nil {
return errors.Wrap(err, "failed to update duties")
}
log.Info("Updated duties due to current dependent root change")

View File

@@ -392,8 +392,8 @@ func TestUpdateDuties_DoesNothingWhenNotEpochStart_AlreadyExistingAssignments(t
defer ctrl.Finish()
client := validatormock.NewMockValidatorClient(ctrl)
slot := primitives.Slot(1)
v := validator{
km: newMockKeymanager(t, randKeypair(t)),
validatorClient: client,
duties: &ethpb.ValidatorDutiesContainer{
CurrentEpochDuties: []*ethpb.ValidatorDuty{
@@ -402,14 +402,20 @@ func TestUpdateDuties_DoesNothingWhenNotEpochStart_AlreadyExistingAssignments(t
CommitteeIndex: 20,
},
},
NextEpochDuties: []*ethpb.ValidatorDuty{
{
AttesterSlot: 10,
CommitteeIndex: 20,
},
},
},
}
client.EXPECT().Duties(
gomock.Any(),
gomock.Any(),
).Times(0)
).Times(1)
assert.NoError(t, v.UpdateDuties(context.Background(), slot), "Could not update assignments")
assert.NoError(t, v.UpdateDuties(context.Background()), "Could not update assignments")
}
func TestUpdateDuties_ReturnsError(t *testing.T) {
@@ -436,7 +442,7 @@ func TestUpdateDuties_ReturnsError(t *testing.T) {
gomock.Any(),
).Return(nil, expected)
assert.ErrorContains(t, expected.Error(), v.UpdateDuties(context.Background(), params.BeaconConfig().SlotsPerEpoch))
assert.ErrorContains(t, expected.Error(), v.UpdateDuties(context.Background()))
assert.Equal(t, (*ethpb.ValidatorDutiesContainer)(nil), v.duties, "Assignments should have been cleared on failure")
}
@@ -445,7 +451,6 @@ func TestUpdateDuties_OK(t *testing.T) {
defer ctrl.Finish()
client := validatormock.NewMockValidatorClient(ctrl)
slot := params.BeaconConfig().SlotsPerEpoch
resp := &ethpb.ValidatorDutiesContainer{
CurrentEpochDuties: []*ethpb.ValidatorDuty{
{
@@ -479,7 +484,7 @@ func TestUpdateDuties_OK(t *testing.T) {
return nil, nil
})
require.NoError(t, v.UpdateDuties(context.Background(), slot), "Could not update assignments")
require.NoError(t, v.UpdateDuties(context.Background()), "Could not update assignments")
util.WaitTimeout(&wg, 2*time.Second)
@@ -494,7 +499,6 @@ func TestUpdateDuties_OK_FilterBlacklistedPublicKeys(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
client := validatormock.NewMockValidatorClient(ctrl)
slot := params.BeaconConfig().SlotsPerEpoch
numValidators := 10
km := genMockKeymanager(t, numValidators)
@@ -527,7 +531,7 @@ func TestUpdateDuties_OK_FilterBlacklistedPublicKeys(t *testing.T) {
return nil, nil
})
require.NoError(t, v.UpdateDuties(context.Background(), slot), "Could not update assignments")
require.NoError(t, v.UpdateDuties(context.Background()), "Could not update assignments")
util.WaitTimeout(&wg, 2*time.Second)
@@ -541,7 +545,6 @@ func TestUpdateDuties_AllValidatorsExited(t *testing.T) {
defer ctrl.Finish()
client := validatormock.NewMockValidatorClient(ctrl)
slot := params.BeaconConfig().SlotsPerEpoch
resp := &ethpb.ValidatorDutiesContainer{
CurrentEpochDuties: []*ethpb.ValidatorDuty{
{
@@ -573,7 +576,7 @@ func TestUpdateDuties_AllValidatorsExited(t *testing.T) {
gomock.Any(),
).Return(resp, nil)
err := v.UpdateDuties(context.Background(), slot)
err := v.UpdateDuties(context.Background())
require.ErrorContains(t, ErrValidatorsAllExited.Error(), err)
}
@@ -659,7 +662,7 @@ func TestUpdateDuties_Distributed(t *testing.T) {
return nil, nil
})
require.NoError(t, v.UpdateDuties(context.Background(), slot), "Could not update assignments")
require.NoError(t, v.UpdateDuties(context.Background()), "Could not update assignments")
util.WaitTimeout(&wg, 2*time.Second)
require.Equal(t, 2, len(v.attSelections))
}