diff --git a/beacon-chain/sync/pending_attestations_queue.go b/beacon-chain/sync/pending_attestations_queue.go index bb30118e49..06054abab1 100644 --- a/beacon-chain/sync/pending_attestations_queue.go +++ b/beacon-chain/sync/pending_attestations_queue.go @@ -265,7 +265,7 @@ func (s *Service) processVerifiedAttestation( if key, err := generateUnaggregatedAttCacheKey(broadcastAtt); err != nil { log.WithError(err).Error("Failed to generate cache key for attestation tracking") } else { - s.setSeenUnaggregatedAtt(key) + _ = s.setSeenUnaggregatedAtt(key) } valCount, err := helpers.ActiveValidatorCount(ctx, preState, slots.ToEpoch(data.Slot)) @@ -320,7 +320,7 @@ func (s *Service) processAggregate(ctx context.Context, aggregate ethpb.SignedAg return } - s.setAggregatorIndexEpochSeen(att.GetData().Target.Epoch, aggregate.AggregateAttestationAndProof().GetAggregatorIndex()) + _ = s.setAggregatorIndexEpochSeen(att.GetData().Target.Epoch, aggregate.AggregateAttestationAndProof().GetAggregatorIndex()) if err := s.cfg.p2p.Broadcast(ctx, aggregate); err != nil { log.WithError(err).Debug("Could not broadcast aggregated attestation") diff --git a/beacon-chain/sync/validate_aggregate_proof.go b/beacon-chain/sync/validate_aggregate_proof.go index 5503e39ba7..14ec4af896 100644 --- a/beacon-chain/sync/validate_aggregate_proof.go +++ b/beacon-chain/sync/validate_aggregate_proof.go @@ -137,7 +137,9 @@ func (s *Service) validateAggregateAndProof(ctx context.Context, pid peer.ID, ms return validationRes, err } - s.setAggregatorIndexEpochSeen(data.Target.Epoch, m.AggregateAttestationAndProof().GetAggregatorIndex()) + if first := s.setAggregatorIndexEpochSeen(data.Target.Epoch, m.AggregateAttestationAndProof().GetAggregatorIndex()); !first { + return pubsub.ValidationIgnore, nil + } msg.ValidatorData = m @@ -265,13 +267,19 @@ func (s *Service) hasSeenAggregatorIndexEpoch(epoch primitives.Epoch, aggregator } // Set aggregate's aggregator index target epoch as seen. -func (s *Service) setAggregatorIndexEpochSeen(epoch primitives.Epoch, aggregatorIndex primitives.ValidatorIndex) { +// Returns true if this is the first time seeing this aggregator index and epoch. +func (s *Service) setAggregatorIndexEpochSeen(epoch primitives.Epoch, aggregatorIndex primitives.ValidatorIndex) bool { b := append(bytesutil.Bytes32(uint64(epoch)), bytesutil.Bytes32(uint64(aggregatorIndex))...) s.seenAggregatedAttestationLock.Lock() defer s.seenAggregatedAttestationLock.Unlock() + _, seen := s.seenAggregatedAttestationCache.Get(string(b)) + if seen { + return false + } s.seenAggregatedAttestationCache.Add(string(b), true) + return true } // This validates the bitfield is correct and aggregator's index in state is within the beacon committee. diff --git a/beacon-chain/sync/validate_aggregate_proof_test.go b/beacon-chain/sync/validate_aggregate_proof_test.go index 7fa4fdddea..f501efaa25 100644 --- a/beacon-chain/sync/validate_aggregate_proof_test.go +++ b/beacon-chain/sync/validate_aggregate_proof_test.go @@ -801,3 +801,27 @@ func TestValidateAggregateAndProof_RejectWhenAttEpochDoesntEqualTargetEpoch(t *t assert.NotNil(t, err) assert.Equal(t, pubsub.ValidationReject, res) } + +func Test_SetAggregatorIndexEpochSeen(t *testing.T) { + db := dbtest.SetupDB(t) + p := p2ptest.NewTestP2P(t) + + r := &Service{ + cfg: &config{ + p2p: p, + beaconDB: db, + }, + seenAggregatedAttestationCache: lruwrpr.New(10), + } + + aggIndex := primitives.ValidatorIndex(42) + epoch := primitives.Epoch(7) + + require.Equal(t, false, r.hasSeenAggregatorIndexEpoch(epoch, aggIndex)) + first := r.setAggregatorIndexEpochSeen(epoch, aggIndex) + require.Equal(t, true, first) + require.Equal(t, true, r.hasSeenAggregatorIndexEpoch(epoch, aggIndex)) + + second := r.setAggregatorIndexEpochSeen(epoch, aggIndex) + require.Equal(t, false, second) +} diff --git a/beacon-chain/sync/validate_beacon_attestation.go b/beacon-chain/sync/validate_beacon_attestation.go index 65bfcbb5bd..67953b9677 100644 --- a/beacon-chain/sync/validate_beacon_attestation.go +++ b/beacon-chain/sync/validate_beacon_attestation.go @@ -104,7 +104,8 @@ func (s *Service) validateCommitteeIndexBeaconAttestation( } if !s.slasherEnabled { - // Verify this the first attestation received for the participating validator for the slot. + // Verify this the first attestation received for the participating validator for the slot. This verification is here to return early if we've already seen this attestation. + // This verification is carried again later after all other validations to avoid TOCTOU issues. if s.hasSeenUnaggregatedAtt(attKey) { return pubsub.ValidationIgnore, nil } @@ -228,7 +229,10 @@ func (s *Service) validateCommitteeIndexBeaconAttestation( Data: eventData, }) - s.setSeenUnaggregatedAtt(attKey) + if first := s.setSeenUnaggregatedAtt(attKey); !first { + // Another concurrent validation processed the same attestation meanwhile + return pubsub.ValidationIgnore, nil + } // Attach final validated attestation to the message for further pipeline use msg.ValidatorData = attForValidation @@ -385,11 +389,16 @@ func (s *Service) hasSeenUnaggregatedAtt(key string) bool { } // Set an incoming attestation as seen for the participating validator for the slot. -func (s *Service) setSeenUnaggregatedAtt(key string) { +// Returns false if the attestation was already seen. +func (s *Service) setSeenUnaggregatedAtt(key string) bool { s.seenUnAggregatedAttestationLock.Lock() defer s.seenUnAggregatedAttestationLock.Unlock() - + _, seen := s.seenUnAggregatedAttestationCache.Get(key) + if seen { + return false + } s.seenUnAggregatedAttestationCache.Add(key, true) + return true } // hasBlockAndState returns true if the beacon node knows about a block and associated state in the diff --git a/beacon-chain/sync/validate_beacon_attestation_test.go b/beacon-chain/sync/validate_beacon_attestation_test.go index 2d4d32cfb7..45b066bea8 100644 --- a/beacon-chain/sync/validate_beacon_attestation_test.go +++ b/beacon-chain/sync/validate_beacon_attestation_test.go @@ -499,6 +499,10 @@ func TestService_setSeenUnaggregatedAtt(t *testing.T) { Data: ðpb.AttestationData{Slot: 2, CommitteeIndex: 0}, AggregationBits: bitfield.Bitlist{0b1001}, } + s3c0a0 := ðpb.Attestation{ + Data: ðpb.AttestationData{Slot: 3, CommitteeIndex: 0}, + AggregationBits: bitfield.Bitlist{0b1001}, + } t.Run("empty cache", func(t *testing.T) { key := generateKey(t, s0c0a0) @@ -506,26 +510,39 @@ func TestService_setSeenUnaggregatedAtt(t *testing.T) { }) t.Run("ok", func(t *testing.T) { key := generateKey(t, s0c0a0) - s.setSeenUnaggregatedAtt(key) + first := s.setSeenUnaggregatedAtt(key) assert.Equal(t, true, s.hasSeenUnaggregatedAtt(key)) + assert.Equal(t, true, first) + }) + t.Run("already seen", func(t *testing.T) { + key := generateKey(t, s3c0a0) + first := s.setSeenUnaggregatedAtt(key) + assert.Equal(t, true, s.hasSeenUnaggregatedAtt(key)) + assert.Equal(t, true, first) + first = s.setSeenUnaggregatedAtt(key) + assert.Equal(t, true, s.hasSeenUnaggregatedAtt(key)) + assert.Equal(t, false, first) }) t.Run("different slot", func(t *testing.T) { key1 := generateKey(t, s1c0a0) key2 := generateKey(t, s2c0a0) - s.setSeenUnaggregatedAtt(key1) + first := s.setSeenUnaggregatedAtt(key1) assert.Equal(t, false, s.hasSeenUnaggregatedAtt(key2)) + assert.Equal(t, true, first) }) t.Run("different committee index", func(t *testing.T) { key1 := generateKey(t, s0c1a0) key2 := generateKey(t, s0c2a0) - s.setSeenUnaggregatedAtt(key1) + first := s.setSeenUnaggregatedAtt(key1) assert.Equal(t, false, s.hasSeenUnaggregatedAtt(key2)) + assert.Equal(t, true, first) }) t.Run("different bit", func(t *testing.T) { key1 := generateKey(t, s0c0a1) key2 := generateKey(t, s0c0a2) - s.setSeenUnaggregatedAtt(key1) + first := s.setSeenUnaggregatedAtt(key1) assert.Equal(t, false, s.hasSeenUnaggregatedAtt(key2)) + assert.Equal(t, true, first) }) t.Run("0 bits set is considered not seen", func(t *testing.T) { a := ðpb.Attestation{AggregationBits: bitfield.Bitlist{0b1000}} @@ -576,6 +593,11 @@ func TestService_setSeenUnaggregatedAtt(t *testing.T) { CommitteeId: 0, AttesterIndex: 0, } + s3c0a0 := ðpb.SingleAttestation{ + Data: ðpb.AttestationData{Slot: 2}, + CommitteeId: 0, + AttesterIndex: 0, + } t.Run("empty cache", func(t *testing.T) { key := generateKey(t, s0c0a0) @@ -583,26 +605,39 @@ func TestService_setSeenUnaggregatedAtt(t *testing.T) { }) t.Run("ok", func(t *testing.T) { key := generateKey(t, s0c0a0) - s.setSeenUnaggregatedAtt(key) + first := s.setSeenUnaggregatedAtt(key) assert.Equal(t, true, s.hasSeenUnaggregatedAtt(key)) + assert.Equal(t, true, first) }) t.Run("different slot", func(t *testing.T) { key1 := generateKey(t, s1c0a0) key2 := generateKey(t, s2c0a0) - s.setSeenUnaggregatedAtt(key1) + first := s.setSeenUnaggregatedAtt(key1) assert.Equal(t, false, s.hasSeenUnaggregatedAtt(key2)) + assert.Equal(t, true, first) + }) + t.Run("already seen", func(t *testing.T) { + key := generateKey(t, s3c0a0) + first := s.setSeenUnaggregatedAtt(key) + assert.Equal(t, true, s.hasSeenUnaggregatedAtt(key)) + assert.Equal(t, true, first) + first = s.setSeenUnaggregatedAtt(key) + assert.Equal(t, true, s.hasSeenUnaggregatedAtt(key)) + assert.Equal(t, false, first) }) t.Run("different committee index", func(t *testing.T) { key1 := generateKey(t, s0c1a0) key2 := generateKey(t, s0c2a0) - s.setSeenUnaggregatedAtt(key1) + first := s.setSeenUnaggregatedAtt(key1) assert.Equal(t, false, s.hasSeenUnaggregatedAtt(key2)) + assert.Equal(t, true, first) }) t.Run("different attester", func(t *testing.T) { key1 := generateKey(t, s0c0a1) key2 := generateKey(t, s0c0a2) - s.setSeenUnaggregatedAtt(key1) + first := s.setSeenUnaggregatedAtt(key1) assert.Equal(t, false, s.hasSeenUnaggregatedAtt(key2)) + assert.Equal(t, true, first) }) t.Run("single attestation is considered not seen", func(t *testing.T) { a := ðpb.AttestationElectra{} diff --git a/changelog/potuz_check_twice_attseen.md b/changelog/potuz_check_twice_attseen.md new file mode 100644 index 0000000000..755bc37b5d --- /dev/null +++ b/changelog/potuz_check_twice_attseen.md @@ -0,0 +1,3 @@ +### Fixed + +- Fixed possible race when validating two attestations at the same time.