From 17b7d3ff12eab55839f0319bcd096e21fb476977 Mon Sep 17 00:00:00 2001 From: terence Date: Tue, 5 Aug 2025 09:08:47 -0700 Subject: [PATCH] Add fork choice check to pending attestations processing (#15547) --- .../sync/pending_attestations_queue.go | 2 +- .../sync/pending_attestations_queue_test.go | 84 +++++++++++++++++++ changelog/tt_check_pending_att.md | 3 + 3 files changed, 88 insertions(+), 1 deletion(-) create mode 100644 changelog/tt_check_pending_att.md diff --git a/beacon-chain/sync/pending_attestations_queue.go b/beacon-chain/sync/pending_attestations_queue.go index 01f98af44c..63abcc8576 100644 --- a/beacon-chain/sync/pending_attestations_queue.go +++ b/beacon-chain/sync/pending_attestations_queue.go @@ -68,7 +68,7 @@ func (s *Service) processPendingAtts(ctx context.Context) error { attestations := s.blkRootToPendingAtts[bRoot] s.pendingAttsLock.RUnlock() // has the pending attestation's missing block arrived and the node processed block yet? - if s.cfg.beaconDB.HasBlock(ctx, bRoot) && (s.cfg.beaconDB.HasState(ctx, bRoot) || s.cfg.beaconDB.HasStateSummary(ctx, bRoot)) { + if s.cfg.beaconDB.HasBlock(ctx, bRoot) && (s.cfg.beaconDB.HasState(ctx, bRoot) || s.cfg.beaconDB.HasStateSummary(ctx, bRoot)) && s.cfg.chain.InForkchoice(bRoot) { s.processAttestations(ctx, attestations) log.WithFields(logrus.Fields{ "blockRoot": hex.EncodeToString(bytesutil.Trunc(bRoot[:])), diff --git a/beacon-chain/sync/pending_attestations_queue_test.go b/beacon-chain/sync/pending_attestations_queue_test.go index f4451d4ec4..cb94618824 100644 --- a/beacon-chain/sync/pending_attestations_queue_test.go +++ b/beacon-chain/sync/pending_attestations_queue_test.go @@ -738,6 +738,90 @@ func TestSavePendingAtts_BeyondLimit(t *testing.T) { assert.Equal(t, 0, len(s.blkRootToPendingAtts[r2]), "Saved pending atts") } +func TestProcessPendingAtts_BlockNotInForkChoice(t *testing.T) { + hook := logTest.NewGlobal() + db := dbtest.SetupDB(t) + p1 := p2ptest.NewTestP2P(t) + validators := uint64(256) + + beaconState, privKeys := util.DeterministicGenesisState(t, validators) + + sb := util.NewBeaconBlock() + util.SaveBlock(t, t.Context(), db, sb) + root, err := sb.Block.HashTreeRoot() + require.NoError(t, err) + + aggBits := bitfield.NewBitlist(8) + aggBits.SetBitAt(1, true) + att := ðpb.Attestation{ + Data: ðpb.AttestationData{ + BeaconBlockRoot: root[:], + Source: ðpb.Checkpoint{Epoch: 0, Root: bytesutil.PadTo([]byte("hello-world"), 32)}, + Target: ðpb.Checkpoint{Epoch: 0, Root: root[:]}, + }, + AggregationBits: aggBits, + } + + committee, err := helpers.BeaconCommitteeFromState(t.Context(), beaconState, att.Data.Slot, att.Data.CommitteeIndex) + assert.NoError(t, err) + attestingIndices, err := attestation.AttestingIndices(att, committee) + require.NoError(t, err) + attesterDomain, err := signing.Domain(beaconState.Fork(), 0, params.BeaconConfig().DomainBeaconAttester, beaconState.GenesisValidatorsRoot()) + require.NoError(t, err) + hashTreeRoot, err := signing.ComputeSigningRoot(att.Data, attesterDomain) + assert.NoError(t, err) + for _, i := range attestingIndices { + att.Signature = privKeys[i].Sign(hashTreeRoot[:]).Marshal() + } + + aggregateAndProof := ðpb.AggregateAttestationAndProof{ + Aggregate: att, + } + + require.NoError(t, beaconState.SetGenesisTime(time.Now())) + + // Mock chain service that returns false for InForkchoice + chain := &mock.ChainService{Genesis: time.Now(), + State: beaconState, + FinalizedCheckPoint: ðpb.Checkpoint{ + Root: aggregateAndProof.Aggregate.Data.BeaconBlockRoot, + Epoch: 0, + }, + // Set NotFinalized to true so InForkchoice returns false + NotFinalized: true, + } + + ctx, cancel := context.WithCancel(t.Context()) + defer cancel() + r := &Service{ + ctx: ctx, + cfg: &config{ + p2p: p1, + beaconDB: db, + chain: chain, + clock: startup.NewClock(chain.Genesis, chain.ValidatorsRoot), + attPool: attestations.NewPool(), + }, + blkRootToPendingAtts: make(map[[32]byte][]ethpb.SignedAggregateAttAndProof), + } + + s, err := util.NewBeaconState() + require.NoError(t, err) + require.NoError(t, r.cfg.beaconDB.SaveState(t.Context(), s, root)) + + // Add pending attestation + r.blkRootToPendingAtts[root] = []ethpb.SignedAggregateAttAndProof{ðpb.SignedAggregateAttestationAndProof{Message: aggregateAndProof}} + + // Process pending attestations - should not process because block is not in fork choice + require.NoError(t, r.processPendingAtts(t.Context())) + + // Verify attestations were not processed (should still be pending) + assert.Equal(t, 1, len(r.blkRootToPendingAtts[root]), "Attestations should still be pending") + assert.Equal(t, 0, len(r.cfg.attPool.UnaggregatedAttestations()), "Should not save attestation when block not in fork choice") + assert.Equal(t, 0, len(r.cfg.attPool.AggregatedAttestations()), "Should not save attestation when block not in fork choice") + require.LogsDoNotContain(t, hook, "Verified and saved pending attestations to pool") +} + func Test_attsAreEqual_Committee(t *testing.T) { t.Run("Phase 0 equal", func(t *testing.T) { att1 := ðpb.SignedAggregateAttestationAndProof{ diff --git a/changelog/tt_check_pending_att.md b/changelog/tt_check_pending_att.md new file mode 100644 index 0000000000..c8224a7544 --- /dev/null +++ b/changelog/tt_check_pending_att.md @@ -0,0 +1,3 @@ +### Changed + +- Check pending block is in forkchoice before importing pending attestation.