feat(event-stream): Support block_gossip topic (#15038)

* feat(event-stream): Add block_gossip topic support

* feat(event-stream): Add block_gossip topic support

* feat(event-stream): Add block_gossip topic support

* feat: add block gossip topic support to beacon api event stream

* fix: sync_fuzz_test panic

* fix: check for nil operationNotifier before sending block gossip

The operationNotifier was not being checked for nil before being used,
which could lead to a panic if it was not initialized. This commit adds
a nil check to prevent the panic.

---------

Co-authored-by: james-prysm <90280386+james-prysm@users.noreply.github.com>
This commit is contained in:
Sam Calder-Mason
2025-03-19 05:27:27 +10:00
committed by GitHub
parent 0f39857653
commit 179cedd4a0
8 changed files with 80 additions and 16 deletions

View File

@@ -20,6 +20,11 @@ type BlockEvent struct {
ExecutionOptimistic bool `json:"execution_optimistic"`
}
type BlockGossipEvent struct {
Slot string `json:"slot"`
Block string `json:"block"`
}
type AggregatedAttEventSource struct {
Aggregate *Attestation `json:"aggregate"`
}

View File

@@ -11,6 +11,7 @@ go_library(
deps = [
"//async/event:go_default_library",
"//consensus-types/blocks:go_default_library",
"//consensus-types/interfaces:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",
],
)

View File

@@ -3,6 +3,7 @@ package operation
import (
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v5/consensus-types/interfaces"
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
)
@@ -35,6 +36,9 @@ const (
// SingleAttReceived is sent after a single attestation object is received from gossip or rpc
SingleAttReceived = 9
// BlockGossipReceived is sent after a block has been received from gossip or API that passes validation rules.
BlockGossipReceived = 10
)
// UnAggregatedAttReceivedData is the data sent with UnaggregatedAttReceived events.
@@ -85,3 +89,9 @@ type AttesterSlashingReceivedData struct {
type SingleAttReceivedData struct {
Attestation ethpb.Att
}
// BlockGossipReceivedData is the data sent with BlockGossipReceived events.
type BlockGossipReceivedData struct {
// SignedBlock is the block that was received.
SignedBlock interfaces.ReadOnlySignedBeaconBlock
}

View File

@@ -45,6 +45,8 @@ const (
HeadTopic = "head"
// BlockTopic represents a new produced block event topic.
BlockTopic = "block"
// BlockGossipTopic represents a block received from gossip or API that passes validation rules.
BlockGossipTopic = "block_gossip"
// AttestationTopic represents a new submitted attestation event topic.
AttestationTopic = "attestation"
// SingleAttestationTopic represents a new submitted single attestation event topic.
@@ -103,6 +105,7 @@ var opsFeedEventTopics = map[feed.EventType]string{
operation.BlobSidecarReceived: BlobSidecarTopic,
operation.AttesterSlashingReceived: AttesterSlashingTopic,
operation.ProposerSlashingReceived: ProposerSlashingTopic,
operation.BlockGossipReceived: BlockGossipTopic,
}
var stateFeedEventTopics = map[feed.EventType]string{
@@ -443,6 +446,8 @@ func topicForEvent(event *feed.Event) string {
return AttesterSlashingTopic
case *operation.ProposerSlashingReceivedData:
return ProposerSlashingTopic
case *operation.BlockGossipReceivedData:
return BlockGossipTopic
case *ethpb.EventHead:
return HeadTopic
case *ethpb.EventFinalizedCheckpoint:
@@ -479,6 +484,18 @@ func (s *Server) lazyReaderForEvent(ctx context.Context, event *feed.Event, topi
return func() io.Reader {
return jsonMarshalReader(eventName, structs.HeadEventFromV1(v))
}, nil
case *operation.BlockGossipReceivedData:
blockRoot, err := v.SignedBlock.Block().HashTreeRoot()
if err != nil {
return nil, errors.Wrap(err, "could not compute block root for BlockGossipReceivedData")
}
return func() io.Reader {
blk := &structs.BlockGossipEvent{
Slot: fmt.Sprintf("%d", v.SignedBlock.Block().Slot()),
Block: hexutil.Encode(blockRoot[:]),
}
return jsonMarshalReader(eventName, blk)
}, nil
case *operation.AggregatedAttReceivedData:
switch att := v.Attestation.AggregateVal().(type) {
case *eth.Attestation:

View File

@@ -119,12 +119,19 @@ func operationEventsFixtures(t *testing.T) (*topicRequest, []*feed.Event) {
BlobSidecarTopic,
AttesterSlashingTopic,
ProposerSlashingTopic,
BlockGossipTopic,
})
require.NoError(t, err)
ro, err := blocks.NewROBlob(util.HydrateBlobSidecar(&eth.BlobSidecar{}))
require.NoError(t, err)
vblob := blocks.NewVerifiedROBlob(ro)
// Create a test block for block gossip event
block := util.NewBeaconBlock()
block.Block.Slot = 123
signedBlock, err := blocks.NewSignedBeaconBlock(block)
require.NoError(t, err)
return topics, []*feed.Event{
{
Type: operation.UnaggregatedAttReceived,
@@ -287,6 +294,12 @@ func operationEventsFixtures(t *testing.T) (*topicRequest, []*feed.Event) {
},
},
},
{
Type: operation.BlockGossipReceived,
Data: &operation.BlockGossipReceivedData{
SignedBlock: signedBlock,
},
},
}
}
@@ -690,7 +703,7 @@ func TestStuckReaderScenarios(t *testing.T) {
func wedgedWriterTestCase(t *testing.T, queueDepth func([]*feed.Event) int) {
topics, events := operationEventsFixtures(t)
require.Equal(t, 10, len(events))
require.Equal(t, 11, len(events))
// set eventFeedDepth to a number lower than the events we intend to send to force the server to drop the reader.
stn := mockChain.NewEventFeedWrapper()

View File

@@ -100,6 +100,7 @@ func FuzzValidateBeaconBlockPubSub_Phase0(f *testing.F) {
}
r.cfg.chain = cService
r.cfg.blockNotifier = cService.BlockNotifier()
r.cfg.operationNotifier = cService.OperationNotifier()
strTop := string(topic)
msg := &pubsub.Message{
Message: &pb.Message{
@@ -146,13 +147,14 @@ func FuzzValidateBeaconBlockPubSub_Altair(f *testing.F) {
}
r := &Service{
cfg: &config{
beaconDB: db,
p2p: p,
initialSync: &mockSync.Sync{IsSyncing: false},
chain: chainService,
blockNotifier: chainService.BlockNotifier(),
stateGen: stateGen,
clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot),
beaconDB: db,
p2p: p,
initialSync: &mockSync.Sync{IsSyncing: false},
chain: chainService,
clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot),
blockNotifier: chainService.BlockNotifier(),
operationNotifier: chainService.OperationNotifier(),
stateGen: stateGen,
},
seenBlockCache: lruwrpr.New(10),
badBlockCache: lruwrpr.New(10),
@@ -182,6 +184,7 @@ func FuzzValidateBeaconBlockPubSub_Altair(f *testing.F) {
}
r.cfg.chain = cService
r.cfg.blockNotifier = cService.BlockNotifier()
r.cfg.operationNotifier = cService.OperationNotifier()
strTop := string(topic)
msg := &pubsub.Message{
Message: &pb.Message{
@@ -228,13 +231,14 @@ func FuzzValidateBeaconBlockPubSub_Bellatrix(f *testing.F) {
}
r := &Service{
cfg: &config{
beaconDB: db,
p2p: p,
initialSync: &mockSync.Sync{IsSyncing: false},
chain: chainService,
clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot),
blockNotifier: chainService.BlockNotifier(),
stateGen: stateGen,
beaconDB: db,
p2p: p,
initialSync: &mockSync.Sync{IsSyncing: false},
chain: chainService,
clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot),
blockNotifier: chainService.BlockNotifier(),
operationNotifier: chainService.OperationNotifier(),
stateGen: stateGen,
},
seenBlockCache: lruwrpr.New(10),
badBlockCache: lruwrpr.New(10),
@@ -264,6 +268,7 @@ func FuzzValidateBeaconBlockPubSub_Bellatrix(f *testing.F) {
}
r.cfg.chain = cService
r.cfg.blockNotifier = cService.BlockNotifier()
r.cfg.operationNotifier = cService.OperationNotifier()
strTop := string(topic)
msg := &pubsub.Message{
Message: &pb.Message{

View File

@@ -12,6 +12,7 @@ import (
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/blocks"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed"
blockfeed "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed/block"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed/operation"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/transition"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/state"
@@ -70,8 +71,17 @@ func (s *Service) validateBeaconBlockPubSub(ctx context.Context, pid peer.ID, ms
return pubsub.ValidationReject, errors.New("block.Block is nil")
}
// Broadcast the block on a feed to notify other services in the beacon node
// Broadcast the block on both block and operation feeds to notify other services in the beacon node
// of a received block (even if it does not process correctly through a state transition).
if s.cfg.operationNotifier != nil {
s.cfg.operationNotifier.OperationFeed().Send(&feed.Event{
Type: operation.BlockGossipReceived,
Data: &operation.BlockGossipReceivedData{
SignedBlock: blk,
},
})
}
s.cfg.blockNotifier.BlockFeed().Send(&feed.Event{
Type: blockfeed.ReceivedBlock,
Data: &blockfeed.ReceivedBlockData{

View File

@@ -0,0 +1,3 @@
### Added
- block_gossip topic support to the beacon api event stream.