Compare commits

..

25 Commits

Author SHA1 Message Date
Aarsh Shah
412648ac02 add changelog 2026-02-17 19:48:55 +04:00
Aarsh Shah
fd3587c932 get a green CI 2026-02-17 18:53:23 +04:00
Aarsh Shah
6ab9af51d6 fix bazel 2026-02-17 17:27:54 +04:00
Aarsh Shah
73c5d2648c Tests for partial data column 2026-02-17 17:15:51 +04:00
Aarsh Shah
9b315166de implement OnIncomingRPC 2026-02-17 16:29:00 +04:00
Aarsh Shah
938dda3025 handle gossip for peer 2026-02-17 09:36:21 +04:00
Aarsh Shah
55f5973dad for peer implementation 2026-02-16 20:16:12 +04:00
Aarsh Shah
867978e79b update gossipsub 2026-02-16 16:13:37 +04:00
Aarsh Shah
c8af4b74b5 rename to getBlobsCalled 2026-02-16 15:03:15 +04:00
Aarsh Shah
ab9366ed5f event loop should never block 2026-02-16 14:27:09 +04:00
Aarsh Shah
fd07e59c0a bound number of go-routines to handle header 2026-02-11 12:26:49 +04:00
Aarsh Shah
d66d25e7ad fix lint 2026-02-11 11:49:25 +04:00
Aarsh Shah
89ab513183 don't block the event loop on handling headers 2026-02-11 11:43:20 +04:00
Aarsh Shah
1c6110b6e8 move initialisation of validators and handlers for partial data columns to the Start method 2026-02-11 10:37:06 +04:00
Aarsh Shah
a82edc7fbc fix lint 2026-02-10 12:11:14 +04:00
Aarsh Shah
39bb8d2929 Merge branch 'rebased-partial-columns' into feat/process-eager-partial-header 2026-02-10 12:10:14 +04:00
Aarsh Shah
387cfcd442 only complete column if it has been extended 2026-02-10 11:50:11 +04:00
Aarsh Shah
c70e51b445 fix test 2026-02-10 10:12:18 +04:00
Aarsh Shah
a1a8a86341 extend existing cells 2026-02-10 09:41:55 +04:00
Aarsh Shah
71b1610331 publish headers 2026-02-09 20:37:12 +04:00
Aarsh Shah
814abab4b5 wait for header to be processed 2026-02-09 18:47:47 +04:00
Aarsh Shah
d0d6bab8a0 handle header 2026-02-06 19:21:43 +04:00
Aarsh Shah
3f6c01fc3b changes as per review 2026-02-06 18:44:58 +04:00
Aarsh Shah
c5914ea4d9 handle partial data column header 2026-02-04 19:05:03 +04:00
Aarsh Shah
08d143bd2c add type for partial header reconstruction 2026-02-04 18:27:54 +04:00
22 changed files with 1861 additions and 381 deletions

View File

@@ -24,11 +24,13 @@ var (
var (
_ ConstructionPopulator = (*BlockReconstructionSource)(nil)
_ ConstructionPopulator = (*SidecarReconstructionSource)(nil)
_ ConstructionPopulator = (*PartialDataColumnHeaderReconstructionSource)(nil)
)
const (
BlockType = "BeaconBlock"
SidecarType = "DataColumnSidecar"
BlockType = "BeaconBlock"
SidecarType = "DataColumnSidecar"
PartialDataColumnHeaderType = "PartialDataColumnHeader"
)
type (
@@ -55,6 +57,10 @@ type (
blocks.VerifiedRODataColumn
}
PartialDataColumnHeaderReconstructionSource struct {
*ethpb.PartialDataColumnHeader
}
blockInfo struct {
signedBlockHeader *ethpb.SignedBeaconBlockHeader
kzgCommitments [][]byte
@@ -72,6 +78,11 @@ func PopulateFromSidecar(sidecar blocks.VerifiedRODataColumn) *SidecarReconstruc
return &SidecarReconstructionSource{VerifiedRODataColumn: sidecar}
}
// PopulateFromPartialHeader creates a PartialDataColumnHeaderReconstructionSource from a partial header
func PopulateFromPartialHeader(header *ethpb.PartialDataColumnHeader) *PartialDataColumnHeaderReconstructionSource {
return &PartialDataColumnHeaderReconstructionSource{PartialDataColumnHeader: header}
}
// ValidatorsCustodyRequirement returns the number of custody groups regarding the validator indices attached to the beacon node.
// https://github.com/ethereum/consensus-specs/blob/master/specs/fulu/validator.md#validator-custody
func ValidatorsCustodyRequirement(state beaconState.ReadOnlyBeaconState, validatorsIndex map[primitives.ValidatorIndex]bool) (uint64, error) {
@@ -167,7 +178,7 @@ func PartialColumns(included bitfield.Bitlist, cellsPerBlob [][]kzg.Cell, proofs
if !included.BitAt(uint64(i)) {
continue
}
dc.ExtendFromVerfifiedCell(uint64(i), cells[idx][0], proofs[idx][0])
dc.ExtendFromVerifiedCell(uint64(i), cells[idx][0], proofs[idx][0])
cells[idx] = cells[idx][1:]
proofs[idx] = proofs[idx][1:]
}
@@ -289,3 +300,43 @@ func (s *SidecarReconstructionSource) extract() (*blockInfo, error) {
return info, nil
}
// Slot returns the slot from the partial data column header
func (p *PartialDataColumnHeaderReconstructionSource) Slot() primitives.Slot {
return p.SignedBlockHeader.Header.Slot
}
// Root returns the block root computed from the header
func (p *PartialDataColumnHeaderReconstructionSource) Root() [fieldparams.RootLength]byte {
root, err := p.SignedBlockHeader.Header.HashTreeRoot()
if err != nil {
return [fieldparams.RootLength]byte{}
}
return root
}
// ProposerIndex returns the proposer index from the header
func (p *PartialDataColumnHeaderReconstructionSource) ProposerIndex() primitives.ValidatorIndex {
return p.SignedBlockHeader.Header.ProposerIndex
}
// Commitments returns the KZG commitments from the header
func (p *PartialDataColumnHeaderReconstructionSource) Commitments() ([][]byte, error) {
return p.KzgCommitments, nil
}
// Type returns the type of the source
func (p *PartialDataColumnHeaderReconstructionSource) Type() string {
return PartialDataColumnHeaderType
}
// extract extracts the block information from the partial header
func (p *PartialDataColumnHeaderReconstructionSource) extract() (*blockInfo, error) {
info := &blockInfo{
signedBlockHeader: p.SignedBlockHeader,
kzgCommitments: p.KzgCommitments,
kzgInclusionProof: p.KzgCommitmentsInclusionProof,
}
return info, nil
}

View File

@@ -267,4 +267,31 @@ func TestReconstructionSource(t *testing.T) {
require.Equal(t, peerdas.SidecarType, src.Type())
})
t.Run("from partial header", func(t *testing.T) {
referenceSidecar := sidecars[0]
partialHeader := &ethpb.PartialDataColumnHeader{
SignedBlockHeader: referenceSidecar.SignedBlockHeader,
KzgCommitments: referenceSidecar.KzgCommitments,
KzgCommitmentsInclusionProof: referenceSidecar.KzgCommitmentsInclusionProof,
}
src := peerdas.PopulateFromPartialHeader(partialHeader)
require.Equal(t, referenceSidecar.SignedBlockHeader.Header.Slot, src.Slot())
// Compute expected root
expectedRoot, err := referenceSidecar.SignedBlockHeader.Header.HashTreeRoot()
require.NoError(t, err)
require.Equal(t, expectedRoot, src.Root())
require.Equal(t, referenceSidecar.SignedBlockHeader.Header.ProposerIndex, src.ProposerIndex())
commitments, err := src.Commitments()
require.NoError(t, err)
require.Equal(t, 2, len(commitments))
require.DeepEqual(t, commitment1, commitments[0])
require.DeepEqual(t, commitment2, commitments[1])
require.Equal(t, peerdas.PartialDataColumnHeaderType, src.Type())
})
}

View File

@@ -479,7 +479,7 @@ func (s *Service) broadcastDataColumnSidecars(ctx context.Context, forkDigest [f
}
fullTopicStr := topic + s.Encoding().ProtocolSuffix()
if err := s.partialColumnBroadcaster.Publish(fullTopicStr, partialColumn); err != nil {
if err := s.partialColumnBroadcaster.Publish(fullTopicStr, partialColumn, true); err != nil {
tracing.AnnotateError(span, err)
log.WithError(err).Error("Cannot partial broadcast data column sidecar")
}

View File

@@ -17,12 +17,10 @@ go_library(
"@com_github_libp2p_go_libp2p//core/peer:go_default_library",
"@com_github_libp2p_go_libp2p_pubsub//:go_default_library",
"@com_github_libp2p_go_libp2p_pubsub//partialmessages:go_default_library",
"@com_github_libp2p_go_libp2p_pubsub//partialmessages/bitmap:go_default_library",
"@com_github_libp2p_go_libp2p_pubsub//pb:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_prometheus_client_golang//prometheus:go_default_library",
"@com_github_prometheus_client_golang//prometheus/promauto:go_default_library",
"@com_github_prysmaticlabs_go_bitfield//:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
],
)

View File

@@ -77,8 +77,6 @@ func TestTwoNodePartialColumnExchange(t *testing.T) {
ps2, err := pubsub.NewGossipSub(ctx, h2, opts2...)
require.NoError(t, err)
go broadcaster1.Start()
go broadcaster2.Start()
defer func() {
broadcaster1.Stop()
broadcaster2.Stop()
@@ -120,9 +118,9 @@ func TestTwoNodePartialColumnExchange(t *testing.T) {
// Split data
for i := range numCells {
if i%2 == 0 {
pc1.ExtendFromVerfifiedCell(uint64(i), roDC[0].Column[i], roDC[0].KzgProofs[i])
pc1.ExtendFromVerifiedCell(uint64(i), roDC[0].Column[i], roDC[0].KzgProofs[i])
} else {
pc2.ExtendFromVerfifiedCell(uint64(i), roDC[0].Column[i], roDC[0].KzgProofs[i])
pc2.ExtendFromVerifiedCell(uint64(i), roDC[0].Column[i], roDC[0].KzgProofs[i])
}
}
@@ -140,7 +138,7 @@ func TestTwoNodePartialColumnExchange(t *testing.T) {
topic2, err := ps2.Join(topicStr, pubsub.RequestPartialMessages())
require.NoError(t, err)
// Header validator that verifies the inclusion proof
// Header validator
headerValidator := func(header *ethpb.PartialDataColumnHeader) (reject bool, err error) {
if header == nil {
return false, fmt.Errorf("nil header")
@@ -151,10 +149,7 @@ func TestTwoNodePartialColumnExchange(t *testing.T) {
if len(header.KzgCommitments) == 0 {
return true, fmt.Errorf("empty kzg commitments")
}
// Verify inclusion proof
if err := peerdas.VerifyPartialDataColumnHeaderInclusionProof(header); err != nil {
return true, fmt.Errorf("invalid inclusion proof: %w", err)
}
t.Log("Header validation passed")
return false, nil
}
@@ -193,9 +188,17 @@ func TestTwoNodePartialColumnExchange(t *testing.T) {
require.NoError(t, err)
defer sub2.Cancel()
err = broadcaster1.Subscribe(topic1, headerValidator, cellValidator, handler1)
noopHeaderHandler := func(header *ethpb.PartialDataColumnHeader, groupID string) {}
err = broadcaster1.Start(headerValidator, cellValidator, handler1, noopHeaderHandler)
require.NoError(t, err)
err = broadcaster2.Subscribe(topic2, headerValidator, cellValidator, handler2)
err = broadcaster2.Start(headerValidator, cellValidator, handler2, noopHeaderHandler)
require.NoError(t, err)
err = broadcaster1.Subscribe(topic1)
require.NoError(t, err)
err = broadcaster2.Subscribe(topic2)
require.NoError(t, err)
// Wait for mesh to form
@@ -203,13 +206,13 @@ func TestTwoNodePartialColumnExchange(t *testing.T) {
// Publish
t.Log("Publishing from Node 1")
err = broadcaster1.Publish(topicStr, pc1)
err = broadcaster1.Publish(topicStr, pc1, true)
require.NoError(t, err)
time.Sleep(200 * time.Millisecond)
t.Log("Publishing from Node 2")
err = broadcaster2.Publish(topicStr, pc2)
err = broadcaster2.Publish(topicStr, pc2, true)
require.NoError(t, err)
// Wait for Completion

View File

@@ -4,17 +4,16 @@ import (
"bytes"
"log/slog"
"regexp"
"slices"
"strconv"
"time"
"github.com/OffchainLabs/go-bitfield"
"github.com/OffchainLabs/prysm/v7/config/params"
"github.com/OffchainLabs/prysm/v7/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v7/internal/logrusadapter"
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p-pubsub/partialmessages"
"github.com/libp2p/go-libp2p-pubsub/partialmessages/bitmap"
pubsub_pb "github.com/libp2p/go-libp2p-pubsub/pb"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/pkg/errors"
@@ -30,6 +29,7 @@ import (
const TTLInSlots = 3
const maxConcurrentValidators = 128
const maxConcurrentHeaderHandlers = 128
var dataColumnTopicRegex = regexp.MustCompile(`data_column_sidecar_(\d+)`)
@@ -47,6 +47,7 @@ func extractColumnIndexFromTopic(topic string) (uint64, error) {
// - reject=false, err!=nil: IGNORE - don't penalize, just ignore
// - reject=false, err=nil: valid header
type HeaderValidator func(header *ethpb.PartialDataColumnHeader) (reject bool, err error)
type HeaderHandler func(header *ethpb.PartialDataColumnHeader, groupID string)
type ColumnValidator func(cells []blocks.CellProofBundle) error
type PartialColumnBroadcaster struct {
@@ -55,18 +56,19 @@ type PartialColumnBroadcaster struct {
ps *pubsub.PubSub
stop chan struct{}
// map topic -> headerValidators
headerValidators map[string]HeaderValidator
// map topic -> Validator
validators map[string]ColumnValidator
validateHeader HeaderValidator
validateColumn ColumnValidator
handleColumn SubHandler
handleHeader HeaderHandler
// map topic -> handler
handlers map[string]SubHandler
// map groupID -> bool to signal when getBlobs has been called
getBlobsCalled map[string]bool
// map topic -> *pubsub.Topic
topics map[string]*pubsub.Topic
concurrentValidatorSemaphore chan struct{}
concurrentValidatorSemaphore chan struct{}
concurrentHeaderHandlerSemaphore chan struct{}
// map topic -> map[groupID]PartialColumn
partialMsgStore map[string]map[string]*blocks.PartialDataColumn
@@ -85,18 +87,24 @@ const (
requestKindPublish requestKind = iota
requestKindSubscribe
requestKindUnsubscribe
requestKindGossipForPeer
requestKindHandleIncomingRPC
requestKindCellsValidated
requestKindGetBlobsCalled
)
type request struct {
kind requestKind
response chan error
sub subscribe
unsub unsubscribe
publish publish
incomingRPC rpcWithFrom
cellsValidated *cellsValidated
getBlobsCalled bool
kind requestKind
cellsValidated *cellsValidated
response chan error
getBlobsCalledGroup string
unsub unsubscribe
incomingRPC rpcWithFrom
gossipForPeer gossipForPeer
gossipForPeerResp chan gossipForPeerResponse
sub subscribe
publish publish
}
type publish struct {
@@ -105,10 +113,7 @@ type publish struct {
}
type subscribe struct {
t *pubsub.Topic
headerValidator HeaderValidator
validator ColumnValidator
handler SubHandler
t *pubsub.Topic
}
type unsubscribe struct {
@@ -128,21 +133,34 @@ type cellsValidated struct {
cells []blocks.CellProofBundle
}
type gossipForPeer struct {
topic string
groupID string
remote peer.ID
peerState partialmessages.PeerState
}
type gossipForPeerResponse struct {
nextPeerState partialmessages.PeerState
encodedMsg []byte
partsMetadataToSend partialmessages.PartsMetadata
err error
}
func NewBroadcaster(logger *logrus.Logger) *PartialColumnBroadcaster {
return &PartialColumnBroadcaster{
validators: make(map[string]ColumnValidator),
headerValidators: make(map[string]HeaderValidator),
handlers: make(map[string]SubHandler),
topics: make(map[string]*pubsub.Topic),
partialMsgStore: make(map[string]map[string]*blocks.PartialDataColumn),
groupTTL: make(map[string]int8),
validHeaderCache: make(map[string]*ethpb.PartialDataColumnHeader),
getBlobsCalled: make(map[string]bool),
// GossipSub sends the messages to this channel. The buffer should be
// big enough to avoid dropping messages. We don't want to block the gossipsub event loop for this.
incomingReq: make(chan request, 128*16),
logger: logger,
concurrentValidatorSemaphore: make(chan struct{}, maxConcurrentValidators),
concurrentValidatorSemaphore: make(chan struct{}, maxConcurrentValidators),
concurrentHeaderHandlerSemaphore: make(chan struct{}, maxConcurrentHeaderHandlers),
}
}
@@ -152,22 +170,29 @@ func (p *PartialColumnBroadcaster) AppendPubSubOpts(opts []pubsub.Option) []pubs
opts = append(opts,
pubsub.WithPartialMessagesExtension(&partialmessages.PartialMessagesExtension{
Logger: slogger,
MergePartsMetadata: func(topic string, left, right partialmessages.PartsMetadata) partialmessages.PartsMetadata {
if len(left) == 0 {
return right
GossipForPeer: func(topic string, groupID string, remote peer.ID, peerState partialmessages.PeerState) (partialmessages.PeerState, []byte, partialmessages.PartsMetadata, error) {
respCh := make(chan gossipForPeerResponse, 1)
p.incomingReq <- request{
kind: requestKindGossipForPeer,
gossipForPeer: gossipForPeer{
topic: topic,
groupID: groupID,
remote: remote,
peerState: peerState,
},
gossipForPeerResp: respCh,
}
merged, err := bitfield.Bitlist(left).Or(bitfield.Bitlist(right))
resp := <-respCh
return resp.nextPeerState, resp.encodedMsg, resp.partsMetadataToSend, resp.err
},
OnIncomingRPC: func(from peer.ID, peerState partialmessages.PeerState, rpc *pubsub_pb.PartialMessagesExtension) (partialmessages.PeerState, error) {
if rpc == nil {
return peerState, errors.New("rpc is nil")
}
nextPeerState, err := updatePeerStateFromIncomingRPC(peerState, rpc)
if err != nil {
p.logger.Warn("Failed to merge bitfields", "err", err, "left", left, "right", right)
return left
return peerState, err
}
return partialmessages.PartsMetadata(merged)
},
ValidateRPC: func(from peer.ID, rpc *pubsub_pb.PartialMessagesExtension) error {
// TODO. Add some basic and fast sanity checks
return nil
},
OnIncomingRPC: func(from peer.ID, rpc *pubsub_pb.PartialMessagesExtension) error {
select {
case p.incomingReq <- request{
kind: requestKindHandleIncomingRPC,
@@ -175,8 +200,9 @@ func (p *PartialColumnBroadcaster) AppendPubSubOpts(opts []pubsub.Option) []pubs
}:
default:
p.logger.Warn("Dropping incoming partial RPC", "rpc", rpc)
return nextPeerState, nil
}
return nil
return nextPeerState, nil
},
}),
func(ps *pubsub.PubSub) error {
@@ -187,11 +213,34 @@ func (p *PartialColumnBroadcaster) AppendPubSubOpts(opts []pubsub.Option) []pubs
return opts
}
// Start starts the event loop of the PartialColumnBroadcaster. Should be called
// within a goroutine (go p.Start())
func (p *PartialColumnBroadcaster) Start() {
// Start starts the event loop of the PartialColumnBroadcaster.
// It accepts the required validator and handler functions, returning an error if any is nil.
// The event loop is launched in a goroutine.
func (p *PartialColumnBroadcaster) Start(
validateHeader HeaderValidator,
validateColumn ColumnValidator,
handleColumn SubHandler,
handleHeader HeaderHandler,
) error {
if validateHeader == nil {
return errors.New("no header validator provided")
}
if handleHeader == nil {
return errors.New("no header handler provided")
}
if validateColumn == nil {
return errors.New("no column validator provided")
}
if handleColumn == nil {
return errors.New("no column handler provided")
}
p.validateHeader = validateHeader
p.validateColumn = validateColumn
p.handleColumn = handleColumn
p.handleHeader = handleHeader
p.stop = make(chan struct{})
p.loop()
go p.loop()
return nil
}
func (p *PartialColumnBroadcaster) loop() {
@@ -210,7 +259,11 @@ func (p *PartialColumnBroadcaster) loop() {
delete(p.groupTTL, groupID)
delete(p.validHeaderCache, groupID)
delete(p.getBlobsCalled, groupID)
for topic, msgStore := range p.partialMsgStore {
if col, ok := msgStore[groupID]; ok {
col.ClearEagerPushSent()
}
delete(msgStore, groupID)
if len(msgStore) == 0 {
delete(p.partialMsgStore, topic)
@@ -220,11 +273,19 @@ func (p *PartialColumnBroadcaster) loop() {
case req := <-p.incomingReq:
switch req.kind {
case requestKindPublish:
req.response <- p.publish(req.publish.topic, req.publish.c)
req.response <- p.publish(req.publish.topic, req.publish.c, req.getBlobsCalled)
case requestKindSubscribe:
req.response <- p.subscribe(req.sub.t, req.sub.headerValidator, req.sub.validator, req.sub.handler)
req.response <- p.subscribe(req.sub.t)
case requestKindUnsubscribe:
req.response <- p.unsubscribe(req.unsub.topic)
case requestKindGossipForPeer:
nextPeerState, encodedMsg, partsMetadataToSend, err := p.handleGossipForPeer(req.gossipForPeer)
req.gossipForPeerResp <- gossipForPeerResponse{
nextPeerState: nextPeerState,
encodedMsg: encodedMsg,
partsMetadataToSend: partsMetadataToSend,
err: err,
}
case requestKindHandleIncomingRPC:
err := p.handleIncomingRPC(req.incomingRPC)
if err != nil {
@@ -235,6 +296,8 @@ func (p *PartialColumnBroadcaster) loop() {
if err != nil {
p.logger.Error("Failed to handle cells validated", "err", err)
}
case requestKindGetBlobsCalled:
p.handleGetBlobsCalled(req.getBlobsCalledGroup)
default:
p.logger.Error("Unknown request kind", "kind", req.kind)
}
@@ -254,6 +317,75 @@ func (p *PartialColumnBroadcaster) getDataColumn(topic string, group []byte) *bl
return msg
}
func (p *PartialColumnBroadcaster) handleGossipForPeer(req gossipForPeer) (partialmessages.PeerState, []byte, partialmessages.PartsMetadata, error) {
topicStore, ok := p.partialMsgStore[req.topic]
if !ok {
return req.peerState, nil, nil, errors.New("not tracking topic for group")
}
partialColumn, ok := topicStore[req.groupID]
if !ok || partialColumn == nil {
return req.peerState, nil, nil, errors.New("not tracking topic for group")
}
return partialColumn.ForPeer(req.remote, false, req.peerState)
}
func parsePartsMetadataFromPeerState(state any, expectedLength uint64, stateKind string) (*ethpb.PartialDataColumnPartsMetadata, error) {
if state == nil {
return blocks.NewPartsMetaWithNoAvailableAndAllRequests(expectedLength), nil
}
pb, ok := state.(partialmessages.PartsMetadata)
if !ok {
return nil, errors.Errorf("%s state is not PartsMetadata", stateKind)
}
return blocks.ParsePartsMetadata(pb, expectedLength)
}
func updatePeerStateFromIncomingRPC(peerState partialmessages.PeerState, rpc *pubsub_pb.PartialMessagesExtension) (partialmessages.PeerState, error) {
nextPeerState := peerState
// if the peer has sent us a partsMetadata, simply overwrite our existing view of that peer with the parts Metadata they have sent.
if len(rpc.PartsMetadata) > 0 {
nextPeerState.RecvdState = partialmessages.PartsMetadata(slices.Clone(rpc.PartsMetadata))
}
// we can not update anything else unless the peer has also sent us cells.
if len(rpc.PartialMessage) == 0 {
return nextPeerState, nil
}
var message ethpb.PartialDataColumnSidecar
if err := message.UnmarshalSSZ(rpc.PartialMessage); err != nil {
return peerState, errors.Wrap(err, "failed to unmarshal partial message data")
}
if message.CellsPresentBitmap == nil {
return nextPeerState, nil
}
nKzgCommitments := message.CellsPresentBitmap.Len()
if nKzgCommitments == 0 {
return nextPeerState, errors.New("length of cells present bitmap is 0")
}
recievedMeta, err := parsePartsMetadataFromPeerState(nextPeerState.RecvdState, nKzgCommitments, "received")
if err != nil {
return peerState, err
}
sentMeta, err := parsePartsMetadataFromPeerState(nextPeerState.SentState, nKzgCommitments, "sent")
if err != nil {
return peerState, err
}
recvdState, err := blocks.MergeAvailableIntoPartsMetadata(recievedMeta, message.CellsPresentBitmap)
if err != nil {
return peerState, err
}
sentState, err := blocks.MergeAvailableIntoPartsMetadata(sentMeta, message.CellsPresentBitmap)
if err != nil {
return peerState, err
}
nextPeerState.RecvdState = recvdState
nextPeerState.SentState = sentState
return nextPeerState, nil
}
func (p *PartialColumnBroadcaster) handleIncomingRPC(rpcWithFrom rpcWithFrom) error {
if p.ps == nil {
return errors.New("pubsub not initialized")
@@ -287,13 +419,7 @@ func (p *PartialColumnBroadcaster) handleIncomingRPC(rpcWithFrom rpcWithFrom) er
}
header = message.Header[0]
headerValidator, ok := p.headerValidators[topicID]
if !ok || headerValidator == nil {
p.logger.Debug("No header validator registered for topic")
return nil
}
reject, err := headerValidator(header)
reject, err := p.validateHeader(header)
if err != nil {
p.logger.Debug("Header validation failed", "err", err, "reject", reject)
if reject {
@@ -306,7 +432,13 @@ func (p *PartialColumnBroadcaster) handleIncomingRPC(rpcWithFrom rpcWithFrom) er
// Cache the valid header
p.validHeaderCache[string(groupID)] = header
// TODO: We now have the information we need to call GetBlobsV3, we should do that to see what we have locally.
p.concurrentHeaderHandlerSemaphore <- struct{}{}
go func() {
defer func() {
<-p.concurrentHeaderHandlerSemaphore
}()
p.handleHeader(header, string(groupID))
}()
}
columnIndex, err := extractColumnIndexFromTopic(topicID)
@@ -354,8 +486,7 @@ func (p *PartialColumnBroadcaster) handleIncomingRPC(rpcWithFrom rpcWithFrom) er
"group": groupID,
})
validator, validatorOK := p.validators[topicID]
if len(rpcWithFrom.PartialMessage) > 0 && validatorOK {
if len(rpcWithFrom.PartialMessage) > 0 {
// TODO: is there any penalty we want to consider for giving us data we didn't request?
// Note that we need to be careful around race conditions and eager data.
// Also note that protobufs by design allow extra data that we don't parse.
@@ -376,7 +507,7 @@ func (p *PartialColumnBroadcaster) handleIncomingRPC(rpcWithFrom rpcWithFrom) er
<-p.concurrentValidatorSemaphore
}()
start := time.Now()
err := validator(cellsToVerify)
err := p.validateColumn(cellsToVerify)
if err != nil {
logger.Error("failed to validate cells", "err", err)
_ = p.ps.PeerFeedback(topicID, rpcWithFrom.from, pubsub.PeerFeedbackInvalidMessage)
@@ -397,14 +528,23 @@ func (p *PartialColumnBroadcaster) handleIncomingRPC(rpcWithFrom rpcWithFrom) er
}
}
peerHas := bitmap.Bitmap(rpcWithFrom.PartsMetadata)
iHave := bitmap.Bitmap(ourDataColumn.PartsMetadata())
if !shouldRepublish && len(peerHas) > 0 && !bytes.Equal(peerHas, iHave) {
peerMeta := rpcWithFrom.PartsMetadata
myMeta, err := ourDataColumn.PartsMetadata()
if err != nil {
return err
}
if !shouldRepublish && len(peerMeta) > 0 && !bytes.Equal(peerMeta, myMeta) {
// Either we have something they don't or vice versa
shouldRepublish = true
logger.Debug("republishing due to parts metadata difference")
}
getBlobsCalled := p.getBlobsCalled[string(groupID)]
if !getBlobsCalled {
p.logger.Debug("GetBlobs not called, skipping republish", "topic", topicID, "group", groupID)
return nil
}
if shouldRepublish {
err := p.ps.PublishPartialMessage(topicID, ourDataColumn, partialmessages.PublishOptions{})
if err != nil {
@@ -419,7 +559,7 @@ func (p *PartialColumnBroadcaster) handleCellsValidated(cells *cellsValidated) e
if ourDataColumn == nil {
return errors.New("data column not found for verified cells")
}
extended := ourDataColumn.ExtendFromVerfifiedCells(cells.cellIndices, cells.cells)
extended := ourDataColumn.ExtendFromVerifiedCells(cells.cellIndices, cells.cells)
p.logger.Debug("Extended partial message", "duration", cells.validationTook, "extended", extended)
columnIndexStr := strconv.FormatUint(ourDataColumn.Index, 10)
@@ -433,15 +573,19 @@ func (p *PartialColumnBroadcaster) handleCellsValidated(cells *cellsValidated) e
if col, ok := ourDataColumn.Complete(p.logger); ok {
p.logger.Info("Completed partial column", "topic", cells.topic, "group", cells.group)
handler, handlerOK := p.handlers[cells.topic]
if handlerOK {
go handler(cells.topic, col)
if p.handleColumn != nil {
go p.handleColumn(cells.topic, col)
}
} else {
p.logger.Info("Extended partial column", "topic", cells.topic, "group", cells.group)
}
getBlobsCalled := p.getBlobsCalled[string(ourDataColumn.GroupID())]
if !getBlobsCalled {
p.logger.Debug("GetBlobs not called, skipping republish", "topic", cells.topic, "group", cells.group)
return nil
}
err := p.ps.PublishPartialMessage(cells.topic, ourDataColumn, partialmessages.PublishOptions{})
if err != nil {
return err
@@ -450,21 +594,53 @@ func (p *PartialColumnBroadcaster) handleCellsValidated(cells *cellsValidated) e
return nil
}
func (p *PartialColumnBroadcaster) handleGetBlobsCalled(groupID string) {
p.getBlobsCalled[groupID] = true
for topic, topicStore := range p.partialMsgStore {
col, ok := topicStore[groupID]
if !ok {
continue
}
err := p.ps.PublishPartialMessage(topic, col, partialmessages.PublishOptions{})
if err != nil {
p.logger.WithError(err).Error("Failed to republish after getBlobs called", "topic", topic, "groupID", groupID)
}
}
}
func (p *PartialColumnBroadcaster) Stop() {
if p.stop != nil {
close(p.stop)
}
}
// GetBlobsCalled notifies the event loop that getBlobs has been called,
// triggering republishing of all columns in the store for the given groupID.
func (p *PartialColumnBroadcaster) GetBlobsCalled(groupID string) error {
if p.ps == nil {
return errors.New("pubsub not initialized")
}
select {
case p.incomingReq <- request{
kind: requestKindGetBlobsCalled,
getBlobsCalledGroup: groupID,
}:
default:
return errors.Errorf("dropping getBlobs called message as incomingReq channel is full, groupID: %s", groupID)
}
return nil
}
// Publish publishes the partial column.
func (p *PartialColumnBroadcaster) Publish(topic string, c blocks.PartialDataColumn) error {
func (p *PartialColumnBroadcaster) Publish(topic string, c blocks.PartialDataColumn, getBlobsCalled bool) error {
if p.ps == nil {
return errors.New("pubsub not initialized")
}
respCh := make(chan error)
p.incomingReq <- request{
kind: requestKindPublish,
response: respCh,
kind: requestKindPublish,
response: respCh,
getBlobsCalled: getBlobsCalled,
publish: publish{
topic: topic,
c: c,
@@ -473,44 +649,65 @@ func (p *PartialColumnBroadcaster) Publish(topic string, c blocks.PartialDataCol
return <-respCh
}
func (p *PartialColumnBroadcaster) publish(topic string, c blocks.PartialDataColumn) error {
func (p *PartialColumnBroadcaster) publish(topic string, c blocks.PartialDataColumn, getBlobsCalled bool) error {
topicStore, ok := p.partialMsgStore[topic]
if !ok {
topicStore = make(map[string]*blocks.PartialDataColumn)
p.partialMsgStore[topic] = topicStore
}
topicStore[string(c.GroupID())] = &c
var extended bool
existing := topicStore[string(c.GroupID())]
if existing != nil {
// Extend the existing column with cells being published here.
// The existing column may already contain cells received from peers. We must not overwrite it.
for i := range c.Included.Len() {
if c.Included.BitAt(i) {
extended = existing.ExtendFromVerifiedCell(uint64(i), c.Column[i], c.KzgProofs[i])
}
}
if extended {
if col, ok := existing.Complete(p.logger); ok {
p.logger.Info("Completed partial column", "topic", topic, "group", existing.GroupID())
if p.handleColumn != nil {
go p.handleColumn(topic, col)
}
}
}
} else {
topicStore[string(c.GroupID())] = &c
existing = &c
}
p.groupTTL[string(c.GroupID())] = TTLInSlots
return p.ps.PublishPartialMessage(topic, &c, partialmessages.PublishOptions{})
err := p.ps.PublishPartialMessage(topic, existing, partialmessages.PublishOptions{})
if err == nil {
p.getBlobsCalled[string(c.GroupID())] = getBlobsCalled
}
return err
}
type SubHandler func(topic string, col blocks.VerifiedRODataColumn)
func (p *PartialColumnBroadcaster) Subscribe(t *pubsub.Topic, headerValidator HeaderValidator, validator ColumnValidator, handler SubHandler) error {
func (p *PartialColumnBroadcaster) Subscribe(t *pubsub.Topic) error {
respCh := make(chan error)
p.incomingReq <- request{
kind: requestKindSubscribe,
sub: subscribe{
t: t,
headerValidator: headerValidator,
validator: validator,
handler: handler,
t: t,
},
response: respCh,
}
return <-respCh
}
func (p *PartialColumnBroadcaster) subscribe(t *pubsub.Topic, headerValidator HeaderValidator, validator ColumnValidator, handler SubHandler) error {
func (p *PartialColumnBroadcaster) subscribe(t *pubsub.Topic) error {
topic := t.String()
if _, ok := p.topics[topic]; ok {
return errors.New("already subscribed")
}
p.topics[topic] = t
p.headerValidators[topic] = headerValidator
p.validators[topic] = validator
p.handlers[topic] = handler
return nil
}
@@ -532,9 +729,5 @@ func (p *PartialColumnBroadcaster) unsubscribe(topic string) error {
}
delete(p.topics, topic)
delete(p.partialMsgStore, topic)
delete(p.headerValidators, topic)
delete(p.validators, topic)
delete(p.handlers, topic)
return t.Close()
}

View File

@@ -311,10 +311,6 @@ func (s *Service) Start() {
logExternalDNSAddr(s.host.ID(), p2pHostDNS, p2pTCPPort)
}
go s.forkWatcher()
if s.partialColumnBroadcaster != nil {
go s.partialColumnBroadcaster.Start()
}
}
// Stop the p2p service and terminate all peer connections.
@@ -325,9 +321,6 @@ func (s *Service) Stop() error {
s.dv5Listener.Close()
}
if s.partialColumnBroadcaster != nil {
s.partialColumnBroadcaster.Stop()
}
return nil
}

View File

@@ -6,6 +6,8 @@ package sync
import (
"context"
"slices"
"strconv"
"sync"
"time"
@@ -28,6 +30,7 @@ import (
"github.com/OffchainLabs/prysm/v7/beacon-chain/operations/synccommittee"
"github.com/OffchainLabs/prysm/v7/beacon-chain/operations/voluntaryexits"
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p"
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/partialdatacolumnbroadcaster"
p2ptypes "github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/types"
"github.com/OffchainLabs/prysm/v7/beacon-chain/startup"
"github.com/OffchainLabs/prysm/v7/beacon-chain/state/stategen"
@@ -41,6 +44,7 @@ import (
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
leakybucket "github.com/OffchainLabs/prysm/v7/container/leaky-bucket"
"github.com/OffchainLabs/prysm/v7/crypto/rand"
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v7/runtime"
prysmTime "github.com/OffchainLabs/prysm/v7/time"
"github.com/OffchainLabs/prysm/v7/time/slots"
@@ -261,6 +265,13 @@ func (s *Service) Start() {
s.newColumnsVerifier = newDataColumnsVerifierFromInitializer(v)
go s.verifierRoutine()
if broadcaster := s.cfg.p2p.PartialColumnBroadcaster(); broadcaster != nil {
if err := s.startPartialColumnBroadcaster(broadcaster); err != nil {
log.WithError(err).Error("Failed to start partial column broadcaster")
}
}
go s.startDiscoveryAndSubscriptions()
go s.processDataColumnLogs()
@@ -328,6 +339,9 @@ func (s *Service) Stop() error {
for _, t := range s.cfg.p2p.PubSub().GetTopics() {
s.unSubscribeFromTopic(t)
}
if broadcaster := s.cfg.p2p.PartialColumnBroadcaster(); broadcaster != nil {
broadcaster.Stop()
}
return nil
}
@@ -397,6 +411,46 @@ func (s *Service) waitForChainStart() {
s.markForChainStart()
}
func (s *Service) startPartialColumnBroadcaster(broadcaster *partialdatacolumnbroadcaster.PartialColumnBroadcaster) error {
return broadcaster.Start(
func(header *ethpb.PartialDataColumnHeader) (bool, error) {
return s.validatePartialDataColumnHeader(s.ctx, header)
},
func(cellsToVerify []blocks.CellProofBundle) error {
return peerdas.VerifyDataColumnsCellsKZGProofs(len(cellsToVerify), slices.Values(cellsToVerify))
},
func(topic string, col blocks.VerifiedRODataColumn) {
ctx, cancel := context.WithTimeout(s.ctx, pubsubMessageTimeout)
defer cancel()
slot := col.SignedBlockHeader.Header.Slot
proposerIndex := col.SignedBlockHeader.Header.ProposerIndex
if !s.hasSeenDataColumnIndex(slot, proposerIndex, col.Index) {
s.setSeenDataColumnIndex(slot, proposerIndex, col.Index)
// This column was completed from a partial message.
partialMessageColumnCompletionsTotal.WithLabelValues(strconv.FormatUint(col.Index, 10)).Inc()
}
err := s.verifiedRODataColumnSubscriber(ctx, col)
if err != nil {
log.WithError(err).Error("Failed to handle verified RO data column subscriber")
}
},
func(header *ethpb.PartialDataColumnHeader, groupID string) {
ctx, cancel := context.WithTimeout(s.ctx, pubsubMessageTimeout)
defer cancel()
source := peerdas.PopulateFromPartialHeader(header)
log.WithField("slot", source.Slot()).Info("Received data column header")
err := s.processDataColumnSidecarsFromExecution(ctx, source)
if err != nil {
log.WithError(err).Error("Failed to process partial data column header")
}
if err := broadcaster.GetBlobsCalled(groupID); err != nil {
log.WithError(err).Error("Failed to call getBlobs called on broadcaster")
}
},
)
}
func (s *Service) startDiscoveryAndSubscriptions() {
// Wait for the chain to start.
s.waitForChainStart()

View File

@@ -5,8 +5,6 @@ import (
"fmt"
"reflect"
"runtime/debug"
"slices"
"strconv"
"strings"
"sync"
"time"
@@ -22,7 +20,6 @@ import (
"github.com/OffchainLabs/prysm/v7/cmd/beacon-chain/flags"
"github.com/OffchainLabs/prysm/v7/config/features"
"github.com/OffchainLabs/prysm/v7/config/params"
"github.com/OffchainLabs/prysm/v7/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
"github.com/OffchainLabs/prysm/v7/monitoring/tracing"
"github.com/OffchainLabs/prysm/v7/monitoring/tracing/trace"
@@ -70,10 +67,7 @@ type subscribeParameters struct {
}
type partialSubscribeParameters struct {
broadcaster *partialdatacolumnbroadcaster.PartialColumnBroadcaster
validateHeader partialdatacolumnbroadcaster.HeaderValidator
validate partialdatacolumnbroadcaster.ColumnValidator
handle partialdatacolumnbroadcaster.SubHandler
broadcaster *partialdatacolumnbroadcaster.PartialColumnBroadcaster
}
// shortTopic is a less verbose version of topic strings used for logging.
@@ -334,32 +328,9 @@ func (s *Service) registerSubscribers(nse params.NetworkScheduleEntry) bool {
if params.BeaconConfig().FuluForkEpoch <= nse.Epoch {
s.spawn(func() {
var ps *partialSubscribeParameters
broadcaster := s.cfg.p2p.PartialColumnBroadcaster()
if broadcaster != nil {
if broadcaster := s.cfg.p2p.PartialColumnBroadcaster(); broadcaster != nil {
ps = &partialSubscribeParameters{
broadcaster: broadcaster,
validateHeader: func(header *ethpb.PartialDataColumnHeader) (bool, error) {
return s.validatePartialDataColumnHeader(context.TODO(), header)
},
validate: func(cellsToVerify []blocks.CellProofBundle) error {
return peerdas.VerifyDataColumnsCellsKZGProofs(len(cellsToVerify), slices.Values(cellsToVerify))
},
handle: func(topic string, col blocks.VerifiedRODataColumn) {
ctx, cancel := context.WithTimeout(s.ctx, pubsubMessageTimeout)
defer cancel()
slot := col.SignedBlockHeader.Header.Slot
proposerIndex := col.SignedBlockHeader.Header.ProposerIndex
if !s.hasSeenDataColumnIndex(slot, proposerIndex, col.Index) {
s.setSeenDataColumnIndex(slot, proposerIndex, col.Index)
// This column was completed from a partial message.
partialMessageColumnCompletionsTotal.WithLabelValues(strconv.FormatUint(col.Index, 10)).Inc()
}
err := s.verifiedRODataColumnSubscriber(ctx, col)
if err != nil {
log.WithError(err).Error("Failed to handle verified RO data column subscriber")
}
},
}
}
s.subscribeWithParameters(subscribeParameters{
@@ -643,7 +614,7 @@ func (s *Service) trySubscribeSubnets(t *subnetTracker) {
if requestPartial {
log.Info("Subscribing to partial columns on", topicStr)
err = t.partial.broadcaster.Subscribe(topic, t.partial.validateHeader, t.partial.validate, t.partial.handle)
err = t.partial.broadcaster.Subscribe(topic)
if err != nil {
log.WithError(err).Error("Failed to subscribe to partial column")

View File

@@ -202,12 +202,6 @@ func (s *Service) processDataColumnSidecarsFromExecution(ctx context.Context, so
return nil, errors.Wrap(err, "column indices to sample")
}
// TODO: the deadline here was removed in https://github.com/OffchainLabs/prysm/pull/16155/files
// make sure that reintroducing it does not cause issues.
secondsPerHalfSlot := time.Duration(params.BeaconConfig().SecondsPerSlot/2) * time.Second
ctx, cancel := context.WithTimeout(ctx, secondsPerHalfSlot)
defer cancel()
digest, err := s.currentForkDigest()
if err != nil {
return nil, err
@@ -259,7 +253,7 @@ func (s *Service) processDataColumnSidecarsFromExecution(ctx context.Context, so
// Publish the partial column. This is idempotent if we republish the same data twice.
// Note, the "partial column" may indeed be complete. We still
// should publish to help our peers.
err = partialBroadcaster.Publish(topic, partialColumns[i])
err = partialBroadcaster.Publish(topic, partialColumns[i], true)
if err != nil {
log.WithError(err).Warn("Failed to publish partial column")
}

View File

@@ -0,0 +1,3 @@
### Added
- Create SSZ encoded container for parts metadata.

View File

@@ -38,6 +38,7 @@ go_library(
"//proto/prysm/v1alpha1:go_default_library",
"//proto/prysm/v1alpha1/validator-client:go_default_library",
"//runtime/version:go_default_library",
"@com_github_libp2p_go_libp2p//core/peer:go_default_library",
"@com_github_libp2p_go_libp2p_pubsub//partialmessages:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_prysmaticlabs_fastssz//:go_default_library",
@@ -54,7 +55,7 @@ go_test(
"factory_test.go",
"getters_test.go",
"kzg_test.go",
"partialdatacolumn_invariants_test.go",
"partialdatacolumn_test.go",
"proofs_test.go",
"proto_test.go",
"roblob_test.go",
@@ -80,10 +81,10 @@ go_test(
"//runtime/version:go_default_library",
"//testing/assert:go_default_library",
"//testing/require:go_default_library",
"//testing/util:go_default_library",
"@com_github_libp2p_go_libp2p//core/peer:go_default_library",
"@com_github_libp2p_go_libp2p_pubsub//partialmessages:go_default_library",
"@com_github_prysmaticlabs_fastssz//:go_default_library",
"@com_github_prysmaticlabs_go_bitfield//:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
],
)

View File

@@ -1,15 +1,22 @@
package blocks
import (
"bytes"
"errors"
"slices"
"github.com/OffchainLabs/go-bitfield"
fieldparams "github.com/OffchainLabs/prysm/v7/config/fieldparams"
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
"github.com/libp2p/go-libp2p-pubsub/partialmessages"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/sirupsen/logrus"
)
var _ partialmessages.Message = (*PartialDataColumn)(nil)
// CellProofBundle contains a cell, its proof, and the corresponding
// commitment/index information.
type CellProofBundle struct {
ColumnIndex uint64
Commitment []byte
@@ -17,21 +24,17 @@ type CellProofBundle struct {
Proof []byte
}
// PartialDataColumn is a partially populated DataColumnSidecar used for
// exchanging cells and metadata with peers.
type PartialDataColumn struct {
*ethpb.DataColumnSidecar
root [fieldparams.RootLength]byte
groupID []byte
Included bitfield.Bitlist
// Parts we've received before we have any commitments to validate against.
// Happens when a peer eager pushes to us.
// TODO implement. For now, not bothering to handle the eager pushes.
// quarantine []*ethpb.PartialDataColumnSidecar
Included bitfield.Bitlist
eagerPushSent map[peer.ID]struct{}
}
// const quarantineSize = 3
// NewPartialDataColumn creates a new Partial Data Column for the given block.
// It does not validate the inputs. The caller is responsible for validating the
// block header and KZG Commitment Inclusion proof.
@@ -65,14 +68,8 @@ func NewPartialDataColumn(
root: root,
groupID: groupID,
Included: bitfield.NewBitlist(uint64(len(sidecar.KzgCommitments))),
eagerPushSent: make(map[peer.ID]struct{}),
}
if len(c.Column) != len(c.KzgCommitments) {
return PartialDataColumn{}, errors.New("mismatch between number of cells and commitments")
}
if len(c.KzgProofs) != len(c.KzgCommitments) {
return PartialDataColumn{}, errors.New("mismatch between number of proofs and commitments")
}
for i := range len(c.KzgCommitments) {
if sidecar.Column[i] == nil {
continue
@@ -82,51 +79,117 @@ func NewPartialDataColumn(
return c, nil
}
// GroupID returns the libp2p partial-messages group identifier.
func (p *PartialDataColumn) GroupID() []byte {
return p.groupID
}
func (p *PartialDataColumn) PartialMessageBytes(metadata partialmessages.PartsMetadata) ([]byte, error) {
peerHas := bitfield.Bitlist(metadata)
if peerHas.Len() != p.Included.Len() {
return nil, errors.New("metadata length does not match expected length")
// ClearEagerPushSent resets the set of peers that have received the eager push
// header, allowing the header to be re-sent on the next ForPeer call.
func (p *PartialDataColumn) ClearEagerPushSent() {
p.eagerPushSent = make(map[peer.ID]struct{})
}
func (p *PartialDataColumn) newPartsMetadata() *ethpb.PartialDataColumnPartsMetadata {
n := uint64(len(p.KzgCommitments))
available := bitfield.NewBitlist(n)
for i := range n {
if p.Included.BitAt(i) {
available.SetBitAt(i, true)
}
}
requests := bitfield.NewBitlist(n)
for i := range n {
requests.SetBitAt(i, true)
}
return &ethpb.PartialDataColumnPartsMetadata{
Available: available,
Requests: requests,
}
}
// NewPartsMetaWithNoAvailableAndAllRequests creates metadata for n parts where
// all requests are set and no parts are marked as available.
func NewPartsMetaWithNoAvailableAndAllRequests(n uint64) *ethpb.PartialDataColumnPartsMetadata {
available := bitfield.NewBitlist(n)
requests := bitfield.NewBitlist(n)
for i := range n {
requests.SetBitAt(i, true)
}
return &ethpb.PartialDataColumnPartsMetadata{
Available: available,
Requests: requests,
}
}
func marshalPartsMetadata(meta *ethpb.PartialDataColumnPartsMetadata) (partialmessages.PartsMetadata, error) {
b, err := meta.MarshalSSZ()
if err != nil {
return nil, err
}
return partialmessages.PartsMetadata(b), nil
}
// NKzgCommitments returns the number of commitments in the column.
func (p *PartialDataColumn) NKzgCommitments() uint64 {
return p.Included.Len()
}
// ParsePartsMetadata SSZ-decodes bytes back to PartialDataColumnPartsMetadata.
func ParsePartsMetadata(pm partialmessages.PartsMetadata, expectedLength uint64) (*ethpb.PartialDataColumnPartsMetadata, error) {
meta := &ethpb.PartialDataColumnPartsMetadata{}
if err := meta.UnmarshalSSZ([]byte(pm)); err != nil {
return nil, err
}
if meta.Available.Len() != expectedLength || meta.Requests.Len() != expectedLength {
return nil, errors.New("invalid parts metadata length")
}
return meta, nil
}
func (p *PartialDataColumn) cellsToSendForPeer(peerMeta *ethpb.PartialDataColumnPartsMetadata) (encodedMsg []byte, cellsSent bitfield.Bitlist, err error) {
peerAvailable := bitfield.Bitlist(peerMeta.Available)
peerRequests := bitfield.Bitlist(peerMeta.Requests)
n := p.Included.Len()
if peerAvailable.Len() != n || peerRequests.Len() != n {
return nil, nil, errors.New("peer metadata bitmap length mismatch")
}
var cellsToReturn int
for i := range peerHas.Len() {
if !peerHas.BitAt(i) && p.Included.BitAt(i) {
for i := range n {
if p.Included.BitAt(i) && !peerAvailable.BitAt(i) && peerRequests.BitAt(i) {
cellsToReturn++
}
}
if cellsToReturn == 0 {
return nil, nil
return nil, nil, nil
}
included := bitfield.NewBitlist(p.Included.Len())
included := bitfield.NewBitlist(n)
outMessage := ethpb.PartialDataColumnSidecar{
CellsPresentBitmap: included,
PartialColumn: make([][]byte, 0, cellsToReturn),
KzgProofs: make([][]byte, 0, cellsToReturn),
PartialColumn: make([][]byte, 0, cellsToReturn),
KzgProofs: make([][]byte, 0, cellsToReturn),
}
for i := range peerHas.Len() {
if peerHas.BitAt(i) || !p.Included.BitAt(i) {
for i := range n {
if !p.Included.BitAt(i) || peerAvailable.BitAt(i) || !peerRequests.BitAt(i) {
continue
}
included.SetBitAt(i, true)
outMessage.PartialColumn = append(outMessage.PartialColumn, p.Column[i])
outMessage.KzgProofs = append(outMessage.KzgProofs, p.KzgProofs[i])
}
outMessage.CellsPresentBitmap = included
marshalled, err := outMessage.MarshalSSZ()
if err != nil {
return nil, err
return nil, nil, err
}
return marshalled, nil
return marshalled, included, nil
}
func (p *PartialDataColumn) EagerPartialMessageBytes() ([]byte, partialmessages.PartsMetadata, error) {
// TODO: do we want to send this once per groupID per peer
// Eagerly push the PartialDataColumnHeader
// eagerPushBytes builds SSZ-encoded PartialDataColumnSidecar with header only (no cells).
func (p *PartialDataColumn) eagerPushBytes() ([]byte, error) {
outHeader := &ethpb.PartialDataColumnHeader{
KzgCommitments: p.KzgCommitments,
SignedBlockHeader: p.SignedBlockHeader,
@@ -136,19 +199,116 @@ func (p *PartialDataColumn) EagerPartialMessageBytes() ([]byte, partialmessages.
CellsPresentBitmap: bitfield.NewBitlist(uint64(len(p.KzgCommitments))),
Header: []*ethpb.PartialDataColumnHeader{outHeader},
}
marshalled, err := outMessage.MarshalSSZ()
if err != nil {
return nil, nil, err
}
// Empty bitlist since we aren't including any cells here
peersNextParts := partialmessages.PartsMetadata(bitfield.NewBitlist(uint64(len(p.KzgCommitments))))
return marshalled, peersNextParts, nil
return outMessage.MarshalSSZ()
}
func (p *PartialDataColumn) PartsMetadata() partialmessages.PartsMetadata {
return partialmessages.PartsMetadata(p.Included)
// PartsMetadata returns SSZ-encoded PartialDataColumnPartsMetadata.
func (p *PartialDataColumn) PartsMetadata() (partialmessages.PartsMetadata, error) {
meta := p.newPartsMetadata()
return marshalPartsMetadata(meta)
}
// MergeAvailableIntoPartsMetadata merges additional availability into base and
// returns the marshaled metadata.
func MergeAvailableIntoPartsMetadata(base *ethpb.PartialDataColumnPartsMetadata, additionalAvailable bitfield.Bitlist) (partialmessages.PartsMetadata, error) {
if base == nil {
return nil, errors.New("base is nil")
}
if base.Available.Len() != additionalAvailable.Len() {
return nil, errors.New("available length mismatch")
}
if base.Requests.Len() != additionalAvailable.Len() {
return nil, errors.New("requests length mismatch")
}
merged, err := bitfield.Bitlist(base.Available).Or(additionalAvailable)
if err != nil {
return nil, err
}
base.Available = merged
return marshalPartsMetadata(base)
}
// merge available, keep request unchanged, if my parts are different, simply over write with myparts
func (p *PartialDataColumn) updateReceivedStateOutgoing(receivedMeta partialmessages.PartsMetadata, cellsSent bitfield.Bitlist) (partialmessages.PartsMetadata, error) {
if receivedMeta == nil || len(receivedMeta) == 0 {
return nil, errors.New("recievedMeta is nil")
}
peerMeta, err := ParsePartsMetadata(receivedMeta, p.Included.Len())
if err != nil {
return nil, err
}
return MergeAvailableIntoPartsMetadata(peerMeta, cellsSent)
}
// ForPeer implements partialmessages.Message.
func (p *PartialDataColumn) ForPeer(remote peer.ID, requestedMessage bool, peerState partialmessages.PeerState) (partialmessages.PeerState, []byte, partialmessages.PartsMetadata, error) {
// Eager push - peer has never been seen (RecvdState nil) and message requested.
// Only send the header once per peer.
if requestedMessage && peerState.RecvdState == nil {
if _, sent := p.eagerPushSent[remote]; !sent {
p.eagerPushSent[remote] = struct{}{}
encoded, err := p.eagerPushBytes()
if err != nil {
return peerState, nil, nil, err
}
return peerState, encoded, nil, nil
}
return peerState, nil, nil, nil
}
var encodedMsg []byte
var cellsSent bitfield.Bitlist
var sentMeta partialmessages.PartsMetadata
var recvdMeta partialmessages.PartsMetadata
if peerState.SentState != nil {
var ok bool
sentMeta, ok = peerState.SentState.(partialmessages.PartsMetadata)
if !ok {
return peerState, nil, nil, errors.New("SentState is not PartsMetadata")
}
}
if peerState.RecvdState != nil {
var ok bool
recvdMeta, ok = peerState.RecvdState.(partialmessages.PartsMetadata)
if !ok {
return peerState, nil, nil, errors.New("RecvdState is not PartsMetadata")
}
}
// Normal - message requested and we have RecvdState.
if requestedMessage && peerState.RecvdState != nil {
peerMeta, err := ParsePartsMetadata(recvdMeta, p.Included.Len())
if err != nil {
return peerState, nil, nil, err
}
encodedMsg, cellsSent, err = p.cellsToSendForPeer(peerMeta)
if err != nil {
return peerState, nil, nil, err
}
if cellsSent != nil && cellsSent.Count() != 0 {
newRecvd, err := p.updateReceivedStateOutgoing(recvdMeta, cellsSent)
if err != nil {
return peerState, nil, nil, err
}
peerState.RecvdState = newRecvd
}
}
// Check if we need to send partsMetadata.
var partsMetadataToSend partialmessages.PartsMetadata
myPartsMetadata, err := p.PartsMetadata()
if err != nil {
return peerState, nil, nil, err
}
if !bytes.Equal(myPartsMetadata, sentMeta) {
partsMetadataToSend = myPartsMetadata
sentMeta = partialmessages.PartsMetadata(slices.Clone([]byte(myPartsMetadata)))
peerState.SentState = sentMeta
}
return peerState, encodedMsg, partsMetadataToSend, nil
}
// CellsToVerifyFromPartialMessage returns cells from the partial message that need to be verified.
@@ -169,7 +329,7 @@ func (p *PartialDataColumn) CellsToVerifyFromPartialMessage(message *ethpb.Parti
ourIncludedList := p.Included
if included.Len() != ourIncludedList.Len() {
return nil, nil, errors.New("invalid message. Wrong bitmap length.")
return nil, nil, errors.New("invalid message: wrong bitmap length")
}
cellIndices := make([]uint64, 0, includedCells)
@@ -200,8 +360,8 @@ func (p *PartialDataColumn) CellsToVerifyFromPartialMessage(message *ethpb.Parti
return cellIndices, cellsToVerify, nil
}
// ExtendFromVerfifiedCells will extend this partial column with the provided verified cells
func (p *PartialDataColumn) ExtendFromVerfifiedCell(cellIndex uint64, cell, proof []byte) bool {
// ExtendFromVerifiedCell extends this partial column with one verified cell.
func (p *PartialDataColumn) ExtendFromVerifiedCell(cellIndex uint64, cell, proof []byte) bool {
if p.Included.BitAt(cellIndex) {
// We already have this cell
return false
@@ -213,21 +373,22 @@ func (p *PartialDataColumn) ExtendFromVerfifiedCell(cellIndex uint64, cell, proo
return true
}
// ExtendFromVerfifiedCells will extend this partial column with the provided verified cells
func (p *PartialDataColumn) ExtendFromVerfifiedCells(cellIndices []uint64, cells []CellProofBundle) /* extended */ bool {
// ExtendFromVerifiedCells extends this partial column with the provided verified cells.
func (p *PartialDataColumn) ExtendFromVerifiedCells(cellIndices []uint64, cells []CellProofBundle) /* extended */ bool {
var extended bool
for i, bundle := range cells {
if bundle.ColumnIndex != p.Index {
// Invalid column index, shouldn't happen
return false
}
if p.ExtendFromVerfifiedCell(cellIndices[i], bundle.Cell, bundle.Proof) {
if p.ExtendFromVerifiedCell(cellIndices[i], bundle.Cell, bundle.Proof) {
extended = true
}
}
return extended
}
// Complete returns a verified read-only column if all cells are present.
func (p *PartialDataColumn) Complete(logger *logrus.Logger) (VerifiedRODataColumn, bool) {
if uint64(len(p.KzgCommitments)) != p.Included.Count() {
return VerifiedRODataColumn{}, false

View File

@@ -1,160 +0,0 @@
package blocks_test
import (
"bytes"
"fmt"
"testing"
"github.com/OffchainLabs/go-bitfield"
"github.com/OffchainLabs/prysm/v7/consensus-types/blocks"
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v7/testing/util"
"github.com/libp2p/go-libp2p-pubsub/partialmessages"
"github.com/libp2p/go-libp2p/core/peer"
)
type invariantChecker struct {
t *testing.T
}
var _ partialmessages.InvariantChecker[*blocks.PartialDataColumn] = (*invariantChecker)(nil)
func (i *invariantChecker) MergePartsMetadata(left, right partialmessages.PartsMetadata) partialmessages.PartsMetadata {
return partialmessages.MergeBitmap(left, right)
}
func (i *invariantChecker) SplitIntoParts(in *blocks.PartialDataColumn) ([]*blocks.PartialDataColumn, error) {
var parts []*blocks.PartialDataColumn
for idx := range in.Column {
if !in.Included.BitAt(uint64(idx)) {
continue
}
msg := i.EmptyMessage()
msg.Included.SetBitAt(uint64(idx), true)
msg.KzgCommitments = in.KzgCommitments
msg.Column[idx] = in.Column[idx]
msg.KzgProofs[idx] = in.KzgProofs[idx]
parts = append(parts, msg)
}
return parts, nil
}
func (i *invariantChecker) FullMessage() (*blocks.PartialDataColumn, error) {
blockRoot := []byte("test-block-root")
numCells := 128
commitments := make([][]byte, numCells)
cells := make([][]byte, numCells)
proofs := make([][]byte, numCells)
for i := range numCells {
for j := range commitments[i] {
commitments[i][j] = byte(i)
}
cells[i] = make([]byte, 2048)
cells[i] = fmt.Appendf(cells[i][:0], "cell %d", i)
proofs[i] = make([]byte, 48)
proofs[i] = fmt.Appendf(proofs[i][:0], "proof %d", i)
}
roDC, _ := util.CreateTestVerifiedRoDataColumnSidecars(i.t, []util.DataColumnParam{
{
BodyRoot: blockRoot,
KzgCommitments: commitments,
Column: cells,
KzgProofs: proofs,
},
})
c, err := blocks.NewPartialDataColumn(roDC[0].DataColumnSidecar.SignedBlockHeader, roDC[0].Index, roDC[0].KzgCommitments, roDC[0].KzgCommitmentsInclusionProof)
return &c, err
}
func (i *invariantChecker) EmptyMessage() *blocks.PartialDataColumn {
blockRoot := []byte("test-block-root")
numCells := 128
commitments := make([][]byte, numCells)
cells := make([][]byte, numCells)
proofs := make([][]byte, numCells)
roDC, _ := util.CreateTestVerifiedRoDataColumnSidecars(i.t, []util.DataColumnParam{
{
BodyRoot: blockRoot,
KzgCommitments: commitments,
Column: cells,
KzgProofs: proofs,
},
})
for i := range roDC[0].Column {
// Clear these fields since this is an empty message
roDC[0].Column[i] = nil
roDC[0].KzgProofs[i] = nil
}
pc, err := blocks.NewPartialDataColumn(roDC[0].DataColumnSidecar.SignedBlockHeader, roDC[0].Index, roDC[0].KzgCommitments, roDC[0].KzgCommitmentsInclusionProof)
if err != nil {
panic(err)
}
return &pc
}
func (i *invariantChecker) ExtendFromBytes(a *blocks.PartialDataColumn, data []byte) (*blocks.PartialDataColumn, error) {
var message ethpb.PartialDataColumnSidecar
err := message.UnmarshalSSZ(data)
if err != nil {
return nil, err
}
cellIndices, bundle, err := a.CellsToVerifyFromPartialMessage(&message)
if err != nil {
return nil, err
}
// No validation happening here. Copy-pasters beware!
_ = a.ExtendFromVerfifiedCells(cellIndices, bundle)
return a, nil
}
func (i *invariantChecker) ShouldRequest(a *blocks.PartialDataColumn, from peer.ID, partsMetadata []byte) bool {
peerHas := bitfield.Bitlist(partsMetadata)
for i := range peerHas.Len() {
if peerHas.BitAt(i) && !a.Included.BitAt(i) {
return true
}
}
return false
}
func (i *invariantChecker) Equal(a, b *blocks.PartialDataColumn) bool {
if !bytes.Equal(a.GroupID(), b.GroupID()) {
return false
}
if !bytes.Equal(a.Included, b.Included) {
return false
}
if len(a.KzgCommitments) != len(b.KzgCommitments) {
return false
}
for i := range a.KzgCommitments {
if !bytes.Equal(a.KzgCommitments[i], b.KzgCommitments[i]) {
return false
}
}
if len(a.Column) != len(b.Column) {
return false
}
for i := range a.Column {
if !bytes.Equal(a.Column[i], b.Column[i]) {
return false
}
}
if len(a.KzgProofs) != len(b.KzgProofs) {
return false
}
for i := range a.KzgProofs {
if !bytes.Equal(a.KzgProofs[i], b.KzgProofs[i]) {
return false
}
}
return true
}
func TestDataColumnInvariants(t *testing.T) {
partialmessages.TestPartialMessageInvariants(t, &invariantChecker{t})
}

View File

@@ -0,0 +1,988 @@
package blocks
import (
"testing"
"github.com/OffchainLabs/go-bitfield"
fieldparams "github.com/OffchainLabs/prysm/v7/config/fieldparams"
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v7/testing/require"
"github.com/libp2p/go-libp2p-pubsub/partialmessages"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/sirupsen/logrus"
)
func testSignedHeader(validRoots bool, sigLen int) *ethpb.SignedBeaconBlockHeader {
parentRoot := make([]byte, fieldparams.RootLength)
stateRoot := make([]byte, fieldparams.RootLength)
bodyRoot := make([]byte, fieldparams.RootLength)
if !validRoots {
parentRoot = []byte{1}
}
return &ethpb.SignedBeaconBlockHeader{
Header: &ethpb.BeaconBlockHeader{
ParentRoot: parentRoot,
StateRoot: stateRoot,
BodyRoot: bodyRoot,
},
Signature: make([]byte, sigLen),
}
}
func sizedSlices(n, size int, start byte) [][]byte {
out := make([][]byte, n)
for i := range n {
b := make([]byte, size)
for j := range b {
b[j] = start + byte(i)
}
out[i] = b
}
return out
}
func testBitlist(n uint64, set ...uint64) bitfield.Bitlist {
bl := bitfield.NewBitlist(n)
for _, idx := range set {
bl.SetBitAt(idx, true)
}
return bl
}
func testPeerMeta(n uint64, available, requests []uint64) *ethpb.PartialDataColumnPartsMetadata {
return &ethpb.PartialDataColumnPartsMetadata{
Available: testBitlist(n, available...),
Requests: testBitlist(n, requests...),
}
}
func allSet(n uint64) []uint64 {
out := make([]uint64, n)
for i := range n {
out[i] = i
}
return out
}
func mustMarshalMeta(t *testing.T, meta *ethpb.PartialDataColumnPartsMetadata) partialmessages.PartsMetadata {
t.Helper()
out, err := marshalPartsMetadata(meta)
require.NoError(t, err)
return out
}
func mustNewPartialColumnWithSigLen(t *testing.T, n int, sigLen int, included ...uint64) *PartialDataColumn {
t.Helper()
pdc, err := NewPartialDataColumn(
testSignedHeader(true, sigLen),
7,
sizedSlices(n, 48, 1),
sizedSlices(4, 32, 90),
)
require.NoError(t, err)
for _, idx := range included {
pdc.Included.SetBitAt(idx, true)
pdc.Column[idx] = sizedSlices(1, 2048, byte(idx+1))[0]
pdc.KzgProofs[idx] = sizedSlices(1, 48, byte(idx+11))[0]
}
return &pdc
}
func mustNewPartialColumn(t *testing.T, n int, included ...uint64) *PartialDataColumn {
t.Helper()
return mustNewPartialColumnWithSigLen(t, n, fieldparams.BLSSignatureLength, included...)
}
func mustDecodeSidecar(t *testing.T, encoded []byte) *ethpb.PartialDataColumnSidecar {
t.Helper()
var msg ethpb.PartialDataColumnSidecar
require.NoError(t, msg.UnmarshalSSZ(encoded))
return &msg
}
func TestNewPartialDataColumn(t *testing.T) {
tests := []struct {
name string
header *ethpb.SignedBeaconBlockHeader
commitments [][]byte
inclusion [][]byte
wantErr string
}{
{
name: "nominal empty commitments",
header: testSignedHeader(true, fieldparams.BLSSignatureLength),
commitments: nil,
inclusion: sizedSlices(4, 32, 10),
},
{
name: "nominal with commitments",
header: testSignedHeader(true, fieldparams.BLSSignatureLength),
commitments: sizedSlices(3, 48, 10),
inclusion: sizedSlices(4, 32, 10),
},
{
name: "header hash tree root error",
header: testSignedHeader(false, fieldparams.BLSSignatureLength),
commitments: sizedSlices(2, 48, 10),
inclusion: sizedSlices(4, 32, 10),
wantErr: "ParentRoot",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
pdc, err := NewPartialDataColumn(tt.header, 11, tt.commitments, tt.inclusion)
if tt.wantErr != "" {
require.ErrorContains(t, tt.wantErr, err)
return
}
require.NoError(t, err)
require.Equal(t, uint64(11), pdc.Index)
require.Equal(t, len(tt.commitments), len(pdc.Column))
require.Equal(t, len(tt.commitments), len(pdc.KzgProofs))
require.Equal(t, uint64(len(tt.commitments)), pdc.Included.Len())
require.Equal(t, uint64(0), pdc.Included.Count())
require.Equal(t, fieldparams.RootLength+1, len(pdc.groupID))
require.Equal(t, byte(0), pdc.groupID[0])
root, rootErr := tt.header.Header.HashTreeRoot()
require.NoError(t, rootErr)
require.DeepEqual(t, root[:], pdc.groupID[1:])
})
}
}
func TestPartialDataColumn_ClearEagerPushSent(t *testing.T) {
tests := []struct {
name string
m map[peer.ID]struct{}
}{
{name: "already nil", m: nil},
{name: "non-empty map", m: map[peer.ID]struct{}{peer.ID("p1"): {}}},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
p := &PartialDataColumn{eagerPushSent: tt.m}
p.ClearEagerPushSent()
require.NotNil(t, p.eagerPushSent)
require.Equal(t, 0, len(p.eagerPushSent))
})
}
}
func TestPartialDataColumn_newPartsMetadata(t *testing.T) {
tests := []struct {
name string
n int
includedBits []uint64
expectedAvail []uint64
}{
{name: "none included", n: 4, includedBits: nil, expectedAvail: nil},
{name: "sparse included", n: 5, includedBits: []uint64{1, 4}, expectedAvail: []uint64{1, 4}},
{name: "all included", n: 3, includedBits: []uint64{0, 1, 2}, expectedAvail: []uint64{0, 1, 2}},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
p := mustNewPartialColumn(t, tt.n, tt.includedBits...)
meta := p.newPartsMetadata()
require.Equal(t, uint64(tt.n), bitfield.Bitlist(meta.Available).Len())
require.Equal(t, uint64(tt.n), bitfield.Bitlist(meta.Requests).Len())
expected := testBitlist(uint64(tt.n), tt.expectedAvail...)
require.DeepEqual(t, []byte(expected), []byte(meta.Available))
for i := uint64(0); i < uint64(tt.n); i++ {
require.Equal(t, true, bitfield.Bitlist(meta.Requests).BitAt(i))
}
})
}
}
func TestNewPartsMetaWithNoAvailableAndAllRequests(t *testing.T) {
tests := []struct {
name string
n uint64
}{
{name: "zero", n: 0},
{name: "non-zero", n: 6},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
meta := NewPartsMetaWithNoAvailableAndAllRequests(tt.n)
require.Equal(t, tt.n, bitfield.Bitlist(meta.Available).Len())
require.Equal(t, uint64(0), bitfield.Bitlist(meta.Available).Count())
require.Equal(t, tt.n, bitfield.Bitlist(meta.Requests).Len())
for i := uint64(0); i < tt.n; i++ {
require.Equal(t, true, bitfield.Bitlist(meta.Requests).BitAt(i))
}
})
}
}
func TestMarshalPartsMetadata(t *testing.T) {
tests := []struct {
name string
meta *ethpb.PartialDataColumnPartsMetadata
wantErr string
}{
{
name: "valid",
meta: &ethpb.PartialDataColumnPartsMetadata{
Available: testBitlist(4, 1),
Requests: testBitlist(4, allSet(4)...),
},
},
{
name: "available too large",
meta: &ethpb.PartialDataColumnPartsMetadata{
Available: bitfield.NewBitlist(4096),
Requests: bitfield.NewBitlist(1),
},
wantErr: "Available",
},
{
name: "requests too large",
meta: &ethpb.PartialDataColumnPartsMetadata{
Available: bitfield.NewBitlist(1),
Requests: bitfield.NewBitlist(4096),
},
wantErr: "Requests",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
out, err := marshalPartsMetadata(tt.meta)
if tt.wantErr != "" {
require.ErrorContains(t, tt.wantErr, err)
return
}
require.NoError(t, err)
parsed, parseErr := ParsePartsMetadata(out, 4)
require.NoError(t, parseErr)
require.Equal(t, uint64(4), bitfield.Bitlist(parsed.Available).Len())
})
}
}
func TestParsePartsMetadata(t *testing.T) {
validMeta := mustMarshalMeta(t, &ethpb.PartialDataColumnPartsMetadata{
Available: testBitlist(4, 1),
Requests: testBitlist(4, allSet(4)...),
})
requestMismatchMeta := mustMarshalMeta(t, &ethpb.PartialDataColumnPartsMetadata{
Available: bitfield.NewBitlist(4),
Requests: bitfield.NewBitlist(3),
})
tests := []struct {
name string
pm partialmessages.PartsMetadata
expectedLength uint64
wantErr string
}{
{
name: "valid",
pm: validMeta,
expectedLength: 4,
},
{
name: "invalid ssz",
pm: partialmessages.PartsMetadata{1, 2, 3},
expectedLength: 4,
wantErr: "size",
},
{
name: "available length mismatch",
pm: validMeta,
expectedLength: 3,
wantErr: "invalid parts metadata length",
},
{
name: "requests length mismatch",
pm: requestMismatchMeta,
expectedLength: 4,
wantErr: "invalid parts metadata length",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
meta, err := ParsePartsMetadata(tt.pm, tt.expectedLength)
if tt.wantErr != "" {
require.ErrorContains(t, tt.wantErr, err)
return
}
require.NoError(t, err)
require.Equal(t, tt.expectedLength, bitfield.Bitlist(meta.Available).Len())
require.Equal(t, tt.expectedLength, bitfield.Bitlist(meta.Requests).Len())
})
}
}
func TestPartialDataColumn_PartsMetadata(t *testing.T) {
tests := []struct {
name string
p *PartialDataColumn
expectedN uint64
expectErr string
availCount uint64
}{
{
name: "nominal",
p: mustNewPartialColumn(t, 4, 1, 2),
expectedN: 4,
availCount: 2,
},
{
name: "marshal error due max bitlist size",
p: &PartialDataColumn{
DataColumnSidecar: &ethpb.DataColumnSidecar{
KzgCommitments: make([][]byte, 4096),
},
Included: bitfield.NewBitlist(4096),
},
expectErr: "Available",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
meta, err := tt.p.PartsMetadata()
if tt.expectErr != "" {
require.ErrorContains(t, tt.expectErr, err)
return
}
require.NoError(t, err)
parsed, parseErr := ParsePartsMetadata(meta, tt.expectedN)
require.NoError(t, parseErr)
require.Equal(t, tt.availCount, bitfield.Bitlist(parsed.Available).Count())
require.Equal(t, tt.expectedN, bitfield.Bitlist(parsed.Requests).Count())
})
}
}
func TestPartialDataColumn_cellsToSendForPeer(t *testing.T) {
tests := []struct {
name string
run func(t *testing.T)
}{
{
name: "metadata length mismatch",
run: func(t *testing.T) {
p := mustNewPartialColumn(t, 4, 0)
_, _, err := p.cellsToSendForPeer(testPeerMeta(3, nil, allSet(3)))
require.ErrorContains(t, "peer metadata bitmap length mismatch", err)
},
},
{
name: "no cells to send",
run: func(t *testing.T) {
p := mustNewPartialColumn(t, 3, 1)
encoded, sent, err := p.cellsToSendForPeer(testPeerMeta(3, []uint64{1}, allSet(3)))
require.NoError(t, err)
require.IsNil(t, encoded)
require.IsNil(t, sent)
},
},
{
name: "sends only requested and missing cells",
run: func(t *testing.T) {
p := mustNewPartialColumn(t, 4, 0, 1, 3)
encoded, sent, err := p.cellsToSendForPeer(testPeerMeta(4, []uint64{1}, allSet(4)))
require.NoError(t, err)
require.NotNil(t, encoded)
require.NotNil(t, sent)
require.Equal(t, true, sent.BitAt(0))
require.Equal(t, false, sent.BitAt(1))
require.Equal(t, true, sent.BitAt(3))
msg := mustDecodeSidecar(t, encoded)
require.Equal(t, 2, len(msg.PartialColumn))
require.Equal(t, 2, len(msg.KzgProofs))
require.Equal(t, true, bitfield.Bitlist(msg.CellsPresentBitmap).BitAt(0))
require.Equal(t, false, bitfield.Bitlist(msg.CellsPresentBitmap).BitAt(1))
require.Equal(t, true, bitfield.Bitlist(msg.CellsPresentBitmap).BitAt(3))
},
},
{
name: "marshal fails with invalid cell length",
run: func(t *testing.T) {
p := mustNewPartialColumn(t, 1, 0)
p.Column[0] = []byte{1}
_, _, err := p.cellsToSendForPeer(testPeerMeta(1, nil, []uint64{0}))
require.ErrorContains(t, "PartialColumn", err)
},
},
{
name: "marshal fails with invalid proof length",
run: func(t *testing.T) {
p := mustNewPartialColumn(t, 1, 0)
p.KzgProofs[0] = []byte{1}
_, _, err := p.cellsToSendForPeer(testPeerMeta(1, nil, []uint64{0}))
require.ErrorContains(t, "KzgProofs", err)
},
},
}
for _, tt := range tests {
t.Run(tt.name, tt.run)
}
}
func TestPartialDataColumn_eagerPushBytes(t *testing.T) {
tests := []struct {
name string
run func(t *testing.T)
}{
{
name: "nominal",
run: func(t *testing.T) {
p := mustNewPartialColumn(t, 3, 0)
encoded, err := p.eagerPushBytes()
require.NoError(t, err)
msg := mustDecodeSidecar(t, encoded)
require.Equal(t, 1, len(msg.Header))
require.Equal(t, 0, len(msg.PartialColumn))
require.Equal(t, 0, len(msg.KzgProofs))
require.Equal(t, uint64(3), bitfield.Bitlist(msg.CellsPresentBitmap).Len())
require.Equal(t, uint64(0), bitfield.Bitlist(msg.CellsPresentBitmap).Count())
},
},
{
name: "invalid commitment size",
run: func(t *testing.T) {
p := mustNewPartialColumn(t, 2, 0)
p.KzgCommitments[0] = []byte{1}
_, err := p.eagerPushBytes()
require.ErrorContains(t, "KzgCommitments", err)
},
},
{
name: "invalid inclusion proof vector length",
run: func(t *testing.T) {
p := mustNewPartialColumn(t, 2, 0)
p.KzgCommitmentsInclusionProof = p.KzgCommitmentsInclusionProof[:3]
_, err := p.eagerPushBytes()
require.ErrorContains(t, "KzgCommitmentsInclusionProof", err)
},
},
}
for _, tt := range tests {
t.Run(tt.name, tt.run)
}
}
func TestMergeAvailableIntoPartsMetadata(t *testing.T) {
tests := []struct {
name string
base *ethpb.PartialDataColumnPartsMetadata
add bitfield.Bitlist
expectErr string
}{
{
name: "nil base",
base: nil,
add: bitfield.NewBitlist(2),
expectErr: "base is nil",
},
{
name: "available length mismatch",
base: &ethpb.PartialDataColumnPartsMetadata{
Available: bitfield.NewBitlist(4),
Requests: bitfield.NewBitlist(4),
},
add: bitfield.NewBitlist(3),
expectErr: "available length mismatch",
},
{
name: "requests length mismatch",
base: &ethpb.PartialDataColumnPartsMetadata{
Available: bitfield.NewBitlist(4),
Requests: bitfield.NewBitlist(3),
},
add: bitfield.NewBitlist(4),
expectErr: "requests length mismatch",
},
{
name: "successfully merges",
base: &ethpb.PartialDataColumnPartsMetadata{
Available: testBitlist(4, 1),
Requests: testBitlist(4, allSet(4)...),
},
add: testBitlist(4, 2),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
out, err := MergeAvailableIntoPartsMetadata(tt.base, tt.add)
if tt.expectErr != "" {
require.ErrorContains(t, tt.expectErr, err)
return
}
require.NoError(t, err)
meta, parseErr := ParsePartsMetadata(out, tt.add.Len())
require.NoError(t, parseErr)
require.Equal(t, false, bitfield.Bitlist(meta.Available).BitAt(0))
require.Equal(t, true, bitfield.Bitlist(meta.Available).BitAt(1))
require.Equal(t, true, bitfield.Bitlist(meta.Available).BitAt(2))
require.Equal(t, false, bitfield.Bitlist(meta.Available).BitAt(3))
// Verify that MergeAvailableIntoPartsMetadata mutates its base argument.
require.Equal(t, true, bitfield.Bitlist(tt.base.Available).BitAt(2))
})
}
}
func TestPartialDataColumn_updateReceivedStateOutgoing(t *testing.T) {
tests := []struct {
name string
run func(t *testing.T, p *PartialDataColumn)
}{
{
name: "nil receivedMeta",
run: func(t *testing.T, p *PartialDataColumn) {
_, err := p.updateReceivedStateOutgoing(nil, testBitlist(4, 1))
require.ErrorContains(t, "recievedMeta is nil", err)
},
},
{
name: "invalid receivedMeta parse",
run: func(t *testing.T, p *PartialDataColumn) {
_, err := p.updateReceivedStateOutgoing(partialmessages.PartsMetadata{1, 2}, testBitlist(4, 1))
require.NotNil(t, err)
},
},
{
name: "cellsSent length mismatch",
run: func(t *testing.T, p *PartialDataColumn) {
recvd := mustMarshalMeta(t, NewPartsMetaWithNoAvailableAndAllRequests(4))
_, err := p.updateReceivedStateOutgoing(recvd, testBitlist(3, 1))
require.ErrorContains(t, "available length mismatch", err)
},
},
{
name: "success",
run: func(t *testing.T, p *PartialDataColumn) {
recvd := mustMarshalMeta(t, &ethpb.PartialDataColumnPartsMetadata{
Available: testBitlist(4, 1),
Requests: testBitlist(4, allSet(4)...),
})
out, err := p.updateReceivedStateOutgoing(recvd, testBitlist(4, 3))
require.NoError(t, err)
parsed, parseErr := ParsePartsMetadata(out, 4)
require.NoError(t, parseErr)
require.Equal(t, true, bitfield.Bitlist(parsed.Available).BitAt(1))
require.Equal(t, true, bitfield.Bitlist(parsed.Available).BitAt(3))
require.Equal(t, uint64(4), bitfield.Bitlist(parsed.Requests).Count())
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
p := mustNewPartialColumn(t, 4, 0, 1)
tt.run(t, p)
})
}
}
func TestPartialDataColumn_ForPeer(t *testing.T) {
tests := []struct {
name string
run func(t *testing.T)
}{
{
name: "eager push first time for peer",
run: func(t *testing.T) {
p := mustNewPartialColumn(t, 2, 0)
initial := partialmessages.PeerState{}
next, encoded, meta, err := p.ForPeer(peer.ID("peer-a"), true, initial)
require.NoError(t, err)
require.NotNil(t, encoded)
require.IsNil(t, meta)
require.IsNil(t, next.RecvdState)
require.IsNil(t, next.SentState)
_, seen := p.eagerPushSent[peer.ID("peer-a")]
require.Equal(t, true, seen)
decoded := mustDecodeSidecar(t, encoded)
require.Equal(t, 1, len(decoded.Header))
require.Equal(t, 0, len(decoded.PartialColumn))
},
},
{
name: "eager push not repeated",
run: func(t *testing.T) {
p := mustNewPartialColumn(t, 2, 0)
_, _, _, err := p.ForPeer(peer.ID("peer-a"), true, partialmessages.PeerState{})
require.NoError(t, err)
next, encoded, meta, err := p.ForPeer(peer.ID("peer-a"), true, partialmessages.PeerState{})
require.NoError(t, err)
require.IsNil(t, encoded)
require.IsNil(t, meta)
require.IsNil(t, next.RecvdState)
require.IsNil(t, next.SentState)
},
},
{
name: "requested false sends only parts metadata",
run: func(t *testing.T) {
p := mustNewPartialColumn(t, 3, 0)
next, encoded, meta, err := p.ForPeer(peer.ID("peer-a"), false, partialmessages.PeerState{})
require.NoError(t, err)
require.IsNil(t, encoded)
require.NotNil(t, meta)
require.NotNil(t, next.SentState)
_, ok := next.SentState.(partialmessages.PartsMetadata)
require.Equal(t, true, ok)
},
},
{
name: "invalid SentState type",
run: func(t *testing.T) {
p := mustNewPartialColumn(t, 3, 0)
_, _, _, err := p.ForPeer(peer.ID("peer-a"), false, partialmessages.PeerState{SentState: "bad"})
require.ErrorContains(t, "SentState is not PartsMetadata", err)
},
},
{
name: "invalid RecvdState type",
run: func(t *testing.T) {
p := mustNewPartialColumn(t, 3, 0)
_, _, _, err := p.ForPeer(peer.ID("peer-a"), false, partialmessages.PeerState{RecvdState: "bad"})
require.ErrorContains(t, "RecvdState is not PartsMetadata", err)
},
},
{
name: "invalid received metadata parse",
run: func(t *testing.T) {
p := mustNewPartialColumn(t, 3, 0)
_, _, _, err := p.ForPeer(peer.ID("peer-a"), true, partialmessages.PeerState{
RecvdState: partialmessages.PartsMetadata{1, 2},
})
require.ErrorContains(t, "size", err)
},
},
{
name: "requested true sends missing cells and updates recvd state",
run: func(t *testing.T) {
p := mustNewPartialColumn(t, 4, 0, 2)
initialMeta := mustMarshalMeta(t, NewPartsMetaWithNoAvailableAndAllRequests(4))
next, encoded, meta, err := p.ForPeer(peer.ID("peer-a"), true, partialmessages.PeerState{
RecvdState: initialMeta,
})
require.NoError(t, err)
require.NotNil(t, encoded)
require.NotNil(t, meta)
sentMeta, parseSentErr := ParsePartsMetadata(meta, 4)
require.NoError(t, parseSentErr)
require.Equal(t, uint64(2), bitfield.Bitlist(sentMeta.Available).Count())
require.Equal(t, true, bitfield.Bitlist(sentMeta.Available).BitAt(0))
require.Equal(t, true, bitfield.Bitlist(sentMeta.Available).BitAt(2))
require.Equal(t, uint64(4), bitfield.Bitlist(sentMeta.Requests).Count())
sentState, ok := next.SentState.(partialmessages.PartsMetadata)
require.Equal(t, true, ok)
require.DeepEqual(t, []byte(meta), []byte(sentState))
msg := mustDecodeSidecar(t, encoded)
require.Equal(t, 2, len(msg.PartialColumn))
require.Equal(t, true, bitfield.Bitlist(msg.CellsPresentBitmap).BitAt(0))
require.Equal(t, true, bitfield.Bitlist(msg.CellsPresentBitmap).BitAt(2))
recvdOut, parseErr := ParsePartsMetadata(next.RecvdState.(partialmessages.PartsMetadata), 4)
require.NoError(t, parseErr)
require.Equal(t, true, bitfield.Bitlist(recvdOut.Available).BitAt(0))
require.Equal(t, true, bitfield.Bitlist(recvdOut.Available).BitAt(2))
},
},
{
name: "requested true with no missing cells",
run: func(t *testing.T) {
p := mustNewPartialColumn(t, 3, 1)
recvd := mustMarshalMeta(t, &ethpb.PartialDataColumnPartsMetadata{
Available: testBitlist(3, 1),
Requests: testBitlist(3, allSet(3)...),
})
next, encoded, _, err := p.ForPeer(peer.ID("peer-a"), true, partialmessages.PeerState{
RecvdState: recvd,
})
require.NoError(t, err)
require.IsNil(t, encoded)
require.DeepEqual(t, []byte(recvd), []byte(next.RecvdState.(partialmessages.PartsMetadata)))
},
},
{
name: "does not resend unchanged metadata",
run: func(t *testing.T) {
p := mustNewPartialColumn(t, 3, 1)
myMeta, err := p.PartsMetadata()
require.NoError(t, err)
next, encoded, meta, err := p.ForPeer(peer.ID("peer-a"), false, partialmessages.PeerState{
SentState: myMeta,
})
require.NoError(t, err)
require.IsNil(t, encoded)
require.IsNil(t, meta)
require.DeepEqual(t, []byte(myMeta), []byte(next.SentState.(partialmessages.PartsMetadata)))
},
},
}
for _, tt := range tests {
t.Run(tt.name, tt.run)
}
}
func TestPartialDataColumn_CellsToVerifyFromPartialMessage(t *testing.T) {
tests := []struct {
name string
run func(t *testing.T)
}{
{
name: "empty bitmap",
run: func(t *testing.T) {
p := mustNewPartialColumn(t, 3, 0)
indices, bundles, err := p.CellsToVerifyFromPartialMessage(&ethpb.PartialDataColumnSidecar{
CellsPresentBitmap: bitfield.NewBitlist(0),
})
require.NoError(t, err)
require.IsNil(t, indices)
require.IsNil(t, bundles)
},
},
{
name: "proofs count mismatch",
run: func(t *testing.T) {
p := mustNewPartialColumn(t, 3, 0)
_, _, err := p.CellsToVerifyFromPartialMessage(&ethpb.PartialDataColumnSidecar{
CellsPresentBitmap: testBitlist(3, 1),
PartialColumn: [][]byte{{1}},
KzgProofs: nil,
})
require.ErrorContains(t, "Missing KZG proofs", err)
},
},
{
name: "cells count mismatch",
run: func(t *testing.T) {
p := mustNewPartialColumn(t, 3, 0)
_, _, err := p.CellsToVerifyFromPartialMessage(&ethpb.PartialDataColumnSidecar{
CellsPresentBitmap: testBitlist(3, 1),
PartialColumn: nil,
KzgProofs: [][]byte{{1}},
})
require.ErrorContains(t, "Missing cells", err)
},
},
{
name: "wrong bitmap length",
run: func(t *testing.T) {
p := mustNewPartialColumn(t, 4, 0)
_, _, err := p.CellsToVerifyFromPartialMessage(&ethpb.PartialDataColumnSidecar{
CellsPresentBitmap: testBitlist(3, 1),
PartialColumn: [][]byte{{1}},
KzgProofs: [][]byte{{2}},
})
require.ErrorContains(t, "wrong bitmap length", err)
},
},
{
name: "returns only unknown cells in bitmap order",
run: func(t *testing.T) {
p := mustNewPartialColumn(t, 4, 1)
msg := &ethpb.PartialDataColumnSidecar{
CellsPresentBitmap: testBitlist(4, 0, 1, 3),
PartialColumn: [][]byte{{0xA}, {0xB}, {0xC}},
KzgProofs: [][]byte{{0x1}, {0x2}, {0x3}},
}
indices, bundles, err := p.CellsToVerifyFromPartialMessage(msg)
require.NoError(t, err)
require.DeepEqual(t, []uint64{0, 3}, indices)
require.Equal(t, 2, len(bundles))
require.Equal(t, p.Index, bundles[0].ColumnIndex)
require.DeepEqual(t, []byte{0xA}, bundles[0].Cell)
require.DeepEqual(t, []byte{0xC}, bundles[1].Cell)
require.DeepEqual(t, p.KzgCommitments[0], bundles[0].Commitment)
require.DeepEqual(t, p.KzgCommitments[3], bundles[1].Commitment)
},
},
{
name: "all cells already known",
run: func(t *testing.T) {
p := mustNewPartialColumn(t, 2, 0, 1)
msg := &ethpb.PartialDataColumnSidecar{
CellsPresentBitmap: testBitlist(2, 0, 1),
PartialColumn: [][]byte{{0xA}, {0xB}},
KzgProofs: [][]byte{{0x1}, {0x2}},
}
indices, bundles, err := p.CellsToVerifyFromPartialMessage(msg)
require.NoError(t, err)
require.Equal(t, 0, len(indices))
require.Equal(t, 0, len(bundles))
},
},
}
for _, tt := range tests {
t.Run(tt.name, tt.run)
}
}
func TestPartialDataColumn_ExtendFromVerifiedCell(t *testing.T) {
tests := []struct {
name string
run func(t *testing.T)
}{
{
name: "already present cell is not overwritten",
run: func(t *testing.T) {
p := mustNewPartialColumn(t, 2, 1)
oldCell := p.Column[1]
ok := p.ExtendFromVerifiedCell(1, []byte{9}, []byte{8})
require.Equal(t, false, ok)
require.DeepEqual(t, oldCell, p.Column[1])
},
},
{
name: "new cell extends data",
run: func(t *testing.T) {
p := mustNewPartialColumn(t, 2, 1)
ok := p.ExtendFromVerifiedCell(0, []byte{9}, []byte{8})
require.Equal(t, true, ok)
require.Equal(t, true, p.Included.BitAt(0))
require.DeepEqual(t, []byte{9}, p.Column[0])
require.DeepEqual(t, []byte{8}, p.KzgProofs[0])
},
},
}
for _, tt := range tests {
t.Run(tt.name, tt.run)
}
}
func TestPartialDataColumn_ExtendFromVerifiedCells(t *testing.T) {
tests := []struct {
name string
run func(t *testing.T)
}{
{
name: "mismatched cellIndices and cells panics",
run: func(t *testing.T) {
p := mustNewPartialColumn(t, 2)
defer func() {
require.NotNil(t, recover())
}()
p.ExtendFromVerifiedCells(
[]uint64{0},
[]CellProofBundle{
{ColumnIndex: p.Index, Cell: []byte{1}, Proof: []byte{2}},
{ColumnIndex: p.Index, Cell: []byte{3}, Proof: []byte{4}},
},
)
},
},
{
name: "all new cells",
run: func(t *testing.T) {
p := mustNewPartialColumn(t, 3)
ok := p.ExtendFromVerifiedCells(
[]uint64{0, 2},
[]CellProofBundle{
{ColumnIndex: p.Index, Cell: []byte{1}, Proof: []byte{2}},
{ColumnIndex: p.Index, Cell: []byte{3}, Proof: []byte{4}},
},
)
require.Equal(t, true, ok)
require.Equal(t, true, p.Included.BitAt(0))
require.Equal(t, true, p.Included.BitAt(2))
},
},
{
name: "all duplicate cells",
run: func(t *testing.T) {
p := mustNewPartialColumn(t, 2, 1)
ok := p.ExtendFromVerifiedCells(
[]uint64{1},
[]CellProofBundle{{ColumnIndex: p.Index, Cell: []byte{7}, Proof: []byte{8}}},
)
require.Equal(t, false, ok)
},
},
{
name: "invalid column index first",
run: func(t *testing.T) {
p := mustNewPartialColumn(t, 2)
ok := p.ExtendFromVerifiedCells(
[]uint64{0},
[]CellProofBundle{{ColumnIndex: p.Index + 1, Cell: []byte{1}, Proof: []byte{2}}},
)
require.Equal(t, false, ok)
require.Equal(t, uint64(0), p.Included.Count())
},
},
{
name: "invalid column index after partial extension",
run: func(t *testing.T) {
p := mustNewPartialColumn(t, 3)
ok := p.ExtendFromVerifiedCells(
[]uint64{0, 1},
[]CellProofBundle{
{ColumnIndex: p.Index, Cell: []byte{1}, Proof: []byte{2}},
{ColumnIndex: p.Index + 1, Cell: []byte{3}, Proof: []byte{4}},
},
)
require.Equal(t, false, ok)
require.Equal(t, true, p.Included.BitAt(0))
require.Equal(t, false, p.Included.BitAt(1))
},
},
}
for _, tt := range tests {
t.Run(tt.name, tt.run)
}
}
func TestPartialDataColumn_Complete(t *testing.T) {
tests := []struct {
name string
p *PartialDataColumn
wantOK bool
}{
{
name: "incomplete",
p: mustNewPartialColumn(t, 2, 0),
wantOK: false,
},
{
name: "complete valid data column",
p: mustNewPartialColumn(t, 2, 0, 1),
wantOK: true,
},
{
name: "complete but invalid signed header",
p: mustNewPartialColumnWithSigLen(t, 2, 0, 0, 1),
wantOK: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, ok := tt.p.Complete(logrus.New())
require.Equal(t, tt.wantOK, ok)
if tt.wantOK {
require.NotNil(t, got.DataColumnSidecar)
}
})
}
}

View File

@@ -1929,8 +1929,8 @@ def prysm_deps():
name = "com_github_libp2p_go_libp2p_pubsub",
build_file_proto_mode = "disable_global",
importpath = "github.com/libp2p/go-libp2p-pubsub",
sum = "h1:8BSI2A/w9owQoNPN1tYAZkFvqE6AK+m9XP8z25YLMEQ=",
version = "v0.15.1-0.20260203150236-415b1d0de51e",
sum = "h1:rv9cHqvl7YV38hl637gTwPOfeWO8FjO5XScpJ5cKm1U=",
version = "v0.15.1-0.20260213050207-d8f500417dc9",
)
go_repository(
name = "com_github_libp2p_go_libp2p_testing",

2
go.mod
View File

@@ -43,7 +43,7 @@ require (
github.com/kr/pretty v0.3.1
github.com/libp2p/go-libp2p v0.44.0
github.com/libp2p/go-libp2p-mplex v0.11.0
github.com/libp2p/go-libp2p-pubsub v0.15.1-0.20260203150236-415b1d0de51e
github.com/libp2p/go-libp2p-pubsub v0.15.1-0.20260213050207-d8f500417dc9
github.com/libp2p/go-mplex v0.7.0
github.com/logrusorgru/aurora v2.0.3+incompatible
github.com/manifoldco/promptui v0.7.0

4
go.sum
View File

@@ -562,8 +562,8 @@ github.com/libp2p/go-libp2p-asn-util v0.4.1 h1:xqL7++IKD9TBFMgnLPZR6/6iYhawHKHl9
github.com/libp2p/go-libp2p-asn-util v0.4.1/go.mod h1:d/NI6XZ9qxw67b4e+NgpQexCIiFYJjErASrYW4PFDN8=
github.com/libp2p/go-libp2p-mplex v0.11.0 h1:0vwpLXRSfkTzshEjETIEgJaVxXvg+orbxYoIb3Ty5qM=
github.com/libp2p/go-libp2p-mplex v0.11.0/go.mod h1:QrsdNY3lzjpdo9V1goJfPb0O65Nms0sUR8CDAO18f6k=
github.com/libp2p/go-libp2p-pubsub v0.15.1-0.20260203150236-415b1d0de51e h1:8BSI2A/w9owQoNPN1tYAZkFvqE6AK+m9XP8z25YLMEQ=
github.com/libp2p/go-libp2p-pubsub v0.15.1-0.20260203150236-415b1d0de51e/go.mod h1:lr4oE8bFgQaifRcoc2uWhWWiK6tPdOEKpUuR408GFN4=
github.com/libp2p/go-libp2p-pubsub v0.15.1-0.20260213050207-d8f500417dc9 h1:rv9cHqvl7YV38hl637gTwPOfeWO8FjO5XScpJ5cKm1U=
github.com/libp2p/go-libp2p-pubsub v0.15.1-0.20260213050207-d8f500417dc9/go.mod h1:lr4oE8bFgQaifRcoc2uWhWWiK6tPdOEKpUuR408GFN4=
github.com/libp2p/go-libp2p-testing v0.12.0 h1:EPvBb4kKMWO29qP4mZGyhVzUyR25dvfUIK5WDu6iPUA=
github.com/libp2p/go-libp2p-testing v0.12.0/go.mod h1:KcGDRXyN7sQCllucn1cOOS+Dmm7ujhfEyXQL5lvkcPg=
github.com/libp2p/go-mplex v0.7.0 h1:BDhFZdlk5tbr0oyFq/xv/NPGfjbnrsDam1EvutpBDbY=

View File

@@ -188,6 +188,7 @@ ssz_fulu_objs = [
"DataColumnIdentifier",
"DataColumnsByRootIdentifier",
"DataColumnSidecar",
"PartialDataColumnPartsMetadata",
"PartialDataColumnSidecar",
"StatusV2",
"SignedBeaconBlockContentsFulu",

View File

@@ -1,4 +1,5 @@
// Code generated by fastssz. DO NOT EDIT.
// Hash: a04fb7ec74508f383f3502e1bf0e7c1c25c3825016e8dbb5a8a98e71615026a6
package eth
import (
@@ -2946,3 +2947,129 @@ func (p *PartialDataColumnHeader) HashTreeRootWith(hh *ssz.Hasher) (err error) {
hh.Merkleize(indx)
return
}
// MarshalSSZ ssz marshals the PartialDataColumnPartsMetadata object
func (p *PartialDataColumnPartsMetadata) MarshalSSZ() ([]byte, error) {
return ssz.MarshalSSZ(p)
}
// MarshalSSZTo ssz marshals the PartialDataColumnPartsMetadata object to a target array
func (p *PartialDataColumnPartsMetadata) MarshalSSZTo(buf []byte) (dst []byte, err error) {
dst = buf
offset := int(8)
// Offset (0) 'Available'
dst = ssz.WriteOffset(dst, offset)
offset += len(p.Available)
// Offset (1) 'Requests'
dst = ssz.WriteOffset(dst, offset)
offset += len(p.Requests)
// Field (0) 'Available'
if size := len(p.Available); size > 512 {
err = ssz.ErrBytesLengthFn("--.Available", size, 512)
return
}
dst = append(dst, p.Available...)
// Field (1) 'Requests'
if size := len(p.Requests); size > 512 {
err = ssz.ErrBytesLengthFn("--.Requests", size, 512)
return
}
dst = append(dst, p.Requests...)
return
}
// UnmarshalSSZ ssz unmarshals the PartialDataColumnPartsMetadata object
func (p *PartialDataColumnPartsMetadata) UnmarshalSSZ(buf []byte) error {
var err error
size := uint64(len(buf))
if size < 8 {
return ssz.ErrSize
}
tail := buf
var o0, o1 uint64
// Offset (0) 'Available'
if o0 = ssz.ReadOffset(buf[0:4]); o0 > size {
return ssz.ErrOffset
}
if o0 != 8 {
return ssz.ErrInvalidVariableOffset
}
// Offset (1) 'Requests'
if o1 = ssz.ReadOffset(buf[4:8]); o1 > size || o0 > o1 {
return ssz.ErrOffset
}
// Field (0) 'Available'
{
buf = tail[o0:o1]
if err = ssz.ValidateBitlist(buf, 512); err != nil {
return err
}
if cap(p.Available) == 0 {
p.Available = make([]byte, 0, len(buf))
}
p.Available = append(p.Available, buf...)
}
// Field (1) 'Requests'
{
buf = tail[o1:]
if err = ssz.ValidateBitlist(buf, 512); err != nil {
return err
}
if cap(p.Requests) == 0 {
p.Requests = make([]byte, 0, len(buf))
}
p.Requests = append(p.Requests, buf...)
}
return err
}
// SizeSSZ returns the ssz encoded size in bytes for the PartialDataColumnPartsMetadata object
func (p *PartialDataColumnPartsMetadata) SizeSSZ() (size int) {
size = 8
// Field (0) 'Available'
size += len(p.Available)
// Field (1) 'Requests'
size += len(p.Requests)
return
}
// HashTreeRoot ssz hashes the PartialDataColumnPartsMetadata object
func (p *PartialDataColumnPartsMetadata) HashTreeRoot() ([32]byte, error) {
return ssz.HashWithDefaultHasher(p)
}
// HashTreeRootWith ssz hashes the PartialDataColumnPartsMetadata object with a hasher
func (p *PartialDataColumnPartsMetadata) HashTreeRootWith(hh *ssz.Hasher) (err error) {
indx := hh.Index()
// Field (0) 'Available'
if len(p.Available) == 0 {
err = ssz.ErrEmptyBitlist
return
}
hh.PutBitlist(p.Available, 512)
// Field (1) 'Requests'
if len(p.Requests) == 0 {
err = ssz.ErrEmptyBitlist
return
}
hh.PutBitlist(p.Requests, 512)
hh.Merkleize(indx)
return
}

View File

@@ -7,13 +7,12 @@
package eth
import (
reflect "reflect"
sync "sync"
github_com_OffchainLabs_go_bitfield "github.com/OffchainLabs/go-bitfield"
_ "github.com/OffchainLabs/prysm/v7/proto/eth/ext"
github_com_OffchainLabs_go_bitfield "github.com/OffchainLabs/go-bitfield"
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
)
const (
@@ -151,6 +150,58 @@ func (x *PartialDataColumnHeader) GetKzgCommitmentsInclusionProof() [][]byte {
return nil
}
type PartialDataColumnPartsMetadata struct {
state protoimpl.MessageState `protogen:"open.v1"`
Available github_com_OffchainLabs_go_bitfield.Bitlist `protobuf:"bytes,1,opt,name=available,proto3" json:"available,omitempty" cast-type:"github.com/OffchainLabs/go-bitfield.Bitlist" ssz-max:"512"`
Requests github_com_OffchainLabs_go_bitfield.Bitlist `protobuf:"bytes,2,opt,name=requests,proto3" json:"requests,omitempty" cast-type:"github.com/OffchainLabs/go-bitfield.Bitlist" ssz-max:"512"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *PartialDataColumnPartsMetadata) Reset() {
*x = PartialDataColumnPartsMetadata{}
mi := &file_proto_prysm_v1alpha1_partial_data_columns_proto_msgTypes[2]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *PartialDataColumnPartsMetadata) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*PartialDataColumnPartsMetadata) ProtoMessage() {}
func (x *PartialDataColumnPartsMetadata) ProtoReflect() protoreflect.Message {
mi := &file_proto_prysm_v1alpha1_partial_data_columns_proto_msgTypes[2]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use PartialDataColumnPartsMetadata.ProtoReflect.Descriptor instead.
func (*PartialDataColumnPartsMetadata) Descriptor() ([]byte, []int) {
return file_proto_prysm_v1alpha1_partial_data_columns_proto_rawDescGZIP(), []int{2}
}
func (x *PartialDataColumnPartsMetadata) GetAvailable() github_com_OffchainLabs_go_bitfield.Bitlist {
if x != nil {
return x.Available
}
return github_com_OffchainLabs_go_bitfield.Bitlist(nil)
}
func (x *PartialDataColumnPartsMetadata) GetRequests() github_com_OffchainLabs_go_bitfield.Bitlist {
if x != nil {
return x.Requests
}
return github_com_OffchainLabs_go_bitfield.Bitlist(nil)
}
var File_proto_prysm_v1alpha1_partial_data_columns_proto protoreflect.FileDescriptor
var file_proto_prysm_v1alpha1_partial_data_columns_proto_rawDesc = []byte{
@@ -199,12 +250,24 @@ var file_proto_prysm_v1alpha1_partial_data_columns_proto_rawDesc = []byte{
0x69, 0x6e, 0x63, 0x6c, 0x75, 0x73, 0x69, 0x6f, 0x6e, 0x5f, 0x70, 0x72, 0x6f, 0x6f, 0x66, 0x18,
0x03, 0x20, 0x03, 0x28, 0x0c, 0x42, 0x08, 0x8a, 0xb5, 0x18, 0x04, 0x34, 0x2c, 0x33, 0x32, 0x52,
0x1c, 0x6b, 0x7a, 0x67, 0x43, 0x6f, 0x6d, 0x6d, 0x69, 0x74, 0x6d, 0x65, 0x6e, 0x74, 0x73, 0x49,
0x6e, 0x63, 0x6c, 0x75, 0x73, 0x69, 0x6f, 0x6e, 0x50, 0x72, 0x6f, 0x6f, 0x66, 0x42, 0x3b, 0x5a,
0x39, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x4f, 0x66, 0x66, 0x63,
0x68, 0x61, 0x69, 0x6e, 0x4c, 0x61, 0x62, 0x73, 0x2f, 0x70, 0x72, 0x79, 0x73, 0x6d, 0x2f, 0x76,
0x36, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x70, 0x72, 0x79, 0x73, 0x6d, 0x2f, 0x76, 0x31,
0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x3b, 0x65, 0x74, 0x68, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74,
0x6f, 0x33,
0x6e, 0x63, 0x6c, 0x75, 0x73, 0x69, 0x6f, 0x6e, 0x50, 0x72, 0x6f, 0x6f, 0x66, 0x22, 0xca, 0x01,
0x0a, 0x1e, 0x50, 0x61, 0x72, 0x74, 0x69, 0x61, 0x6c, 0x44, 0x61, 0x74, 0x61, 0x43, 0x6f, 0x6c,
0x75, 0x6d, 0x6e, 0x50, 0x61, 0x72, 0x74, 0x73, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61,
0x12, 0x54, 0x0a, 0x09, 0x61, 0x76, 0x61, 0x69, 0x6c, 0x61, 0x62, 0x6c, 0x65, 0x18, 0x01, 0x20,
0x01, 0x28, 0x0c, 0x42, 0x36, 0x82, 0xb5, 0x18, 0x2b, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e,
0x63, 0x6f, 0x6d, 0x2f, 0x4f, 0x66, 0x66, 0x63, 0x68, 0x61, 0x69, 0x6e, 0x4c, 0x61, 0x62, 0x73,
0x2f, 0x67, 0x6f, 0x2d, 0x62, 0x69, 0x74, 0x66, 0x69, 0x65, 0x6c, 0x64, 0x2e, 0x42, 0x69, 0x74,
0x6c, 0x69, 0x73, 0x74, 0x92, 0xb5, 0x18, 0x03, 0x35, 0x31, 0x32, 0x52, 0x09, 0x61, 0x76, 0x61,
0x69, 0x6c, 0x61, 0x62, 0x6c, 0x65, 0x12, 0x52, 0x0a, 0x08, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73,
0x74, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x42, 0x36, 0x82, 0xb5, 0x18, 0x2b, 0x67, 0x69,
0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x4f, 0x66, 0x66, 0x63, 0x68, 0x61, 0x69,
0x6e, 0x4c, 0x61, 0x62, 0x73, 0x2f, 0x67, 0x6f, 0x2d, 0x62, 0x69, 0x74, 0x66, 0x69, 0x65, 0x6c,
0x64, 0x2e, 0x42, 0x69, 0x74, 0x6c, 0x69, 0x73, 0x74, 0x92, 0xb5, 0x18, 0x03, 0x35, 0x31, 0x32,
0x52, 0x08, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x73, 0x42, 0x3b, 0x5a, 0x39, 0x67, 0x69,
0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x4f, 0x66, 0x66, 0x63, 0x68, 0x61, 0x69,
0x6e, 0x4c, 0x61, 0x62, 0x73, 0x2f, 0x70, 0x72, 0x79, 0x73, 0x6d, 0x2f, 0x76, 0x36, 0x2f, 0x70,
0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x70, 0x72, 0x79, 0x73, 0x6d, 0x2f, 0x76, 0x31, 0x61, 0x6c, 0x70,
0x68, 0x61, 0x31, 0x3b, 0x65, 0x74, 0x68, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
@@ -219,15 +282,16 @@ func file_proto_prysm_v1alpha1_partial_data_columns_proto_rawDescGZIP() []byte {
return file_proto_prysm_v1alpha1_partial_data_columns_proto_rawDescData
}
var file_proto_prysm_v1alpha1_partial_data_columns_proto_msgTypes = make([]protoimpl.MessageInfo, 2)
var file_proto_prysm_v1alpha1_partial_data_columns_proto_msgTypes = make([]protoimpl.MessageInfo, 3)
var file_proto_prysm_v1alpha1_partial_data_columns_proto_goTypes = []any{
(*PartialDataColumnSidecar)(nil), // 0: ethereum.eth.v1alpha1.PartialDataColumnSidecar
(*PartialDataColumnHeader)(nil), // 1: ethereum.eth.v1alpha1.PartialDataColumnHeader
(*SignedBeaconBlockHeader)(nil), // 2: ethereum.eth.v1alpha1.SignedBeaconBlockHeader
(*PartialDataColumnSidecar)(nil), // 0: ethereum.eth.v1alpha1.PartialDataColumnSidecar
(*PartialDataColumnHeader)(nil), // 1: ethereum.eth.v1alpha1.PartialDataColumnHeader
(*PartialDataColumnPartsMetadata)(nil), // 2: ethereum.eth.v1alpha1.PartialDataColumnPartsMetadata
(*SignedBeaconBlockHeader)(nil), // 3: ethereum.eth.v1alpha1.SignedBeaconBlockHeader
}
var file_proto_prysm_v1alpha1_partial_data_columns_proto_depIdxs = []int32{
1, // 0: ethereum.eth.v1alpha1.PartialDataColumnSidecar.header:type_name -> ethereum.eth.v1alpha1.PartialDataColumnHeader
2, // 1: ethereum.eth.v1alpha1.PartialDataColumnHeader.signed_block_header:type_name -> ethereum.eth.v1alpha1.SignedBeaconBlockHeader
3, // 1: ethereum.eth.v1alpha1.PartialDataColumnHeader.signed_block_header:type_name -> ethereum.eth.v1alpha1.SignedBeaconBlockHeader
2, // [2:2] is the sub-list for method output_type
2, // [2:2] is the sub-list for method input_type
2, // [2:2] is the sub-list for extension type_name
@@ -247,7 +311,7 @@ func file_proto_prysm_v1alpha1_partial_data_columns_proto_init() {
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_proto_prysm_v1alpha1_partial_data_columns_proto_rawDesc,
NumEnums: 0,
NumMessages: 2,
NumMessages: 3,
NumExtensions: 0,
NumServices: 0,
},

View File

@@ -44,3 +44,14 @@ message PartialDataColumnHeader {
SignedBeaconBlockHeader signed_block_header = 2;
repeated bytes kzg_commitments_inclusion_proof = 3 [(ethereum.eth.ext.ssz_size) = "kzg_commitments_inclusion_proof_depth.size,32"];
}
message PartialDataColumnPartsMetadata {
bytes available = 1 [
(ethereum.eth.ext.ssz_max) = "max_blob_commitments_bitmap.size",
(ethereum.eth.ext.cast_type) = "github.com/OffchainLabs/go-bitfield.Bitlist"
];
bytes requests = 2 [
(ethereum.eth.ext.ssz_max) = "max_blob_commitments_bitmap.size",
(ethereum.eth.ext.cast_type) = "github.com/OffchainLabs/go-bitfield.Bitlist"
];
}