Compare commits

...

16 Commits

Author SHA1 Message Date
Aarsh Shah
9b8a6baad3 parts metadata change 2026-02-13 00:04:41 +04:00
Aarsh Shah
fd07e59c0a bound number of go-routines to handle header 2026-02-11 12:26:49 +04:00
Aarsh Shah
d66d25e7ad fix lint 2026-02-11 11:49:25 +04:00
Aarsh Shah
89ab513183 don't block the event loop on handling headers 2026-02-11 11:43:20 +04:00
Aarsh Shah
1c6110b6e8 move initialisation of validators and handlers for partial data columns to the Start method 2026-02-11 10:37:06 +04:00
Aarsh Shah
a82edc7fbc fix lint 2026-02-10 12:11:14 +04:00
Aarsh Shah
39bb8d2929 Merge branch 'rebased-partial-columns' into feat/process-eager-partial-header 2026-02-10 12:10:14 +04:00
Aarsh Shah
387cfcd442 only complete column if it has been extended 2026-02-10 11:50:11 +04:00
Aarsh Shah
c70e51b445 fix test 2026-02-10 10:12:18 +04:00
Aarsh Shah
a1a8a86341 extend existing cells 2026-02-10 09:41:55 +04:00
Aarsh Shah
71b1610331 publish headers 2026-02-09 20:37:12 +04:00
Aarsh Shah
814abab4b5 wait for header to be processed 2026-02-09 18:47:47 +04:00
Aarsh Shah
d0d6bab8a0 handle header 2026-02-06 19:21:43 +04:00
Aarsh Shah
3f6c01fc3b changes as per review 2026-02-06 18:44:58 +04:00
Aarsh Shah
c5914ea4d9 handle partial data column header 2026-02-04 19:05:03 +04:00
Aarsh Shah
08d143bd2c add type for partial header reconstruction 2026-02-04 18:27:54 +04:00
13 changed files with 632 additions and 150 deletions

View File

@@ -24,11 +24,13 @@ var (
var (
_ ConstructionPopulator = (*BlockReconstructionSource)(nil)
_ ConstructionPopulator = (*SidecarReconstructionSource)(nil)
_ ConstructionPopulator = (*PartialDataColumnHeaderReconstructionSource)(nil)
)
const (
BlockType = "BeaconBlock"
SidecarType = "DataColumnSidecar"
BlockType = "BeaconBlock"
SidecarType = "DataColumnSidecar"
PartialDataColumnHeaderType = "PartialDataColumnHeader"
)
type (
@@ -55,6 +57,10 @@ type (
blocks.VerifiedRODataColumn
}
PartialDataColumnHeaderReconstructionSource struct {
*ethpb.PartialDataColumnHeader
}
blockInfo struct {
signedBlockHeader *ethpb.SignedBeaconBlockHeader
kzgCommitments [][]byte
@@ -72,6 +78,11 @@ func PopulateFromSidecar(sidecar blocks.VerifiedRODataColumn) *SidecarReconstruc
return &SidecarReconstructionSource{VerifiedRODataColumn: sidecar}
}
// PopulateFromPartialHeader creates a PartialDataColumnHeaderReconstructionSource from a partial header
func PopulateFromPartialHeader(header *ethpb.PartialDataColumnHeader) *PartialDataColumnHeaderReconstructionSource {
return &PartialDataColumnHeaderReconstructionSource{PartialDataColumnHeader: header}
}
// ValidatorsCustodyRequirement returns the number of custody groups regarding the validator indices attached to the beacon node.
// https://github.com/ethereum/consensus-specs/blob/master/specs/fulu/validator.md#validator-custody
func ValidatorsCustodyRequirement(state beaconState.ReadOnlyBeaconState, validatorsIndex map[primitives.ValidatorIndex]bool) (uint64, error) {
@@ -289,3 +300,43 @@ func (s *SidecarReconstructionSource) extract() (*blockInfo, error) {
return info, nil
}
// Slot returns the slot from the partial data column header
func (p *PartialDataColumnHeaderReconstructionSource) Slot() primitives.Slot {
return p.SignedBlockHeader.Header.Slot
}
// Root returns the block root computed from the header
func (p *PartialDataColumnHeaderReconstructionSource) Root() [fieldparams.RootLength]byte {
root, err := p.SignedBlockHeader.Header.HashTreeRoot()
if err != nil {
return [fieldparams.RootLength]byte{}
}
return root
}
// ProposerIndex returns the proposer index from the header
func (p *PartialDataColumnHeaderReconstructionSource) ProposerIndex() primitives.ValidatorIndex {
return p.SignedBlockHeader.Header.ProposerIndex
}
// Commitments returns the KZG commitments from the header
func (p *PartialDataColumnHeaderReconstructionSource) Commitments() ([][]byte, error) {
return p.KzgCommitments, nil
}
// Type returns the type of the source
func (p *PartialDataColumnHeaderReconstructionSource) Type() string {
return PartialDataColumnHeaderType
}
// extract extracts the block information from the partial header
func (p *PartialDataColumnHeaderReconstructionSource) extract() (*blockInfo, error) {
info := &blockInfo{
signedBlockHeader: p.SignedBlockHeader,
kzgCommitments: p.KzgCommitments,
kzgInclusionProof: p.KzgCommitmentsInclusionProof,
}
return info, nil
}

View File

@@ -267,4 +267,31 @@ func TestReconstructionSource(t *testing.T) {
require.Equal(t, peerdas.SidecarType, src.Type())
})
t.Run("from partial header", func(t *testing.T) {
referenceSidecar := sidecars[0]
partialHeader := &ethpb.PartialDataColumnHeader{
SignedBlockHeader: referenceSidecar.SignedBlockHeader,
KzgCommitments: referenceSidecar.KzgCommitments,
KzgCommitmentsInclusionProof: referenceSidecar.KzgCommitmentsInclusionProof,
}
src := peerdas.PopulateFromPartialHeader(partialHeader)
require.Equal(t, referenceSidecar.SignedBlockHeader.Header.Slot, src.Slot())
// Compute expected root
expectedRoot, err := referenceSidecar.SignedBlockHeader.Header.HashTreeRoot()
require.NoError(t, err)
require.Equal(t, expectedRoot, src.Root())
require.Equal(t, referenceSidecar.SignedBlockHeader.Header.ProposerIndex, src.ProposerIndex())
commitments, err := src.Commitments()
require.NoError(t, err)
require.Equal(t, 2, len(commitments))
require.DeepEqual(t, commitment1, commitments[0])
require.DeepEqual(t, commitment2, commitments[1])
require.Equal(t, peerdas.PartialDataColumnHeaderType, src.Type())
})
}

View File

@@ -77,8 +77,6 @@ func TestTwoNodePartialColumnExchange(t *testing.T) {
ps2, err := pubsub.NewGossipSub(ctx, h2, opts2...)
require.NoError(t, err)
go broadcaster1.Start()
go broadcaster2.Start()
defer func() {
broadcaster1.Stop()
broadcaster2.Stop()
@@ -140,7 +138,7 @@ func TestTwoNodePartialColumnExchange(t *testing.T) {
topic2, err := ps2.Join(topicStr, pubsub.RequestPartialMessages())
require.NoError(t, err)
// Header validator that verifies the inclusion proof
// Header validator
headerValidator := func(header *ethpb.PartialDataColumnHeader) (reject bool, err error) {
if header == nil {
return false, fmt.Errorf("nil header")
@@ -193,9 +191,17 @@ func TestTwoNodePartialColumnExchange(t *testing.T) {
require.NoError(t, err)
defer sub2.Cancel()
err = broadcaster1.Subscribe(topic1, headerValidator, cellValidator, handler1)
noopHeaderHandler := func(header *ethpb.PartialDataColumnHeader, groupID string) {}
err = broadcaster1.Start(headerValidator, cellValidator, handler1, noopHeaderHandler)
require.NoError(t, err)
err = broadcaster2.Subscribe(topic2, headerValidator, cellValidator, handler2)
err = broadcaster2.Start(headerValidator, cellValidator, handler2, noopHeaderHandler)
require.NoError(t, err)
err = broadcaster1.Subscribe(topic1)
require.NoError(t, err)
err = broadcaster2.Subscribe(topic2)
require.NoError(t, err)
// Wait for mesh to form

View File

@@ -7,14 +7,12 @@ import (
"strconv"
"time"
"github.com/OffchainLabs/go-bitfield"
"github.com/OffchainLabs/prysm/v7/config/params"
"github.com/OffchainLabs/prysm/v7/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v7/internal/logrusadapter"
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p-pubsub/partialmessages"
"github.com/libp2p/go-libp2p-pubsub/partialmessages/bitmap"
pubsub_pb "github.com/libp2p/go-libp2p-pubsub/pb"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/pkg/errors"
@@ -30,6 +28,8 @@ import (
const TTLInSlots = 3
const maxConcurrentValidators = 128
const headerHandledTimeout = time.Second * 1
const maxConcurrentHeaderHandlers = 128
var dataColumnTopicRegex = regexp.MustCompile(`data_column_sidecar_(\d+)`)
@@ -47,6 +47,7 @@ func extractColumnIndexFromTopic(topic string) (uint64, error) {
// - reject=false, err!=nil: IGNORE - don't penalize, just ignore
// - reject=false, err=nil: valid header
type HeaderValidator func(header *ethpb.PartialDataColumnHeader) (reject bool, err error)
type HeaderHandler func(header *ethpb.PartialDataColumnHeader, groupID string)
type ColumnValidator func(cells []blocks.CellProofBundle) error
type PartialColumnBroadcaster struct {
@@ -55,18 +56,19 @@ type PartialColumnBroadcaster struct {
ps *pubsub.PubSub
stop chan struct{}
// map topic -> headerValidators
headerValidators map[string]HeaderValidator
// map topic -> Validator
validators map[string]ColumnValidator
validateHeader HeaderValidator
validateColumn ColumnValidator
handleColumn SubHandler
handleHeader HeaderHandler
// map topic -> handler
handlers map[string]SubHandler
// map groupID -> bool to signal when header has been handled
headerHandled map[string]bool
// map topic -> *pubsub.Topic
topics map[string]*pubsub.Topic
concurrentValidatorSemaphore chan struct{}
concurrentValidatorSemaphore chan struct{}
concurrentHeaderHandlerSemaphore chan struct{}
// map topic -> map[groupID]PartialColumn
partialMsgStore map[string]map[string]*blocks.PartialDataColumn
@@ -87,16 +89,18 @@ const (
requestKindUnsubscribe
requestKindHandleIncomingRPC
requestKindCellsValidated
requestKindHeaderHandled
)
type request struct {
kind requestKind
response chan error
sub subscribe
unsub unsubscribe
publish publish
incomingRPC rpcWithFrom
cellsValidated *cellsValidated
kind requestKind
response chan error
sub subscribe
unsub unsubscribe
publish publish
incomingRPC rpcWithFrom
cellsValidated *cellsValidated
headerHandledGroup string
}
type publish struct {
@@ -105,10 +109,7 @@ type publish struct {
}
type subscribe struct {
t *pubsub.Topic
headerValidator HeaderValidator
validator ColumnValidator
handler SubHandler
t *pubsub.Topic
}
type unsubscribe struct {
@@ -130,19 +131,18 @@ type cellsValidated struct {
func NewBroadcaster(logger *logrus.Logger) *PartialColumnBroadcaster {
return &PartialColumnBroadcaster{
validators: make(map[string]ColumnValidator),
headerValidators: make(map[string]HeaderValidator),
handlers: make(map[string]SubHandler),
topics: make(map[string]*pubsub.Topic),
partialMsgStore: make(map[string]map[string]*blocks.PartialDataColumn),
groupTTL: make(map[string]int8),
validHeaderCache: make(map[string]*ethpb.PartialDataColumnHeader),
headerHandled: make(map[string]bool),
// GossipSub sends the messages to this channel. The buffer should be
// big enough to avoid dropping messages. We don't want to block the gossipsub event loop for this.
incomingReq: make(chan request, 128*16),
logger: logger,
concurrentValidatorSemaphore: make(chan struct{}, maxConcurrentValidators),
concurrentValidatorSemaphore: make(chan struct{}, maxConcurrentValidators),
concurrentHeaderHandlerSemaphore: make(chan struct{}, maxConcurrentHeaderHandlers),
}
}
@@ -153,15 +153,12 @@ func (p *PartialColumnBroadcaster) AppendPubSubOpts(opts []pubsub.Option) []pubs
pubsub.WithPartialMessagesExtension(&partialmessages.PartialMessagesExtension{
Logger: slogger,
MergePartsMetadata: func(topic string, left, right partialmessages.PartsMetadata) partialmessages.PartsMetadata {
if len(left) == 0 {
return right
}
merged, err := bitfield.Bitlist(left).Or(bitfield.Bitlist(right))
merged, err := blocks.MergePartsMetadata(left, right)
if err != nil {
p.logger.Warn("Failed to merge bitfields", "err", err, "left", left, "right", right)
p.logger.Warn("Failed to merge parts metadata", "err", err)
return left
}
return partialmessages.PartsMetadata(merged)
return merged
},
ValidateRPC: func(from peer.ID, rpc *pubsub_pb.PartialMessagesExtension) error {
// TODO. Add some basic and fast sanity checks
@@ -187,11 +184,34 @@ func (p *PartialColumnBroadcaster) AppendPubSubOpts(opts []pubsub.Option) []pubs
return opts
}
// Start starts the event loop of the PartialColumnBroadcaster. Should be called
// within a goroutine (go p.Start())
func (p *PartialColumnBroadcaster) Start() {
// Start starts the event loop of the PartialColumnBroadcaster.
// It accepts the required validator and handler functions, returning an error if any is nil.
// The event loop is launched in a goroutine.
func (p *PartialColumnBroadcaster) Start(
validateHeader HeaderValidator,
validateColumn ColumnValidator,
handleColumn SubHandler,
handleHeader HeaderHandler,
) error {
if validateHeader == nil {
return errors.New("no header validator provided")
}
if handleHeader == nil {
return errors.New("no header handler provided")
}
if validateColumn == nil {
return errors.New("no column validator provided")
}
if handleColumn == nil {
return errors.New("no column handler provided")
}
p.validateHeader = validateHeader
p.validateColumn = validateColumn
p.handleColumn = handleColumn
p.handleHeader = handleHeader
p.stop = make(chan struct{})
p.loop()
go p.loop()
return nil
}
func (p *PartialColumnBroadcaster) loop() {
@@ -210,6 +230,7 @@ func (p *PartialColumnBroadcaster) loop() {
delete(p.groupTTL, groupID)
delete(p.validHeaderCache, groupID)
delete(p.headerHandled, groupID)
for topic, msgStore := range p.partialMsgStore {
delete(msgStore, groupID)
if len(msgStore) == 0 {
@@ -222,7 +243,7 @@ func (p *PartialColumnBroadcaster) loop() {
case requestKindPublish:
req.response <- p.publish(req.publish.topic, req.publish.c)
case requestKindSubscribe:
req.response <- p.subscribe(req.sub.t, req.sub.headerValidator, req.sub.validator, req.sub.handler)
req.response <- p.subscribe(req.sub.t)
case requestKindUnsubscribe:
req.response <- p.unsubscribe(req.unsub.topic)
case requestKindHandleIncomingRPC:
@@ -235,6 +256,8 @@ func (p *PartialColumnBroadcaster) loop() {
if err != nil {
p.logger.Error("Failed to handle cells validated", "err", err)
}
case requestKindHeaderHandled:
p.handleHeaderHandled(req.headerHandledGroup)
default:
p.logger.Error("Unknown request kind", "kind", req.kind)
}
@@ -287,13 +310,7 @@ func (p *PartialColumnBroadcaster) handleIncomingRPC(rpcWithFrom rpcWithFrom) er
}
header = message.Header[0]
headerValidator, ok := p.headerValidators[topicID]
if !ok || headerValidator == nil {
p.logger.Debug("No header validator registered for topic")
return nil
}
reject, err := headerValidator(header)
reject, err := p.validateHeader(header)
if err != nil {
p.logger.Debug("Header validation failed", "err", err, "reject", reject)
if reject {
@@ -305,8 +322,15 @@ func (p *PartialColumnBroadcaster) handleIncomingRPC(rpcWithFrom rpcWithFrom) er
}
// Cache the valid header
p.validHeaderCache[string(groupID)] = header
p.headerHandled[string(groupID)] = false
// TODO: We now have the information we need to call GetBlobsV3, we should do that to see what we have locally.
p.concurrentHeaderHandlerSemaphore <- struct{}{}
go func() {
defer func() {
<-p.concurrentHeaderHandlerSemaphore
}()
p.handleHeader(header, string(groupID))
}()
}
columnIndex, err := extractColumnIndexFromTopic(topicID)
@@ -354,8 +378,7 @@ func (p *PartialColumnBroadcaster) handleIncomingRPC(rpcWithFrom rpcWithFrom) er
"group": groupID,
})
validator, validatorOK := p.validators[topicID]
if len(rpcWithFrom.PartialMessage) > 0 && validatorOK {
if len(rpcWithFrom.PartialMessage) > 0 {
// TODO: is there any penalty we want to consider for giving us data we didn't request?
// Note that we need to be careful around race conditions and eager data.
// Also note that protobufs by design allow extra data that we don't parse.
@@ -376,7 +399,7 @@ func (p *PartialColumnBroadcaster) handleIncomingRPC(rpcWithFrom rpcWithFrom) er
<-p.concurrentValidatorSemaphore
}()
start := time.Now()
err := validator(cellsToVerify)
err := p.validateColumn(cellsToVerify)
if err != nil {
logger.Error("failed to validate cells", "err", err)
_ = p.ps.PeerFeedback(topicID, rpcWithFrom.from, pubsub.PeerFeedbackInvalidMessage)
@@ -397,12 +420,23 @@ func (p *PartialColumnBroadcaster) handleIncomingRPC(rpcWithFrom rpcWithFrom) er
}
}
peerHas := bitmap.Bitmap(rpcWithFrom.PartsMetadata)
iHave := bitmap.Bitmap(ourDataColumn.PartsMetadata())
if !shouldRepublish && len(peerHas) > 0 && !bytes.Equal(peerHas, iHave) {
// Either we have something they don't or vice versa
shouldRepublish = true
logger.Debug("republishing due to parts metadata difference")
if !shouldRepublish && len(rpcWithFrom.PartsMetadata) > 0 {
peerMeta, err := blocks.ParsePartsMetadata(rpcWithFrom.PartsMetadata)
ourMeta, err2 := blocks.ParsePartsMetadata(ourDataColumn.PartsMetadata())
if err == nil && err2 == nil && !bytes.Equal(peerMeta.Available, ourMeta.Available) {
// Either we have something they don't or vice versa
shouldRepublish = true
logger.Debug("republishing due to parts metadata difference")
}
}
headerHandled, ok := p.headerHandled[string(groupID)]
// we only want to skip republishing if the header is currently being handled but hasn't been handled yet.
// If the header is NOT being handled at all, these incoming cells are likely in response to a previous publish we sent
// (either when got a data column sidecar, a beacon block body or if we are a block proposer).
if ok && !headerHandled {
p.logger.Debug("Header not handled, skipping republish", "topic", topicID, "group", groupID)
return nil
}
if shouldRepublish {
@@ -433,15 +467,19 @@ func (p *PartialColumnBroadcaster) handleCellsValidated(cells *cellsValidated) e
if col, ok := ourDataColumn.Complete(p.logger); ok {
p.logger.Info("Completed partial column", "topic", cells.topic, "group", cells.group)
handler, handlerOK := p.handlers[cells.topic]
if handlerOK {
go handler(cells.topic, col)
if p.handleColumn != nil {
go p.handleColumn(cells.topic, col)
}
} else {
p.logger.Info("Extended partial column", "topic", cells.topic, "group", cells.group)
}
headerHandled, ok := p.headerHandled[string(ourDataColumn.GroupID())]
if ok && !headerHandled {
p.logger.Debug("Header not handled, skipping republish", "topic", cells.topic, "group", cells.group)
return nil
}
err := p.ps.PublishPartialMessage(cells.topic, ourDataColumn, partialmessages.PublishOptions{})
if err != nil {
return err
@@ -450,12 +488,39 @@ func (p *PartialColumnBroadcaster) handleCellsValidated(cells *cellsValidated) e
return nil
}
func (p *PartialColumnBroadcaster) handleHeaderHandled(groupID string) {
p.headerHandled[groupID] = true
for topic, topicStore := range p.partialMsgStore {
col, ok := topicStore[groupID]
if !ok {
continue
}
err := p.ps.PublishPartialMessage(topic, col, partialmessages.PublishOptions{})
if err != nil {
p.logger.WithError(err).Error("Failed to republish after header handled", "topic", topic, "groupID", groupID)
}
}
}
func (p *PartialColumnBroadcaster) Stop() {
if p.stop != nil {
close(p.stop)
}
}
// HeaderHandled notifies the event loop that a header has been fully processed,
// triggering republishing of all columns in the store for the given groupID.
func (p *PartialColumnBroadcaster) HeaderHandled(groupID string) error {
if p.ps == nil {
return errors.New("pubsub not initialized")
}
p.incomingReq <- request{
kind: requestKindHeaderHandled,
headerHandledGroup: groupID,
}
return nil
}
// Publish publishes the partial column.
func (p *PartialColumnBroadcaster) Publish(topic string, c blocks.PartialDataColumn) error {
if p.ps == nil {
@@ -479,38 +544,55 @@ func (p *PartialColumnBroadcaster) publish(topic string, c blocks.PartialDataCol
topicStore = make(map[string]*blocks.PartialDataColumn)
p.partialMsgStore[topic] = topicStore
}
topicStore[string(c.GroupID())] = &c
var extended bool
existing := topicStore[string(c.GroupID())]
if existing != nil {
// Extend the existing column with cells being published here.
// The existing column may already contain cells received from peers. We must not overwrite it.
for i := range c.Included.Len() {
if c.Included.BitAt(i) {
extended = existing.ExtendFromVerfifiedCell(uint64(i), c.Column[i], c.KzgProofs[i])
}
}
if extended {
if col, ok := existing.Complete(p.logger); ok {
p.logger.Info("Completed partial column", "topic", topic, "group", existing.GroupID())
if p.handleColumn != nil {
go p.handleColumn(topic, col)
}
}
}
} else {
topicStore[string(c.GroupID())] = &c
existing = &c
}
p.groupTTL[string(c.GroupID())] = TTLInSlots
return p.ps.PublishPartialMessage(topic, &c, partialmessages.PublishOptions{})
return p.ps.PublishPartialMessage(topic, existing, partialmessages.PublishOptions{})
}
type SubHandler func(topic string, col blocks.VerifiedRODataColumn)
func (p *PartialColumnBroadcaster) Subscribe(t *pubsub.Topic, headerValidator HeaderValidator, validator ColumnValidator, handler SubHandler) error {
func (p *PartialColumnBroadcaster) Subscribe(t *pubsub.Topic) error {
respCh := make(chan error)
p.incomingReq <- request{
kind: requestKindSubscribe,
sub: subscribe{
t: t,
headerValidator: headerValidator,
validator: validator,
handler: handler,
t: t,
},
response: respCh,
}
return <-respCh
}
func (p *PartialColumnBroadcaster) subscribe(t *pubsub.Topic, headerValidator HeaderValidator, validator ColumnValidator, handler SubHandler) error {
func (p *PartialColumnBroadcaster) subscribe(t *pubsub.Topic) error {
topic := t.String()
if _, ok := p.topics[topic]; ok {
return errors.New("already subscribed")
}
p.topics[topic] = t
p.headerValidators[topic] = headerValidator
p.validators[topic] = validator
p.handlers[topic] = handler
return nil
}
@@ -532,9 +614,5 @@ func (p *PartialColumnBroadcaster) unsubscribe(topic string) error {
}
delete(p.topics, topic)
delete(p.partialMsgStore, topic)
delete(p.headerValidators, topic)
delete(p.validators, topic)
delete(p.handlers, topic)
return t.Close()
}

View File

@@ -311,10 +311,6 @@ func (s *Service) Start() {
logExternalDNSAddr(s.host.ID(), p2pHostDNS, p2pTCPPort)
}
go s.forkWatcher()
if s.partialColumnBroadcaster != nil {
go s.partialColumnBroadcaster.Start()
}
}
// Stop the p2p service and terminate all peer connections.
@@ -325,9 +321,6 @@ func (s *Service) Stop() error {
s.dv5Listener.Close()
}
if s.partialColumnBroadcaster != nil {
s.partialColumnBroadcaster.Stop()
}
return nil
}

View File

@@ -6,6 +6,8 @@ package sync
import (
"context"
"slices"
"strconv"
"sync"
"time"
@@ -28,6 +30,7 @@ import (
"github.com/OffchainLabs/prysm/v7/beacon-chain/operations/synccommittee"
"github.com/OffchainLabs/prysm/v7/beacon-chain/operations/voluntaryexits"
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p"
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/partialdatacolumnbroadcaster"
p2ptypes "github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/types"
"github.com/OffchainLabs/prysm/v7/beacon-chain/startup"
"github.com/OffchainLabs/prysm/v7/beacon-chain/state/stategen"
@@ -41,6 +44,7 @@ import (
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
leakybucket "github.com/OffchainLabs/prysm/v7/container/leaky-bucket"
"github.com/OffchainLabs/prysm/v7/crypto/rand"
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v7/runtime"
prysmTime "github.com/OffchainLabs/prysm/v7/time"
"github.com/OffchainLabs/prysm/v7/time/slots"
@@ -261,6 +265,13 @@ func (s *Service) Start() {
s.newColumnsVerifier = newDataColumnsVerifierFromInitializer(v)
go s.verifierRoutine()
if broadcaster := s.cfg.p2p.PartialColumnBroadcaster(); broadcaster != nil {
if err := s.startPartialColumnBroadcaster(broadcaster); err != nil {
log.WithError(err).Error("Failed to start partial column broadcaster")
}
}
go s.startDiscoveryAndSubscriptions()
go s.processDataColumnLogs()
@@ -328,6 +339,9 @@ func (s *Service) Stop() error {
for _, t := range s.cfg.p2p.PubSub().GetTopics() {
s.unSubscribeFromTopic(t)
}
if broadcaster := s.cfg.p2p.PartialColumnBroadcaster(); broadcaster != nil {
broadcaster.Stop()
}
return nil
}
@@ -397,6 +411,46 @@ func (s *Service) waitForChainStart() {
s.markForChainStart()
}
func (s *Service) startPartialColumnBroadcaster(broadcaster *partialdatacolumnbroadcaster.PartialColumnBroadcaster) error {
return broadcaster.Start(
func(header *ethpb.PartialDataColumnHeader) (bool, error) {
return s.validatePartialDataColumnHeader(s.ctx, header)
},
func(cellsToVerify []blocks.CellProofBundle) error {
return peerdas.VerifyDataColumnsCellsKZGProofs(len(cellsToVerify), slices.Values(cellsToVerify))
},
func(topic string, col blocks.VerifiedRODataColumn) {
ctx, cancel := context.WithTimeout(s.ctx, pubsubMessageTimeout)
defer cancel()
slot := col.SignedBlockHeader.Header.Slot
proposerIndex := col.SignedBlockHeader.Header.ProposerIndex
if !s.hasSeenDataColumnIndex(slot, proposerIndex, col.Index) {
s.setSeenDataColumnIndex(slot, proposerIndex, col.Index)
// This column was completed from a partial message.
partialMessageColumnCompletionsTotal.WithLabelValues(strconv.FormatUint(col.Index, 10)).Inc()
}
err := s.verifiedRODataColumnSubscriber(ctx, col)
if err != nil {
log.WithError(err).Error("Failed to handle verified RO data column subscriber")
}
},
func(header *ethpb.PartialDataColumnHeader, groupID string) {
ctx, cancel := context.WithTimeout(s.ctx, pubsubMessageTimeout)
defer cancel()
source := peerdas.PopulateFromPartialHeader(header)
log.WithField("slot", source.Slot()).Info("Received data column header")
err := s.processDataColumnSidecarsFromExecution(ctx, source)
if err != nil {
log.WithError(err).Error("Failed to process partial data column header")
}
if err := broadcaster.HeaderHandled(groupID); err != nil {
log.WithError(err).Error("Failed to call header handled on broadcaster")
}
},
)
}
func (s *Service) startDiscoveryAndSubscriptions() {
// Wait for the chain to start.
s.waitForChainStart()

View File

@@ -5,8 +5,6 @@ import (
"fmt"
"reflect"
"runtime/debug"
"slices"
"strconv"
"strings"
"sync"
"time"
@@ -22,7 +20,6 @@ import (
"github.com/OffchainLabs/prysm/v7/cmd/beacon-chain/flags"
"github.com/OffchainLabs/prysm/v7/config/features"
"github.com/OffchainLabs/prysm/v7/config/params"
"github.com/OffchainLabs/prysm/v7/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
"github.com/OffchainLabs/prysm/v7/monitoring/tracing"
"github.com/OffchainLabs/prysm/v7/monitoring/tracing/trace"
@@ -70,10 +67,7 @@ type subscribeParameters struct {
}
type partialSubscribeParameters struct {
broadcaster *partialdatacolumnbroadcaster.PartialColumnBroadcaster
validateHeader partialdatacolumnbroadcaster.HeaderValidator
validate partialdatacolumnbroadcaster.ColumnValidator
handle partialdatacolumnbroadcaster.SubHandler
broadcaster *partialdatacolumnbroadcaster.PartialColumnBroadcaster
}
// shortTopic is a less verbose version of topic strings used for logging.
@@ -334,32 +328,9 @@ func (s *Service) registerSubscribers(nse params.NetworkScheduleEntry) bool {
if params.BeaconConfig().FuluForkEpoch <= nse.Epoch {
s.spawn(func() {
var ps *partialSubscribeParameters
broadcaster := s.cfg.p2p.PartialColumnBroadcaster()
if broadcaster != nil {
if broadcaster := s.cfg.p2p.PartialColumnBroadcaster(); broadcaster != nil {
ps = &partialSubscribeParameters{
broadcaster: broadcaster,
validateHeader: func(header *ethpb.PartialDataColumnHeader) (bool, error) {
return s.validatePartialDataColumnHeader(context.TODO(), header)
},
validate: func(cellsToVerify []blocks.CellProofBundle) error {
return peerdas.VerifyDataColumnsCellsKZGProofs(len(cellsToVerify), slices.Values(cellsToVerify))
},
handle: func(topic string, col blocks.VerifiedRODataColumn) {
ctx, cancel := context.WithTimeout(s.ctx, pubsubMessageTimeout)
defer cancel()
slot := col.SignedBlockHeader.Header.Slot
proposerIndex := col.SignedBlockHeader.Header.ProposerIndex
if !s.hasSeenDataColumnIndex(slot, proposerIndex, col.Index) {
s.setSeenDataColumnIndex(slot, proposerIndex, col.Index)
// This column was completed from a partial message.
partialMessageColumnCompletionsTotal.WithLabelValues(strconv.FormatUint(col.Index, 10)).Inc()
}
err := s.verifiedRODataColumnSubscriber(ctx, col)
if err != nil {
log.WithError(err).Error("Failed to handle verified RO data column subscriber")
}
},
}
}
s.subscribeWithParameters(subscribeParameters{
@@ -643,7 +614,7 @@ func (s *Service) trySubscribeSubnets(t *subnetTracker) {
if requestPartial {
log.Info("Subscribing to partial columns on", topicStr)
err = t.partial.broadcaster.Subscribe(topic, t.partial.validateHeader, t.partial.validate, t.partial.handle)
err = t.partial.broadcaster.Subscribe(topic)
if err != nil {
log.WithError(err).Error("Failed to subscribe to partial column")

View File

@@ -85,15 +85,81 @@ func NewPartialDataColumn(
func (p *PartialDataColumn) GroupID() []byte {
return p.groupID
}
// NewPartsMetadata creates SSZ-encoded PartialDataColumnPartsMetadata from the given bitmaps.
func NewPartsMetadata(available, requests bitfield.Bitlist) (partialmessages.PartsMetadata, error) {
meta := &ethpb.PartialDataColumnPartsMetadata{
Available: available,
Requests: requests,
}
marshalled, err := meta.MarshalSSZ()
if err != nil {
return nil, err
}
return partialmessages.PartsMetadata(marshalled), nil
}
// ParsePartsMetadata deserializes SSZ-encoded PartialDataColumnPartsMetadata.
func ParsePartsMetadata(data partialmessages.PartsMetadata) (*ethpb.PartialDataColumnPartsMetadata, error) {
meta := &ethpb.PartialDataColumnPartsMetadata{}
if err := meta.UnmarshalSSZ(data); err != nil {
return nil, err
}
return meta, nil
}
// MergePartsMetadata merges two SSZ-encoded PartialDataColumnPartsMetadata by OR-ing
// both available and requests bitmaps.
// TODO: How do we handle the request bitmap here ?
func MergePartsMetadata(left, right partialmessages.PartsMetadata) (partialmessages.PartsMetadata, error) {
if len(left) == 0 {
return right, nil
}
if len(right) == 0 {
return left, nil
}
leftMeta, err := ParsePartsMetadata(left)
if err != nil {
return left, err
}
rightMeta, err := ParsePartsMetadata(right)
if err != nil {
return left, err
}
mergedAvailable, err := bitfield.Bitlist(leftMeta.Available).Or(bitfield.Bitlist(rightMeta.Available))
if err != nil {
return left, err
}
mergedRequests, err := bitfield.Bitlist(leftMeta.Requests).Or(bitfield.Bitlist(rightMeta.Requests))
if err != nil {
return left, err
}
return NewPartsMetadata(mergedAvailable, mergedRequests)
}
func (p *PartialDataColumn) PartialMessageBytes(metadata partialmessages.PartsMetadata) ([]byte, error) {
peerHas := bitfield.Bitlist(metadata)
if peerHas.Len() != p.Included.Len() {
return nil, errors.New("metadata length does not match expected length")
peerMeta, err := ParsePartsMetadata(metadata)
if err != nil {
return nil, err
}
peerAvailable := bitfield.Bitlist(peerMeta.Available)
peerRequests := bitfield.Bitlist(peerMeta.Requests)
if peerAvailable.Len() != p.Included.Len() {
return nil, errors.New("available bitmap length does not match expected length")
}
if peerRequests.Len() != p.Included.Len() {
return nil, errors.New("requests bitmap length does not match expected length")
}
var cellsToReturn int
for i := range peerHas.Len() {
if !peerHas.BitAt(i) && p.Included.BitAt(i) {
for i := range p.Included.Len() {
// Send cell if: we have it, peer doesn't have it, and peer wants it.
if p.Included.BitAt(i) && !peerAvailable.BitAt(i) && peerRequests.BitAt(i) {
cellsToReturn++
}
}
@@ -107,8 +173,8 @@ func (p *PartialDataColumn) PartialMessageBytes(metadata partialmessages.PartsMe
PartialColumn: make([][]byte, 0, cellsToReturn),
KzgProofs: make([][]byte, 0, cellsToReturn),
}
for i := range peerHas.Len() {
if peerHas.BitAt(i) || !p.Included.BitAt(i) {
for i := range p.Included.Len() {
if !p.Included.BitAt(i) || peerAvailable.BitAt(i) || !peerRequests.BitAt(i) {
continue
}
included.SetBitAt(i, true)
@@ -141,14 +207,27 @@ func (p *PartialDataColumn) EagerPartialMessageBytes() ([]byte, partialmessages.
if err != nil {
return nil, nil, err
}
// Empty bitlist since we aren't including any cells here
peersNextParts := partialmessages.PartsMetadata(bitfield.NewBitlist(uint64(len(p.KzgCommitments))))
// Empty available (no cells sent), empty requests (we don't know what the peer wants)
numCommitments := uint64(len(p.KzgCommitments))
peersNextParts, err := NewPartsMetadata(
bitfield.NewBitlist(numCommitments),
bitfield.NewBitlist(numCommitments),
)
if err != nil {
return nil, nil, err
}
return marshalled, peersNextParts, nil
}
func (p *PartialDataColumn) PartsMetadata() partialmessages.PartsMetadata {
return partialmessages.PartsMetadata(p.Included)
numCommitments := uint64(len(p.KzgCommitments))
meta, err := NewPartsMetadata(p.Included, allOnesBitlist(numCommitments))
if err != nil {
logrus.Error("failed to create parts metadata", "err", err)
return nil
}
return meta
}
// CellsToVerifyFromPartialMessage returns cells from the partial message that need to be verified.
@@ -243,3 +322,11 @@ func (p *PartialDataColumn) Complete(logger *logrus.Logger) (VerifiedRODataColum
return NewVerifiedRODataColumn(rodc), true
}
func allOnesBitlist(length uint64) bitfield.Bitlist {
bl := bitfield.NewBitlist(length)
for i := range length {
bl.SetBitAt(i, true)
}
return bl
}

View File

@@ -5,7 +5,6 @@ import (
"fmt"
"testing"
"github.com/OffchainLabs/go-bitfield"
"github.com/OffchainLabs/prysm/v7/consensus-types/blocks"
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v7/testing/util"
@@ -20,7 +19,11 @@ type invariantChecker struct {
var _ partialmessages.InvariantChecker[*blocks.PartialDataColumn] = (*invariantChecker)(nil)
func (i *invariantChecker) MergePartsMetadata(left, right partialmessages.PartsMetadata) partialmessages.PartsMetadata {
return partialmessages.MergeBitmap(left, right)
merged, err := blocks.MergePartsMetadata(left, right)
if err != nil {
i.t.Fatalf("Failed to merge parts metadata: %v", err)
}
return merged
}
func (i *invariantChecker) SplitIntoParts(in *blocks.PartialDataColumn) ([]*blocks.PartialDataColumn, error) {
@@ -47,13 +50,10 @@ func (i *invariantChecker) FullMessage() (*blocks.PartialDataColumn, error) {
proofs := make([][]byte, numCells)
for i := range numCells {
for j := range commitments[i] {
commitments[i][j] = byte(i)
}
cells[i] = make([]byte, 2048)
cells[i] = fmt.Appendf(cells[i][:0], "cell %d", i)
copy(cells[i], fmt.Sprintf("cell %d", i))
proofs[i] = make([]byte, 48)
proofs[i] = fmt.Appendf(proofs[i][:0], "proof %d", i)
copy(proofs[i], fmt.Sprintf("proof %d", i))
}
roDC, _ := util.CreateTestVerifiedRoDataColumnSidecars(i.t, []util.DataColumnParam{
@@ -66,7 +66,14 @@ func (i *invariantChecker) FullMessage() (*blocks.PartialDataColumn, error) {
})
c, err := blocks.NewPartialDataColumn(roDC[0].DataColumnSidecar.SignedBlockHeader, roDC[0].Index, roDC[0].KzgCommitments, roDC[0].KzgCommitmentsInclusionProof)
return &c, err
if err != nil {
return nil, err
}
// Populate all cells to make this a full message
for idx := range numCells {
c.ExtendFromVerfifiedCell(uint64(idx), roDC[0].Column[idx], roDC[0].KzgProofs[idx])
}
return &c, nil
}
func (i *invariantChecker) EmptyMessage() *blocks.PartialDataColumn {
@@ -112,9 +119,13 @@ func (i *invariantChecker) ExtendFromBytes(a *blocks.PartialDataColumn, data []b
}
func (i *invariantChecker) ShouldRequest(a *blocks.PartialDataColumn, from peer.ID, partsMetadata []byte) bool {
peerHas := bitfield.Bitlist(partsMetadata)
for i := range peerHas.Len() {
if peerHas.BitAt(i) && !a.Included.BitAt(i) {
peerMeta, err := blocks.ParsePartsMetadata(partsMetadata)
if err != nil {
return false
}
for idx := range peerMeta.Available.Len() {
// Request if peer has cell, is willing to provide it, and we don't have it.
if peerMeta.Available.BitAt(idx) && peerMeta.Requests.BitAt(idx) && !a.Included.BitAt(idx) {
return true
}
}

View File

@@ -189,6 +189,7 @@ ssz_fulu_objs = [
"DataColumnsByRootIdentifier",
"DataColumnSidecar",
"PartialDataColumnSidecar",
"PartialDataColumnPartsMetadata",
"StatusV2",
"SignedBeaconBlockContentsFulu",
"SignedBeaconBlockFulu",

View File

@@ -1,4 +1,5 @@
// Code generated by fastssz. DO NOT EDIT.
// Hash: 2192552bad81ddd02726341648c9be77395b169121c28ed7ed7a794fc8a5f55d
package eth
import (
@@ -2946,3 +2947,129 @@ func (p *PartialDataColumnHeader) HashTreeRootWith(hh *ssz.Hasher) (err error) {
hh.Merkleize(indx)
return
}
// MarshalSSZ ssz marshals the PartialDataColumnPartsMetadata object
func (p *PartialDataColumnPartsMetadata) MarshalSSZ() ([]byte, error) {
return ssz.MarshalSSZ(p)
}
// MarshalSSZTo ssz marshals the PartialDataColumnPartsMetadata object to a target array
func (p *PartialDataColumnPartsMetadata) MarshalSSZTo(buf []byte) (dst []byte, err error) {
dst = buf
offset := int(8)
// Offset (0) 'Available'
dst = ssz.WriteOffset(dst, offset)
offset += len(p.Available)
// Offset (1) 'Requests'
dst = ssz.WriteOffset(dst, offset)
offset += len(p.Requests)
// Field (0) 'Available'
if size := len(p.Available); size > 512 {
err = ssz.ErrBytesLengthFn("--.Available", size, 512)
return
}
dst = append(dst, p.Available...)
// Field (1) 'Requests'
if size := len(p.Requests); size > 512 {
err = ssz.ErrBytesLengthFn("--.Requests", size, 512)
return
}
dst = append(dst, p.Requests...)
return
}
// UnmarshalSSZ ssz unmarshals the PartialDataColumnPartsMetadata object
func (p *PartialDataColumnPartsMetadata) UnmarshalSSZ(buf []byte) error {
var err error
size := uint64(len(buf))
if size < 8 {
return ssz.ErrSize
}
tail := buf
var o0, o1 uint64
// Offset (0) 'Available'
if o0 = ssz.ReadOffset(buf[0:4]); o0 > size {
return ssz.ErrOffset
}
if o0 != 8 {
return ssz.ErrInvalidVariableOffset
}
// Offset (1) 'Requests'
if o1 = ssz.ReadOffset(buf[4:8]); o1 > size || o0 > o1 {
return ssz.ErrOffset
}
// Field (0) 'Available'
{
buf = tail[o0:o1]
if err = ssz.ValidateBitlist(buf, 512); err != nil {
return err
}
if cap(p.Available) == 0 {
p.Available = make([]byte, 0, len(buf))
}
p.Available = append(p.Available, buf...)
}
// Field (1) 'Requests'
{
buf = tail[o1:]
if err = ssz.ValidateBitlist(buf, 512); err != nil {
return err
}
if cap(p.Requests) == 0 {
p.Requests = make([]byte, 0, len(buf))
}
p.Requests = append(p.Requests, buf...)
}
return err
}
// SizeSSZ returns the ssz encoded size in bytes for the PartialDataColumnPartsMetadata object
func (p *PartialDataColumnPartsMetadata) SizeSSZ() (size int) {
size = 8
// Field (0) 'Available'
size += len(p.Available)
// Field (1) 'Requests'
size += len(p.Requests)
return
}
// HashTreeRoot ssz hashes the PartialDataColumnPartsMetadata object
func (p *PartialDataColumnPartsMetadata) HashTreeRoot() ([32]byte, error) {
return ssz.HashWithDefaultHasher(p)
}
// HashTreeRootWith ssz hashes the PartialDataColumnPartsMetadata object with a hasher
func (p *PartialDataColumnPartsMetadata) HashTreeRootWith(hh *ssz.Hasher) (err error) {
indx := hh.Index()
// Field (0) 'Available'
if len(p.Available) == 0 {
err = ssz.ErrEmptyBitlist
return
}
hh.PutBitlist(p.Available, 512)
// Field (1) 'Requests'
if len(p.Requests) == 0 {
err = ssz.ErrEmptyBitlist
return
}
hh.PutBitlist(p.Requests, 512)
hh.Merkleize(indx)
return
}

View File

@@ -151,6 +151,58 @@ func (x *PartialDataColumnHeader) GetKzgCommitmentsInclusionProof() [][]byte {
return nil
}
type PartialDataColumnPartsMetadata struct {
state protoimpl.MessageState `protogen:"open.v1"`
Available github_com_OffchainLabs_go_bitfield.Bitlist `protobuf:"bytes,1,opt,name=available,proto3" json:"available,omitempty" cast-type:"github.com/OffchainLabs/go-bitfield.Bitlist" ssz-max:"512"`
Requests github_com_OffchainLabs_go_bitfield.Bitlist `protobuf:"bytes,2,opt,name=requests,proto3" json:"requests,omitempty" cast-type:"github.com/OffchainLabs/go-bitfield.Bitlist" ssz-max:"512"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *PartialDataColumnPartsMetadata) Reset() {
*x = PartialDataColumnPartsMetadata{}
mi := &file_proto_prysm_v1alpha1_partial_data_columns_proto_msgTypes[2]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *PartialDataColumnPartsMetadata) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*PartialDataColumnPartsMetadata) ProtoMessage() {}
func (x *PartialDataColumnPartsMetadata) ProtoReflect() protoreflect.Message {
mi := &file_proto_prysm_v1alpha1_partial_data_columns_proto_msgTypes[2]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use PartialDataColumnPartsMetadata.ProtoReflect.Descriptor instead.
func (*PartialDataColumnPartsMetadata) Descriptor() ([]byte, []int) {
return file_proto_prysm_v1alpha1_partial_data_columns_proto_rawDescGZIP(), []int{2}
}
func (x *PartialDataColumnPartsMetadata) GetAvailable() github_com_OffchainLabs_go_bitfield.Bitlist {
if x != nil {
return x.Available
}
return github_com_OffchainLabs_go_bitfield.Bitlist(nil)
}
func (x *PartialDataColumnPartsMetadata) GetRequests() github_com_OffchainLabs_go_bitfield.Bitlist {
if x != nil {
return x.Requests
}
return github_com_OffchainLabs_go_bitfield.Bitlist(nil)
}
var File_proto_prysm_v1alpha1_partial_data_columns_proto protoreflect.FileDescriptor
var file_proto_prysm_v1alpha1_partial_data_columns_proto_rawDesc = []byte{
@@ -199,12 +251,24 @@ var file_proto_prysm_v1alpha1_partial_data_columns_proto_rawDesc = []byte{
0x69, 0x6e, 0x63, 0x6c, 0x75, 0x73, 0x69, 0x6f, 0x6e, 0x5f, 0x70, 0x72, 0x6f, 0x6f, 0x66, 0x18,
0x03, 0x20, 0x03, 0x28, 0x0c, 0x42, 0x08, 0x8a, 0xb5, 0x18, 0x04, 0x34, 0x2c, 0x33, 0x32, 0x52,
0x1c, 0x6b, 0x7a, 0x67, 0x43, 0x6f, 0x6d, 0x6d, 0x69, 0x74, 0x6d, 0x65, 0x6e, 0x74, 0x73, 0x49,
0x6e, 0x63, 0x6c, 0x75, 0x73, 0x69, 0x6f, 0x6e, 0x50, 0x72, 0x6f, 0x6f, 0x66, 0x42, 0x3b, 0x5a,
0x39, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x4f, 0x66, 0x66, 0x63,
0x68, 0x61, 0x69, 0x6e, 0x4c, 0x61, 0x62, 0x73, 0x2f, 0x70, 0x72, 0x79, 0x73, 0x6d, 0x2f, 0x76,
0x36, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x70, 0x72, 0x79, 0x73, 0x6d, 0x2f, 0x76, 0x31,
0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x3b, 0x65, 0x74, 0x68, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74,
0x6f, 0x33,
0x6e, 0x63, 0x6c, 0x75, 0x73, 0x69, 0x6f, 0x6e, 0x50, 0x72, 0x6f, 0x6f, 0x66, 0x22, 0xca, 0x01,
0x0a, 0x1e, 0x50, 0x61, 0x72, 0x74, 0x69, 0x61, 0x6c, 0x44, 0x61, 0x74, 0x61, 0x43, 0x6f, 0x6c,
0x75, 0x6d, 0x6e, 0x50, 0x61, 0x72, 0x74, 0x73, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61,
0x12, 0x54, 0x0a, 0x09, 0x61, 0x76, 0x61, 0x69, 0x6c, 0x61, 0x62, 0x6c, 0x65, 0x18, 0x01, 0x20,
0x01, 0x28, 0x0c, 0x42, 0x36, 0x82, 0xb5, 0x18, 0x2b, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e,
0x63, 0x6f, 0x6d, 0x2f, 0x4f, 0x66, 0x66, 0x63, 0x68, 0x61, 0x69, 0x6e, 0x4c, 0x61, 0x62, 0x73,
0x2f, 0x67, 0x6f, 0x2d, 0x62, 0x69, 0x74, 0x66, 0x69, 0x65, 0x6c, 0x64, 0x2e, 0x42, 0x69, 0x74,
0x6c, 0x69, 0x73, 0x74, 0x92, 0xb5, 0x18, 0x03, 0x35, 0x31, 0x32, 0x52, 0x09, 0x61, 0x76, 0x61,
0x69, 0x6c, 0x61, 0x62, 0x6c, 0x65, 0x12, 0x52, 0x0a, 0x08, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73,
0x74, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x42, 0x36, 0x82, 0xb5, 0x18, 0x2b, 0x67, 0x69,
0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x4f, 0x66, 0x66, 0x63, 0x68, 0x61, 0x69,
0x6e, 0x4c, 0x61, 0x62, 0x73, 0x2f, 0x67, 0x6f, 0x2d, 0x62, 0x69, 0x74, 0x66, 0x69, 0x65, 0x6c,
0x64, 0x2e, 0x42, 0x69, 0x74, 0x6c, 0x69, 0x73, 0x74, 0x92, 0xb5, 0x18, 0x03, 0x35, 0x31, 0x32,
0x52, 0x08, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x73, 0x42, 0x3b, 0x5a, 0x39, 0x67, 0x69,
0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x4f, 0x66, 0x66, 0x63, 0x68, 0x61, 0x69,
0x6e, 0x4c, 0x61, 0x62, 0x73, 0x2f, 0x70, 0x72, 0x79, 0x73, 0x6d, 0x2f, 0x76, 0x36, 0x2f, 0x70,
0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x70, 0x72, 0x79, 0x73, 0x6d, 0x2f, 0x76, 0x31, 0x61, 0x6c, 0x70,
0x68, 0x61, 0x31, 0x3b, 0x65, 0x74, 0x68, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
@@ -219,15 +283,16 @@ func file_proto_prysm_v1alpha1_partial_data_columns_proto_rawDescGZIP() []byte {
return file_proto_prysm_v1alpha1_partial_data_columns_proto_rawDescData
}
var file_proto_prysm_v1alpha1_partial_data_columns_proto_msgTypes = make([]protoimpl.MessageInfo, 2)
var file_proto_prysm_v1alpha1_partial_data_columns_proto_msgTypes = make([]protoimpl.MessageInfo, 3)
var file_proto_prysm_v1alpha1_partial_data_columns_proto_goTypes = []any{
(*PartialDataColumnSidecar)(nil), // 0: ethereum.eth.v1alpha1.PartialDataColumnSidecar
(*PartialDataColumnHeader)(nil), // 1: ethereum.eth.v1alpha1.PartialDataColumnHeader
(*SignedBeaconBlockHeader)(nil), // 2: ethereum.eth.v1alpha1.SignedBeaconBlockHeader
(*PartialDataColumnSidecar)(nil), // 0: ethereum.eth.v1alpha1.PartialDataColumnSidecar
(*PartialDataColumnHeader)(nil), // 1: ethereum.eth.v1alpha1.PartialDataColumnHeader
(*PartialDataColumnPartsMetadata)(nil), // 2: ethereum.eth.v1alpha1.PartialDataColumnPartsMetadata
(*SignedBeaconBlockHeader)(nil), // 3: ethereum.eth.v1alpha1.SignedBeaconBlockHeader
}
var file_proto_prysm_v1alpha1_partial_data_columns_proto_depIdxs = []int32{
1, // 0: ethereum.eth.v1alpha1.PartialDataColumnSidecar.header:type_name -> ethereum.eth.v1alpha1.PartialDataColumnHeader
2, // 1: ethereum.eth.v1alpha1.PartialDataColumnHeader.signed_block_header:type_name -> ethereum.eth.v1alpha1.SignedBeaconBlockHeader
3, // 1: ethereum.eth.v1alpha1.PartialDataColumnHeader.signed_block_header:type_name -> ethereum.eth.v1alpha1.SignedBeaconBlockHeader
2, // [2:2] is the sub-list for method output_type
2, // [2:2] is the sub-list for method input_type
2, // [2:2] is the sub-list for extension type_name
@@ -247,7 +312,7 @@ func file_proto_prysm_v1alpha1_partial_data_columns_proto_init() {
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_proto_prysm_v1alpha1_partial_data_columns_proto_rawDesc,
NumEnums: 0,
NumMessages: 2,
NumMessages: 3,
NumExtensions: 0,
NumServices: 0,
},

View File

@@ -44,3 +44,14 @@ message PartialDataColumnHeader {
SignedBeaconBlockHeader signed_block_header = 2;
repeated bytes kzg_commitments_inclusion_proof = 3 [(ethereum.eth.ext.ssz_size) = "kzg_commitments_inclusion_proof_depth.size,32"];
}
message PartialDataColumnPartsMetadata {
bytes available = 1 [
(ethereum.eth.ext.ssz_max) = "max_blob_commitments_bitmap.size",
(ethereum.eth.ext.cast_type) = "github.com/OffchainLabs/go-bitfield.Bitlist"
];
bytes requests = 2 [
(ethereum.eth.ext.ssz_max) = "max_blob_commitments_bitmap.size",
(ethereum.eth.ext.cast_type) = "github.com/OffchainLabs/go-bitfield.Bitlist"
];
}