beacon api: Stream event for data column sidecar (#15387)

This commit is contained in:
terence
2025-06-10 15:33:52 -07:00
committed by GitHub
parent 214f4a76fb
commit 6087875da5
7 changed files with 66 additions and 2 deletions

View File

@@ -25,6 +25,13 @@ type BlockGossipEvent struct {
Block string `json:"block"`
}
type DataColumnGossipEvent struct {
Slot string `json:"slot"`
Index string `json:"index"`
BlockRoot string `json:"block_root"`
KzgCommitments []string `json:"kzg_commitments"`
}
type AggregatedAttEventSource struct {
Aggregate *Attestation `json:"aggregate"`
}

View File

@@ -12,6 +12,7 @@ go_library(
"//async/event:go_default_library",
"//consensus-types/blocks:go_default_library",
"//consensus-types/interfaces:go_default_library",
"//consensus-types/primitives:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",
],
)

View File

@@ -4,6 +4,7 @@ package operation
import (
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v6/consensus-types/interfaces"
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
ethpb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1"
)
@@ -39,6 +40,9 @@ const (
// BlockGossipReceived is sent after a block has been received from gossip or API that passes validation rules.
BlockGossipReceived = 10
// DataColumnReceived is sent after a data column has been seen after gossip validation rules.
DataColumnReceived = 11
)
// UnAggregatedAttReceivedData is the data sent with UnaggregatedAttReceived events.
@@ -95,3 +99,10 @@ type BlockGossipReceivedData struct {
// SignedBlock is the block that was received.
SignedBlock interfaces.ReadOnlySignedBeaconBlock
}
type DataColumnReceivedData struct {
Slot primitives.Slot
Index uint64
BlockRoot [32]byte
KzgCommitments [][]byte
}

View File

@@ -72,6 +72,8 @@ const (
LightClientFinalityUpdateTopic = "light_client_finality_update"
// LightClientOptimisticUpdateTopic represents a new light client optimistic update event topic.
LightClientOptimisticUpdateTopic = "light_client_optimistic_update"
// DataColumnTopic represents a data column sidecar event topic
DataColumnTopic = "data_column_sidecar"
)
var (
@@ -105,6 +107,7 @@ var opsFeedEventTopics = map[feed.EventType]string{
operation.AttesterSlashingReceived: AttesterSlashingTopic,
operation.ProposerSlashingReceived: ProposerSlashingTopic,
operation.BlockGossipReceived: BlockGossipTopic,
operation.DataColumnReceived: DataColumnTopic,
}
var stateFeedEventTopics = map[feed.EventType]string{
@@ -461,6 +464,8 @@ func topicForEvent(event *feed.Event) string {
return BlockTopic
case payloadattribute.EventData:
return PayloadAttributesTopic
case *operation.DataColumnReceivedData:
return DataColumnTopic
default:
return InvalidTopic
}
@@ -495,6 +500,19 @@ func (s *Server) lazyReaderForEvent(ctx context.Context, event *feed.Event, topi
}
return jsonMarshalReader(eventName, blk)
}, nil
case *operation.DataColumnReceivedData:
return func() io.Reader {
kzgCommitments := make([]string, len(v.KzgCommitments))
for i, kzgCommitment := range v.KzgCommitments {
kzgCommitments[i] = hexutil.Encode(kzgCommitment)
}
return jsonMarshalReader(eventName, &structs.DataColumnGossipEvent{
Slot: fmt.Sprintf("%d", v.Slot),
Index: fmt.Sprintf("%d", v.Index),
BlockRoot: hexutil.Encode(v.BlockRoot[:]),
KzgCommitments: kzgCommitments,
})
}, nil
case *operation.AggregatedAttReceivedData:
switch att := v.Attestation.AggregateVal().(type) {
case *eth.Attestation:

View File

@@ -31,7 +31,7 @@ import (
"github.com/OffchainLabs/prysm/v6/testing/require"
"github.com/OffchainLabs/prysm/v6/testing/util"
"github.com/ethereum/go-ethereum/common"
sse "github.com/r3labs/sse/v2"
"github.com/r3labs/sse/v2"
"github.com/sirupsen/logrus"
)
@@ -121,6 +121,7 @@ func operationEventsFixtures(t *testing.T) (*topicRequest, []*feed.Event) {
AttesterSlashingTopic,
ProposerSlashingTopic,
BlockGossipTopic,
DataColumnTopic,
})
require.NoError(t, err)
ro, err := blocks.NewROBlob(util.HydrateBlobSidecar(&eth.BlobSidecar{}))
@@ -301,6 +302,15 @@ func operationEventsFixtures(t *testing.T) (*topicRequest, []*feed.Event) {
SignedBlock: signedBlock,
},
},
{
Type: operation.DataColumnReceived,
Data: &operation.DataColumnReceivedData{
Slot: 1,
Index: 2,
BlockRoot: [32]byte{'a'},
KzgCommitments: [][]byte{{'a'}, {'b'}, {'c'}},
},
},
}
}
@@ -709,7 +719,7 @@ func TestStuckReaderScenarios(t *testing.T) {
func wedgedWriterTestCase(t *testing.T, queueDepth func([]*feed.Event) int) {
topics, events := operationEventsFixtures(t)
require.Equal(t, 11, len(events))
require.Equal(t, 12, len(events))
// set eventFeedDepth to a number lower than the events we intend to send to force the server to drop the reader.
stn := mockChain.NewEventFeedWrapper()

View File

@@ -6,6 +6,8 @@ import (
"math"
"time"
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/feed"
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/feed/operation"
"github.com/OffchainLabs/prysm/v6/beacon-chain/verification"
fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams"
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
@@ -205,6 +207,18 @@ func (s *Service) validateDataColumn(ctx context.Context, pid peer.ID, msg *pubs
log.WithField("slot", roDataColumn.Slot()).Warn("Failed to send data column log entry")
}
if s.cfg.operationNotifier != nil {
s.cfg.operationNotifier.OperationFeed().Send(&feed.Event{
Type: operation.DataColumnReceived,
Data: &operation.DataColumnReceivedData{
Slot: roDataColumn.Slot(),
Index: roDataColumn.Index,
BlockRoot: roDataColumn.BlockRoot(),
KzgCommitments: bytesutil.SafeCopy2dBytes(roDataColumn.KzgCommitments),
},
})
}
return pubsub.ValidationAccept, nil
}

3
changelog/tt_fish.md Normal file
View File

@@ -0,0 +1,3 @@
### Added
- Data column support for beacon api event end point