From 179cedd4a01325cee252a5d85496fc307736cfed Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Wed, 19 Mar 2025 05:27:27 +1000 Subject: [PATCH] feat(event-stream): Support block_gossip topic (#15038) * feat(event-stream): Add block_gossip topic support * feat(event-stream): Add block_gossip topic support * feat(event-stream): Add block_gossip topic support * feat: add block gossip topic support to beacon api event stream * fix: sync_fuzz_test panic * fix: check for nil operationNotifier before sending block gossip The operationNotifier was not being checked for nil before being used, which could lead to a panic if it was not initialized. This commit adds a nil check to prevent the panic. --------- Co-authored-by: james-prysm <90280386+james-prysm@users.noreply.github.com> --- api/server/structs/endpoints_events.go | 5 +++ beacon-chain/core/feed/operation/BUILD.bazel | 1 + beacon-chain/core/feed/operation/events.go | 10 ++++++ beacon-chain/rpc/eth/events/events.go | 17 ++++++++++ beacon-chain/rpc/eth/events/events_test.go | 15 ++++++++- beacon-chain/sync/sync_fuzz_test.go | 33 +++++++++++--------- beacon-chain/sync/validate_beacon_blocks.go | 12 ++++++- changelog/samcm_add_block_gossip_topic.md | 3 ++ 8 files changed, 80 insertions(+), 16 deletions(-) create mode 100644 changelog/samcm_add_block_gossip_topic.md diff --git a/api/server/structs/endpoints_events.go b/api/server/structs/endpoints_events.go index 275a945803..a8188f0647 100644 --- a/api/server/structs/endpoints_events.go +++ b/api/server/structs/endpoints_events.go @@ -20,6 +20,11 @@ type BlockEvent struct { ExecutionOptimistic bool `json:"execution_optimistic"` } +type BlockGossipEvent struct { + Slot string `json:"slot"` + Block string `json:"block"` +} + type AggregatedAttEventSource struct { Aggregate *Attestation `json:"aggregate"` } diff --git a/beacon-chain/core/feed/operation/BUILD.bazel b/beacon-chain/core/feed/operation/BUILD.bazel index 0d475e92d8..b159265e19 100644 --- a/beacon-chain/core/feed/operation/BUILD.bazel +++ b/beacon-chain/core/feed/operation/BUILD.bazel @@ -11,6 +11,7 @@ go_library( deps = [ "//async/event:go_default_library", "//consensus-types/blocks:go_default_library", + "//consensus-types/interfaces:go_default_library", "//proto/prysm/v1alpha1:go_default_library", ], ) diff --git a/beacon-chain/core/feed/operation/events.go b/beacon-chain/core/feed/operation/events.go index 4244253acc..10c2e32afa 100644 --- a/beacon-chain/core/feed/operation/events.go +++ b/beacon-chain/core/feed/operation/events.go @@ -3,6 +3,7 @@ package operation import ( "github.com/prysmaticlabs/prysm/v5/consensus-types/blocks" + "github.com/prysmaticlabs/prysm/v5/consensus-types/interfaces" ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1" ) @@ -35,6 +36,9 @@ const ( // SingleAttReceived is sent after a single attestation object is received from gossip or rpc SingleAttReceived = 9 + + // BlockGossipReceived is sent after a block has been received from gossip or API that passes validation rules. + BlockGossipReceived = 10 ) // UnAggregatedAttReceivedData is the data sent with UnaggregatedAttReceived events. @@ -85,3 +89,9 @@ type AttesterSlashingReceivedData struct { type SingleAttReceivedData struct { Attestation ethpb.Att } + +// BlockGossipReceivedData is the data sent with BlockGossipReceived events. +type BlockGossipReceivedData struct { + // SignedBlock is the block that was received. + SignedBlock interfaces.ReadOnlySignedBeaconBlock +} diff --git a/beacon-chain/rpc/eth/events/events.go b/beacon-chain/rpc/eth/events/events.go index 1bb595b8da..bcdb537470 100644 --- a/beacon-chain/rpc/eth/events/events.go +++ b/beacon-chain/rpc/eth/events/events.go @@ -45,6 +45,8 @@ const ( HeadTopic = "head" // BlockTopic represents a new produced block event topic. BlockTopic = "block" + // BlockGossipTopic represents a block received from gossip or API that passes validation rules. + BlockGossipTopic = "block_gossip" // AttestationTopic represents a new submitted attestation event topic. AttestationTopic = "attestation" // SingleAttestationTopic represents a new submitted single attestation event topic. @@ -103,6 +105,7 @@ var opsFeedEventTopics = map[feed.EventType]string{ operation.BlobSidecarReceived: BlobSidecarTopic, operation.AttesterSlashingReceived: AttesterSlashingTopic, operation.ProposerSlashingReceived: ProposerSlashingTopic, + operation.BlockGossipReceived: BlockGossipTopic, } var stateFeedEventTopics = map[feed.EventType]string{ @@ -443,6 +446,8 @@ func topicForEvent(event *feed.Event) string { return AttesterSlashingTopic case *operation.ProposerSlashingReceivedData: return ProposerSlashingTopic + case *operation.BlockGossipReceivedData: + return BlockGossipTopic case *ethpb.EventHead: return HeadTopic case *ethpb.EventFinalizedCheckpoint: @@ -479,6 +484,18 @@ func (s *Server) lazyReaderForEvent(ctx context.Context, event *feed.Event, topi return func() io.Reader { return jsonMarshalReader(eventName, structs.HeadEventFromV1(v)) }, nil + case *operation.BlockGossipReceivedData: + blockRoot, err := v.SignedBlock.Block().HashTreeRoot() + if err != nil { + return nil, errors.Wrap(err, "could not compute block root for BlockGossipReceivedData") + } + return func() io.Reader { + blk := &structs.BlockGossipEvent{ + Slot: fmt.Sprintf("%d", v.SignedBlock.Block().Slot()), + Block: hexutil.Encode(blockRoot[:]), + } + return jsonMarshalReader(eventName, blk) + }, nil case *operation.AggregatedAttReceivedData: switch att := v.Attestation.AggregateVal().(type) { case *eth.Attestation: diff --git a/beacon-chain/rpc/eth/events/events_test.go b/beacon-chain/rpc/eth/events/events_test.go index 45aa49ee0b..d976a89837 100644 --- a/beacon-chain/rpc/eth/events/events_test.go +++ b/beacon-chain/rpc/eth/events/events_test.go @@ -119,12 +119,19 @@ func operationEventsFixtures(t *testing.T) (*topicRequest, []*feed.Event) { BlobSidecarTopic, AttesterSlashingTopic, ProposerSlashingTopic, + BlockGossipTopic, }) require.NoError(t, err) ro, err := blocks.NewROBlob(util.HydrateBlobSidecar(ð.BlobSidecar{})) require.NoError(t, err) vblob := blocks.NewVerifiedROBlob(ro) + // Create a test block for block gossip event + block := util.NewBeaconBlock() + block.Block.Slot = 123 + signedBlock, err := blocks.NewSignedBeaconBlock(block) + require.NoError(t, err) + return topics, []*feed.Event{ { Type: operation.UnaggregatedAttReceived, @@ -287,6 +294,12 @@ func operationEventsFixtures(t *testing.T) (*topicRequest, []*feed.Event) { }, }, }, + { + Type: operation.BlockGossipReceived, + Data: &operation.BlockGossipReceivedData{ + SignedBlock: signedBlock, + }, + }, } } @@ -690,7 +703,7 @@ func TestStuckReaderScenarios(t *testing.T) { func wedgedWriterTestCase(t *testing.T, queueDepth func([]*feed.Event) int) { topics, events := operationEventsFixtures(t) - require.Equal(t, 10, len(events)) + require.Equal(t, 11, len(events)) // set eventFeedDepth to a number lower than the events we intend to send to force the server to drop the reader. stn := mockChain.NewEventFeedWrapper() diff --git a/beacon-chain/sync/sync_fuzz_test.go b/beacon-chain/sync/sync_fuzz_test.go index 620a5ed3f4..a7cb9e4081 100644 --- a/beacon-chain/sync/sync_fuzz_test.go +++ b/beacon-chain/sync/sync_fuzz_test.go @@ -100,6 +100,7 @@ func FuzzValidateBeaconBlockPubSub_Phase0(f *testing.F) { } r.cfg.chain = cService r.cfg.blockNotifier = cService.BlockNotifier() + r.cfg.operationNotifier = cService.OperationNotifier() strTop := string(topic) msg := &pubsub.Message{ Message: &pb.Message{ @@ -146,13 +147,14 @@ func FuzzValidateBeaconBlockPubSub_Altair(f *testing.F) { } r := &Service{ cfg: &config{ - beaconDB: db, - p2p: p, - initialSync: &mockSync.Sync{IsSyncing: false}, - chain: chainService, - blockNotifier: chainService.BlockNotifier(), - stateGen: stateGen, - clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot), + beaconDB: db, + p2p: p, + initialSync: &mockSync.Sync{IsSyncing: false}, + chain: chainService, + clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot), + blockNotifier: chainService.BlockNotifier(), + operationNotifier: chainService.OperationNotifier(), + stateGen: stateGen, }, seenBlockCache: lruwrpr.New(10), badBlockCache: lruwrpr.New(10), @@ -182,6 +184,7 @@ func FuzzValidateBeaconBlockPubSub_Altair(f *testing.F) { } r.cfg.chain = cService r.cfg.blockNotifier = cService.BlockNotifier() + r.cfg.operationNotifier = cService.OperationNotifier() strTop := string(topic) msg := &pubsub.Message{ Message: &pb.Message{ @@ -228,13 +231,14 @@ func FuzzValidateBeaconBlockPubSub_Bellatrix(f *testing.F) { } r := &Service{ cfg: &config{ - beaconDB: db, - p2p: p, - initialSync: &mockSync.Sync{IsSyncing: false}, - chain: chainService, - clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot), - blockNotifier: chainService.BlockNotifier(), - stateGen: stateGen, + beaconDB: db, + p2p: p, + initialSync: &mockSync.Sync{IsSyncing: false}, + chain: chainService, + clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot), + blockNotifier: chainService.BlockNotifier(), + operationNotifier: chainService.OperationNotifier(), + stateGen: stateGen, }, seenBlockCache: lruwrpr.New(10), badBlockCache: lruwrpr.New(10), @@ -264,6 +268,7 @@ func FuzzValidateBeaconBlockPubSub_Bellatrix(f *testing.F) { } r.cfg.chain = cService r.cfg.blockNotifier = cService.BlockNotifier() + r.cfg.operationNotifier = cService.OperationNotifier() strTop := string(topic) msg := &pubsub.Message{ Message: &pb.Message{ diff --git a/beacon-chain/sync/validate_beacon_blocks.go b/beacon-chain/sync/validate_beacon_blocks.go index d773d076db..4dbcd54930 100644 --- a/beacon-chain/sync/validate_beacon_blocks.go +++ b/beacon-chain/sync/validate_beacon_blocks.go @@ -12,6 +12,7 @@ import ( "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/blocks" "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed" blockfeed "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed/block" + "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed/operation" "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/helpers" "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/transition" "github.com/prysmaticlabs/prysm/v5/beacon-chain/state" @@ -70,8 +71,17 @@ func (s *Service) validateBeaconBlockPubSub(ctx context.Context, pid peer.ID, ms return pubsub.ValidationReject, errors.New("block.Block is nil") } - // Broadcast the block on a feed to notify other services in the beacon node + // Broadcast the block on both block and operation feeds to notify other services in the beacon node // of a received block (even if it does not process correctly through a state transition). + if s.cfg.operationNotifier != nil { + s.cfg.operationNotifier.OperationFeed().Send(&feed.Event{ + Type: operation.BlockGossipReceived, + Data: &operation.BlockGossipReceivedData{ + SignedBlock: blk, + }, + }) + } + s.cfg.blockNotifier.BlockFeed().Send(&feed.Event{ Type: blockfeed.ReceivedBlock, Data: &blockfeed.ReceivedBlockData{ diff --git a/changelog/samcm_add_block_gossip_topic.md b/changelog/samcm_add_block_gossip_topic.md new file mode 100644 index 0000000000..ba49415f9f --- /dev/null +++ b/changelog/samcm_add_block_gossip_topic.md @@ -0,0 +1,3 @@ +### Added + +- block_gossip topic support to the beacon api event stream. \ No newline at end of file