From dd9a5fba59ac5bf0308552505b44947a0a9752d7 Mon Sep 17 00:00:00 2001 From: Potuz Date: Tue, 6 May 2025 21:49:22 -0300 Subject: [PATCH] Force duties update on received blocks. (#15251) * Force duties update on received blocks. - Change the context on UpdateDuties to be passed by the calling function. - Change the context passed to UpdateDuties to not be dependent on a slot context. - Change the deadlines to be forced to be an entire epoch. - Force duties to be initialized when receiving a HeadEvent if they aren't already. - Adds a read lock on the event handling * review * Add deadlines at start and healthyagain * cancel once --- changelog/potuz_force_duties_update.md | 3 ++ validator/client/runner.go | 29 ++++++++++++-- validator/client/validator.go | 54 +++++++++++++++----------- 3 files changed, 59 insertions(+), 27 deletions(-) create mode 100644 changelog/potuz_force_duties_update.md diff --git a/changelog/potuz_force_duties_update.md b/changelog/potuz_force_duties_update.md new file mode 100644 index 0000000000..0b2e146389 --- /dev/null +++ b/changelog/potuz_force_duties_update.md @@ -0,0 +1,3 @@ +### Added + +- Force duties start on received blocks. diff --git a/validator/client/runner.go b/validator/client/runner.go index aead8e3d41..4121fd36e5 100644 --- a/validator/client/runner.go +++ b/validator/client/runner.go @@ -43,9 +43,17 @@ func run(ctx context.Context, v iface.Validator) { if err != nil { return // Exit if context is canceled. } - if err := v.UpdateDuties(ctx); err != nil { + ss, err := slots.EpochStart(slots.ToEpoch(headSlot + 1)) + if err != nil { + log.WithError(err).Error("Failed to get epoch start") + ss = headSlot + } + startDeadline := v.SlotDeadline(ss + params.BeaconConfig().SlotsPerEpoch - 1) + startCtx, startCancel := context.WithDeadline(ctx, startDeadline) + if err := v.UpdateDuties(startCtx); err != nil { handleAssignmentError(err, headSlot) } + startCancel() eventsChan := make(chan *event.Event, 1) healthTracker := v.HealthTracker() runHealthCheckRoutine(ctx, v, eventsChan) @@ -90,12 +98,16 @@ func run(ctx context.Context, v iface.Validator) { // Keep trying to update assignments if they are nil or if we are past an // epoch transition in the beacon node's state. if slots.IsEpochStart(slot) { - if err := v.UpdateDuties(slotCtx); err != nil { + deadline = v.SlotDeadline(slot + params.BeaconConfig().SlotsPerEpoch - 1) + dutiesCtx, dutiesCancel := context.WithDeadline(ctx, deadline) + if err := v.UpdateDuties(dutiesCtx); err != nil { handleAssignmentError(err, slot) + dutiesCancel() span.End() cancel() continue } + dutiesCancel() } // call push proposer settings often to account for the following edge cases: @@ -127,10 +139,19 @@ func run(ctx context.Context, v iface.Validator) { log.WithError(err).Error("Failed to re initialize validator and get head slot") continue } - if err := v.UpdateDuties(ctx); err != nil { - handleAssignmentError(err, headSlot) + ss, err := slots.EpochStart(slots.ToEpoch(headSlot + 1)) + if err != nil { + log.WithError(err).Error("Failed to get epoch start") continue } + deadline := v.SlotDeadline(ss + params.BeaconConfig().SlotsPerEpoch - 1) + dutiesCtx, dutiesCancel := context.WithDeadline(ctx, deadline) + if err := v.UpdateDuties(dutiesCtx); err != nil { + handleAssignmentError(err, headSlot) + dutiesCancel() + continue + } + dutiesCancel() } case e := <-eventsChan: v.ProcessEvent(ctx, e) diff --git a/validator/client/validator.go b/validator/client/validator.go index a46b1470ea..bc52076944 100644 --- a/validator/client/validator.go +++ b/validator/client/validator.go @@ -515,15 +515,6 @@ func retrieveLatestRecord(recs []*dbCommon.AttestationRecord) *dbCommon.Attestat // list of upcoming assignments needs to be updated. For example, at the // beginning of a new epoch. func (v *validator) UpdateDuties(ctx context.Context) error { - // Set deadline to end of epoch. - epoch := slots.ToEpoch(slots.CurrentSlot(v.genesisTime) + 1) - - ss, err := slots.EpochStart(epoch + 1) - if err != nil { - return err - } - ctx, cancel := context.WithDeadline(ctx, v.SlotDeadline(ss)) - defer cancel() ctx, span := trace.StartSpan(ctx, "validator.UpdateDuties") defer span.End() @@ -546,7 +537,7 @@ func (v *validator) UpdateDuties(ctx context.Context) error { } } v.blacklistedPubkeysLock.RUnlock() - + epoch := slots.ToEpoch(slots.CurrentSlot(v.genesisTime) + 1) req := ðpb.DutiesRequest{ Epoch: epoch, PublicKeys: bytesutil.FromBytes48Array(filteredKeys), @@ -562,9 +553,13 @@ func (v *validator) UpdateDuties(ctx context.Context) error { return err } + ss, err := slots.EpochStart(epoch) + if err != nil { + return err + } v.dutiesLock.Lock() v.duties = resp - v.logDuties(ss-params.BeaconConfig().SlotsPerEpoch, v.duties.CurrentEpochDuties, v.duties.NextEpochDuties) + v.logDuties(ss, v.duties.CurrentEpochDuties, v.duties.NextEpochDuties) v.dutiesLock.Unlock() allExitedCounter := 0 @@ -1146,18 +1141,28 @@ func (v *validator) checkDependentRoots(ctx context.Context, head *structs.HeadE if head == nil { return errors.New("received empty head event") } - if v.duties == nil { - return errors.New("duties are not initialized") - } - prevDepedentRoot, err := bytesutil.DecodeHexWithLength(head.PreviousDutyDependentRoot, fieldparams.RootLength) + prevDependentRoot, err := bytesutil.DecodeHexWithLength(head.PreviousDutyDependentRoot, fieldparams.RootLength) if err != nil { return errors.Wrap(err, "failed to decode previous duty dependent root") } - if bytes.Equal(prevDepedentRoot, params.BeaconConfig().ZeroHash[:]) { + if bytes.Equal(prevDependentRoot, params.BeaconConfig().ZeroHash[:]) { return nil } - if !bytes.Equal(prevDepedentRoot, v.duties.PrevDependentRoot) { - if err := v.UpdateDuties(ctx); err != nil { + epoch := slots.ToEpoch(slots.CurrentSlot(v.genesisTime) + 1) + ss, err := slots.EpochStart(epoch + 1) + if err != nil { + return errors.Wrap(err, "failed to get epoch start") + } + deadline := v.SlotDeadline(ss - 1) + dutiesCtx, cancel := context.WithDeadline(ctx, deadline) + defer cancel() + v.dutiesLock.RLock() + needsPrevDependentRootUpdate := v.duties == nil || !bytes.Equal(prevDependentRoot, v.duties.PrevDependentRoot) + v.dutiesLock.RUnlock() + if needsPrevDependentRootUpdate { + // There's an edge case when the initial duties are not set yet + // This routine will lock and recompute them right after the initial duties finishes. + if err := v.UpdateDuties(dutiesCtx); err != nil { return errors.Wrap(err, "failed to update duties") } log.Info("Updated duties due to previous dependent root change") @@ -1170,13 +1175,16 @@ func (v *validator) checkDependentRoots(ctx context.Context, head *structs.HeadE if bytes.Equal(currDepedentRoot, params.BeaconConfig().ZeroHash[:]) { return nil } - if !bytes.Equal(currDepedentRoot, v.duties.CurrDependentRoot) { - if err := v.UpdateDuties(ctx); err != nil { - return errors.Wrap(err, "failed to update duties") - } - log.Info("Updated duties due to current dependent root change") + v.dutiesLock.RLock() + needsCurrDependentRootUpdate := v.duties == nil || !bytes.Equal(currDepedentRoot, v.duties.CurrDependentRoot) + v.dutiesLock.RUnlock() + if !needsCurrDependentRootUpdate { return nil } + if err := v.UpdateDuties(dutiesCtx); err != nil { + return errors.Wrap(err, "failed to update duties") + } + log.Info("Updated duties due to current dependent root change") return nil }