Force duties update on received blocks. (#15251)

* Force duties update on received blocks.

- Change the context on UpdateDuties to be passed by the calling
  function.
- Change the context passed to UpdateDuties to not be dependent on a
  slot context.
- Change the deadlines to be forced to be an entire epoch.
- Force duties to be initialized when receiving a HeadEvent if they
  aren't already.
- Adds a read lock on the event handling

* review

* Add deadlines at start and healthyagain

* cancel once
This commit is contained in:
Potuz
2025-05-06 21:49:22 -03:00
committed by GitHub
parent 7da7019a20
commit dd9a5fba59
3 changed files with 59 additions and 27 deletions

View File

@@ -0,0 +1,3 @@
### Added
- Force duties start on received blocks.

View File

@@ -43,9 +43,17 @@ func run(ctx context.Context, v iface.Validator) {
if err != nil {
return // Exit if context is canceled.
}
if err := v.UpdateDuties(ctx); err != nil {
ss, err := slots.EpochStart(slots.ToEpoch(headSlot + 1))
if err != nil {
log.WithError(err).Error("Failed to get epoch start")
ss = headSlot
}
startDeadline := v.SlotDeadline(ss + params.BeaconConfig().SlotsPerEpoch - 1)
startCtx, startCancel := context.WithDeadline(ctx, startDeadline)
if err := v.UpdateDuties(startCtx); err != nil {
handleAssignmentError(err, headSlot)
}
startCancel()
eventsChan := make(chan *event.Event, 1)
healthTracker := v.HealthTracker()
runHealthCheckRoutine(ctx, v, eventsChan)
@@ -90,12 +98,16 @@ func run(ctx context.Context, v iface.Validator) {
// Keep trying to update assignments if they are nil or if we are past an
// epoch transition in the beacon node's state.
if slots.IsEpochStart(slot) {
if err := v.UpdateDuties(slotCtx); err != nil {
deadline = v.SlotDeadline(slot + params.BeaconConfig().SlotsPerEpoch - 1)
dutiesCtx, dutiesCancel := context.WithDeadline(ctx, deadline)
if err := v.UpdateDuties(dutiesCtx); err != nil {
handleAssignmentError(err, slot)
dutiesCancel()
span.End()
cancel()
continue
}
dutiesCancel()
}
// call push proposer settings often to account for the following edge cases:
@@ -127,10 +139,19 @@ func run(ctx context.Context, v iface.Validator) {
log.WithError(err).Error("Failed to re initialize validator and get head slot")
continue
}
if err := v.UpdateDuties(ctx); err != nil {
handleAssignmentError(err, headSlot)
ss, err := slots.EpochStart(slots.ToEpoch(headSlot + 1))
if err != nil {
log.WithError(err).Error("Failed to get epoch start")
continue
}
deadline := v.SlotDeadline(ss + params.BeaconConfig().SlotsPerEpoch - 1)
dutiesCtx, dutiesCancel := context.WithDeadline(ctx, deadline)
if err := v.UpdateDuties(dutiesCtx); err != nil {
handleAssignmentError(err, headSlot)
dutiesCancel()
continue
}
dutiesCancel()
}
case e := <-eventsChan:
v.ProcessEvent(ctx, e)

View File

@@ -515,15 +515,6 @@ func retrieveLatestRecord(recs []*dbCommon.AttestationRecord) *dbCommon.Attestat
// list of upcoming assignments needs to be updated. For example, at the
// beginning of a new epoch.
func (v *validator) UpdateDuties(ctx context.Context) error {
// Set deadline to end of epoch.
epoch := slots.ToEpoch(slots.CurrentSlot(v.genesisTime) + 1)
ss, err := slots.EpochStart(epoch + 1)
if err != nil {
return err
}
ctx, cancel := context.WithDeadline(ctx, v.SlotDeadline(ss))
defer cancel()
ctx, span := trace.StartSpan(ctx, "validator.UpdateDuties")
defer span.End()
@@ -546,7 +537,7 @@ func (v *validator) UpdateDuties(ctx context.Context) error {
}
}
v.blacklistedPubkeysLock.RUnlock()
epoch := slots.ToEpoch(slots.CurrentSlot(v.genesisTime) + 1)
req := &ethpb.DutiesRequest{
Epoch: epoch,
PublicKeys: bytesutil.FromBytes48Array(filteredKeys),
@@ -562,9 +553,13 @@ func (v *validator) UpdateDuties(ctx context.Context) error {
return err
}
ss, err := slots.EpochStart(epoch)
if err != nil {
return err
}
v.dutiesLock.Lock()
v.duties = resp
v.logDuties(ss-params.BeaconConfig().SlotsPerEpoch, v.duties.CurrentEpochDuties, v.duties.NextEpochDuties)
v.logDuties(ss, v.duties.CurrentEpochDuties, v.duties.NextEpochDuties)
v.dutiesLock.Unlock()
allExitedCounter := 0
@@ -1146,18 +1141,28 @@ func (v *validator) checkDependentRoots(ctx context.Context, head *structs.HeadE
if head == nil {
return errors.New("received empty head event")
}
if v.duties == nil {
return errors.New("duties are not initialized")
}
prevDepedentRoot, err := bytesutil.DecodeHexWithLength(head.PreviousDutyDependentRoot, fieldparams.RootLength)
prevDependentRoot, err := bytesutil.DecodeHexWithLength(head.PreviousDutyDependentRoot, fieldparams.RootLength)
if err != nil {
return errors.Wrap(err, "failed to decode previous duty dependent root")
}
if bytes.Equal(prevDepedentRoot, params.BeaconConfig().ZeroHash[:]) {
if bytes.Equal(prevDependentRoot, params.BeaconConfig().ZeroHash[:]) {
return nil
}
if !bytes.Equal(prevDepedentRoot, v.duties.PrevDependentRoot) {
if err := v.UpdateDuties(ctx); err != nil {
epoch := slots.ToEpoch(slots.CurrentSlot(v.genesisTime) + 1)
ss, err := slots.EpochStart(epoch + 1)
if err != nil {
return errors.Wrap(err, "failed to get epoch start")
}
deadline := v.SlotDeadline(ss - 1)
dutiesCtx, cancel := context.WithDeadline(ctx, deadline)
defer cancel()
v.dutiesLock.RLock()
needsPrevDependentRootUpdate := v.duties == nil || !bytes.Equal(prevDependentRoot, v.duties.PrevDependentRoot)
v.dutiesLock.RUnlock()
if needsPrevDependentRootUpdate {
// There's an edge case when the initial duties are not set yet
// This routine will lock and recompute them right after the initial duties finishes.
if err := v.UpdateDuties(dutiesCtx); err != nil {
return errors.Wrap(err, "failed to update duties")
}
log.Info("Updated duties due to previous dependent root change")
@@ -1170,15 +1175,18 @@ func (v *validator) checkDependentRoots(ctx context.Context, head *structs.HeadE
if bytes.Equal(currDepedentRoot, params.BeaconConfig().ZeroHash[:]) {
return nil
}
if !bytes.Equal(currDepedentRoot, v.duties.CurrDependentRoot) {
if err := v.UpdateDuties(ctx); err != nil {
v.dutiesLock.RLock()
needsCurrDependentRootUpdate := v.duties == nil || !bytes.Equal(currDepedentRoot, v.duties.CurrDependentRoot)
v.dutiesLock.RUnlock()
if !needsCurrDependentRootUpdate {
return nil
}
if err := v.UpdateDuties(dutiesCtx); err != nil {
return errors.Wrap(err, "failed to update duties")
}
log.Info("Updated duties due to current dependent root change")
return nil
}
return nil
}
func (v *validator) ProcessEvent(ctx context.Context, event *eventClient.Event) {
if event == nil || event.Data == nil {