Move BlockGossipReceived event to the end of gossip validation. (#16031)

* Move `BlockGossipReceived` event to the end of gossip validation.

* changelog <3

* tests
This commit is contained in:
Radosław Kapka
2025-11-19 23:34:02 +01:00
committed by GitHub
parent 207f36065a
commit 81266f60af
3 changed files with 426 additions and 152 deletions

View File

@@ -75,17 +75,8 @@ func (s *Service) validateBeaconBlockPubSub(ctx context.Context, pid peer.ID, ms
return pubsub.ValidationReject, errors.New("block.Block is nil") return pubsub.ValidationReject, errors.New("block.Block is nil")
} }
// Broadcast the block on both block and operation feeds to notify other services in the beacon node // Broadcast the block on a feed to notify other services in the beacon node
// of a received block (even if it does not process correctly through a state transition). // of a received block (even if it does not process correctly through a state transition).
if s.cfg.operationNotifier != nil {
s.cfg.operationNotifier.OperationFeed().Send(&feed.Event{
Type: operation.BlockGossipReceived,
Data: &operation.BlockGossipReceivedData{
SignedBlock: blk,
},
})
}
s.cfg.blockNotifier.BlockFeed().Send(&feed.Event{ s.cfg.blockNotifier.BlockFeed().Send(&feed.Event{
Type: blockfeed.ReceivedBlock, Type: blockfeed.ReceivedBlock,
Data: &blockfeed.ReceivedBlockData{ Data: &blockfeed.ReceivedBlockData{
@@ -247,6 +238,16 @@ func (s *Service) validateBeaconBlockPubSub(ctx context.Context, pid peer.ID, ms
blockArrivalGossipSummary.Observe(float64(sinceSlotStartTime.Milliseconds())) blockArrivalGossipSummary.Observe(float64(sinceSlotStartTime.Milliseconds()))
blockVerificationGossipSummary.Observe(float64(validationTime.Milliseconds())) blockVerificationGossipSummary.Observe(float64(validationTime.Milliseconds()))
if s.cfg.operationNotifier != nil {
s.cfg.operationNotifier.OperationFeed().Send(&feed.Event{
Type: operation.BlockGossipReceived,
Data: &operation.BlockGossipReceivedData{
SignedBlock: blk,
},
})
}
return pubsub.ValidationAccept, nil return pubsub.ValidationAccept, nil
} }

View File

@@ -11,6 +11,8 @@ import (
"github.com/OffchainLabs/prysm/v7/async/abool" "github.com/OffchainLabs/prysm/v7/async/abool"
mock "github.com/OffchainLabs/prysm/v7/beacon-chain/blockchain/testing" mock "github.com/OffchainLabs/prysm/v7/beacon-chain/blockchain/testing"
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/feed"
opfeed "github.com/OffchainLabs/prysm/v7/beacon-chain/core/feed/operation"
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/helpers" "github.com/OffchainLabs/prysm/v7/beacon-chain/core/helpers"
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/signing" "github.com/OffchainLabs/prysm/v7/beacon-chain/core/signing"
coreTime "github.com/OffchainLabs/prysm/v7/beacon-chain/core/time" coreTime "github.com/OffchainLabs/prysm/v7/beacon-chain/core/time"
@@ -79,17 +81,21 @@ func TestValidateBeaconBlockPubSub_InvalidSignature(t *testing.T) {
} }
r := &Service{ r := &Service{
cfg: &config{ cfg: &config{
beaconDB: db, beaconDB: db,
p2p: p, p2p: p,
initialSync: &mockSync.Sync{IsSyncing: false}, initialSync: &mockSync.Sync{IsSyncing: false},
chain: chainService, chain: chainService,
clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot), clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot),
blockNotifier: chainService.BlockNotifier(), blockNotifier: chainService.BlockNotifier(),
stateGen: stateGen, operationNotifier: chainService.OperationNotifier(),
stateGen: stateGen,
}, },
seenBlockCache: lruwrpr.New(10), seenBlockCache: lruwrpr.New(10),
badBlockCache: lruwrpr.New(10), badBlockCache: lruwrpr.New(10),
} }
opChannel := make(chan *feed.Event, 1)
opSub := r.cfg.operationNotifier.OperationFeed().Subscribe(opChannel)
defer opSub.Unsubscribe()
buf := new(bytes.Buffer) buf := new(bytes.Buffer)
_, err = p.Encoding().EncodeGossip(buf, msg) _, err = p.Encoding().EncodeGossip(buf, msg)
@@ -108,6 +114,13 @@ func TestValidateBeaconBlockPubSub_InvalidSignature(t *testing.T) {
require.ErrorContains(t, "invalid signature", err) require.ErrorContains(t, "invalid signature", err)
result := res == pubsub.ValidationReject result := res == pubsub.ValidationReject
assert.Equal(t, true, result) assert.Equal(t, true, result)
select {
case event := <-opChannel:
assert.NotEqual(t, opfeed.BlockGossipReceived, event.Type, "BlockGossipReceived event should not be sent")
default:
// this case is needed, otherwise the test will never finish
}
} }
func TestValidateBeaconBlockPubSub_InvalidSignature_MarksBlockAsBad(t *testing.T) { func TestValidateBeaconBlockPubSub_InvalidSignature_MarksBlockAsBad(t *testing.T) {
@@ -145,17 +158,21 @@ func TestValidateBeaconBlockPubSub_InvalidSignature_MarksBlockAsBad(t *testing.T
} }
r := &Service{ r := &Service{
cfg: &config{ cfg: &config{
beaconDB: db, beaconDB: db,
p2p: p, p2p: p,
initialSync: &mockSync.Sync{IsSyncing: false}, initialSync: &mockSync.Sync{IsSyncing: false},
chain: chainService, chain: chainService,
clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot), clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot),
blockNotifier: chainService.BlockNotifier(), blockNotifier: chainService.BlockNotifier(),
stateGen: stateGen, operationNotifier: chainService.OperationNotifier(),
stateGen: stateGen,
}, },
seenBlockCache: lruwrpr.New(10), seenBlockCache: lruwrpr.New(10),
badBlockCache: lruwrpr.New(10), badBlockCache: lruwrpr.New(10),
} }
opChannel := make(chan *feed.Event, 1)
opSub := r.cfg.operationNotifier.OperationFeed().Subscribe(opChannel)
defer opSub.Unsubscribe()
blockRoot, err := msg.Block.HashTreeRoot() blockRoot, err := msg.Block.HashTreeRoot()
require.NoError(t, err) require.NoError(t, err)
@@ -183,6 +200,13 @@ func TestValidateBeaconBlockPubSub_InvalidSignature_MarksBlockAsBad(t *testing.T
// Verify block is now marked as bad after invalid signature // Verify block is now marked as bad after invalid signature
assert.Equal(t, true, r.hasBadBlock(blockRoot), "block should be marked as bad after invalid signature") assert.Equal(t, true, r.hasBadBlock(blockRoot), "block should be marked as bad after invalid signature")
select {
case event := <-opChannel:
assert.NotEqual(t, opfeed.BlockGossipReceived, event.Type, "BlockGossipReceived event should not be sent")
default:
// this case is needed, otherwise the test will never finish
}
} }
func TestValidateBeaconBlockPubSub_BlockAlreadyPresentInDB(t *testing.T) { func TestValidateBeaconBlockPubSub_BlockAlreadyPresentInDB(t *testing.T) {
@@ -198,16 +222,20 @@ func TestValidateBeaconBlockPubSub_BlockAlreadyPresentInDB(t *testing.T) {
chainService := &mock.ChainService{Genesis: time.Now()} chainService := &mock.ChainService{Genesis: time.Now()}
r := &Service{ r := &Service{
cfg: &config{ cfg: &config{
beaconDB: db, beaconDB: db,
p2p: p, p2p: p,
initialSync: &mockSync.Sync{IsSyncing: false}, initialSync: &mockSync.Sync{IsSyncing: false},
chain: chainService, chain: chainService,
clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot), clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot),
blockNotifier: chainService.BlockNotifier(), blockNotifier: chainService.BlockNotifier(),
operationNotifier: chainService.OperationNotifier(),
}, },
seenBlockCache: lruwrpr.New(10), seenBlockCache: lruwrpr.New(10),
badBlockCache: lruwrpr.New(10), badBlockCache: lruwrpr.New(10),
} }
opChannel := make(chan *feed.Event, 1)
opSub := r.cfg.operationNotifier.OperationFeed().Subscribe(opChannel)
defer opSub.Unsubscribe()
buf := new(bytes.Buffer) buf := new(bytes.Buffer)
_, err := p.Encoding().EncodeGossip(buf, msg) _, err := p.Encoding().EncodeGossip(buf, msg)
@@ -226,6 +254,13 @@ func TestValidateBeaconBlockPubSub_BlockAlreadyPresentInDB(t *testing.T) {
res, err := r.validateBeaconBlockPubSub(ctx, "", m) res, err := r.validateBeaconBlockPubSub(ctx, "", m)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, res, pubsub.ValidationIgnore, "block present in DB should be ignored") assert.Equal(t, res, pubsub.ValidationIgnore, "block present in DB should be ignored")
select {
case event := <-opChannel:
assert.NotEqual(t, opfeed.BlockGossipReceived, event.Type, "BlockGossipReceived event should not be sent")
default:
// this case is needed, otherwise the test will never finish
}
} }
func TestValidateBeaconBlockPubSub_CanRecoverStateSummary(t *testing.T) { func TestValidateBeaconBlockPubSub_CanRecoverStateSummary(t *testing.T) {
@@ -260,19 +295,24 @@ func TestValidateBeaconBlockPubSub_CanRecoverStateSummary(t *testing.T) {
} }
r := &Service{ r := &Service{
cfg: &config{ cfg: &config{
beaconDB: db, beaconDB: db,
p2p: p, p2p: p,
initialSync: &mockSync.Sync{IsSyncing: false}, initialSync: &mockSync.Sync{IsSyncing: false},
chain: chainService, chain: chainService,
clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot), clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot),
blockNotifier: chainService.BlockNotifier(), blockNotifier: chainService.BlockNotifier(),
stateGen: stateGen, operationNotifier: chainService.OperationNotifier(),
stateGen: stateGen,
}, },
seenBlockCache: lruwrpr.New(10), seenBlockCache: lruwrpr.New(10),
badBlockCache: lruwrpr.New(10), badBlockCache: lruwrpr.New(10),
slotToPendingBlocks: gcache.New(time.Second, 2*time.Second), slotToPendingBlocks: gcache.New(time.Second, 2*time.Second),
seenPendingBlocks: make(map[[32]byte]bool), seenPendingBlocks: make(map[[32]byte]bool),
} }
opChannel := make(chan *feed.Event, 1)
opSub := r.cfg.operationNotifier.OperationFeed().Subscribe(opChannel)
defer opSub.Unsubscribe()
buf := new(bytes.Buffer) buf := new(bytes.Buffer)
_, err = p.Encoding().EncodeGossip(buf, msg) _, err = p.Encoding().EncodeGossip(buf, msg)
require.NoError(t, err) require.NoError(t, err)
@@ -291,6 +331,17 @@ func TestValidateBeaconBlockPubSub_CanRecoverStateSummary(t *testing.T) {
result := res == pubsub.ValidationAccept result := res == pubsub.ValidationAccept
assert.Equal(t, true, result) assert.Equal(t, true, result)
assert.NotNil(t, m.ValidatorData, "Decoded message was not set on the message validator data") assert.NotNil(t, m.ValidatorData, "Decoded message was not set on the message validator data")
blockGossipFound := false
select {
case event := <-opChannel:
if event.Type == opfeed.BlockGossipReceived {
blockGossipFound = true
}
default:
// this case is needed, otherwise the test will never finish
}
assert.Equal(t, true, blockGossipFound, "BlockGossipReceived event should be sent")
} }
func TestValidateBeaconBlockPubSub_IsInCache(t *testing.T) { func TestValidateBeaconBlockPubSub_IsInCache(t *testing.T) {
@@ -326,19 +377,24 @@ func TestValidateBeaconBlockPubSub_IsInCache(t *testing.T) {
} }
r := &Service{ r := &Service{
cfg: &config{ cfg: &config{
beaconDB: db, beaconDB: db,
p2p: p, p2p: p,
initialSync: &mockSync.Sync{IsSyncing: false}, initialSync: &mockSync.Sync{IsSyncing: false},
chain: chainService, chain: chainService,
clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot), clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot),
blockNotifier: chainService.BlockNotifier(), blockNotifier: chainService.BlockNotifier(),
stateGen: stateGen, operationNotifier: chainService.OperationNotifier(),
stateGen: stateGen,
}, },
seenBlockCache: lruwrpr.New(10), seenBlockCache: lruwrpr.New(10),
badBlockCache: lruwrpr.New(10), badBlockCache: lruwrpr.New(10),
slotToPendingBlocks: gcache.New(time.Second, 2*time.Second), slotToPendingBlocks: gcache.New(time.Second, 2*time.Second),
seenPendingBlocks: make(map[[32]byte]bool), seenPendingBlocks: make(map[[32]byte]bool),
} }
opChannel := make(chan *feed.Event, 1)
opSub := r.cfg.operationNotifier.OperationFeed().Subscribe(opChannel)
defer opSub.Unsubscribe()
buf := new(bytes.Buffer) buf := new(bytes.Buffer)
_, err = p.Encoding().EncodeGossip(buf, msg) _, err = p.Encoding().EncodeGossip(buf, msg)
require.NoError(t, err) require.NoError(t, err)
@@ -357,6 +413,17 @@ func TestValidateBeaconBlockPubSub_IsInCache(t *testing.T) {
result := res == pubsub.ValidationAccept result := res == pubsub.ValidationAccept
assert.Equal(t, true, result) assert.Equal(t, true, result)
assert.NotNil(t, m.ValidatorData, "Decoded message was not set on the message validator data") assert.NotNil(t, m.ValidatorData, "Decoded message was not set on the message validator data")
blockGossipFound := false
select {
case event := <-opChannel:
if event.Type == opfeed.BlockGossipReceived {
blockGossipFound = true
}
default:
// this case is needed, otherwise the test will never finish
}
assert.Equal(t, true, blockGossipFound, "BlockGossipReceived event should be sent")
} }
func TestValidateBeaconBlockPubSub_ValidProposerSignature(t *testing.T) { func TestValidateBeaconBlockPubSub_ValidProposerSignature(t *testing.T) {
@@ -392,19 +459,24 @@ func TestValidateBeaconBlockPubSub_ValidProposerSignature(t *testing.T) {
} }
r := &Service{ r := &Service{
cfg: &config{ cfg: &config{
beaconDB: db, beaconDB: db,
p2p: p, p2p: p,
initialSync: &mockSync.Sync{IsSyncing: false}, initialSync: &mockSync.Sync{IsSyncing: false},
chain: chainService, chain: chainService,
clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot), clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot),
blockNotifier: chainService.BlockNotifier(), blockNotifier: chainService.BlockNotifier(),
stateGen: stateGen, operationNotifier: chainService.OperationNotifier(),
stateGen: stateGen,
}, },
seenBlockCache: lruwrpr.New(10), seenBlockCache: lruwrpr.New(10),
badBlockCache: lruwrpr.New(10), badBlockCache: lruwrpr.New(10),
slotToPendingBlocks: gcache.New(time.Second, 2*time.Second), slotToPendingBlocks: gcache.New(time.Second, 2*time.Second),
seenPendingBlocks: make(map[[32]byte]bool), seenPendingBlocks: make(map[[32]byte]bool),
} }
opChannel := make(chan *feed.Event, 1)
opSub := r.cfg.operationNotifier.OperationFeed().Subscribe(opChannel)
defer opSub.Unsubscribe()
buf := new(bytes.Buffer) buf := new(bytes.Buffer)
_, err = p.Encoding().EncodeGossip(buf, msg) _, err = p.Encoding().EncodeGossip(buf, msg)
require.NoError(t, err) require.NoError(t, err)
@@ -423,6 +495,17 @@ func TestValidateBeaconBlockPubSub_ValidProposerSignature(t *testing.T) {
result := res == pubsub.ValidationAccept result := res == pubsub.ValidationAccept
assert.Equal(t, true, result) assert.Equal(t, true, result)
assert.NotNil(t, m.ValidatorData, "Decoded message was not set on the message validator data") assert.NotNil(t, m.ValidatorData, "Decoded message was not set on the message validator data")
blockGossipFound := false
select {
case event := <-opChannel:
if event.Type == opfeed.BlockGossipReceived {
blockGossipFound = true
}
default:
// this case is needed, otherwise the test will never finish
}
assert.Equal(t, true, blockGossipFound, "BlockGossipReceived event should be sent")
} }
func TestValidateBeaconBlockPubSub_WithLookahead(t *testing.T) { func TestValidateBeaconBlockPubSub_WithLookahead(t *testing.T) {
@@ -460,13 +543,14 @@ func TestValidateBeaconBlockPubSub_WithLookahead(t *testing.T) {
}} }}
r := &Service{ r := &Service{
cfg: &config{ cfg: &config{
beaconDB: db, beaconDB: db,
p2p: p, p2p: p,
initialSync: &mockSync.Sync{IsSyncing: false}, initialSync: &mockSync.Sync{IsSyncing: false},
chain: chainService, chain: chainService,
clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot), clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot),
blockNotifier: chainService.BlockNotifier(), blockNotifier: chainService.BlockNotifier(),
stateGen: stateGen, operationNotifier: chainService.OperationNotifier(),
stateGen: stateGen,
}, },
seenBlockCache: lruwrpr.New(10), seenBlockCache: lruwrpr.New(10),
badBlockCache: lruwrpr.New(10), badBlockCache: lruwrpr.New(10),
@@ -474,6 +558,10 @@ func TestValidateBeaconBlockPubSub_WithLookahead(t *testing.T) {
seenPendingBlocks: make(map[[32]byte]bool), seenPendingBlocks: make(map[[32]byte]bool),
subHandler: newSubTopicHandler(), subHandler: newSubTopicHandler(),
} }
opChannel := make(chan *feed.Event, 1)
opSub := r.cfg.operationNotifier.OperationFeed().Subscribe(opChannel)
defer opSub.Unsubscribe()
buf := new(bytes.Buffer) buf := new(bytes.Buffer)
_, err = p.Encoding().EncodeGossip(buf, msg) _, err = p.Encoding().EncodeGossip(buf, msg)
require.NoError(t, err) require.NoError(t, err)
@@ -492,6 +580,17 @@ func TestValidateBeaconBlockPubSub_WithLookahead(t *testing.T) {
result := res == pubsub.ValidationAccept result := res == pubsub.ValidationAccept
assert.Equal(t, true, result) assert.Equal(t, true, result)
assert.NotNil(t, m.ValidatorData, "Decoded message was not set on the message validator data") assert.NotNil(t, m.ValidatorData, "Decoded message was not set on the message validator data")
blockGossipFound := false
select {
case event := <-opChannel:
if event.Type == opfeed.BlockGossipReceived {
blockGossipFound = true
}
default:
// this case is needed, otherwise the test will never finish
}
assert.Equal(t, true, blockGossipFound, "BlockGossipReceived event should be sent")
} }
func TestValidateBeaconBlockPubSub_AdvanceEpochsForState(t *testing.T) { func TestValidateBeaconBlockPubSub_AdvanceEpochsForState(t *testing.T) {
@@ -529,19 +628,24 @@ func TestValidateBeaconBlockPubSub_AdvanceEpochsForState(t *testing.T) {
}} }}
r := &Service{ r := &Service{
cfg: &config{ cfg: &config{
beaconDB: db, beaconDB: db,
p2p: p, p2p: p,
initialSync: &mockSync.Sync{IsSyncing: false}, initialSync: &mockSync.Sync{IsSyncing: false},
chain: chainService, chain: chainService,
clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot), clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot),
blockNotifier: chainService.BlockNotifier(), blockNotifier: chainService.BlockNotifier(),
stateGen: stateGen, operationNotifier: chainService.OperationNotifier(),
stateGen: stateGen,
}, },
seenBlockCache: lruwrpr.New(10), seenBlockCache: lruwrpr.New(10),
badBlockCache: lruwrpr.New(10), badBlockCache: lruwrpr.New(10),
slotToPendingBlocks: gcache.New(time.Second, 2*time.Second), slotToPendingBlocks: gcache.New(time.Second, 2*time.Second),
seenPendingBlocks: make(map[[32]byte]bool), seenPendingBlocks: make(map[[32]byte]bool),
} }
opChannel := make(chan *feed.Event, 1)
opSub := r.cfg.operationNotifier.OperationFeed().Subscribe(opChannel)
defer opSub.Unsubscribe()
buf := new(bytes.Buffer) buf := new(bytes.Buffer)
_, err = p.Encoding().EncodeGossip(buf, msg) _, err = p.Encoding().EncodeGossip(buf, msg)
require.NoError(t, err) require.NoError(t, err)
@@ -560,6 +664,17 @@ func TestValidateBeaconBlockPubSub_AdvanceEpochsForState(t *testing.T) {
result := res == pubsub.ValidationAccept result := res == pubsub.ValidationAccept
assert.Equal(t, true, result) assert.Equal(t, true, result)
assert.NotNil(t, m.ValidatorData, "Decoded message was not set on the message validator data") assert.NotNil(t, m.ValidatorData, "Decoded message was not set on the message validator data")
blockGossipFound := false
select {
case event := <-opChannel:
if event.Type == opfeed.BlockGossipReceived {
blockGossipFound = true
}
default:
// this case is needed, otherwise the test will never finish
}
assert.Equal(t, true, blockGossipFound, "BlockGossipReceived event should be sent")
} }
func TestValidateBeaconBlockPubSub_Syncing(t *testing.T) { func TestValidateBeaconBlockPubSub_Syncing(t *testing.T) {
@@ -580,13 +695,17 @@ func TestValidateBeaconBlockPubSub_Syncing(t *testing.T) {
}} }}
r := &Service{ r := &Service{
cfg: &config{ cfg: &config{
beaconDB: db, beaconDB: db,
p2p: p, p2p: p,
initialSync: &mockSync.Sync{IsSyncing: true}, initialSync: &mockSync.Sync{IsSyncing: true},
chain: chainService, chain: chainService,
blockNotifier: chainService.BlockNotifier(), blockNotifier: chainService.BlockNotifier(),
operationNotifier: chainService.OperationNotifier(),
}, },
} }
opChannel := make(chan *feed.Event, 1)
opSub := r.cfg.operationNotifier.OperationFeed().Subscribe(opChannel)
defer opSub.Unsubscribe()
buf := new(bytes.Buffer) buf := new(bytes.Buffer)
_, err = p.Encoding().EncodeGossip(buf, msg) _, err = p.Encoding().EncodeGossip(buf, msg)
@@ -601,6 +720,13 @@ func TestValidateBeaconBlockPubSub_Syncing(t *testing.T) {
res, err := r.validateBeaconBlockPubSub(ctx, "", m) res, err := r.validateBeaconBlockPubSub(ctx, "", m)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, res, pubsub.ValidationIgnore, "block is ignored until fully synced") assert.Equal(t, res, pubsub.ValidationIgnore, "block is ignored until fully synced")
select {
case event := <-opChannel:
assert.NotEqual(t, opfeed.BlockGossipReceived, event.Type, "BlockGossipReceived event should not be sent")
default:
// this case is needed, otherwise the test will never finish
}
} }
func TestValidateBeaconBlockPubSub_IgnoreAndQueueBlocksFromNearFuture(t *testing.T) { func TestValidateBeaconBlockPubSub_IgnoreAndQueueBlocksFromNearFuture(t *testing.T) {
@@ -636,13 +762,14 @@ func TestValidateBeaconBlockPubSub_IgnoreAndQueueBlocksFromNearFuture(t *testing
State: beaconState} State: beaconState}
r := &Service{ r := &Service{
cfg: &config{ cfg: &config{
p2p: p, p2p: p,
beaconDB: db, beaconDB: db,
initialSync: &mockSync.Sync{IsSyncing: false}, initialSync: &mockSync.Sync{IsSyncing: false},
chain: chainService, chain: chainService,
clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot), clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot),
blockNotifier: chainService.BlockNotifier(), blockNotifier: chainService.BlockNotifier(),
stateGen: stateGen, operationNotifier: chainService.OperationNotifier(),
stateGen: stateGen,
}, },
chainStarted: abool.New(), chainStarted: abool.New(),
seenBlockCache: lruwrpr.New(10), seenBlockCache: lruwrpr.New(10),
@@ -650,6 +777,9 @@ func TestValidateBeaconBlockPubSub_IgnoreAndQueueBlocksFromNearFuture(t *testing
slotToPendingBlocks: gcache.New(time.Second, 2*time.Second), slotToPendingBlocks: gcache.New(time.Second, 2*time.Second),
seenPendingBlocks: make(map[[32]byte]bool), seenPendingBlocks: make(map[[32]byte]bool),
} }
opChannel := make(chan *feed.Event, 1)
opSub := r.cfg.operationNotifier.OperationFeed().Subscribe(opChannel)
defer opSub.Unsubscribe()
buf := new(bytes.Buffer) buf := new(bytes.Buffer)
_, err = p.Encoding().EncodeGossip(buf, msg) _, err = p.Encoding().EncodeGossip(buf, msg)
@@ -670,6 +800,13 @@ func TestValidateBeaconBlockPubSub_IgnoreAndQueueBlocksFromNearFuture(t *testing
// check if the block is inserted in the Queue // check if the block is inserted in the Queue
assert.Equal(t, true, len(r.pendingBlocksInCache(msg.Block.Slot)) == 1) assert.Equal(t, true, len(r.pendingBlocksInCache(msg.Block.Slot)) == 1)
select {
case event := <-opChannel:
assert.NotEqual(t, opfeed.BlockGossipReceived, event.Type, "BlockGossipReceived event should not be sent")
default:
// this case is needed, otherwise the test will never finish
}
} }
func TestValidateBeaconBlockPubSub_RejectBlocksFromFuture(t *testing.T) { func TestValidateBeaconBlockPubSub_RejectBlocksFromFuture(t *testing.T) {
@@ -688,12 +825,13 @@ func TestValidateBeaconBlockPubSub_RejectBlocksFromFuture(t *testing.T) {
chainService := &mock.ChainService{Genesis: time.Now()} chainService := &mock.ChainService{Genesis: time.Now()}
r := &Service{ r := &Service{
cfg: &config{ cfg: &config{
p2p: p, p2p: p,
beaconDB: db, beaconDB: db,
initialSync: &mockSync.Sync{IsSyncing: false}, initialSync: &mockSync.Sync{IsSyncing: false},
chain: chainService, chain: chainService,
clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot), clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot),
blockNotifier: chainService.BlockNotifier(), blockNotifier: chainService.BlockNotifier(),
operationNotifier: chainService.OperationNotifier(),
}, },
chainStarted: abool.New(), chainStarted: abool.New(),
seenBlockCache: lruwrpr.New(10), seenBlockCache: lruwrpr.New(10),
@@ -701,6 +839,9 @@ func TestValidateBeaconBlockPubSub_RejectBlocksFromFuture(t *testing.T) {
slotToPendingBlocks: gcache.New(time.Second, 2*time.Second), slotToPendingBlocks: gcache.New(time.Second, 2*time.Second),
seenPendingBlocks: make(map[[32]byte]bool), seenPendingBlocks: make(map[[32]byte]bool),
} }
opChannel := make(chan *feed.Event, 1)
opSub := r.cfg.operationNotifier.OperationFeed().Subscribe(opChannel)
defer opSub.Unsubscribe()
buf := new(bytes.Buffer) buf := new(bytes.Buffer)
_, err = p.Encoding().EncodeGossip(buf, msg) _, err = p.Encoding().EncodeGossip(buf, msg)
@@ -718,6 +859,13 @@ func TestValidateBeaconBlockPubSub_RejectBlocksFromFuture(t *testing.T) {
res, err := r.validateBeaconBlockPubSub(ctx, "", m) res, err := r.validateBeaconBlockPubSub(ctx, "", m)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, res, pubsub.ValidationIgnore, "block from the future should be ignored") assert.Equal(t, res, pubsub.ValidationIgnore, "block from the future should be ignored")
select {
case event := <-opChannel:
assert.NotEqual(t, opfeed.BlockGossipReceived, event.Type, "BlockGossipReceived event should not be sent")
default:
// this case is needed, otherwise the test will never finish
}
} }
func TestValidateBeaconBlockPubSub_RejectBlocksFromThePast(t *testing.T) { func TestValidateBeaconBlockPubSub_RejectBlocksFromThePast(t *testing.T) {
@@ -742,16 +890,20 @@ func TestValidateBeaconBlockPubSub_RejectBlocksFromThePast(t *testing.T) {
} }
r := &Service{ r := &Service{
cfg: &config{ cfg: &config{
beaconDB: db, beaconDB: db,
p2p: p, p2p: p,
initialSync: &mockSync.Sync{IsSyncing: false}, initialSync: &mockSync.Sync{IsSyncing: false},
chain: chainService, chain: chainService,
clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot), clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot),
blockNotifier: chainService.BlockNotifier(), blockNotifier: chainService.BlockNotifier(),
operationNotifier: chainService.OperationNotifier(),
}, },
seenBlockCache: lruwrpr.New(10), seenBlockCache: lruwrpr.New(10),
badBlockCache: lruwrpr.New(10), badBlockCache: lruwrpr.New(10),
} }
opChannel := make(chan *feed.Event, 1)
opSub := r.cfg.operationNotifier.OperationFeed().Subscribe(opChannel)
defer opSub.Unsubscribe()
buf := new(bytes.Buffer) buf := new(bytes.Buffer)
_, err = p.Encoding().EncodeGossip(buf, msg) _, err = p.Encoding().EncodeGossip(buf, msg)
@@ -769,6 +921,13 @@ func TestValidateBeaconBlockPubSub_RejectBlocksFromThePast(t *testing.T) {
res, err := r.validateBeaconBlockPubSub(ctx, "", m) res, err := r.validateBeaconBlockPubSub(ctx, "", m)
require.ErrorContains(t, "greater or equal to block slot", err) require.ErrorContains(t, "greater or equal to block slot", err)
assert.Equal(t, res, pubsub.ValidationIgnore, "block from the past should be ignored") assert.Equal(t, res, pubsub.ValidationIgnore, "block from the past should be ignored")
select {
case event := <-opChannel:
assert.NotEqual(t, opfeed.BlockGossipReceived, event.Type, "BlockGossipReceived event should not be sent")
default:
// this case is needed, otherwise the test will never finish
}
} }
func TestValidateBeaconBlockPubSub_SeenProposerSlot(t *testing.T) { func TestValidateBeaconBlockPubSub_SeenProposerSlot(t *testing.T) {
@@ -813,19 +972,23 @@ func TestValidateBeaconBlockPubSub_SeenProposerSlot(t *testing.T) {
} }
r := &Service{ r := &Service{
cfg: &config{ cfg: &config{
beaconDB: db, beaconDB: db,
p2p: p, p2p: p,
initialSync: &mockSync.Sync{IsSyncing: false}, initialSync: &mockSync.Sync{IsSyncing: false},
chain: chainService, chain: chainService,
clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot), clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot),
blockNotifier: chainService.BlockNotifier(), blockNotifier: chainService.BlockNotifier(),
slashingPool: slashingPool, operationNotifier: chainService.OperationNotifier(),
slashingPool: slashingPool,
}, },
seenBlockCache: lruwrpr.New(10), seenBlockCache: lruwrpr.New(10),
badBlockCache: lruwrpr.New(10), badBlockCache: lruwrpr.New(10),
slotToPendingBlocks: gcache.New(time.Second, 2*time.Second), slotToPendingBlocks: gcache.New(time.Second, 2*time.Second),
seenPendingBlocks: make(map[[32]byte]bool), seenPendingBlocks: make(map[[32]byte]bool),
} }
opChannel := make(chan *feed.Event, 1)
opSub := r.cfg.operationNotifier.OperationFeed().Subscribe(opChannel)
defer opSub.Unsubscribe()
// Mark the proposer/slot as seen // Mark the proposer/slot as seen
r.setSeenBlockIndexSlot(msg.Block.Slot, msg.Block.ProposerIndex) r.setSeenBlockIndexSlot(msg.Block.Slot, msg.Block.ProposerIndex)
@@ -853,6 +1016,13 @@ func TestValidateBeaconBlockPubSub_SeenProposerSlot(t *testing.T) {
// Verify no slashings were created // Verify no slashings were created
assert.Equal(t, 0, len(slashingPool.PendingPropSlashings), "Expected no slashings for same signature") assert.Equal(t, 0, len(slashingPool.PendingPropSlashings), "Expected no slashings for same signature")
select {
case event := <-opChannel:
assert.NotEqual(t, opfeed.BlockGossipReceived, event.Type, "BlockGossipReceived event should not be sent")
default:
// this case is needed, otherwise the test will never finish
}
} }
func TestValidateBeaconBlockPubSub_FilterByFinalizedEpoch(t *testing.T) { func TestValidateBeaconBlockPubSub_FilterByFinalizedEpoch(t *testing.T) {
@@ -875,17 +1045,21 @@ func TestValidateBeaconBlockPubSub_FilterByFinalizedEpoch(t *testing.T) {
r := &Service{ r := &Service{
cfg: &config{ cfg: &config{
beaconDB: db, beaconDB: db,
p2p: p, p2p: p,
chain: chain, chain: chain,
clock: startup.NewClock(chain.Genesis, chain.ValidatorsRoot), clock: startup.NewClock(chain.Genesis, chain.ValidatorsRoot),
blockNotifier: chain.BlockNotifier(), blockNotifier: chain.BlockNotifier(),
attPool: attestations.NewPool(), operationNotifier: chain.OperationNotifier(),
initialSync: &mockSync.Sync{IsSyncing: false}, attPool: attestations.NewPool(),
initialSync: &mockSync.Sync{IsSyncing: false},
}, },
seenBlockCache: lruwrpr.New(10), seenBlockCache: lruwrpr.New(10),
badBlockCache: lruwrpr.New(10), badBlockCache: lruwrpr.New(10),
} }
opChannel := make(chan *feed.Event, 1)
opSub := r.cfg.operationNotifier.OperationFeed().Subscribe(opChannel)
defer opSub.Unsubscribe()
b := util.NewBeaconBlock() b := util.NewBeaconBlock()
b.Block.Slot = 1 b.Block.Slot = 1
@@ -922,6 +1096,13 @@ func TestValidateBeaconBlockPubSub_FilterByFinalizedEpoch(t *testing.T) {
res, err = r.validateBeaconBlockPubSub(t.Context(), "", m) res, err = r.validateBeaconBlockPubSub(t.Context(), "", m)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, pubsub.ValidationIgnore, res) assert.Equal(t, pubsub.ValidationIgnore, res)
select {
case event := <-opChannel:
assert.NotEqual(t, opfeed.BlockGossipReceived, event.Type, "BlockGossipReceived event should not be sent")
default:
// this case is needed, otherwise the test will never finish
}
} }
func TestValidateBeaconBlockPubSub_ParentNotFinalizedDescendant(t *testing.T) { func TestValidateBeaconBlockPubSub_ParentNotFinalizedDescendant(t *testing.T) {
@@ -960,19 +1141,24 @@ func TestValidateBeaconBlockPubSub_ParentNotFinalizedDescendant(t *testing.T) {
} }
r := &Service{ r := &Service{
cfg: &config{ cfg: &config{
beaconDB: db, beaconDB: db,
p2p: p, p2p: p,
initialSync: &mockSync.Sync{IsSyncing: false}, initialSync: &mockSync.Sync{IsSyncing: false},
chain: chainService, chain: chainService,
clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot), clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot),
blockNotifier: chainService.BlockNotifier(), blockNotifier: chainService.BlockNotifier(),
stateGen: stateGen, operationNotifier: chainService.OperationNotifier(),
stateGen: stateGen,
}, },
seenBlockCache: lruwrpr.New(10), seenBlockCache: lruwrpr.New(10),
badBlockCache: lruwrpr.New(10), badBlockCache: lruwrpr.New(10),
slotToPendingBlocks: gcache.New(time.Second, 2*time.Second), slotToPendingBlocks: gcache.New(time.Second, 2*time.Second),
seenPendingBlocks: make(map[[32]byte]bool), seenPendingBlocks: make(map[[32]byte]bool),
} }
opChannel := make(chan *feed.Event, 1)
opSub := r.cfg.operationNotifier.OperationFeed().Subscribe(opChannel)
defer opSub.Unsubscribe()
buf := new(bytes.Buffer) buf := new(bytes.Buffer)
_, err = p.Encoding().EncodeGossip(buf, msg) _, err = p.Encoding().EncodeGossip(buf, msg)
require.NoError(t, err) require.NoError(t, err)
@@ -989,6 +1175,13 @@ func TestValidateBeaconBlockPubSub_ParentNotFinalizedDescendant(t *testing.T) {
res, err := r.validateBeaconBlockPubSub(ctx, "", m) res, err := r.validateBeaconBlockPubSub(ctx, "", m)
assert.Equal(t, pubsub.ValidationReject, res, "Wrong validation result returned") assert.Equal(t, pubsub.ValidationReject, res, "Wrong validation result returned")
require.ErrorContains(t, "not descendant of finalized checkpoint", err) require.ErrorContains(t, "not descendant of finalized checkpoint", err)
select {
case event := <-opChannel:
assert.NotEqual(t, opfeed.BlockGossipReceived, event.Type, "BlockGossipReceived event should not be sent")
default:
// this case is needed, otherwise the test will never finish
}
} }
func TestValidateBeaconBlockPubSub_InvalidParentBlock(t *testing.T) { func TestValidateBeaconBlockPubSub_InvalidParentBlock(t *testing.T) {
@@ -1026,19 +1219,24 @@ func TestValidateBeaconBlockPubSub_InvalidParentBlock(t *testing.T) {
}} }}
r := &Service{ r := &Service{
cfg: &config{ cfg: &config{
beaconDB: db, beaconDB: db,
p2p: p, p2p: p,
initialSync: &mockSync.Sync{IsSyncing: false}, initialSync: &mockSync.Sync{IsSyncing: false},
chain: chainService, chain: chainService,
clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot), clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot),
blockNotifier: chainService.BlockNotifier(), blockNotifier: chainService.BlockNotifier(),
stateGen: stateGen, operationNotifier: chainService.OperationNotifier(),
stateGen: stateGen,
}, },
seenBlockCache: lruwrpr.New(10), seenBlockCache: lruwrpr.New(10),
badBlockCache: lruwrpr.New(10), badBlockCache: lruwrpr.New(10),
slotToPendingBlocks: gcache.New(time.Second, 2*time.Second), slotToPendingBlocks: gcache.New(time.Second, 2*time.Second),
seenPendingBlocks: make(map[[32]byte]bool), seenPendingBlocks: make(map[[32]byte]bool),
} }
opChannel := make(chan *feed.Event, 1)
opSub := r.cfg.operationNotifier.OperationFeed().Subscribe(opChannel)
defer opSub.Unsubscribe()
buf := new(bytes.Buffer) buf := new(bytes.Buffer)
_, err = p.Encoding().EncodeGossip(buf, msg) _, err = p.Encoding().EncodeGossip(buf, msg)
require.NoError(t, err) require.NoError(t, err)
@@ -1088,6 +1286,13 @@ func TestValidateBeaconBlockPubSub_InvalidParentBlock(t *testing.T) {
require.ErrorContains(t, "has an invalid parent", err) require.ErrorContains(t, "has an invalid parent", err)
// Expect block with bad parent to fail too // Expect block with bad parent to fail too
assert.Equal(t, res, pubsub.ValidationReject, "block with invalid parent should be ignored") assert.Equal(t, res, pubsub.ValidationReject, "block with invalid parent should be ignored")
select {
case event := <-opChannel:
assert.NotEqual(t, opfeed.BlockGossipReceived, event.Type, "BlockGossipReceived event should not be sent")
default:
// this case is needed, otherwise the test will never finish
}
} }
func TestValidateBeaconBlockPubSub_InsertValidPendingBlock(t *testing.T) { func TestValidateBeaconBlockPubSub_InsertValidPendingBlock(t *testing.T) {
@@ -1120,19 +1325,24 @@ func TestValidateBeaconBlockPubSub_InsertValidPendingBlock(t *testing.T) {
}} }}
r := &Service{ r := &Service{
cfg: &config{ cfg: &config{
beaconDB: db, beaconDB: db,
p2p: p, p2p: p,
initialSync: &mockSync.Sync{IsSyncing: false}, initialSync: &mockSync.Sync{IsSyncing: false},
chain: chainService, chain: chainService,
clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot), clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot),
blockNotifier: chainService.BlockNotifier(), blockNotifier: chainService.BlockNotifier(),
stateGen: stateGen, operationNotifier: chainService.OperationNotifier(),
stateGen: stateGen,
}, },
seenBlockCache: lruwrpr.New(10), seenBlockCache: lruwrpr.New(10),
badBlockCache: lruwrpr.New(10), badBlockCache: lruwrpr.New(10),
slotToPendingBlocks: gcache.New(time.Second, 2*time.Second), slotToPendingBlocks: gcache.New(time.Second, 2*time.Second),
seenPendingBlocks: make(map[[32]byte]bool), seenPendingBlocks: make(map[[32]byte]bool),
} }
opChannel := make(chan *feed.Event, 1)
opSub := r.cfg.operationNotifier.OperationFeed().Subscribe(opChannel)
defer opSub.Unsubscribe()
buf := new(bytes.Buffer) buf := new(bytes.Buffer)
_, err = p.Encoding().EncodeGossip(buf, msg) _, err = p.Encoding().EncodeGossip(buf, msg)
require.NoError(t, err) require.NoError(t, err)
@@ -1152,6 +1362,13 @@ func TestValidateBeaconBlockPubSub_InsertValidPendingBlock(t *testing.T) {
bRoot, err = msg.Block.HashTreeRoot() bRoot, err = msg.Block.HashTreeRoot()
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, true, r.seenPendingBlocks[bRoot]) assert.Equal(t, true, r.seenPendingBlocks[bRoot])
select {
case event := <-opChannel:
assert.NotEqual(t, opfeed.BlockGossipReceived, event.Type, "BlockGossipReceived event should not be sent")
default:
// this case is needed, otherwise the test will never finish
}
} }
func TestValidateBeaconBlockPubSub_RejectBlocksFromBadParent(t *testing.T) { func TestValidateBeaconBlockPubSub_RejectBlocksFromBadParent(t *testing.T) {
@@ -1203,19 +1420,24 @@ func TestValidateBeaconBlockPubSub_RejectBlocksFromBadParent(t *testing.T) {
} }
r := &Service{ r := &Service{
cfg: &config{ cfg: &config{
beaconDB: db, beaconDB: db,
p2p: p, p2p: p,
initialSync: &mockSync.Sync{IsSyncing: false}, initialSync: &mockSync.Sync{IsSyncing: false},
chain: chainService, chain: chainService,
clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot), clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot),
blockNotifier: chainService.BlockNotifier(), blockNotifier: chainService.BlockNotifier(),
stateGen: stateGen, operationNotifier: chainService.OperationNotifier(),
stateGen: stateGen,
}, },
seenBlockCache: lruwrpr.New(10), seenBlockCache: lruwrpr.New(10),
badBlockCache: lruwrpr.New(10), badBlockCache: lruwrpr.New(10),
slotToPendingBlocks: gcache.New(time.Second, 2*time.Second), slotToPendingBlocks: gcache.New(time.Second, 2*time.Second),
seenPendingBlocks: make(map[[32]byte]bool), seenPendingBlocks: make(map[[32]byte]bool),
} }
opChannel := make(chan *feed.Event, 1)
opSub := r.cfg.operationNotifier.OperationFeed().Subscribe(opChannel)
defer opSub.Unsubscribe()
r.setBadBlock(ctx, bytesutil.ToBytes32(msg.Block.ParentRoot)) r.setBadBlock(ctx, bytesutil.ToBytes32(msg.Block.ParentRoot))
buf := new(bytes.Buffer) buf := new(bytes.Buffer)
@@ -1234,6 +1456,13 @@ func TestValidateBeaconBlockPubSub_RejectBlocksFromBadParent(t *testing.T) {
res, err := r.validateBeaconBlockPubSub(ctx, "", m) res, err := r.validateBeaconBlockPubSub(ctx, "", m)
assert.ErrorContains(t, "invalid parent", err) assert.ErrorContains(t, "invalid parent", err)
assert.Equal(t, res, pubsub.ValidationReject) assert.Equal(t, res, pubsub.ValidationReject)
select {
case event := <-opChannel:
assert.NotEqual(t, opfeed.BlockGossipReceived, event.Type, "BlockGossipReceived event should not be sent")
default:
// this case is needed, otherwise the test will never finish
}
} }
func TestService_setBadBlock_DoesntSetWithContextErr(t *testing.T) { func TestService_setBadBlock_DoesntSetWithContextErr(t *testing.T) {
@@ -1311,17 +1540,21 @@ func TestValidateBeaconBlockPubSub_ValidExecutionPayload(t *testing.T) {
} }
r := &Service{ r := &Service{
cfg: &config{ cfg: &config{
beaconDB: db, beaconDB: db,
p2p: p, p2p: p,
initialSync: &mockSync.Sync{IsSyncing: false}, initialSync: &mockSync.Sync{IsSyncing: false},
chain: chainService, chain: chainService,
blockNotifier: chainService.BlockNotifier(), blockNotifier: chainService.BlockNotifier(),
stateGen: stateGen, operationNotifier: chainService.OperationNotifier(),
clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot), stateGen: stateGen,
clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot),
}, },
seenBlockCache: lruwrpr.New(10), seenBlockCache: lruwrpr.New(10),
badBlockCache: lruwrpr.New(10), badBlockCache: lruwrpr.New(10),
} }
opChannel := make(chan *feed.Event, 1)
opSub := r.cfg.operationNotifier.OperationFeed().Subscribe(opChannel)
defer opSub.Unsubscribe()
buf := new(bytes.Buffer) buf := new(bytes.Buffer)
_, err = p.Encoding().EncodeGossip(buf, msg) _, err = p.Encoding().EncodeGossip(buf, msg)
@@ -1342,6 +1575,17 @@ func TestValidateBeaconBlockPubSub_ValidExecutionPayload(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
result := res == pubsub.ValidationAccept result := res == pubsub.ValidationAccept
require.Equal(t, true, result) require.Equal(t, true, result)
blockGossipFound := false
select {
case event := <-opChannel:
if event.Type == opfeed.BlockGossipReceived {
blockGossipFound = true
}
default:
// this case is needed, otherwise the test will never finish
}
assert.Equal(t, true, blockGossipFound, "BlockGossipReceived event should be sent")
} }
func TestValidateBeaconBlockPubSub_InvalidPayloadTimestamp(t *testing.T) { func TestValidateBeaconBlockPubSub_InvalidPayloadTimestamp(t *testing.T) {
@@ -1383,17 +1627,21 @@ func TestValidateBeaconBlockPubSub_InvalidPayloadTimestamp(t *testing.T) {
}} }}
r := &Service{ r := &Service{
cfg: &config{ cfg: &config{
beaconDB: db, beaconDB: db,
p2p: p, p2p: p,
initialSync: &mockSync.Sync{IsSyncing: false}, initialSync: &mockSync.Sync{IsSyncing: false},
chain: chainService, chain: chainService,
clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot), clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot),
blockNotifier: chainService.BlockNotifier(), blockNotifier: chainService.BlockNotifier(),
stateGen: stateGen, operationNotifier: chainService.OperationNotifier(),
stateGen: stateGen,
}, },
seenBlockCache: lruwrpr.New(10), seenBlockCache: lruwrpr.New(10),
badBlockCache: lruwrpr.New(10), badBlockCache: lruwrpr.New(10),
} }
opChannel := make(chan *feed.Event, 1)
opSub := r.cfg.operationNotifier.OperationFeed().Subscribe(opChannel)
defer opSub.Unsubscribe()
buf := new(bytes.Buffer) buf := new(bytes.Buffer)
_, err = p.Encoding().EncodeGossip(buf, msg) _, err = p.Encoding().EncodeGossip(buf, msg)
@@ -1413,6 +1661,13 @@ func TestValidateBeaconBlockPubSub_InvalidPayloadTimestamp(t *testing.T) {
require.NotNil(t, err) require.NotNil(t, err)
result := res == pubsub.ValidationReject result := res == pubsub.ValidationReject
assert.Equal(t, true, result) assert.Equal(t, true, result)
select {
case event := <-opChannel:
assert.NotEqual(t, opfeed.BlockGossipReceived, event.Type, "BlockGossipReceived event should not be sent")
default:
// this case is needed, otherwise the test will never finish
}
} }
func Test_validateBellatrixBeaconBlock(t *testing.T) { func Test_validateBellatrixBeaconBlock(t *testing.T) {
@@ -1549,17 +1804,21 @@ func Test_validateBeaconBlockProcessingWhenParentIsOptimistic(t *testing.T) {
} }
r := &Service{ r := &Service{
cfg: &config{ cfg: &config{
beaconDB: db, beaconDB: db,
p2p: p, p2p: p,
initialSync: &mockSync.Sync{IsSyncing: false}, initialSync: &mockSync.Sync{IsSyncing: false},
chain: chainService, chain: chainService,
blockNotifier: chainService.BlockNotifier(), blockNotifier: chainService.BlockNotifier(),
stateGen: stateGen, operationNotifier: chainService.OperationNotifier(),
clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot), stateGen: stateGen,
clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot),
}, },
seenBlockCache: lruwrpr.New(10), seenBlockCache: lruwrpr.New(10),
badBlockCache: lruwrpr.New(10), badBlockCache: lruwrpr.New(10),
} }
opChannel := make(chan *feed.Event, 1)
opSub := r.cfg.operationNotifier.OperationFeed().Subscribe(opChannel)
defer opSub.Unsubscribe()
buf := new(bytes.Buffer) buf := new(bytes.Buffer)
_, err = p.Encoding().EncodeGossip(buf, msg) _, err = p.Encoding().EncodeGossip(buf, msg)
@@ -1580,6 +1839,17 @@ func Test_validateBeaconBlockProcessingWhenParentIsOptimistic(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
result := res == pubsub.ValidationAccept result := res == pubsub.ValidationAccept
assert.Equal(t, true, result) assert.Equal(t, true, result)
blockGossipFound := false
select {
case event := <-opChannel:
if event.Type == opfeed.BlockGossipReceived {
blockGossipFound = true
}
default:
// this case is needed, otherwise the test will never finish
}
assert.Equal(t, true, blockGossipFound, "BlockGossipReceived event should be sent")
} }
func Test_getBlockFields(t *testing.T) { func Test_getBlockFields(t *testing.T) {

View File

@@ -0,0 +1,3 @@
### Fixed
- Move `BlockGossipReceived` event to the end of gossip validation.