Compare commits

...

6 Commits

Author SHA1 Message Date
potuz
aa06ce5e0e don't panic on underflow 2025-04-25 12:36:25 -03:00
james-prysm
e5d7888e5c Merge branch 'develop' into fix_deadlines 2025-04-25 09:26:03 -05:00
potuz
4ea3699c51 make all duties deadlines to end of epoch 2025-04-24 12:31:27 -03:00
potuz
06a5a16007 review 2 2025-04-23 18:27:20 -03:00
potuz
fb2eeb5ce9 review 2025-04-23 18:16:29 -03:00
potuz
b526d99a55 Fix deadlines 2025-04-23 17:50:07 -03:00
4 changed files with 73 additions and 41 deletions

View File

@@ -0,0 +1,3 @@
### Fixed
- Fixed deadlines in update duties.

View File

@@ -72,6 +72,9 @@ func feeRecipientIsPresent(_ *types.EvaluationContext, conns ...*grpc.ClientConn
if err != nil {
return errors.Wrap(err, "failed to get chain head")
}
if chainHead.HeadEpoch == 0 {
return nil
}
req := &ethpb.ListBlocksRequest{QueryFilter: &ethpb.ListBlocksRequest_Epoch{Epoch: chainHead.HeadEpoch.Sub(1)}}
blks, err := client.ListBeaconBlocks(context.Background(), req)
if err != nil {

View File

@@ -43,7 +43,15 @@ func run(ctx context.Context, v iface.Validator) {
if err != nil {
return // Exit if context is canceled.
}
if err := v.UpdateDuties(ctx, headSlot); err != nil {
endEpoch, err := slots.EpochEnd(slots.ToEpoch(headSlot))
if err != nil {
log.WithError(err).Error("Could not get current epoch")
return
}
deadline := v.SlotDeadline(endEpoch)
dutiesCtx, cancel := context.WithDeadline(ctx, deadline)
defer cancel()
if err := v.UpdateDuties(dutiesCtx, headSlot); err != nil {
handleAssignmentError(err, headSlot)
}
eventsChan := make(chan *event.Event, 1)
@@ -77,24 +85,35 @@ func run(ctx context.Context, v iface.Validator) {
continue
}
deadline := v.SlotDeadline(slot)
slotCtx, cancel := context.WithDeadline(ctx, deadline)
endEpoch, err := slots.EpochEnd(slots.ToEpoch(slot))
if err != nil {
log.WithError(err).Error("Could not get current epoch")
continue
}
deadline := v.SlotDeadline(endEpoch)
dutiesCtx, cancel := context.WithDeadline(ctx, deadline)
var span trace.Span
slotCtx, span = prysmTrace.StartSpan(slotCtx, "validator.processSlot")
dutiesCtx, span = prysmTrace.StartSpan(dutiesCtx, "validator.processSlot.updateDuties")
span.SetAttributes(prysmTrace.Int64Attribute("slot", int64(slot))) // lint:ignore uintcast -- This conversion is OK for tracing.
log := log.WithField("slot", slot)
log.WithField("deadline", deadline).Debug("Set deadline for proposals and attestations")
// Keep trying to update assignments if they are nil or if we are past an
// epoch transition in the beacon node's state.
if err := v.UpdateDuties(slotCtx, slot); err != nil {
if err := v.UpdateDuties(dutiesCtx, slot); err != nil {
handleAssignmentError(err, slot)
cancel()
span.End()
cancel()
continue
}
span.End()
cancel()
deadline = v.SlotDeadline(slot)
slotCtx, cancel := context.WithDeadline(ctx, deadline)
slotCtx, span = prysmTrace.StartSpan(slotCtx, "validator.processSlot.performRoles")
span.SetAttributes(prysmTrace.Int64Attribute("slot", int64(slot))) // lint:ignore uintcast -- This conversion is OK for tracing.
log := log.WithField("slot", slot)
log.WithField("deadline", deadline).Debug("Set deadline for proposals and attestations")
// call push proposer settings often to account for the following edge cases:
// proposer is activated at the start of epoch and tries to propose immediately
@@ -113,11 +132,13 @@ func run(ctx context.Context, v iface.Validator) {
allRoles, err := v.RolesAt(slotCtx, slot)
if err != nil {
log.WithError(err).Error("Could not get validator roles")
cancel()
span.End()
cancel()
continue
}
performRoles(slotCtx, allRoles, v, slot, &wg, span)
span.End()
cancel()
case isHealthyAgain := <-healthTracker.HealthUpdates():
if isHealthyAgain {
headSlot, err = initializeValidatorAndGetHeadSlot(ctx, v)
@@ -125,10 +146,18 @@ func run(ctx context.Context, v iface.Validator) {
log.WithError(err).Error("Failed to re initialize validator and get head slot")
continue
}
if err := v.UpdateDuties(ctx, headSlot); err != nil {
handleAssignmentError(err, headSlot)
endEpoch, err := slots.EpochEnd(slots.ToEpoch(headSlot))
if err != nil {
log.WithError(err).Error("Could not get current epoch")
continue
}
dutiesCtx, cancel := context.WithDeadline(ctx, v.SlotDeadline(endEpoch))
if err := v.UpdateDuties(dutiesCtx, headSlot); err != nil {
handleAssignmentError(err, headSlot)
cancel()
continue
}
cancel()
}
case e := <-eventsChan:
v.ProcessEvent(ctx, e)

View File

@@ -511,27 +511,11 @@ func retrieveLatestRecord(recs []*dbCommon.AttestationRecord) *dbCommon.Attestat
return chosenRec
}
// UpdateDuties checks the slot number to determine if the validator's
// list of upcoming assignments needs to be updated. For example, at the
// beginning of a new epoch.
func (v *validator) UpdateDuties(ctx context.Context, slot primitives.Slot) error {
if !slots.IsEpochStart(slot) && v.duties != nil {
// Do nothing if not epoch start AND assignments already exist.
return nil
}
// Set deadline to end of epoch.
ss, err := slots.EpochStart(slots.ToEpoch(slot) + 1)
if err != nil {
return err
}
ctx, cancel := context.WithDeadline(ctx, v.SlotDeadline(ss))
defer cancel()
ctx, span := trace.StartSpan(ctx, "validator.UpdateDuties")
defer span.End()
// getFilteredKeys returns the list of keys that are not slashable.
func (v *validator) getFilteredKeys(ctx context.Context) ([][fieldparams.BLSPubkeyLength]byte, error) {
validatingKeys, err := v.km.FetchValidatingPublicKeys(ctx)
if err != nil {
return err
return nil, errors.Wrap(err, "could not fetch validating public keys")
}
// Filter out the slashable public keys from the duties request.
@@ -548,6 +532,23 @@ func (v *validator) UpdateDuties(ctx context.Context, slot primitives.Slot) erro
}
}
v.blacklistedPubkeysLock.RUnlock()
return filteredKeys, nil
}
// UpdateDuties checks the slot number to determine if the validator's
// list of upcoming assignments needs to be updated. For example, at the
// beginning of a new epoch.
func (v *validator) UpdateDuties(ctx context.Context, slot primitives.Slot) error {
if !slots.IsEpochStart(slot) && v.duties != nil {
// Do nothing if not epoch start AND assignments already exist.
return nil
}
ctx, span := trace.StartSpan(ctx, "validator.UpdateDuties")
defer span.End()
filteredKeys, err := v.getFilteredKeys(ctx)
if err != nil {
return errors.Wrap(err, "could not get filtered keys")
}
req := &ethpb.DutiesRequest{
Epoch: primitives.Epoch(slot / params.BeaconConfig().SlotsPerEpoch),
@@ -1148,21 +1149,17 @@ func (v *validator) checkDependentRoots(ctx context.Context, head *structs.HeadE
if err != nil {
return errors.Wrap(err, "failed to decode previous duty dependent root")
}
uintSlot, err := strconv.ParseUint(head.Slot, 10, 64)
if err != nil {
return errors.Wrap(err, "Failed to parse slot")
}
slot := primitives.Slot(uintSlot)
slot := slots.CurrentSlot(v.genesisTime)
currEpochStart, err := slots.EpochStart(slots.ToEpoch(slot))
if err != nil {
return err
}
deadline := v.SlotDeadline(slot)
slotCtx, cancel := context.WithDeadline(ctx, deadline)
// set deadline for next epoch instead of dutiesDeadline
deadline := v.SlotDeadline(currEpochStart + params.BeaconConfig().SlotsPerEpoch)
dutiesCtx, cancel := context.WithDeadline(ctx, deadline)
defer cancel()
if !bytes.Equal(prevDepedentRoot, v.duties.PrevDependentRoot) {
if err := v.UpdateDuties(slotCtx, currEpochStart); err != nil {
if err := v.UpdateDuties(dutiesCtx, currEpochStart); err != nil {
return errors.Wrap(err, "failed to update duties")
}
log.Info("Updated duties due to previous dependent root change")
@@ -1173,7 +1170,7 @@ func (v *validator) checkDependentRoots(ctx context.Context, head *structs.HeadE
return errors.Wrap(err, "failed to decode current duty dependent root")
}
if !bytes.Equal(currDepedentRoot, v.duties.CurrDependentRoot) {
if err := v.UpdateDuties(slotCtx, currEpochStart); err != nil {
if err := v.UpdateDuties(dutiesCtx, currEpochStart); err != nil {
return errors.Wrap(err, "failed to update duties")
}
log.Info("Updated duties due to current dependent root change")