mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-05-02 03:02:54 -04:00
Add Lock to Processing Pending Attestations (#4833)
* add lock to pending atts * Merge branch 'master' into add-att-lock * changed to rlock * broken build * Merge refs/heads/master into add-att-lock * refactor to prevent deadlock
This commit is contained in:
@@ -24,7 +24,9 @@ var processPendingAttsPeriod = time.Duration(params.BeaconConfig().SecondsPerSlo
|
||||
func (s *Service) processPendingAttsQueue() {
|
||||
ctx := context.Background()
|
||||
runutil.RunEvery(s.ctx, processPendingAttsPeriod, func() {
|
||||
s.processPendingAtts(ctx)
|
||||
if err := s.processPendingAtts(ctx); err != nil {
|
||||
log.WithError(err).Errorf("Could not process pending attestation: %v", err)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
@@ -33,6 +35,8 @@ func (s *Service) processPendingAttsQueue() {
|
||||
// 2. Check if pending attestations can be processed when the block has arrived.
|
||||
// 3. Request block from a random peer if unable to proceed step 2.
|
||||
func (s *Service) processPendingAtts(ctx context.Context) error {
|
||||
s.pendingAttsLock.Lock()
|
||||
defer s.pendingAttsLock.Unlock()
|
||||
ctx, span := trace.StartSpan(ctx, "processPendingAtts")
|
||||
defer span.End()
|
||||
|
||||
@@ -122,9 +126,6 @@ func (s *Service) processPendingAtts(ctx context.Context) error {
|
||||
// root of the missing block. The value is the list of pending attestations
|
||||
// that voted for that block root.
|
||||
func (s *Service) savePendingAtt(att *ethpb.AggregateAttestationAndProof) {
|
||||
s.pendingAttsLock.Lock()
|
||||
defer s.pendingAttsLock.Unlock()
|
||||
|
||||
root := bytesutil.ToBytes32(att.Aggregate.Data.BeaconBlockRoot)
|
||||
|
||||
_, ok := s.blkRootToPendingAtts[root]
|
||||
@@ -141,9 +142,6 @@ func (s *Service) savePendingAtt(att *ethpb.AggregateAttestationAndProof) {
|
||||
// check specifies the pending attestation could not fall one epoch behind
|
||||
// of the current slot.
|
||||
func (s *Service) validatePendingAtts(ctx context.Context, slot uint64) {
|
||||
s.pendingAttsLock.Lock()
|
||||
defer s.pendingAttsLock.Unlock()
|
||||
|
||||
ctx, span := trace.StartSpan(ctx, "validatePendingAtts")
|
||||
defer span.End()
|
||||
|
||||
|
||||
@@ -76,7 +76,9 @@ func (s *Service) validateCommitteeIndexBeaconAttestation(ctx context.Context, p
|
||||
// Verify the block being voted is in DB. The block should have passed validation if it's in the DB.
|
||||
if !s.db.HasBlock(ctx, bytesutil.ToBytes32(att.Data.BeaconBlockRoot)) {
|
||||
// A node doesn't have the block, it'll request from peer while saving the pending attestation to a queue.
|
||||
s.pendingAttsLock.Lock()
|
||||
s.savePendingAtt(ð.AggregateAttestationAndProof{Aggregate: att})
|
||||
s.pendingAttsLock.Unlock()
|
||||
return false
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user