From 6087875da526ac5e20df09881ad082048443db42 Mon Sep 17 00:00:00 2001 From: terence Date: Tue, 10 Jun 2025 15:33:52 -0700 Subject: [PATCH] beacon api: Stream event for data column sidecar (#15387) --- api/server/structs/endpoints_events.go | 7 +++++++ beacon-chain/core/feed/operation/BUILD.bazel | 1 + beacon-chain/core/feed/operation/events.go | 11 +++++++++++ beacon-chain/rpc/eth/events/events.go | 18 ++++++++++++++++++ beacon-chain/rpc/eth/events/events_test.go | 14 ++++++++++++-- beacon-chain/sync/validate_data_column.go | 14 ++++++++++++++ changelog/tt_fish.md | 3 +++ 7 files changed, 66 insertions(+), 2 deletions(-) create mode 100644 changelog/tt_fish.md diff --git a/api/server/structs/endpoints_events.go b/api/server/structs/endpoints_events.go index a8188f0647..2ff2bb8a60 100644 --- a/api/server/structs/endpoints_events.go +++ b/api/server/structs/endpoints_events.go @@ -25,6 +25,13 @@ type BlockGossipEvent struct { Block string `json:"block"` } +type DataColumnGossipEvent struct { + Slot string `json:"slot"` + Index string `json:"index"` + BlockRoot string `json:"block_root"` + KzgCommitments []string `json:"kzg_commitments"` +} + type AggregatedAttEventSource struct { Aggregate *Attestation `json:"aggregate"` } diff --git a/beacon-chain/core/feed/operation/BUILD.bazel b/beacon-chain/core/feed/operation/BUILD.bazel index f46ba542eb..30024d5187 100644 --- a/beacon-chain/core/feed/operation/BUILD.bazel +++ b/beacon-chain/core/feed/operation/BUILD.bazel @@ -12,6 +12,7 @@ go_library( "//async/event:go_default_library", "//consensus-types/blocks:go_default_library", "//consensus-types/interfaces:go_default_library", + "//consensus-types/primitives:go_default_library", "//proto/prysm/v1alpha1:go_default_library", ], ) diff --git a/beacon-chain/core/feed/operation/events.go b/beacon-chain/core/feed/operation/events.go index aae7384bc5..95507300dc 100644 --- a/beacon-chain/core/feed/operation/events.go +++ b/beacon-chain/core/feed/operation/events.go @@ -4,6 +4,7 @@ package operation import ( "github.com/OffchainLabs/prysm/v6/consensus-types/blocks" "github.com/OffchainLabs/prysm/v6/consensus-types/interfaces" + "github.com/OffchainLabs/prysm/v6/consensus-types/primitives" ethpb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1" ) @@ -39,6 +40,9 @@ const ( // BlockGossipReceived is sent after a block has been received from gossip or API that passes validation rules. BlockGossipReceived = 10 + + // DataColumnReceived is sent after a data column has been seen after gossip validation rules. + DataColumnReceived = 11 ) // UnAggregatedAttReceivedData is the data sent with UnaggregatedAttReceived events. @@ -95,3 +99,10 @@ type BlockGossipReceivedData struct { // SignedBlock is the block that was received. SignedBlock interfaces.ReadOnlySignedBeaconBlock } + +type DataColumnReceivedData struct { + Slot primitives.Slot + Index uint64 + BlockRoot [32]byte + KzgCommitments [][]byte +} diff --git a/beacon-chain/rpc/eth/events/events.go b/beacon-chain/rpc/eth/events/events.go index 11c7c892c9..e83760267a 100644 --- a/beacon-chain/rpc/eth/events/events.go +++ b/beacon-chain/rpc/eth/events/events.go @@ -72,6 +72,8 @@ const ( LightClientFinalityUpdateTopic = "light_client_finality_update" // LightClientOptimisticUpdateTopic represents a new light client optimistic update event topic. LightClientOptimisticUpdateTopic = "light_client_optimistic_update" + // DataColumnTopic represents a data column sidecar event topic + DataColumnTopic = "data_column_sidecar" ) var ( @@ -105,6 +107,7 @@ var opsFeedEventTopics = map[feed.EventType]string{ operation.AttesterSlashingReceived: AttesterSlashingTopic, operation.ProposerSlashingReceived: ProposerSlashingTopic, operation.BlockGossipReceived: BlockGossipTopic, + operation.DataColumnReceived: DataColumnTopic, } var stateFeedEventTopics = map[feed.EventType]string{ @@ -461,6 +464,8 @@ func topicForEvent(event *feed.Event) string { return BlockTopic case payloadattribute.EventData: return PayloadAttributesTopic + case *operation.DataColumnReceivedData: + return DataColumnTopic default: return InvalidTopic } @@ -495,6 +500,19 @@ func (s *Server) lazyReaderForEvent(ctx context.Context, event *feed.Event, topi } return jsonMarshalReader(eventName, blk) }, nil + case *operation.DataColumnReceivedData: + return func() io.Reader { + kzgCommitments := make([]string, len(v.KzgCommitments)) + for i, kzgCommitment := range v.KzgCommitments { + kzgCommitments[i] = hexutil.Encode(kzgCommitment) + } + return jsonMarshalReader(eventName, &structs.DataColumnGossipEvent{ + Slot: fmt.Sprintf("%d", v.Slot), + Index: fmt.Sprintf("%d", v.Index), + BlockRoot: hexutil.Encode(v.BlockRoot[:]), + KzgCommitments: kzgCommitments, + }) + }, nil case *operation.AggregatedAttReceivedData: switch att := v.Attestation.AggregateVal().(type) { case *eth.Attestation: diff --git a/beacon-chain/rpc/eth/events/events_test.go b/beacon-chain/rpc/eth/events/events_test.go index 8c0f421034..7879d30275 100644 --- a/beacon-chain/rpc/eth/events/events_test.go +++ b/beacon-chain/rpc/eth/events/events_test.go @@ -31,7 +31,7 @@ import ( "github.com/OffchainLabs/prysm/v6/testing/require" "github.com/OffchainLabs/prysm/v6/testing/util" "github.com/ethereum/go-ethereum/common" - sse "github.com/r3labs/sse/v2" + "github.com/r3labs/sse/v2" "github.com/sirupsen/logrus" ) @@ -121,6 +121,7 @@ func operationEventsFixtures(t *testing.T) (*topicRequest, []*feed.Event) { AttesterSlashingTopic, ProposerSlashingTopic, BlockGossipTopic, + DataColumnTopic, }) require.NoError(t, err) ro, err := blocks.NewROBlob(util.HydrateBlobSidecar(ð.BlobSidecar{})) @@ -301,6 +302,15 @@ func operationEventsFixtures(t *testing.T) (*topicRequest, []*feed.Event) { SignedBlock: signedBlock, }, }, + { + Type: operation.DataColumnReceived, + Data: &operation.DataColumnReceivedData{ + Slot: 1, + Index: 2, + BlockRoot: [32]byte{'a'}, + KzgCommitments: [][]byte{{'a'}, {'b'}, {'c'}}, + }, + }, } } @@ -709,7 +719,7 @@ func TestStuckReaderScenarios(t *testing.T) { func wedgedWriterTestCase(t *testing.T, queueDepth func([]*feed.Event) int) { topics, events := operationEventsFixtures(t) - require.Equal(t, 11, len(events)) + require.Equal(t, 12, len(events)) // set eventFeedDepth to a number lower than the events we intend to send to force the server to drop the reader. stn := mockChain.NewEventFeedWrapper() diff --git a/beacon-chain/sync/validate_data_column.go b/beacon-chain/sync/validate_data_column.go index 1cc4662821..a72a576f7a 100644 --- a/beacon-chain/sync/validate_data_column.go +++ b/beacon-chain/sync/validate_data_column.go @@ -6,6 +6,8 @@ import ( "math" "time" + "github.com/OffchainLabs/prysm/v6/beacon-chain/core/feed" + "github.com/OffchainLabs/prysm/v6/beacon-chain/core/feed/operation" "github.com/OffchainLabs/prysm/v6/beacon-chain/verification" fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams" "github.com/OffchainLabs/prysm/v6/consensus-types/blocks" @@ -205,6 +207,18 @@ func (s *Service) validateDataColumn(ctx context.Context, pid peer.ID, msg *pubs log.WithField("slot", roDataColumn.Slot()).Warn("Failed to send data column log entry") } + if s.cfg.operationNotifier != nil { + s.cfg.operationNotifier.OperationFeed().Send(&feed.Event{ + Type: operation.DataColumnReceived, + Data: &operation.DataColumnReceivedData{ + Slot: roDataColumn.Slot(), + Index: roDataColumn.Index, + BlockRoot: roDataColumn.BlockRoot(), + KzgCommitments: bytesutil.SafeCopy2dBytes(roDataColumn.KzgCommitments), + }, + }) + } + return pubsub.ValidationAccept, nil } diff --git a/changelog/tt_fish.md b/changelog/tt_fish.md new file mode 100644 index 0000000000..db25bbc80c --- /dev/null +++ b/changelog/tt_fish.md @@ -0,0 +1,3 @@ +### Added + +- Data column support for beacon api event end point \ No newline at end of file