From 81266f60af641f1e5a9b74f2b27f0d31058ae60c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rados=C5=82aw=20Kapka?= Date: Wed, 19 Nov 2025 23:34:02 +0100 Subject: [PATCH] Move `BlockGossipReceived` event to the end of gossip validation. (#16031) * Move `BlockGossipReceived` event to the end of gossip validation. * changelog <3 * tests --- beacon-chain/sync/validate_beacon_blocks.go | 21 +- .../sync/validate_beacon_blocks_test.go | 554 +++++++++++++----- changelog/radek_move-block-gossip-event.md | 3 + 3 files changed, 426 insertions(+), 152 deletions(-) create mode 100644 changelog/radek_move-block-gossip-event.md diff --git a/beacon-chain/sync/validate_beacon_blocks.go b/beacon-chain/sync/validate_beacon_blocks.go index 021a1df0ed..cd4236a9c3 100644 --- a/beacon-chain/sync/validate_beacon_blocks.go +++ b/beacon-chain/sync/validate_beacon_blocks.go @@ -75,17 +75,8 @@ func (s *Service) validateBeaconBlockPubSub(ctx context.Context, pid peer.ID, ms return pubsub.ValidationReject, errors.New("block.Block is nil") } - // Broadcast the block on both block and operation feeds to notify other services in the beacon node + // Broadcast the block on a feed to notify other services in the beacon node // of a received block (even if it does not process correctly through a state transition). - if s.cfg.operationNotifier != nil { - s.cfg.operationNotifier.OperationFeed().Send(&feed.Event{ - Type: operation.BlockGossipReceived, - Data: &operation.BlockGossipReceivedData{ - SignedBlock: blk, - }, - }) - } - s.cfg.blockNotifier.BlockFeed().Send(&feed.Event{ Type: blockfeed.ReceivedBlock, Data: &blockfeed.ReceivedBlockData{ @@ -247,6 +238,16 @@ func (s *Service) validateBeaconBlockPubSub(ctx context.Context, pid peer.ID, ms blockArrivalGossipSummary.Observe(float64(sinceSlotStartTime.Milliseconds())) blockVerificationGossipSummary.Observe(float64(validationTime.Milliseconds())) + + if s.cfg.operationNotifier != nil { + s.cfg.operationNotifier.OperationFeed().Send(&feed.Event{ + Type: operation.BlockGossipReceived, + Data: &operation.BlockGossipReceivedData{ + SignedBlock: blk, + }, + }) + } + return pubsub.ValidationAccept, nil } diff --git a/beacon-chain/sync/validate_beacon_blocks_test.go b/beacon-chain/sync/validate_beacon_blocks_test.go index b01e965451..d56b2dfc73 100644 --- a/beacon-chain/sync/validate_beacon_blocks_test.go +++ b/beacon-chain/sync/validate_beacon_blocks_test.go @@ -11,6 +11,8 @@ import ( "github.com/OffchainLabs/prysm/v7/async/abool" mock "github.com/OffchainLabs/prysm/v7/beacon-chain/blockchain/testing" + "github.com/OffchainLabs/prysm/v7/beacon-chain/core/feed" + opfeed "github.com/OffchainLabs/prysm/v7/beacon-chain/core/feed/operation" "github.com/OffchainLabs/prysm/v7/beacon-chain/core/helpers" "github.com/OffchainLabs/prysm/v7/beacon-chain/core/signing" coreTime "github.com/OffchainLabs/prysm/v7/beacon-chain/core/time" @@ -79,17 +81,21 @@ func TestValidateBeaconBlockPubSub_InvalidSignature(t *testing.T) { } r := &Service{ cfg: &config{ - beaconDB: db, - p2p: p, - initialSync: &mockSync.Sync{IsSyncing: false}, - chain: chainService, - clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot), - blockNotifier: chainService.BlockNotifier(), - stateGen: stateGen, + beaconDB: db, + p2p: p, + initialSync: &mockSync.Sync{IsSyncing: false}, + chain: chainService, + clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot), + blockNotifier: chainService.BlockNotifier(), + operationNotifier: chainService.OperationNotifier(), + stateGen: stateGen, }, seenBlockCache: lruwrpr.New(10), badBlockCache: lruwrpr.New(10), } + opChannel := make(chan *feed.Event, 1) + opSub := r.cfg.operationNotifier.OperationFeed().Subscribe(opChannel) + defer opSub.Unsubscribe() buf := new(bytes.Buffer) _, err = p.Encoding().EncodeGossip(buf, msg) @@ -108,6 +114,13 @@ func TestValidateBeaconBlockPubSub_InvalidSignature(t *testing.T) { require.ErrorContains(t, "invalid signature", err) result := res == pubsub.ValidationReject assert.Equal(t, true, result) + + select { + case event := <-opChannel: + assert.NotEqual(t, opfeed.BlockGossipReceived, event.Type, "BlockGossipReceived event should not be sent") + default: + // this case is needed, otherwise the test will never finish + } } func TestValidateBeaconBlockPubSub_InvalidSignature_MarksBlockAsBad(t *testing.T) { @@ -145,17 +158,21 @@ func TestValidateBeaconBlockPubSub_InvalidSignature_MarksBlockAsBad(t *testing.T } r := &Service{ cfg: &config{ - beaconDB: db, - p2p: p, - initialSync: &mockSync.Sync{IsSyncing: false}, - chain: chainService, - clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot), - blockNotifier: chainService.BlockNotifier(), - stateGen: stateGen, + beaconDB: db, + p2p: p, + initialSync: &mockSync.Sync{IsSyncing: false}, + chain: chainService, + clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot), + blockNotifier: chainService.BlockNotifier(), + operationNotifier: chainService.OperationNotifier(), + stateGen: stateGen, }, seenBlockCache: lruwrpr.New(10), badBlockCache: lruwrpr.New(10), } + opChannel := make(chan *feed.Event, 1) + opSub := r.cfg.operationNotifier.OperationFeed().Subscribe(opChannel) + defer opSub.Unsubscribe() blockRoot, err := msg.Block.HashTreeRoot() require.NoError(t, err) @@ -183,6 +200,13 @@ func TestValidateBeaconBlockPubSub_InvalidSignature_MarksBlockAsBad(t *testing.T // Verify block is now marked as bad after invalid signature assert.Equal(t, true, r.hasBadBlock(blockRoot), "block should be marked as bad after invalid signature") + + select { + case event := <-opChannel: + assert.NotEqual(t, opfeed.BlockGossipReceived, event.Type, "BlockGossipReceived event should not be sent") + default: + // this case is needed, otherwise the test will never finish + } } func TestValidateBeaconBlockPubSub_BlockAlreadyPresentInDB(t *testing.T) { @@ -198,16 +222,20 @@ func TestValidateBeaconBlockPubSub_BlockAlreadyPresentInDB(t *testing.T) { chainService := &mock.ChainService{Genesis: time.Now()} r := &Service{ cfg: &config{ - beaconDB: db, - p2p: p, - initialSync: &mockSync.Sync{IsSyncing: false}, - chain: chainService, - clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot), - blockNotifier: chainService.BlockNotifier(), + beaconDB: db, + p2p: p, + initialSync: &mockSync.Sync{IsSyncing: false}, + chain: chainService, + clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot), + blockNotifier: chainService.BlockNotifier(), + operationNotifier: chainService.OperationNotifier(), }, seenBlockCache: lruwrpr.New(10), badBlockCache: lruwrpr.New(10), } + opChannel := make(chan *feed.Event, 1) + opSub := r.cfg.operationNotifier.OperationFeed().Subscribe(opChannel) + defer opSub.Unsubscribe() buf := new(bytes.Buffer) _, err := p.Encoding().EncodeGossip(buf, msg) @@ -226,6 +254,13 @@ func TestValidateBeaconBlockPubSub_BlockAlreadyPresentInDB(t *testing.T) { res, err := r.validateBeaconBlockPubSub(ctx, "", m) assert.NoError(t, err) assert.Equal(t, res, pubsub.ValidationIgnore, "block present in DB should be ignored") + + select { + case event := <-opChannel: + assert.NotEqual(t, opfeed.BlockGossipReceived, event.Type, "BlockGossipReceived event should not be sent") + default: + // this case is needed, otherwise the test will never finish + } } func TestValidateBeaconBlockPubSub_CanRecoverStateSummary(t *testing.T) { @@ -260,19 +295,24 @@ func TestValidateBeaconBlockPubSub_CanRecoverStateSummary(t *testing.T) { } r := &Service{ cfg: &config{ - beaconDB: db, - p2p: p, - initialSync: &mockSync.Sync{IsSyncing: false}, - chain: chainService, - clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot), - blockNotifier: chainService.BlockNotifier(), - stateGen: stateGen, + beaconDB: db, + p2p: p, + initialSync: &mockSync.Sync{IsSyncing: false}, + chain: chainService, + clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot), + blockNotifier: chainService.BlockNotifier(), + operationNotifier: chainService.OperationNotifier(), + stateGen: stateGen, }, seenBlockCache: lruwrpr.New(10), badBlockCache: lruwrpr.New(10), slotToPendingBlocks: gcache.New(time.Second, 2*time.Second), seenPendingBlocks: make(map[[32]byte]bool), } + opChannel := make(chan *feed.Event, 1) + opSub := r.cfg.operationNotifier.OperationFeed().Subscribe(opChannel) + defer opSub.Unsubscribe() + buf := new(bytes.Buffer) _, err = p.Encoding().EncodeGossip(buf, msg) require.NoError(t, err) @@ -291,6 +331,17 @@ func TestValidateBeaconBlockPubSub_CanRecoverStateSummary(t *testing.T) { result := res == pubsub.ValidationAccept assert.Equal(t, true, result) assert.NotNil(t, m.ValidatorData, "Decoded message was not set on the message validator data") + + blockGossipFound := false + select { + case event := <-opChannel: + if event.Type == opfeed.BlockGossipReceived { + blockGossipFound = true + } + default: + // this case is needed, otherwise the test will never finish + } + assert.Equal(t, true, blockGossipFound, "BlockGossipReceived event should be sent") } func TestValidateBeaconBlockPubSub_IsInCache(t *testing.T) { @@ -326,19 +377,24 @@ func TestValidateBeaconBlockPubSub_IsInCache(t *testing.T) { } r := &Service{ cfg: &config{ - beaconDB: db, - p2p: p, - initialSync: &mockSync.Sync{IsSyncing: false}, - chain: chainService, - clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot), - blockNotifier: chainService.BlockNotifier(), - stateGen: stateGen, + beaconDB: db, + p2p: p, + initialSync: &mockSync.Sync{IsSyncing: false}, + chain: chainService, + clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot), + blockNotifier: chainService.BlockNotifier(), + operationNotifier: chainService.OperationNotifier(), + stateGen: stateGen, }, seenBlockCache: lruwrpr.New(10), badBlockCache: lruwrpr.New(10), slotToPendingBlocks: gcache.New(time.Second, 2*time.Second), seenPendingBlocks: make(map[[32]byte]bool), } + opChannel := make(chan *feed.Event, 1) + opSub := r.cfg.operationNotifier.OperationFeed().Subscribe(opChannel) + defer opSub.Unsubscribe() + buf := new(bytes.Buffer) _, err = p.Encoding().EncodeGossip(buf, msg) require.NoError(t, err) @@ -357,6 +413,17 @@ func TestValidateBeaconBlockPubSub_IsInCache(t *testing.T) { result := res == pubsub.ValidationAccept assert.Equal(t, true, result) assert.NotNil(t, m.ValidatorData, "Decoded message was not set on the message validator data") + + blockGossipFound := false + select { + case event := <-opChannel: + if event.Type == opfeed.BlockGossipReceived { + blockGossipFound = true + } + default: + // this case is needed, otherwise the test will never finish + } + assert.Equal(t, true, blockGossipFound, "BlockGossipReceived event should be sent") } func TestValidateBeaconBlockPubSub_ValidProposerSignature(t *testing.T) { @@ -392,19 +459,24 @@ func TestValidateBeaconBlockPubSub_ValidProposerSignature(t *testing.T) { } r := &Service{ cfg: &config{ - beaconDB: db, - p2p: p, - initialSync: &mockSync.Sync{IsSyncing: false}, - chain: chainService, - clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot), - blockNotifier: chainService.BlockNotifier(), - stateGen: stateGen, + beaconDB: db, + p2p: p, + initialSync: &mockSync.Sync{IsSyncing: false}, + chain: chainService, + clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot), + blockNotifier: chainService.BlockNotifier(), + operationNotifier: chainService.OperationNotifier(), + stateGen: stateGen, }, seenBlockCache: lruwrpr.New(10), badBlockCache: lruwrpr.New(10), slotToPendingBlocks: gcache.New(time.Second, 2*time.Second), seenPendingBlocks: make(map[[32]byte]bool), } + opChannel := make(chan *feed.Event, 1) + opSub := r.cfg.operationNotifier.OperationFeed().Subscribe(opChannel) + defer opSub.Unsubscribe() + buf := new(bytes.Buffer) _, err = p.Encoding().EncodeGossip(buf, msg) require.NoError(t, err) @@ -423,6 +495,17 @@ func TestValidateBeaconBlockPubSub_ValidProposerSignature(t *testing.T) { result := res == pubsub.ValidationAccept assert.Equal(t, true, result) assert.NotNil(t, m.ValidatorData, "Decoded message was not set on the message validator data") + + blockGossipFound := false + select { + case event := <-opChannel: + if event.Type == opfeed.BlockGossipReceived { + blockGossipFound = true + } + default: + // this case is needed, otherwise the test will never finish + } + assert.Equal(t, true, blockGossipFound, "BlockGossipReceived event should be sent") } func TestValidateBeaconBlockPubSub_WithLookahead(t *testing.T) { @@ -460,13 +543,14 @@ func TestValidateBeaconBlockPubSub_WithLookahead(t *testing.T) { }} r := &Service{ cfg: &config{ - beaconDB: db, - p2p: p, - initialSync: &mockSync.Sync{IsSyncing: false}, - chain: chainService, - clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot), - blockNotifier: chainService.BlockNotifier(), - stateGen: stateGen, + beaconDB: db, + p2p: p, + initialSync: &mockSync.Sync{IsSyncing: false}, + chain: chainService, + clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot), + blockNotifier: chainService.BlockNotifier(), + operationNotifier: chainService.OperationNotifier(), + stateGen: stateGen, }, seenBlockCache: lruwrpr.New(10), badBlockCache: lruwrpr.New(10), @@ -474,6 +558,10 @@ func TestValidateBeaconBlockPubSub_WithLookahead(t *testing.T) { seenPendingBlocks: make(map[[32]byte]bool), subHandler: newSubTopicHandler(), } + opChannel := make(chan *feed.Event, 1) + opSub := r.cfg.operationNotifier.OperationFeed().Subscribe(opChannel) + defer opSub.Unsubscribe() + buf := new(bytes.Buffer) _, err = p.Encoding().EncodeGossip(buf, msg) require.NoError(t, err) @@ -492,6 +580,17 @@ func TestValidateBeaconBlockPubSub_WithLookahead(t *testing.T) { result := res == pubsub.ValidationAccept assert.Equal(t, true, result) assert.NotNil(t, m.ValidatorData, "Decoded message was not set on the message validator data") + + blockGossipFound := false + select { + case event := <-opChannel: + if event.Type == opfeed.BlockGossipReceived { + blockGossipFound = true + } + default: + // this case is needed, otherwise the test will never finish + } + assert.Equal(t, true, blockGossipFound, "BlockGossipReceived event should be sent") } func TestValidateBeaconBlockPubSub_AdvanceEpochsForState(t *testing.T) { @@ -529,19 +628,24 @@ func TestValidateBeaconBlockPubSub_AdvanceEpochsForState(t *testing.T) { }} r := &Service{ cfg: &config{ - beaconDB: db, - p2p: p, - initialSync: &mockSync.Sync{IsSyncing: false}, - chain: chainService, - clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot), - blockNotifier: chainService.BlockNotifier(), - stateGen: stateGen, + beaconDB: db, + p2p: p, + initialSync: &mockSync.Sync{IsSyncing: false}, + chain: chainService, + clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot), + blockNotifier: chainService.BlockNotifier(), + operationNotifier: chainService.OperationNotifier(), + stateGen: stateGen, }, seenBlockCache: lruwrpr.New(10), badBlockCache: lruwrpr.New(10), slotToPendingBlocks: gcache.New(time.Second, 2*time.Second), seenPendingBlocks: make(map[[32]byte]bool), } + opChannel := make(chan *feed.Event, 1) + opSub := r.cfg.operationNotifier.OperationFeed().Subscribe(opChannel) + defer opSub.Unsubscribe() + buf := new(bytes.Buffer) _, err = p.Encoding().EncodeGossip(buf, msg) require.NoError(t, err) @@ -560,6 +664,17 @@ func TestValidateBeaconBlockPubSub_AdvanceEpochsForState(t *testing.T) { result := res == pubsub.ValidationAccept assert.Equal(t, true, result) assert.NotNil(t, m.ValidatorData, "Decoded message was not set on the message validator data") + + blockGossipFound := false + select { + case event := <-opChannel: + if event.Type == opfeed.BlockGossipReceived { + blockGossipFound = true + } + default: + // this case is needed, otherwise the test will never finish + } + assert.Equal(t, true, blockGossipFound, "BlockGossipReceived event should be sent") } func TestValidateBeaconBlockPubSub_Syncing(t *testing.T) { @@ -580,13 +695,17 @@ func TestValidateBeaconBlockPubSub_Syncing(t *testing.T) { }} r := &Service{ cfg: &config{ - beaconDB: db, - p2p: p, - initialSync: &mockSync.Sync{IsSyncing: true}, - chain: chainService, - blockNotifier: chainService.BlockNotifier(), + beaconDB: db, + p2p: p, + initialSync: &mockSync.Sync{IsSyncing: true}, + chain: chainService, + blockNotifier: chainService.BlockNotifier(), + operationNotifier: chainService.OperationNotifier(), }, } + opChannel := make(chan *feed.Event, 1) + opSub := r.cfg.operationNotifier.OperationFeed().Subscribe(opChannel) + defer opSub.Unsubscribe() buf := new(bytes.Buffer) _, err = p.Encoding().EncodeGossip(buf, msg) @@ -601,6 +720,13 @@ func TestValidateBeaconBlockPubSub_Syncing(t *testing.T) { res, err := r.validateBeaconBlockPubSub(ctx, "", m) assert.NoError(t, err) assert.Equal(t, res, pubsub.ValidationIgnore, "block is ignored until fully synced") + + select { + case event := <-opChannel: + assert.NotEqual(t, opfeed.BlockGossipReceived, event.Type, "BlockGossipReceived event should not be sent") + default: + // this case is needed, otherwise the test will never finish + } } func TestValidateBeaconBlockPubSub_IgnoreAndQueueBlocksFromNearFuture(t *testing.T) { @@ -636,13 +762,14 @@ func TestValidateBeaconBlockPubSub_IgnoreAndQueueBlocksFromNearFuture(t *testing State: beaconState} r := &Service{ cfg: &config{ - p2p: p, - beaconDB: db, - initialSync: &mockSync.Sync{IsSyncing: false}, - chain: chainService, - clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot), - blockNotifier: chainService.BlockNotifier(), - stateGen: stateGen, + p2p: p, + beaconDB: db, + initialSync: &mockSync.Sync{IsSyncing: false}, + chain: chainService, + clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot), + blockNotifier: chainService.BlockNotifier(), + operationNotifier: chainService.OperationNotifier(), + stateGen: stateGen, }, chainStarted: abool.New(), seenBlockCache: lruwrpr.New(10), @@ -650,6 +777,9 @@ func TestValidateBeaconBlockPubSub_IgnoreAndQueueBlocksFromNearFuture(t *testing slotToPendingBlocks: gcache.New(time.Second, 2*time.Second), seenPendingBlocks: make(map[[32]byte]bool), } + opChannel := make(chan *feed.Event, 1) + opSub := r.cfg.operationNotifier.OperationFeed().Subscribe(opChannel) + defer opSub.Unsubscribe() buf := new(bytes.Buffer) _, err = p.Encoding().EncodeGossip(buf, msg) @@ -670,6 +800,13 @@ func TestValidateBeaconBlockPubSub_IgnoreAndQueueBlocksFromNearFuture(t *testing // check if the block is inserted in the Queue assert.Equal(t, true, len(r.pendingBlocksInCache(msg.Block.Slot)) == 1) + + select { + case event := <-opChannel: + assert.NotEqual(t, opfeed.BlockGossipReceived, event.Type, "BlockGossipReceived event should not be sent") + default: + // this case is needed, otherwise the test will never finish + } } func TestValidateBeaconBlockPubSub_RejectBlocksFromFuture(t *testing.T) { @@ -688,12 +825,13 @@ func TestValidateBeaconBlockPubSub_RejectBlocksFromFuture(t *testing.T) { chainService := &mock.ChainService{Genesis: time.Now()} r := &Service{ cfg: &config{ - p2p: p, - beaconDB: db, - initialSync: &mockSync.Sync{IsSyncing: false}, - chain: chainService, - clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot), - blockNotifier: chainService.BlockNotifier(), + p2p: p, + beaconDB: db, + initialSync: &mockSync.Sync{IsSyncing: false}, + chain: chainService, + clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot), + blockNotifier: chainService.BlockNotifier(), + operationNotifier: chainService.OperationNotifier(), }, chainStarted: abool.New(), seenBlockCache: lruwrpr.New(10), @@ -701,6 +839,9 @@ func TestValidateBeaconBlockPubSub_RejectBlocksFromFuture(t *testing.T) { slotToPendingBlocks: gcache.New(time.Second, 2*time.Second), seenPendingBlocks: make(map[[32]byte]bool), } + opChannel := make(chan *feed.Event, 1) + opSub := r.cfg.operationNotifier.OperationFeed().Subscribe(opChannel) + defer opSub.Unsubscribe() buf := new(bytes.Buffer) _, err = p.Encoding().EncodeGossip(buf, msg) @@ -718,6 +859,13 @@ func TestValidateBeaconBlockPubSub_RejectBlocksFromFuture(t *testing.T) { res, err := r.validateBeaconBlockPubSub(ctx, "", m) assert.NoError(t, err) assert.Equal(t, res, pubsub.ValidationIgnore, "block from the future should be ignored") + + select { + case event := <-opChannel: + assert.NotEqual(t, opfeed.BlockGossipReceived, event.Type, "BlockGossipReceived event should not be sent") + default: + // this case is needed, otherwise the test will never finish + } } func TestValidateBeaconBlockPubSub_RejectBlocksFromThePast(t *testing.T) { @@ -742,16 +890,20 @@ func TestValidateBeaconBlockPubSub_RejectBlocksFromThePast(t *testing.T) { } r := &Service{ cfg: &config{ - beaconDB: db, - p2p: p, - initialSync: &mockSync.Sync{IsSyncing: false}, - chain: chainService, - clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot), - blockNotifier: chainService.BlockNotifier(), + beaconDB: db, + p2p: p, + initialSync: &mockSync.Sync{IsSyncing: false}, + chain: chainService, + clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot), + blockNotifier: chainService.BlockNotifier(), + operationNotifier: chainService.OperationNotifier(), }, seenBlockCache: lruwrpr.New(10), badBlockCache: lruwrpr.New(10), } + opChannel := make(chan *feed.Event, 1) + opSub := r.cfg.operationNotifier.OperationFeed().Subscribe(opChannel) + defer opSub.Unsubscribe() buf := new(bytes.Buffer) _, err = p.Encoding().EncodeGossip(buf, msg) @@ -769,6 +921,13 @@ func TestValidateBeaconBlockPubSub_RejectBlocksFromThePast(t *testing.T) { res, err := r.validateBeaconBlockPubSub(ctx, "", m) require.ErrorContains(t, "greater or equal to block slot", err) assert.Equal(t, res, pubsub.ValidationIgnore, "block from the past should be ignored") + + select { + case event := <-opChannel: + assert.NotEqual(t, opfeed.BlockGossipReceived, event.Type, "BlockGossipReceived event should not be sent") + default: + // this case is needed, otherwise the test will never finish + } } func TestValidateBeaconBlockPubSub_SeenProposerSlot(t *testing.T) { @@ -813,19 +972,23 @@ func TestValidateBeaconBlockPubSub_SeenProposerSlot(t *testing.T) { } r := &Service{ cfg: &config{ - beaconDB: db, - p2p: p, - initialSync: &mockSync.Sync{IsSyncing: false}, - chain: chainService, - clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot), - blockNotifier: chainService.BlockNotifier(), - slashingPool: slashingPool, + beaconDB: db, + p2p: p, + initialSync: &mockSync.Sync{IsSyncing: false}, + chain: chainService, + clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot), + blockNotifier: chainService.BlockNotifier(), + operationNotifier: chainService.OperationNotifier(), + slashingPool: slashingPool, }, seenBlockCache: lruwrpr.New(10), badBlockCache: lruwrpr.New(10), slotToPendingBlocks: gcache.New(time.Second, 2*time.Second), seenPendingBlocks: make(map[[32]byte]bool), } + opChannel := make(chan *feed.Event, 1) + opSub := r.cfg.operationNotifier.OperationFeed().Subscribe(opChannel) + defer opSub.Unsubscribe() // Mark the proposer/slot as seen r.setSeenBlockIndexSlot(msg.Block.Slot, msg.Block.ProposerIndex) @@ -853,6 +1016,13 @@ func TestValidateBeaconBlockPubSub_SeenProposerSlot(t *testing.T) { // Verify no slashings were created assert.Equal(t, 0, len(slashingPool.PendingPropSlashings), "Expected no slashings for same signature") + + select { + case event := <-opChannel: + assert.NotEqual(t, opfeed.BlockGossipReceived, event.Type, "BlockGossipReceived event should not be sent") + default: + // this case is needed, otherwise the test will never finish + } } func TestValidateBeaconBlockPubSub_FilterByFinalizedEpoch(t *testing.T) { @@ -875,17 +1045,21 @@ func TestValidateBeaconBlockPubSub_FilterByFinalizedEpoch(t *testing.T) { r := &Service{ cfg: &config{ - beaconDB: db, - p2p: p, - chain: chain, - clock: startup.NewClock(chain.Genesis, chain.ValidatorsRoot), - blockNotifier: chain.BlockNotifier(), - attPool: attestations.NewPool(), - initialSync: &mockSync.Sync{IsSyncing: false}, + beaconDB: db, + p2p: p, + chain: chain, + clock: startup.NewClock(chain.Genesis, chain.ValidatorsRoot), + blockNotifier: chain.BlockNotifier(), + operationNotifier: chain.OperationNotifier(), + attPool: attestations.NewPool(), + initialSync: &mockSync.Sync{IsSyncing: false}, }, seenBlockCache: lruwrpr.New(10), badBlockCache: lruwrpr.New(10), } + opChannel := make(chan *feed.Event, 1) + opSub := r.cfg.operationNotifier.OperationFeed().Subscribe(opChannel) + defer opSub.Unsubscribe() b := util.NewBeaconBlock() b.Block.Slot = 1 @@ -922,6 +1096,13 @@ func TestValidateBeaconBlockPubSub_FilterByFinalizedEpoch(t *testing.T) { res, err = r.validateBeaconBlockPubSub(t.Context(), "", m) assert.NoError(t, err) assert.Equal(t, pubsub.ValidationIgnore, res) + + select { + case event := <-opChannel: + assert.NotEqual(t, opfeed.BlockGossipReceived, event.Type, "BlockGossipReceived event should not be sent") + default: + // this case is needed, otherwise the test will never finish + } } func TestValidateBeaconBlockPubSub_ParentNotFinalizedDescendant(t *testing.T) { @@ -960,19 +1141,24 @@ func TestValidateBeaconBlockPubSub_ParentNotFinalizedDescendant(t *testing.T) { } r := &Service{ cfg: &config{ - beaconDB: db, - p2p: p, - initialSync: &mockSync.Sync{IsSyncing: false}, - chain: chainService, - clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot), - blockNotifier: chainService.BlockNotifier(), - stateGen: stateGen, + beaconDB: db, + p2p: p, + initialSync: &mockSync.Sync{IsSyncing: false}, + chain: chainService, + clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot), + blockNotifier: chainService.BlockNotifier(), + operationNotifier: chainService.OperationNotifier(), + stateGen: stateGen, }, seenBlockCache: lruwrpr.New(10), badBlockCache: lruwrpr.New(10), slotToPendingBlocks: gcache.New(time.Second, 2*time.Second), seenPendingBlocks: make(map[[32]byte]bool), } + opChannel := make(chan *feed.Event, 1) + opSub := r.cfg.operationNotifier.OperationFeed().Subscribe(opChannel) + defer opSub.Unsubscribe() + buf := new(bytes.Buffer) _, err = p.Encoding().EncodeGossip(buf, msg) require.NoError(t, err) @@ -989,6 +1175,13 @@ func TestValidateBeaconBlockPubSub_ParentNotFinalizedDescendant(t *testing.T) { res, err := r.validateBeaconBlockPubSub(ctx, "", m) assert.Equal(t, pubsub.ValidationReject, res, "Wrong validation result returned") require.ErrorContains(t, "not descendant of finalized checkpoint", err) + + select { + case event := <-opChannel: + assert.NotEqual(t, opfeed.BlockGossipReceived, event.Type, "BlockGossipReceived event should not be sent") + default: + // this case is needed, otherwise the test will never finish + } } func TestValidateBeaconBlockPubSub_InvalidParentBlock(t *testing.T) { @@ -1026,19 +1219,24 @@ func TestValidateBeaconBlockPubSub_InvalidParentBlock(t *testing.T) { }} r := &Service{ cfg: &config{ - beaconDB: db, - p2p: p, - initialSync: &mockSync.Sync{IsSyncing: false}, - chain: chainService, - clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot), - blockNotifier: chainService.BlockNotifier(), - stateGen: stateGen, + beaconDB: db, + p2p: p, + initialSync: &mockSync.Sync{IsSyncing: false}, + chain: chainService, + clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot), + blockNotifier: chainService.BlockNotifier(), + operationNotifier: chainService.OperationNotifier(), + stateGen: stateGen, }, seenBlockCache: lruwrpr.New(10), badBlockCache: lruwrpr.New(10), slotToPendingBlocks: gcache.New(time.Second, 2*time.Second), seenPendingBlocks: make(map[[32]byte]bool), } + opChannel := make(chan *feed.Event, 1) + opSub := r.cfg.operationNotifier.OperationFeed().Subscribe(opChannel) + defer opSub.Unsubscribe() + buf := new(bytes.Buffer) _, err = p.Encoding().EncodeGossip(buf, msg) require.NoError(t, err) @@ -1088,6 +1286,13 @@ func TestValidateBeaconBlockPubSub_InvalidParentBlock(t *testing.T) { require.ErrorContains(t, "has an invalid parent", err) // Expect block with bad parent to fail too assert.Equal(t, res, pubsub.ValidationReject, "block with invalid parent should be ignored") + + select { + case event := <-opChannel: + assert.NotEqual(t, opfeed.BlockGossipReceived, event.Type, "BlockGossipReceived event should not be sent") + default: + // this case is needed, otherwise the test will never finish + } } func TestValidateBeaconBlockPubSub_InsertValidPendingBlock(t *testing.T) { @@ -1120,19 +1325,24 @@ func TestValidateBeaconBlockPubSub_InsertValidPendingBlock(t *testing.T) { }} r := &Service{ cfg: &config{ - beaconDB: db, - p2p: p, - initialSync: &mockSync.Sync{IsSyncing: false}, - chain: chainService, - clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot), - blockNotifier: chainService.BlockNotifier(), - stateGen: stateGen, + beaconDB: db, + p2p: p, + initialSync: &mockSync.Sync{IsSyncing: false}, + chain: chainService, + clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot), + blockNotifier: chainService.BlockNotifier(), + operationNotifier: chainService.OperationNotifier(), + stateGen: stateGen, }, seenBlockCache: lruwrpr.New(10), badBlockCache: lruwrpr.New(10), slotToPendingBlocks: gcache.New(time.Second, 2*time.Second), seenPendingBlocks: make(map[[32]byte]bool), } + opChannel := make(chan *feed.Event, 1) + opSub := r.cfg.operationNotifier.OperationFeed().Subscribe(opChannel) + defer opSub.Unsubscribe() + buf := new(bytes.Buffer) _, err = p.Encoding().EncodeGossip(buf, msg) require.NoError(t, err) @@ -1152,6 +1362,13 @@ func TestValidateBeaconBlockPubSub_InsertValidPendingBlock(t *testing.T) { bRoot, err = msg.Block.HashTreeRoot() assert.NoError(t, err) assert.Equal(t, true, r.seenPendingBlocks[bRoot]) + + select { + case event := <-opChannel: + assert.NotEqual(t, opfeed.BlockGossipReceived, event.Type, "BlockGossipReceived event should not be sent") + default: + // this case is needed, otherwise the test will never finish + } } func TestValidateBeaconBlockPubSub_RejectBlocksFromBadParent(t *testing.T) { @@ -1203,19 +1420,24 @@ func TestValidateBeaconBlockPubSub_RejectBlocksFromBadParent(t *testing.T) { } r := &Service{ cfg: &config{ - beaconDB: db, - p2p: p, - initialSync: &mockSync.Sync{IsSyncing: false}, - chain: chainService, - clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot), - blockNotifier: chainService.BlockNotifier(), - stateGen: stateGen, + beaconDB: db, + p2p: p, + initialSync: &mockSync.Sync{IsSyncing: false}, + chain: chainService, + clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot), + blockNotifier: chainService.BlockNotifier(), + operationNotifier: chainService.OperationNotifier(), + stateGen: stateGen, }, seenBlockCache: lruwrpr.New(10), badBlockCache: lruwrpr.New(10), slotToPendingBlocks: gcache.New(time.Second, 2*time.Second), seenPendingBlocks: make(map[[32]byte]bool), } + opChannel := make(chan *feed.Event, 1) + opSub := r.cfg.operationNotifier.OperationFeed().Subscribe(opChannel) + defer opSub.Unsubscribe() + r.setBadBlock(ctx, bytesutil.ToBytes32(msg.Block.ParentRoot)) buf := new(bytes.Buffer) @@ -1234,6 +1456,13 @@ func TestValidateBeaconBlockPubSub_RejectBlocksFromBadParent(t *testing.T) { res, err := r.validateBeaconBlockPubSub(ctx, "", m) assert.ErrorContains(t, "invalid parent", err) assert.Equal(t, res, pubsub.ValidationReject) + + select { + case event := <-opChannel: + assert.NotEqual(t, opfeed.BlockGossipReceived, event.Type, "BlockGossipReceived event should not be sent") + default: + // this case is needed, otherwise the test will never finish + } } func TestService_setBadBlock_DoesntSetWithContextErr(t *testing.T) { @@ -1311,17 +1540,21 @@ func TestValidateBeaconBlockPubSub_ValidExecutionPayload(t *testing.T) { } r := &Service{ cfg: &config{ - beaconDB: db, - p2p: p, - initialSync: &mockSync.Sync{IsSyncing: false}, - chain: chainService, - blockNotifier: chainService.BlockNotifier(), - stateGen: stateGen, - clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot), + beaconDB: db, + p2p: p, + initialSync: &mockSync.Sync{IsSyncing: false}, + chain: chainService, + blockNotifier: chainService.BlockNotifier(), + operationNotifier: chainService.OperationNotifier(), + stateGen: stateGen, + clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot), }, seenBlockCache: lruwrpr.New(10), badBlockCache: lruwrpr.New(10), } + opChannel := make(chan *feed.Event, 1) + opSub := r.cfg.operationNotifier.OperationFeed().Subscribe(opChannel) + defer opSub.Unsubscribe() buf := new(bytes.Buffer) _, err = p.Encoding().EncodeGossip(buf, msg) @@ -1342,6 +1575,17 @@ func TestValidateBeaconBlockPubSub_ValidExecutionPayload(t *testing.T) { require.NoError(t, err) result := res == pubsub.ValidationAccept require.Equal(t, true, result) + + blockGossipFound := false + select { + case event := <-opChannel: + if event.Type == opfeed.BlockGossipReceived { + blockGossipFound = true + } + default: + // this case is needed, otherwise the test will never finish + } + assert.Equal(t, true, blockGossipFound, "BlockGossipReceived event should be sent") } func TestValidateBeaconBlockPubSub_InvalidPayloadTimestamp(t *testing.T) { @@ -1383,17 +1627,21 @@ func TestValidateBeaconBlockPubSub_InvalidPayloadTimestamp(t *testing.T) { }} r := &Service{ cfg: &config{ - beaconDB: db, - p2p: p, - initialSync: &mockSync.Sync{IsSyncing: false}, - chain: chainService, - clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot), - blockNotifier: chainService.BlockNotifier(), - stateGen: stateGen, + beaconDB: db, + p2p: p, + initialSync: &mockSync.Sync{IsSyncing: false}, + chain: chainService, + clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot), + blockNotifier: chainService.BlockNotifier(), + operationNotifier: chainService.OperationNotifier(), + stateGen: stateGen, }, seenBlockCache: lruwrpr.New(10), badBlockCache: lruwrpr.New(10), } + opChannel := make(chan *feed.Event, 1) + opSub := r.cfg.operationNotifier.OperationFeed().Subscribe(opChannel) + defer opSub.Unsubscribe() buf := new(bytes.Buffer) _, err = p.Encoding().EncodeGossip(buf, msg) @@ -1413,6 +1661,13 @@ func TestValidateBeaconBlockPubSub_InvalidPayloadTimestamp(t *testing.T) { require.NotNil(t, err) result := res == pubsub.ValidationReject assert.Equal(t, true, result) + + select { + case event := <-opChannel: + assert.NotEqual(t, opfeed.BlockGossipReceived, event.Type, "BlockGossipReceived event should not be sent") + default: + // this case is needed, otherwise the test will never finish + } } func Test_validateBellatrixBeaconBlock(t *testing.T) { @@ -1549,17 +1804,21 @@ func Test_validateBeaconBlockProcessingWhenParentIsOptimistic(t *testing.T) { } r := &Service{ cfg: &config{ - beaconDB: db, - p2p: p, - initialSync: &mockSync.Sync{IsSyncing: false}, - chain: chainService, - blockNotifier: chainService.BlockNotifier(), - stateGen: stateGen, - clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot), + beaconDB: db, + p2p: p, + initialSync: &mockSync.Sync{IsSyncing: false}, + chain: chainService, + blockNotifier: chainService.BlockNotifier(), + operationNotifier: chainService.OperationNotifier(), + stateGen: stateGen, + clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot), }, seenBlockCache: lruwrpr.New(10), badBlockCache: lruwrpr.New(10), } + opChannel := make(chan *feed.Event, 1) + opSub := r.cfg.operationNotifier.OperationFeed().Subscribe(opChannel) + defer opSub.Unsubscribe() buf := new(bytes.Buffer) _, err = p.Encoding().EncodeGossip(buf, msg) @@ -1580,6 +1839,17 @@ func Test_validateBeaconBlockProcessingWhenParentIsOptimistic(t *testing.T) { require.NoError(t, err) result := res == pubsub.ValidationAccept assert.Equal(t, true, result) + + blockGossipFound := false + select { + case event := <-opChannel: + if event.Type == opfeed.BlockGossipReceived { + blockGossipFound = true + } + default: + // this case is needed, otherwise the test will never finish + } + assert.Equal(t, true, blockGossipFound, "BlockGossipReceived event should be sent") } func Test_getBlockFields(t *testing.T) { diff --git a/changelog/radek_move-block-gossip-event.md b/changelog/radek_move-block-gossip-event.md new file mode 100644 index 0000000000..a99945f5ab --- /dev/null +++ b/changelog/radek_move-block-gossip-event.md @@ -0,0 +1,3 @@ +### Fixed + +- Move `BlockGossipReceived` event to the end of gossip validation. \ No newline at end of file