From c3785e03bafca2f195c4eb9ee9f3233b147dcecb Mon Sep 17 00:00:00 2001 From: Raul Jordan Date: Tue, 11 Feb 2020 14:01:33 -0600 Subject: [PATCH] Add Lock to Processing Pending Attestations (#4833) * add lock to pending atts * Merge branch 'master' into add-att-lock * changed to rlock * broken build * Merge refs/heads/master into add-att-lock * refactor to prevent deadlock --- beacon-chain/sync/pending_attestations_queue.go | 12 +++++------- .../validate_committee_index_beacon_attestation.go | 2 ++ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/beacon-chain/sync/pending_attestations_queue.go b/beacon-chain/sync/pending_attestations_queue.go index c250015c98..b91d646dfc 100644 --- a/beacon-chain/sync/pending_attestations_queue.go +++ b/beacon-chain/sync/pending_attestations_queue.go @@ -24,7 +24,9 @@ var processPendingAttsPeriod = time.Duration(params.BeaconConfig().SecondsPerSlo func (s *Service) processPendingAttsQueue() { ctx := context.Background() runutil.RunEvery(s.ctx, processPendingAttsPeriod, func() { - s.processPendingAtts(ctx) + if err := s.processPendingAtts(ctx); err != nil { + log.WithError(err).Errorf("Could not process pending attestation: %v", err) + } }) } @@ -33,6 +35,8 @@ func (s *Service) processPendingAttsQueue() { // 2. Check if pending attestations can be processed when the block has arrived. // 3. Request block from a random peer if unable to proceed step 2. func (s *Service) processPendingAtts(ctx context.Context) error { + s.pendingAttsLock.Lock() + defer s.pendingAttsLock.Unlock() ctx, span := trace.StartSpan(ctx, "processPendingAtts") defer span.End() @@ -122,9 +126,6 @@ func (s *Service) processPendingAtts(ctx context.Context) error { // root of the missing block. The value is the list of pending attestations // that voted for that block root. func (s *Service) savePendingAtt(att *ethpb.AggregateAttestationAndProof) { - s.pendingAttsLock.Lock() - defer s.pendingAttsLock.Unlock() - root := bytesutil.ToBytes32(att.Aggregate.Data.BeaconBlockRoot) _, ok := s.blkRootToPendingAtts[root] @@ -141,9 +142,6 @@ func (s *Service) savePendingAtt(att *ethpb.AggregateAttestationAndProof) { // check specifies the pending attestation could not fall one epoch behind // of the current slot. func (s *Service) validatePendingAtts(ctx context.Context, slot uint64) { - s.pendingAttsLock.Lock() - defer s.pendingAttsLock.Unlock() - ctx, span := trace.StartSpan(ctx, "validatePendingAtts") defer span.End() diff --git a/beacon-chain/sync/validate_committee_index_beacon_attestation.go b/beacon-chain/sync/validate_committee_index_beacon_attestation.go index ecd98426b8..55d44a6b79 100644 --- a/beacon-chain/sync/validate_committee_index_beacon_attestation.go +++ b/beacon-chain/sync/validate_committee_index_beacon_attestation.go @@ -76,7 +76,9 @@ func (s *Service) validateCommitteeIndexBeaconAttestation(ctx context.Context, p // Verify the block being voted is in DB. The block should have passed validation if it's in the DB. if !s.db.HasBlock(ctx, bytesutil.ToBytes32(att.Data.BeaconBlockRoot)) { // A node doesn't have the block, it'll request from peer while saving the pending attestation to a queue. + s.pendingAttsLock.Lock() s.savePendingAtt(ð.AggregateAttestationAndProof{Aggregate: att}) + s.pendingAttsLock.Unlock() return false }