Compare commits

..

3 Commits

Author SHA1 Message Date
Aarsh Shah
bcc0a89bf9 remove docs 2026-02-06 18:22:50 +04:00
Aarsh Shah
9fa36dc84e fix bazel 2026-02-06 18:11:26 +04:00
Aarsh Shah
0d4f695a48 request bitmask 2026-02-06 18:11:05 +04:00
24 changed files with 463 additions and 1861 deletions

View File

@@ -24,13 +24,11 @@ var (
var (
_ ConstructionPopulator = (*BlockReconstructionSource)(nil)
_ ConstructionPopulator = (*SidecarReconstructionSource)(nil)
_ ConstructionPopulator = (*PartialDataColumnHeaderReconstructionSource)(nil)
)
const (
BlockType = "BeaconBlock"
SidecarType = "DataColumnSidecar"
PartialDataColumnHeaderType = "PartialDataColumnHeader"
BlockType = "BeaconBlock"
SidecarType = "DataColumnSidecar"
)
type (
@@ -57,10 +55,6 @@ type (
blocks.VerifiedRODataColumn
}
PartialDataColumnHeaderReconstructionSource struct {
*ethpb.PartialDataColumnHeader
}
blockInfo struct {
signedBlockHeader *ethpb.SignedBeaconBlockHeader
kzgCommitments [][]byte
@@ -78,11 +72,6 @@ func PopulateFromSidecar(sidecar blocks.VerifiedRODataColumn) *SidecarReconstruc
return &SidecarReconstructionSource{VerifiedRODataColumn: sidecar}
}
// PopulateFromPartialHeader creates a PartialDataColumnHeaderReconstructionSource from a partial header
func PopulateFromPartialHeader(header *ethpb.PartialDataColumnHeader) *PartialDataColumnHeaderReconstructionSource {
return &PartialDataColumnHeaderReconstructionSource{PartialDataColumnHeader: header}
}
// ValidatorsCustodyRequirement returns the number of custody groups regarding the validator indices attached to the beacon node.
// https://github.com/ethereum/consensus-specs/blob/master/specs/fulu/validator.md#validator-custody
func ValidatorsCustodyRequirement(state beaconState.ReadOnlyBeaconState, validatorsIndex map[primitives.ValidatorIndex]bool) (uint64, error) {
@@ -178,7 +167,7 @@ func PartialColumns(included bitfield.Bitlist, cellsPerBlob [][]kzg.Cell, proofs
if !included.BitAt(uint64(i)) {
continue
}
dc.ExtendFromVerifiedCell(uint64(i), cells[idx][0], proofs[idx][0])
dc.ExtendFromVerfifiedCell(uint64(i), cells[idx][0], proofs[idx][0])
cells[idx] = cells[idx][1:]
proofs[idx] = proofs[idx][1:]
}
@@ -300,43 +289,3 @@ func (s *SidecarReconstructionSource) extract() (*blockInfo, error) {
return info, nil
}
// Slot returns the slot from the partial data column header
func (p *PartialDataColumnHeaderReconstructionSource) Slot() primitives.Slot {
return p.SignedBlockHeader.Header.Slot
}
// Root returns the block root computed from the header
func (p *PartialDataColumnHeaderReconstructionSource) Root() [fieldparams.RootLength]byte {
root, err := p.SignedBlockHeader.Header.HashTreeRoot()
if err != nil {
return [fieldparams.RootLength]byte{}
}
return root
}
// ProposerIndex returns the proposer index from the header
func (p *PartialDataColumnHeaderReconstructionSource) ProposerIndex() primitives.ValidatorIndex {
return p.SignedBlockHeader.Header.ProposerIndex
}
// Commitments returns the KZG commitments from the header
func (p *PartialDataColumnHeaderReconstructionSource) Commitments() ([][]byte, error) {
return p.KzgCommitments, nil
}
// Type returns the type of the source
func (p *PartialDataColumnHeaderReconstructionSource) Type() string {
return PartialDataColumnHeaderType
}
// extract extracts the block information from the partial header
func (p *PartialDataColumnHeaderReconstructionSource) extract() (*blockInfo, error) {
info := &blockInfo{
signedBlockHeader: p.SignedBlockHeader,
kzgCommitments: p.KzgCommitments,
kzgInclusionProof: p.KzgCommitmentsInclusionProof,
}
return info, nil
}

View File

@@ -267,31 +267,4 @@ func TestReconstructionSource(t *testing.T) {
require.Equal(t, peerdas.SidecarType, src.Type())
})
t.Run("from partial header", func(t *testing.T) {
referenceSidecar := sidecars[0]
partialHeader := &ethpb.PartialDataColumnHeader{
SignedBlockHeader: referenceSidecar.SignedBlockHeader,
KzgCommitments: referenceSidecar.KzgCommitments,
KzgCommitmentsInclusionProof: referenceSidecar.KzgCommitmentsInclusionProof,
}
src := peerdas.PopulateFromPartialHeader(partialHeader)
require.Equal(t, referenceSidecar.SignedBlockHeader.Header.Slot, src.Slot())
// Compute expected root
expectedRoot, err := referenceSidecar.SignedBlockHeader.Header.HashTreeRoot()
require.NoError(t, err)
require.Equal(t, expectedRoot, src.Root())
require.Equal(t, referenceSidecar.SignedBlockHeader.Header.ProposerIndex, src.ProposerIndex())
commitments, err := src.Commitments()
require.NoError(t, err)
require.Equal(t, 2, len(commitments))
require.DeepEqual(t, commitment1, commitments[0])
require.DeepEqual(t, commitment2, commitments[1])
require.Equal(t, peerdas.PartialDataColumnHeaderType, src.Type())
})
}

View File

@@ -479,7 +479,7 @@ func (s *Service) broadcastDataColumnSidecars(ctx context.Context, forkDigest [f
}
fullTopicStr := topic + s.Encoding().ProtocolSuffix()
if err := s.partialColumnBroadcaster.Publish(fullTopicStr, partialColumn, true); err != nil {
if err := s.partialColumnBroadcaster.Publish(fullTopicStr, partialColumn); err != nil {
tracing.AnnotateError(span, err)
log.WithError(err).Error("Cannot partial broadcast data column sidecar")
}

View File

@@ -867,7 +867,7 @@ func (*rpcOrderTracer) DeliverMessage(*pubsub.Message) {}
func (*rpcOrderTracer) RejectMessage(*pubsub.Message, string) {}
func (*rpcOrderTracer) DuplicateMessage(*pubsub.Message) {}
func (*rpcOrderTracer) ThrottlePeer(peer.ID) {}
func (*rpcOrderTracer) RecvRPC(*pubsub.RPC) {}
func (*rpcOrderTracer) RecvRPC(*pubsub.RPC, peer.ID) {}
func (*rpcOrderTracer) DropRPC(*pubsub.RPC, peer.ID) {}
func (*rpcOrderTracer) UndeliverableMessage(*pubsub.Message) {}

View File

@@ -17,6 +17,7 @@ go_library(
"@com_github_libp2p_go_libp2p//core/peer:go_default_library",
"@com_github_libp2p_go_libp2p_pubsub//:go_default_library",
"@com_github_libp2p_go_libp2p_pubsub//partialmessages:go_default_library",
"@com_github_libp2p_go_libp2p_pubsub//partialmessages/bitmap:go_default_library",
"@com_github_libp2p_go_libp2p_pubsub//pb:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_prometheus_client_golang//prometheus:go_default_library",

View File

@@ -77,6 +77,8 @@ func TestTwoNodePartialColumnExchange(t *testing.T) {
ps2, err := pubsub.NewGossipSub(ctx, h2, opts2...)
require.NoError(t, err)
go broadcaster1.Start()
go broadcaster2.Start()
defer func() {
broadcaster1.Stop()
broadcaster2.Stop()
@@ -118,9 +120,9 @@ func TestTwoNodePartialColumnExchange(t *testing.T) {
// Split data
for i := range numCells {
if i%2 == 0 {
pc1.ExtendFromVerifiedCell(uint64(i), roDC[0].Column[i], roDC[0].KzgProofs[i])
pc1.ExtendFromVerfifiedCell(uint64(i), roDC[0].Column[i], roDC[0].KzgProofs[i])
} else {
pc2.ExtendFromVerifiedCell(uint64(i), roDC[0].Column[i], roDC[0].KzgProofs[i])
pc2.ExtendFromVerfifiedCell(uint64(i), roDC[0].Column[i], roDC[0].KzgProofs[i])
}
}
@@ -138,7 +140,7 @@ func TestTwoNodePartialColumnExchange(t *testing.T) {
topic2, err := ps2.Join(topicStr, pubsub.RequestPartialMessages())
require.NoError(t, err)
// Header validator
// Header validator that verifies the inclusion proof
headerValidator := func(header *ethpb.PartialDataColumnHeader) (reject bool, err error) {
if header == nil {
return false, fmt.Errorf("nil header")
@@ -149,7 +151,10 @@ func TestTwoNodePartialColumnExchange(t *testing.T) {
if len(header.KzgCommitments) == 0 {
return true, fmt.Errorf("empty kzg commitments")
}
// Verify inclusion proof
if err := peerdas.VerifyPartialDataColumnHeaderInclusionProof(header); err != nil {
return true, fmt.Errorf("invalid inclusion proof: %w", err)
}
t.Log("Header validation passed")
return false, nil
}
@@ -188,17 +193,9 @@ func TestTwoNodePartialColumnExchange(t *testing.T) {
require.NoError(t, err)
defer sub2.Cancel()
noopHeaderHandler := func(header *ethpb.PartialDataColumnHeader, groupID string) {}
err = broadcaster1.Start(headerValidator, cellValidator, handler1, noopHeaderHandler)
err = broadcaster1.Subscribe(topic1, headerValidator, cellValidator, handler1)
require.NoError(t, err)
err = broadcaster2.Start(headerValidator, cellValidator, handler2, noopHeaderHandler)
require.NoError(t, err)
err = broadcaster1.Subscribe(topic1)
require.NoError(t, err)
err = broadcaster2.Subscribe(topic2)
err = broadcaster2.Subscribe(topic2, headerValidator, cellValidator, handler2)
require.NoError(t, err)
// Wait for mesh to form
@@ -206,13 +203,13 @@ func TestTwoNodePartialColumnExchange(t *testing.T) {
// Publish
t.Log("Publishing from Node 1")
err = broadcaster1.Publish(topicStr, pc1, true)
err = broadcaster1.Publish(topicStr, pc1)
require.NoError(t, err)
time.Sleep(200 * time.Millisecond)
t.Log("Publishing from Node 2")
err = broadcaster2.Publish(topicStr, pc2, true)
err = broadcaster2.Publish(topicStr, pc2)
require.NoError(t, err)
// Wait for Completion

View File

@@ -4,7 +4,6 @@ import (
"bytes"
"log/slog"
"regexp"
"slices"
"strconv"
"time"
@@ -14,6 +13,7 @@ import (
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p-pubsub/partialmessages"
"github.com/libp2p/go-libp2p-pubsub/partialmessages/bitmap"
pubsub_pb "github.com/libp2p/go-libp2p-pubsub/pb"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/pkg/errors"
@@ -29,7 +29,6 @@ import (
const TTLInSlots = 3
const maxConcurrentValidators = 128
const maxConcurrentHeaderHandlers = 128
var dataColumnTopicRegex = regexp.MustCompile(`data_column_sidecar_(\d+)`)
@@ -47,7 +46,6 @@ func extractColumnIndexFromTopic(topic string) (uint64, error) {
// - reject=false, err!=nil: IGNORE - don't penalize, just ignore
// - reject=false, err=nil: valid header
type HeaderValidator func(header *ethpb.PartialDataColumnHeader) (reject bool, err error)
type HeaderHandler func(header *ethpb.PartialDataColumnHeader, groupID string)
type ColumnValidator func(cells []blocks.CellProofBundle) error
type PartialColumnBroadcaster struct {
@@ -56,19 +54,18 @@ type PartialColumnBroadcaster struct {
ps *pubsub.PubSub
stop chan struct{}
validateHeader HeaderValidator
validateColumn ColumnValidator
handleColumn SubHandler
handleHeader HeaderHandler
// map topic -> headerValidators
headerValidators map[string]HeaderValidator
// map topic -> Validator
validators map[string]ColumnValidator
// map groupID -> bool to signal when getBlobs has been called
getBlobsCalled map[string]bool
// map topic -> handler
handlers map[string]SubHandler
// map topic -> *pubsub.Topic
topics map[string]*pubsub.Topic
concurrentValidatorSemaphore chan struct{}
concurrentHeaderHandlerSemaphore chan struct{}
concurrentValidatorSemaphore chan struct{}
// map topic -> map[groupID]PartialColumn
partialMsgStore map[string]map[string]*blocks.PartialDataColumn
@@ -87,24 +84,18 @@ const (
requestKindPublish requestKind = iota
requestKindSubscribe
requestKindUnsubscribe
requestKindGossipForPeer
requestKindHandleIncomingRPC
requestKindCellsValidated
requestKindGetBlobsCalled
)
type request struct {
getBlobsCalled bool
kind requestKind
cellsValidated *cellsValidated
response chan error
getBlobsCalledGroup string
unsub unsubscribe
incomingRPC rpcWithFrom
gossipForPeer gossipForPeer
gossipForPeerResp chan gossipForPeerResponse
sub subscribe
publish publish
kind requestKind
response chan error
sub subscribe
unsub unsubscribe
publish publish
incomingRPC rpcWithFrom
cellsValidated *cellsValidated
}
type publish struct {
@@ -113,7 +104,10 @@ type publish struct {
}
type subscribe struct {
t *pubsub.Topic
t *pubsub.Topic
headerValidator HeaderValidator
validator ColumnValidator
handler SubHandler
}
type unsubscribe struct {
@@ -133,34 +127,21 @@ type cellsValidated struct {
cells []blocks.CellProofBundle
}
type gossipForPeer struct {
topic string
groupID string
remote peer.ID
peerState partialmessages.PeerState
}
type gossipForPeerResponse struct {
nextPeerState partialmessages.PeerState
encodedMsg []byte
partsMetadataToSend partialmessages.PartsMetadata
err error
}
func NewBroadcaster(logger *logrus.Logger) *PartialColumnBroadcaster {
return &PartialColumnBroadcaster{
validators: make(map[string]ColumnValidator),
headerValidators: make(map[string]HeaderValidator),
handlers: make(map[string]SubHandler),
topics: make(map[string]*pubsub.Topic),
partialMsgStore: make(map[string]map[string]*blocks.PartialDataColumn),
groupTTL: make(map[string]int8),
validHeaderCache: make(map[string]*ethpb.PartialDataColumnHeader),
getBlobsCalled: make(map[string]bool),
// GossipSub sends the messages to this channel. The buffer should be
// big enough to avoid dropping messages. We don't want to block the gossipsub event loop for this.
incomingReq: make(chan request, 128*16),
logger: logger,
concurrentValidatorSemaphore: make(chan struct{}, maxConcurrentValidators),
concurrentHeaderHandlerSemaphore: make(chan struct{}, maxConcurrentHeaderHandlers),
concurrentValidatorSemaphore: make(chan struct{}, maxConcurrentValidators),
}
}
@@ -170,29 +151,19 @@ func (p *PartialColumnBroadcaster) AppendPubSubOpts(opts []pubsub.Option) []pubs
opts = append(opts,
pubsub.WithPartialMessagesExtension(&partialmessages.PartialMessagesExtension{
Logger: slogger,
GossipForPeer: func(topic string, groupID string, remote peer.ID, peerState partialmessages.PeerState) (partialmessages.PeerState, []byte, partialmessages.PartsMetadata, error) {
respCh := make(chan gossipForPeerResponse, 1)
p.incomingReq <- request{
kind: requestKindGossipForPeer,
gossipForPeer: gossipForPeer{
topic: topic,
groupID: groupID,
remote: remote,
peerState: peerState,
},
gossipForPeerResp: respCh,
}
resp := <-respCh
return resp.nextPeerState, resp.encodedMsg, resp.partsMetadataToSend, resp.err
},
OnIncomingRPC: func(from peer.ID, peerState partialmessages.PeerState, rpc *pubsub_pb.PartialMessagesExtension) (partialmessages.PeerState, error) {
if rpc == nil {
return peerState, errors.New("rpc is nil")
}
nextPeerState, err := updatePeerStateFromIncomingRPC(peerState, rpc)
MergePartsMetadata: func(topic string, left, right partialmessages.PartsMetadata) partialmessages.PartsMetadata {
merged, err := blocks.MergePartsMetadata(left, right)
if err != nil {
return peerState, err
p.logger.Warn("Failed to merge bitfields", "err", err, "left", left, "right", right)
return left
}
return merged
},
ValidateRPC: func(from peer.ID, rpc *pubsub_pb.PartialMessagesExtension) error {
// TODO. Add some basic and fast sanity checks
return nil
},
OnIncomingRPC: func(from peer.ID, rpc *pubsub_pb.PartialMessagesExtension) error {
select {
case p.incomingReq <- request{
kind: requestKindHandleIncomingRPC,
@@ -200,9 +171,8 @@ func (p *PartialColumnBroadcaster) AppendPubSubOpts(opts []pubsub.Option) []pubs
}:
default:
p.logger.Warn("Dropping incoming partial RPC", "rpc", rpc)
return nextPeerState, nil
}
return nextPeerState, nil
return nil
},
}),
func(ps *pubsub.PubSub) error {
@@ -213,34 +183,14 @@ func (p *PartialColumnBroadcaster) AppendPubSubOpts(opts []pubsub.Option) []pubs
return opts
}
// Start starts the event loop of the PartialColumnBroadcaster.
// It accepts the required validator and handler functions, returning an error if any is nil.
// The event loop is launched in a goroutine.
func (p *PartialColumnBroadcaster) Start(
validateHeader HeaderValidator,
validateColumn ColumnValidator,
handleColumn SubHandler,
handleHeader HeaderHandler,
) error {
if validateHeader == nil {
return errors.New("no header validator provided")
// Start starts the event loop of the PartialColumnBroadcaster. Should be called
// within a goroutine (go p.Start())
func (p *PartialColumnBroadcaster) Start() {
if p.stop != nil {
return
}
if handleHeader == nil {
return errors.New("no header handler provided")
}
if validateColumn == nil {
return errors.New("no column validator provided")
}
if handleColumn == nil {
return errors.New("no column handler provided")
}
p.validateHeader = validateHeader
p.validateColumn = validateColumn
p.handleColumn = handleColumn
p.handleHeader = handleHeader
p.stop = make(chan struct{})
go p.loop()
return nil
p.loop()
}
func (p *PartialColumnBroadcaster) loop() {
@@ -259,11 +209,7 @@ func (p *PartialColumnBroadcaster) loop() {
delete(p.groupTTL, groupID)
delete(p.validHeaderCache, groupID)
delete(p.getBlobsCalled, groupID)
for topic, msgStore := range p.partialMsgStore {
if col, ok := msgStore[groupID]; ok {
col.ClearEagerPushSent()
}
delete(msgStore, groupID)
if len(msgStore) == 0 {
delete(p.partialMsgStore, topic)
@@ -273,19 +219,11 @@ func (p *PartialColumnBroadcaster) loop() {
case req := <-p.incomingReq:
switch req.kind {
case requestKindPublish:
req.response <- p.publish(req.publish.topic, req.publish.c, req.getBlobsCalled)
req.response <- p.publish(req.publish.topic, req.publish.c)
case requestKindSubscribe:
req.response <- p.subscribe(req.sub.t)
req.response <- p.subscribe(req.sub.t, req.sub.headerValidator, req.sub.validator, req.sub.handler)
case requestKindUnsubscribe:
req.response <- p.unsubscribe(req.unsub.topic)
case requestKindGossipForPeer:
nextPeerState, encodedMsg, partsMetadataToSend, err := p.handleGossipForPeer(req.gossipForPeer)
req.gossipForPeerResp <- gossipForPeerResponse{
nextPeerState: nextPeerState,
encodedMsg: encodedMsg,
partsMetadataToSend: partsMetadataToSend,
err: err,
}
case requestKindHandleIncomingRPC:
err := p.handleIncomingRPC(req.incomingRPC)
if err != nil {
@@ -296,8 +234,6 @@ func (p *PartialColumnBroadcaster) loop() {
if err != nil {
p.logger.Error("Failed to handle cells validated", "err", err)
}
case requestKindGetBlobsCalled:
p.handleGetBlobsCalled(req.getBlobsCalledGroup)
default:
p.logger.Error("Unknown request kind", "kind", req.kind)
}
@@ -317,75 +253,6 @@ func (p *PartialColumnBroadcaster) getDataColumn(topic string, group []byte) *bl
return msg
}
func (p *PartialColumnBroadcaster) handleGossipForPeer(req gossipForPeer) (partialmessages.PeerState, []byte, partialmessages.PartsMetadata, error) {
topicStore, ok := p.partialMsgStore[req.topic]
if !ok {
return req.peerState, nil, nil, errors.New("not tracking topic for group")
}
partialColumn, ok := topicStore[req.groupID]
if !ok || partialColumn == nil {
return req.peerState, nil, nil, errors.New("not tracking topic for group")
}
return partialColumn.ForPeer(req.remote, false, req.peerState)
}
func parsePartsMetadataFromPeerState(state any, expectedLength uint64, stateKind string) (*ethpb.PartialDataColumnPartsMetadata, error) {
if state == nil {
return blocks.NewPartsMetaWithNoAvailableAndAllRequests(expectedLength), nil
}
pb, ok := state.(partialmessages.PartsMetadata)
if !ok {
return nil, errors.Errorf("%s state is not PartsMetadata", stateKind)
}
return blocks.ParsePartsMetadata(pb, expectedLength)
}
func updatePeerStateFromIncomingRPC(peerState partialmessages.PeerState, rpc *pubsub_pb.PartialMessagesExtension) (partialmessages.PeerState, error) {
nextPeerState := peerState
// if the peer has sent us a partsMetadata, simply overwrite our existing view of that peer with the parts Metadata they have sent.
if len(rpc.PartsMetadata) > 0 {
nextPeerState.RecvdState = partialmessages.PartsMetadata(slices.Clone(rpc.PartsMetadata))
}
// we can not update anything else unless the peer has also sent us cells.
if len(rpc.PartialMessage) == 0 {
return nextPeerState, nil
}
var message ethpb.PartialDataColumnSidecar
if err := message.UnmarshalSSZ(rpc.PartialMessage); err != nil {
return peerState, errors.Wrap(err, "failed to unmarshal partial message data")
}
if message.CellsPresentBitmap == nil {
return nextPeerState, nil
}
nKzgCommitments := message.CellsPresentBitmap.Len()
if nKzgCommitments == 0 {
return nextPeerState, errors.New("length of cells present bitmap is 0")
}
recievedMeta, err := parsePartsMetadataFromPeerState(nextPeerState.RecvdState, nKzgCommitments, "received")
if err != nil {
return peerState, err
}
sentMeta, err := parsePartsMetadataFromPeerState(nextPeerState.SentState, nKzgCommitments, "sent")
if err != nil {
return peerState, err
}
recvdState, err := blocks.MergeAvailableIntoPartsMetadata(recievedMeta, message.CellsPresentBitmap)
if err != nil {
return peerState, err
}
sentState, err := blocks.MergeAvailableIntoPartsMetadata(sentMeta, message.CellsPresentBitmap)
if err != nil {
return peerState, err
}
nextPeerState.RecvdState = recvdState
nextPeerState.SentState = sentState
return nextPeerState, nil
}
func (p *PartialColumnBroadcaster) handleIncomingRPC(rpcWithFrom rpcWithFrom) error {
if p.ps == nil {
return errors.New("pubsub not initialized")
@@ -419,7 +286,13 @@ func (p *PartialColumnBroadcaster) handleIncomingRPC(rpcWithFrom rpcWithFrom) er
}
header = message.Header[0]
reject, err := p.validateHeader(header)
headerValidator, ok := p.headerValidators[topicID]
if !ok || headerValidator == nil {
p.logger.Debug("No header validator registered for topic")
return nil
}
reject, err := headerValidator(header)
if err != nil {
p.logger.Debug("Header validation failed", "err", err, "reject", reject)
if reject {
@@ -432,13 +305,7 @@ func (p *PartialColumnBroadcaster) handleIncomingRPC(rpcWithFrom rpcWithFrom) er
// Cache the valid header
p.validHeaderCache[string(groupID)] = header
p.concurrentHeaderHandlerSemaphore <- struct{}{}
go func() {
defer func() {
<-p.concurrentHeaderHandlerSemaphore
}()
p.handleHeader(header, string(groupID))
}()
// TODO: We now have the information we need to call GetBlobsV3, we should do that to see what we have locally.
}
columnIndex, err := extractColumnIndexFromTopic(topicID)
@@ -486,7 +353,8 @@ func (p *PartialColumnBroadcaster) handleIncomingRPC(rpcWithFrom rpcWithFrom) er
"group": groupID,
})
if len(rpcWithFrom.PartialMessage) > 0 {
validator, validatorOK := p.validators[topicID]
if len(rpcWithFrom.PartialMessage) > 0 && validatorOK {
// TODO: is there any penalty we want to consider for giving us data we didn't request?
// Note that we need to be careful around race conditions and eager data.
// Also note that protobufs by design allow extra data that we don't parse.
@@ -507,7 +375,7 @@ func (p *PartialColumnBroadcaster) handleIncomingRPC(rpcWithFrom rpcWithFrom) er
<-p.concurrentValidatorSemaphore
}()
start := time.Now()
err := p.validateColumn(cellsToVerify)
err := validator(cellsToVerify)
if err != nil {
logger.Error("failed to validate cells", "err", err)
_ = p.ps.PeerFeedback(topicID, rpcWithFrom.from, pubsub.PeerFeedbackInvalidMessage)
@@ -528,23 +396,14 @@ func (p *PartialColumnBroadcaster) handleIncomingRPC(rpcWithFrom rpcWithFrom) er
}
}
peerMeta := rpcWithFrom.PartsMetadata
myMeta, err := ourDataColumn.PartsMetadata()
if err != nil {
return err
}
if !shouldRepublish && len(peerMeta) > 0 && !bytes.Equal(peerMeta, myMeta) {
peerHas := bitmap.Bitmap(rpcWithFrom.PartsMetadata)
iHave := bitmap.Bitmap(ourDataColumn.PartsMetadata())
if !shouldRepublish && len(peerHas) > 0 && !bytes.Equal(peerHas, iHave) {
// Either we have something they don't or vice versa
shouldRepublish = true
logger.Debug("republishing due to parts metadata difference")
}
getBlobsCalled := p.getBlobsCalled[string(groupID)]
if !getBlobsCalled {
p.logger.Debug("GetBlobs not called, skipping republish", "topic", topicID, "group", groupID)
return nil
}
if shouldRepublish {
err := p.ps.PublishPartialMessage(topicID, ourDataColumn, partialmessages.PublishOptions{})
if err != nil {
@@ -559,7 +418,7 @@ func (p *PartialColumnBroadcaster) handleCellsValidated(cells *cellsValidated) e
if ourDataColumn == nil {
return errors.New("data column not found for verified cells")
}
extended := ourDataColumn.ExtendFromVerifiedCells(cells.cellIndices, cells.cells)
extended := ourDataColumn.ExtendFromVerfifiedCells(cells.cellIndices, cells.cells)
p.logger.Debug("Extended partial message", "duration", cells.validationTook, "extended", extended)
columnIndexStr := strconv.FormatUint(ourDataColumn.Index, 10)
@@ -573,19 +432,15 @@ func (p *PartialColumnBroadcaster) handleCellsValidated(cells *cellsValidated) e
if col, ok := ourDataColumn.Complete(p.logger); ok {
p.logger.Info("Completed partial column", "topic", cells.topic, "group", cells.group)
if p.handleColumn != nil {
go p.handleColumn(cells.topic, col)
handler, handlerOK := p.handlers[cells.topic]
if handlerOK {
go handler(cells.topic, col)
}
} else {
p.logger.Info("Extended partial column", "topic", cells.topic, "group", cells.group)
}
getBlobsCalled := p.getBlobsCalled[string(ourDataColumn.GroupID())]
if !getBlobsCalled {
p.logger.Debug("GetBlobs not called, skipping republish", "topic", cells.topic, "group", cells.group)
return nil
}
err := p.ps.PublishPartialMessage(cells.topic, ourDataColumn, partialmessages.PublishOptions{})
if err != nil {
return err
@@ -594,53 +449,22 @@ func (p *PartialColumnBroadcaster) handleCellsValidated(cells *cellsValidated) e
return nil
}
func (p *PartialColumnBroadcaster) handleGetBlobsCalled(groupID string) {
p.getBlobsCalled[groupID] = true
for topic, topicStore := range p.partialMsgStore {
col, ok := topicStore[groupID]
if !ok {
continue
}
err := p.ps.PublishPartialMessage(topic, col, partialmessages.PublishOptions{})
if err != nil {
p.logger.WithError(err).Error("Failed to republish after getBlobs called", "topic", topic, "groupID", groupID)
}
}
}
func (p *PartialColumnBroadcaster) Stop() {
if p.stop != nil {
close(p.stop)
p.stop = nil
}
}
// GetBlobsCalled notifies the event loop that getBlobs has been called,
// triggering republishing of all columns in the store for the given groupID.
func (p *PartialColumnBroadcaster) GetBlobsCalled(groupID string) error {
if p.ps == nil {
return errors.New("pubsub not initialized")
}
select {
case p.incomingReq <- request{
kind: requestKindGetBlobsCalled,
getBlobsCalledGroup: groupID,
}:
default:
return errors.Errorf("dropping getBlobs called message as incomingReq channel is full, groupID: %s", groupID)
}
return nil
}
// Publish publishes the partial column.
func (p *PartialColumnBroadcaster) Publish(topic string, c blocks.PartialDataColumn, getBlobsCalled bool) error {
func (p *PartialColumnBroadcaster) Publish(topic string, c blocks.PartialDataColumn) error {
if p.ps == nil {
return errors.New("pubsub not initialized")
}
respCh := make(chan error)
p.incomingReq <- request{
kind: requestKindPublish,
response: respCh,
getBlobsCalled: getBlobsCalled,
kind: requestKindPublish,
response: respCh,
publish: publish{
topic: topic,
c: c,
@@ -649,65 +473,44 @@ func (p *PartialColumnBroadcaster) Publish(topic string, c blocks.PartialDataCol
return <-respCh
}
func (p *PartialColumnBroadcaster) publish(topic string, c blocks.PartialDataColumn, getBlobsCalled bool) error {
func (p *PartialColumnBroadcaster) publish(topic string, c blocks.PartialDataColumn) error {
topicStore, ok := p.partialMsgStore[topic]
if !ok {
topicStore = make(map[string]*blocks.PartialDataColumn)
p.partialMsgStore[topic] = topicStore
}
var extended bool
existing := topicStore[string(c.GroupID())]
if existing != nil {
// Extend the existing column with cells being published here.
// The existing column may already contain cells received from peers. We must not overwrite it.
for i := range c.Included.Len() {
if c.Included.BitAt(i) {
extended = existing.ExtendFromVerifiedCell(uint64(i), c.Column[i], c.KzgProofs[i])
}
}
if extended {
if col, ok := existing.Complete(p.logger); ok {
p.logger.Info("Completed partial column", "topic", topic, "group", existing.GroupID())
if p.handleColumn != nil {
go p.handleColumn(topic, col)
}
}
}
} else {
topicStore[string(c.GroupID())] = &c
existing = &c
}
topicStore[string(c.GroupID())] = &c
p.groupTTL[string(c.GroupID())] = TTLInSlots
err := p.ps.PublishPartialMessage(topic, existing, partialmessages.PublishOptions{})
if err == nil {
p.getBlobsCalled[string(c.GroupID())] = getBlobsCalled
}
return err
return p.ps.PublishPartialMessage(topic, &c, partialmessages.PublishOptions{})
}
type SubHandler func(topic string, col blocks.VerifiedRODataColumn)
func (p *PartialColumnBroadcaster) Subscribe(t *pubsub.Topic) error {
func (p *PartialColumnBroadcaster) Subscribe(t *pubsub.Topic, headerValidator HeaderValidator, validator ColumnValidator, handler SubHandler) error {
respCh := make(chan error)
p.incomingReq <- request{
kind: requestKindSubscribe,
sub: subscribe{
t: t,
t: t,
headerValidator: headerValidator,
validator: validator,
handler: handler,
},
response: respCh,
}
return <-respCh
}
func (p *PartialColumnBroadcaster) subscribe(t *pubsub.Topic) error {
func (p *PartialColumnBroadcaster) subscribe(t *pubsub.Topic, headerValidator HeaderValidator, validator ColumnValidator, handler SubHandler) error {
topic := t.String()
if _, ok := p.topics[topic]; ok {
return errors.New("already subscribed")
}
p.topics[topic] = t
p.headerValidators[topic] = headerValidator
p.validators[topic] = validator
p.handlers[topic] = handler
return nil
}
@@ -729,5 +532,9 @@ func (p *PartialColumnBroadcaster) unsubscribe(topic string) error {
}
delete(p.topics, topic)
delete(p.partialMsgStore, topic)
delete(p.headerValidators, topic)
delete(p.validators, topic)
delete(p.handlers, topic)
return t.Close()
}

View File

@@ -170,7 +170,7 @@ func (s *Service) pubsubOptions() []pubsub.Option {
pubsub.WithPeerScore(peerScoringParams(s.cfg.IPColocationWhitelist)),
pubsub.WithPeerScoreInspect(s.peerInspector, time.Minute),
pubsub.WithGossipSubParams(pubsubGossipParam()),
pubsub.WithRawTracer(&gossipTracer{host: s.host, allowedTopics: filt}),
pubsub.WithRawTracer(&gossipTracer{host: s.host}),
}
if len(s.cfg.StaticPeers) > 0 {

View File

@@ -26,8 +26,6 @@ const (
type gossipTracer struct {
host host.Host
allowedTopics pubsub.SubscriptionFilter
mu sync.Mutex
// map topic -> Set(peerID). Peer is in set if it supports partial messages.
partialMessagePeers map[string]map[peer.ID]struct{}
@@ -138,32 +136,20 @@ func (g *gossipTracer) ThrottlePeer(p peer.ID) {
}
// RecvRPC .
func (g *gossipTracer) RecvRPC(rpc *pubsub.RPC) {
from := rpc.From()
func (g *gossipTracer) RecvRPC(rpc *pubsub.RPC, from peer.ID) {
g.setMetricFromRPC(recv, pubsubRPCSubRecv, pubsubRPCPubRecv, pubsubRPCPubRecvSize, pubsubRPCRecv, rpc)
g.mu.Lock()
defer g.mu.Unlock()
for _, sub := range rpc.Subscriptions {
topic := sub.GetTopicid()
if !g.allowedTopics.CanSubscribe(topic) {
continue
}
if g.partialMessagePeers == nil {
g.partialMessagePeers = make(map[string]map[peer.ID]struct{})
}
m, ok := g.partialMessagePeers[topic]
m, ok := g.partialMessagePeers[sub.GetTopicid()]
if !ok {
m = make(map[peer.ID]struct{})
g.partialMessagePeers[topic] = m
continue
}
if sub.GetSubscribe() && sub.GetRequestsPartial() {
m[from] = struct{}{}
} else {
delete(m, from)
if len(m) == 0 {
delete(g.partialMessagePeers, topic)
}
}
}
}

View File

@@ -311,6 +311,10 @@ func (s *Service) Start() {
logExternalDNSAddr(s.host.ID(), p2pHostDNS, p2pTCPPort)
}
go s.forkWatcher()
if s.partialColumnBroadcaster != nil {
go s.partialColumnBroadcaster.Start()
}
}
// Stop the p2p service and terminate all peer connections.
@@ -321,6 +325,9 @@ func (s *Service) Stop() error {
s.dv5Listener.Close()
}
if s.partialColumnBroadcaster != nil {
s.partialColumnBroadcaster.Stop()
}
return nil
}

View File

@@ -6,8 +6,6 @@ package sync
import (
"context"
"slices"
"strconv"
"sync"
"time"
@@ -30,7 +28,6 @@ import (
"github.com/OffchainLabs/prysm/v7/beacon-chain/operations/synccommittee"
"github.com/OffchainLabs/prysm/v7/beacon-chain/operations/voluntaryexits"
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p"
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/partialdatacolumnbroadcaster"
p2ptypes "github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/types"
"github.com/OffchainLabs/prysm/v7/beacon-chain/startup"
"github.com/OffchainLabs/prysm/v7/beacon-chain/state/stategen"
@@ -44,7 +41,6 @@ import (
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
leakybucket "github.com/OffchainLabs/prysm/v7/container/leaky-bucket"
"github.com/OffchainLabs/prysm/v7/crypto/rand"
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v7/runtime"
prysmTime "github.com/OffchainLabs/prysm/v7/time"
"github.com/OffchainLabs/prysm/v7/time/slots"
@@ -265,13 +261,6 @@ func (s *Service) Start() {
s.newColumnsVerifier = newDataColumnsVerifierFromInitializer(v)
go s.verifierRoutine()
if broadcaster := s.cfg.p2p.PartialColumnBroadcaster(); broadcaster != nil {
if err := s.startPartialColumnBroadcaster(broadcaster); err != nil {
log.WithError(err).Error("Failed to start partial column broadcaster")
}
}
go s.startDiscoveryAndSubscriptions()
go s.processDataColumnLogs()
@@ -339,9 +328,6 @@ func (s *Service) Stop() error {
for _, t := range s.cfg.p2p.PubSub().GetTopics() {
s.unSubscribeFromTopic(t)
}
if broadcaster := s.cfg.p2p.PartialColumnBroadcaster(); broadcaster != nil {
broadcaster.Stop()
}
return nil
}
@@ -411,46 +397,6 @@ func (s *Service) waitForChainStart() {
s.markForChainStart()
}
func (s *Service) startPartialColumnBroadcaster(broadcaster *partialdatacolumnbroadcaster.PartialColumnBroadcaster) error {
return broadcaster.Start(
func(header *ethpb.PartialDataColumnHeader) (bool, error) {
return s.validatePartialDataColumnHeader(s.ctx, header)
},
func(cellsToVerify []blocks.CellProofBundle) error {
return peerdas.VerifyDataColumnsCellsKZGProofs(len(cellsToVerify), slices.Values(cellsToVerify))
},
func(topic string, col blocks.VerifiedRODataColumn) {
ctx, cancel := context.WithTimeout(s.ctx, pubsubMessageTimeout)
defer cancel()
slot := col.SignedBlockHeader.Header.Slot
proposerIndex := col.SignedBlockHeader.Header.ProposerIndex
if !s.hasSeenDataColumnIndex(slot, proposerIndex, col.Index) {
s.setSeenDataColumnIndex(slot, proposerIndex, col.Index)
// This column was completed from a partial message.
partialMessageColumnCompletionsTotal.WithLabelValues(strconv.FormatUint(col.Index, 10)).Inc()
}
err := s.verifiedRODataColumnSubscriber(ctx, col)
if err != nil {
log.WithError(err).Error("Failed to handle verified RO data column subscriber")
}
},
func(header *ethpb.PartialDataColumnHeader, groupID string) {
ctx, cancel := context.WithTimeout(s.ctx, pubsubMessageTimeout)
defer cancel()
source := peerdas.PopulateFromPartialHeader(header)
log.WithField("slot", source.Slot()).Info("Received data column header")
err := s.processDataColumnSidecarsFromExecution(ctx, source)
if err != nil {
log.WithError(err).Error("Failed to process partial data column header")
}
if err := broadcaster.GetBlobsCalled(groupID); err != nil {
log.WithError(err).Error("Failed to call getBlobs called on broadcaster")
}
},
)
}
func (s *Service) startDiscoveryAndSubscriptions() {
// Wait for the chain to start.
s.waitForChainStart()

View File

@@ -5,6 +5,8 @@ import (
"fmt"
"reflect"
"runtime/debug"
"slices"
"strconv"
"strings"
"sync"
"time"
@@ -20,6 +22,7 @@ import (
"github.com/OffchainLabs/prysm/v7/cmd/beacon-chain/flags"
"github.com/OffchainLabs/prysm/v7/config/features"
"github.com/OffchainLabs/prysm/v7/config/params"
"github.com/OffchainLabs/prysm/v7/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
"github.com/OffchainLabs/prysm/v7/monitoring/tracing"
"github.com/OffchainLabs/prysm/v7/monitoring/tracing/trace"
@@ -67,7 +70,10 @@ type subscribeParameters struct {
}
type partialSubscribeParameters struct {
broadcaster *partialdatacolumnbroadcaster.PartialColumnBroadcaster
broadcaster *partialdatacolumnbroadcaster.PartialColumnBroadcaster
validateHeader partialdatacolumnbroadcaster.HeaderValidator
validate partialdatacolumnbroadcaster.ColumnValidator
handle partialdatacolumnbroadcaster.SubHandler
}
// shortTopic is a less verbose version of topic strings used for logging.
@@ -328,9 +334,32 @@ func (s *Service) registerSubscribers(nse params.NetworkScheduleEntry) bool {
if params.BeaconConfig().FuluForkEpoch <= nse.Epoch {
s.spawn(func() {
var ps *partialSubscribeParameters
if broadcaster := s.cfg.p2p.PartialColumnBroadcaster(); broadcaster != nil {
broadcaster := s.cfg.p2p.PartialColumnBroadcaster()
if broadcaster != nil {
ps = &partialSubscribeParameters{
broadcaster: broadcaster,
validateHeader: func(header *ethpb.PartialDataColumnHeader) (bool, error) {
return s.validatePartialDataColumnHeader(context.TODO(), header)
},
validate: func(cellsToVerify []blocks.CellProofBundle) error {
return peerdas.VerifyDataColumnsCellsKZGProofs(len(cellsToVerify), slices.Values(cellsToVerify))
},
handle: func(topic string, col blocks.VerifiedRODataColumn) {
ctx, cancel := context.WithTimeout(s.ctx, pubsubMessageTimeout)
defer cancel()
slot := col.SignedBlockHeader.Header.Slot
proposerIndex := col.SignedBlockHeader.Header.ProposerIndex
if !s.hasSeenDataColumnIndex(slot, proposerIndex, col.Index) {
s.setSeenDataColumnIndex(slot, proposerIndex, col.Index)
// This column was completed from a partial message.
partialMessageColumnCompletionsTotal.WithLabelValues(strconv.FormatUint(col.Index, 10)).Inc()
}
err := s.verifiedRODataColumnSubscriber(ctx, col)
if err != nil {
log.WithError(err).Error("Failed to handle verified RO data column subscriber")
}
},
}
}
s.subscribeWithParameters(subscribeParameters{
@@ -614,7 +643,7 @@ func (s *Service) trySubscribeSubnets(t *subnetTracker) {
if requestPartial {
log.Info("Subscribing to partial columns on", topicStr)
err = t.partial.broadcaster.Subscribe(topic)
err = t.partial.broadcaster.Subscribe(topic, t.partial.validateHeader, t.partial.validate, t.partial.handle)
if err != nil {
log.WithError(err).Error("Failed to subscribe to partial column")

View File

@@ -259,7 +259,7 @@ func (s *Service) processDataColumnSidecarsFromExecution(ctx context.Context, so
// Publish the partial column. This is idempotent if we republish the same data twice.
// Note, the "partial column" may indeed be complete. We still
// should publish to help our peers.
err = partialBroadcaster.Publish(topic, partialColumns[i], true)
err = partialBroadcaster.Publish(topic, partialColumns[i])
if err != nil {
log.WithError(err).Warn("Failed to publish partial column")
}

View File

@@ -38,7 +38,6 @@ go_library(
"//proto/prysm/v1alpha1:go_default_library",
"//proto/prysm/v1alpha1/validator-client:go_default_library",
"//runtime/version:go_default_library",
"@com_github_libp2p_go_libp2p//core/peer:go_default_library",
"@com_github_libp2p_go_libp2p_pubsub//partialmessages:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_prysmaticlabs_fastssz//:go_default_library",
@@ -55,7 +54,7 @@ go_test(
"factory_test.go",
"getters_test.go",
"kzg_test.go",
"partialdatacolumn_test.go",
"partialdatacolumn_invariants_test.go",
"proofs_test.go",
"proto_test.go",
"roblob_test.go",
@@ -81,6 +80,9 @@ go_test(
"//runtime/version:go_default_library",
"//testing/assert:go_default_library",
"//testing/require:go_default_library",
"//testing/util:go_default_library",
"@com_github_libp2p_go_libp2p//core/peer:go_default_library",
"@com_github_libp2p_go_libp2p_pubsub//partialmessages:go_default_library",
"@com_github_prysmaticlabs_fastssz//:go_default_library",
"@com_github_prysmaticlabs_go_bitfield//:go_default_library",
],

View File

@@ -1,22 +1,15 @@
package blocks
import (
"bytes"
"errors"
"slices"
"github.com/OffchainLabs/go-bitfield"
fieldparams "github.com/OffchainLabs/prysm/v7/config/fieldparams"
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
"github.com/libp2p/go-libp2p-pubsub/partialmessages"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/sirupsen/logrus"
)
var _ partialmessages.Message = (*PartialDataColumn)(nil)
// CellProofBundle contains a cell, its proof, and the corresponding
// commitment/index information.
type CellProofBundle struct {
ColumnIndex uint64
Commitment []byte
@@ -24,17 +17,21 @@ type CellProofBundle struct {
Proof []byte
}
// PartialDataColumn is a partially populated DataColumnSidecar used for
// exchanging cells and metadata with peers.
type PartialDataColumn struct {
*ethpb.DataColumnSidecar
root [fieldparams.RootLength]byte
groupID []byte
Included bitfield.Bitlist
eagerPushSent map[peer.ID]struct{}
Included bitfield.Bitlist
// Parts we've received before we have any commitments to validate against.
// Happens when a peer eager pushes to us.
// TODO implement. For now, not bothering to handle the eager pushes.
// quarantine []*ethpb.PartialDataColumnSidecar
}
// const quarantineSize = 3
// NewPartialDataColumn creates a new Partial Data Column for the given block.
// It does not validate the inputs. The caller is responsible for validating the
// block header and KZG Commitment Inclusion proof.
@@ -68,8 +65,14 @@ func NewPartialDataColumn(
root: root,
groupID: groupID,
Included: bitfield.NewBitlist(uint64(len(sidecar.KzgCommitments))),
eagerPushSent: make(map[peer.ID]struct{}),
}
if len(c.Column) != len(c.KzgCommitments) {
return PartialDataColumn{}, errors.New("mismatch between number of cells and commitments")
}
if len(c.KzgProofs) != len(c.KzgCommitments) {
return PartialDataColumn{}, errors.New("mismatch between number of proofs and commitments")
}
for i := range len(c.KzgCommitments) {
if sidecar.Column[i] == nil {
continue
@@ -79,117 +82,70 @@ func NewPartialDataColumn(
return c, nil
}
// GroupID returns the libp2p partial-messages group identifier.
func (p *PartialDataColumn) GroupID() []byte {
return p.groupID
}
func (p *PartialDataColumn) PartialMessageBytes(metadata partialmessages.PartsMetadata) ([]byte, error) {
numCommitments := p.Included.Len()
peerAvailable, peerRequests, isNewFormat := ParseMetadata(metadata, numCommitments)
if peerAvailable.Len() != numCommitments {
return nil, errors.New("metadata length does not match expected length")
}
if isNewFormat && peerRequests.Len() != numCommitments {
return nil, errors.New("metadata length does not match expected length")
}
// ClearEagerPushSent resets the set of peers that have received the eager push
// header, allowing the header to be re-sent on the next ForPeer call.
func (p *PartialDataColumn) ClearEagerPushSent() {
p.eagerPushSent = make(map[peer.ID]struct{})
}
func (p *PartialDataColumn) newPartsMetadata() *ethpb.PartialDataColumnPartsMetadata {
n := uint64(len(p.KzgCommitments))
available := bitfield.NewBitlist(n)
for i := range n {
if p.Included.BitAt(i) {
available.SetBitAt(i, true)
// shouldSend returns true if we should send cell i to this peer.
shouldSend := func(i uint64) bool {
if !p.Included.BitAt(i) {
return false
}
}
requests := bitfield.NewBitlist(n)
for i := range n {
requests.SetBitAt(i, true)
}
return &ethpb.PartialDataColumnPartsMetadata{
Available: available,
Requests: requests,
}
}
// NewPartsMetaWithNoAvailableAndAllRequests creates metadata for n parts where
// all requests are set and no parts are marked as available.
func NewPartsMetaWithNoAvailableAndAllRequests(n uint64) *ethpb.PartialDataColumnPartsMetadata {
available := bitfield.NewBitlist(n)
requests := bitfield.NewBitlist(n)
for i := range n {
requests.SetBitAt(i, true)
}
return &ethpb.PartialDataColumnPartsMetadata{
Available: available,
Requests: requests,
}
}
func marshalPartsMetadata(meta *ethpb.PartialDataColumnPartsMetadata) (partialmessages.PartsMetadata, error) {
b, err := meta.MarshalSSZ()
if err != nil {
return nil, err
}
return partialmessages.PartsMetadata(b), nil
}
// NKzgCommitments returns the number of commitments in the column.
func (p *PartialDataColumn) NKzgCommitments() uint64 {
return p.Included.Len()
}
// ParsePartsMetadata SSZ-decodes bytes back to PartialDataColumnPartsMetadata.
func ParsePartsMetadata(pm partialmessages.PartsMetadata, expectedLength uint64) (*ethpb.PartialDataColumnPartsMetadata, error) {
meta := &ethpb.PartialDataColumnPartsMetadata{}
if err := meta.UnmarshalSSZ([]byte(pm)); err != nil {
return nil, err
}
if meta.Available.Len() != expectedLength || meta.Requests.Len() != expectedLength {
return nil, errors.New("invalid parts metadata length")
}
return meta, nil
}
func (p *PartialDataColumn) cellsToSendForPeer(peerMeta *ethpb.PartialDataColumnPartsMetadata) (encodedMsg []byte, cellsSent bitfield.Bitlist, err error) {
peerAvailable := bitfield.Bitlist(peerMeta.Available)
peerRequests := bitfield.Bitlist(peerMeta.Requests)
n := p.Included.Len()
if peerAvailable.Len() != n || peerRequests.Len() != n {
return nil, nil, errors.New("peer metadata bitmap length mismatch")
if peerAvailable.BitAt(i) {
return false
}
if isNewFormat && !peerRequests.BitAt(i) {
return false
}
return true
}
var cellsToReturn int
for i := range n {
if p.Included.BitAt(i) && !peerAvailable.BitAt(i) && peerRequests.BitAt(i) {
for i := range numCommitments {
if shouldSend(i) {
cellsToReturn++
}
}
if cellsToReturn == 0 {
return nil, nil, nil
return nil, nil
}
included := bitfield.NewBitlist(n)
included := bitfield.NewBitlist(numCommitments)
outMessage := ethpb.PartialDataColumnSidecar{
PartialColumn: make([][]byte, 0, cellsToReturn),
KzgProofs: make([][]byte, 0, cellsToReturn),
CellsPresentBitmap: included,
PartialColumn: make([][]byte, 0, cellsToReturn),
KzgProofs: make([][]byte, 0, cellsToReturn),
}
for i := range n {
if !p.Included.BitAt(i) || peerAvailable.BitAt(i) || !peerRequests.BitAt(i) {
for i := range numCommitments {
if !shouldSend(i) {
continue
}
included.SetBitAt(i, true)
outMessage.PartialColumn = append(outMessage.PartialColumn, p.Column[i])
outMessage.KzgProofs = append(outMessage.KzgProofs, p.KzgProofs[i])
}
outMessage.CellsPresentBitmap = included
marshalled, err := outMessage.MarshalSSZ()
if err != nil {
return nil, nil, err
return nil, err
}
return marshalled, included, nil
return marshalled, nil
}
// eagerPushBytes builds SSZ-encoded PartialDataColumnSidecar with header only (no cells).
func (p *PartialDataColumn) eagerPushBytes() ([]byte, error) {
// TODO: This method will be removed after upgrading to the latest Gossipsub.
func (p *PartialDataColumn) EagerPartialMessageBytes() ([]byte, partialmessages.PartsMetadata, error) {
// TODO: do we want to send this once per groupID per peer
// Eagerly push the PartialDataColumnHeader
outHeader := &ethpb.PartialDataColumnHeader{
KzgCommitments: p.KzgCommitments,
SignedBlockHeader: p.SignedBlockHeader,
@@ -199,116 +155,92 @@ func (p *PartialDataColumn) eagerPushBytes() ([]byte, error) {
CellsPresentBitmap: bitfield.NewBitlist(uint64(len(p.KzgCommitments))),
Header: []*ethpb.PartialDataColumnHeader{outHeader},
}
return outMessage.MarshalSSZ()
marshalled, err := outMessage.MarshalSSZ()
if err != nil {
return nil, nil, err
}
// Empty bitlist since we aren't including any cells here
peersNextParts := partialmessages.PartsMetadata(bitfield.NewBitlist(uint64(len(p.KzgCommitments))))
return marshalled, peersNextParts, nil
}
// PartsMetadata returns SSZ-encoded PartialDataColumnPartsMetadata.
func (p *PartialDataColumn) PartsMetadata() (partialmessages.PartsMetadata, error) {
meta := p.newPartsMetadata()
return marshalPartsMetadata(meta)
func (p *PartialDataColumn) PartsMetadata() partialmessages.PartsMetadata {
n := p.Included.Len()
requests := bitfield.NewBitlist(n)
for i := range n {
requests.SetBitAt(i, true)
}
return combinedMetadata(p.Included, requests)
}
// MergeAvailableIntoPartsMetadata merges additional availability into base and
// returns the marshaled metadata.
func MergeAvailableIntoPartsMetadata(base *ethpb.PartialDataColumnPartsMetadata, additionalAvailable bitfield.Bitlist) (partialmessages.PartsMetadata, error) {
if base == nil {
return nil, errors.New("base is nil")
// ParseMetadata splits PartsMetadata into available and request bitlists.
// Old format (len==N): returns (metadata, nil, false)
// New format (len==2N): returns (first N bits, next N bits, true)
func ParseMetadata(metadata partialmessages.PartsMetadata, numCommitments uint64) (available bitfield.Bitlist, requests bitfield.Bitlist, isNewFormat bool) {
bl := bitfield.Bitlist(metadata)
if bl.Len() == 2*numCommitments {
available = bitfield.NewBitlist(numCommitments)
requests = bitfield.NewBitlist(numCommitments)
for i := range numCommitments {
available.SetBitAt(i, bl.BitAt(i))
requests.SetBitAt(i, bl.BitAt(i+numCommitments))
}
return available, requests, true
}
if base.Available.Len() != additionalAvailable.Len() {
return nil, errors.New("available length mismatch")
return bl, nil, false
}
func combinedMetadata(available, requests bitfield.Bitlist) partialmessages.PartsMetadata {
n := available.Len()
combined := bitfield.NewBitlist(2 * n)
for i := range n {
combined.SetBitAt(i, available.BitAt(i))
combined.SetBitAt(i+n, requests.BitAt(i))
}
if base.Requests.Len() != additionalAvailable.Len() {
return nil, errors.New("requests length mismatch")
return partialmessages.PartsMetadata(combined)
}
// MergePartsMetadata merges two PartsMetadata values, handling old (N) and new (2N) formats.
// If lengths differ, the old-format (N) is extended to new-format (2N) with all request bits set to 1.
// TODO: This method will be removed after upgrading to the latest Gossipsub.
func MergePartsMetadata(left, right partialmessages.PartsMetadata) (partialmessages.PartsMetadata, error) {
if len(left) == 0 {
return right, nil
}
merged, err := bitfield.Bitlist(base.Available).Or(additionalAvailable)
if len(right) == 0 {
return left, nil
}
leftBl := bitfield.Bitlist(left)
rightBl := bitfield.Bitlist(right)
if leftBl.Len() != rightBl.Len() {
leftBl, rightBl = normalizeMetadataLengths(leftBl, rightBl)
}
merged, err := leftBl.Or(rightBl)
if err != nil {
return nil, err
}
base.Available = merged
return marshalPartsMetadata(base)
return partialmessages.PartsMetadata(merged), nil
}
// merge available, keep request unchanged, if my parts are different, simply over write with myparts
func (p *PartialDataColumn) updateReceivedStateOutgoing(receivedMeta partialmessages.PartsMetadata, cellsSent bitfield.Bitlist) (partialmessages.PartsMetadata, error) {
if receivedMeta == nil || len(receivedMeta) == 0 {
return nil, errors.New("recievedMeta is nil")
func normalizeMetadataLengths(left, right bitfield.Bitlist) (bitfield.Bitlist, bitfield.Bitlist) {
if left.Len() < right.Len() {
left = extendToNewFormat(left)
} else {
right = extendToNewFormat(right)
}
peerMeta, err := ParsePartsMetadata(receivedMeta, p.Included.Len())
if err != nil {
return nil, err
}
return MergeAvailableIntoPartsMetadata(peerMeta, cellsSent)
return left, right
}
// ForPeer implements partialmessages.Message.
func (p *PartialDataColumn) ForPeer(remote peer.ID, requestedMessage bool, peerState partialmessages.PeerState) (partialmessages.PeerState, []byte, partialmessages.PartsMetadata, error) {
// Eager push - peer has never been seen (RecvdState nil) and message requested.
// Only send the header once per peer.
if requestedMessage && peerState.RecvdState == nil {
if _, sent := p.eagerPushSent[remote]; !sent {
p.eagerPushSent[remote] = struct{}{}
encoded, err := p.eagerPushBytes()
if err != nil {
return peerState, nil, nil, err
}
return peerState, encoded, nil, nil
}
return peerState, nil, nil, nil
func extendToNewFormat(bl bitfield.Bitlist) bitfield.Bitlist {
n := bl.Len()
extended := bitfield.NewBitlist(2 * n)
for i := range n {
extended.SetBitAt(i, bl.BitAt(i))
extended.SetBitAt(i+n, true)
}
var encodedMsg []byte
var cellsSent bitfield.Bitlist
var sentMeta partialmessages.PartsMetadata
var recvdMeta partialmessages.PartsMetadata
if peerState.SentState != nil {
var ok bool
sentMeta, ok = peerState.SentState.(partialmessages.PartsMetadata)
if !ok {
return peerState, nil, nil, errors.New("SentState is not PartsMetadata")
}
}
if peerState.RecvdState != nil {
var ok bool
recvdMeta, ok = peerState.RecvdState.(partialmessages.PartsMetadata)
if !ok {
return peerState, nil, nil, errors.New("RecvdState is not PartsMetadata")
}
}
// Normal - message requested and we have RecvdState.
if requestedMessage && peerState.RecvdState != nil {
peerMeta, err := ParsePartsMetadata(recvdMeta, p.Included.Len())
if err != nil {
return peerState, nil, nil, err
}
encodedMsg, cellsSent, err = p.cellsToSendForPeer(peerMeta)
if err != nil {
return peerState, nil, nil, err
}
if cellsSent != nil && cellsSent.Count() != 0 {
newRecvd, err := p.updateReceivedStateOutgoing(recvdMeta, cellsSent)
if err != nil {
return peerState, nil, nil, err
}
peerState.RecvdState = newRecvd
}
}
// Check if we need to send partsMetadata.
var partsMetadataToSend partialmessages.PartsMetadata
myPartsMetadata, err := p.PartsMetadata()
if err != nil {
return peerState, nil, nil, err
}
if !bytes.Equal(myPartsMetadata, sentMeta) {
partsMetadataToSend = myPartsMetadata
sentMeta = partialmessages.PartsMetadata(slices.Clone([]byte(myPartsMetadata)))
peerState.SentState = sentMeta
}
return peerState, encodedMsg, partsMetadataToSend, nil
return extended
}
// CellsToVerifyFromPartialMessage returns cells from the partial message that need to be verified.
@@ -329,7 +261,7 @@ func (p *PartialDataColumn) CellsToVerifyFromPartialMessage(message *ethpb.Parti
ourIncludedList := p.Included
if included.Len() != ourIncludedList.Len() {
return nil, nil, errors.New("invalid message: wrong bitmap length")
return nil, nil, errors.New("invalid message. Wrong bitmap length.")
}
cellIndices := make([]uint64, 0, includedCells)
@@ -360,8 +292,8 @@ func (p *PartialDataColumn) CellsToVerifyFromPartialMessage(message *ethpb.Parti
return cellIndices, cellsToVerify, nil
}
// ExtendFromVerifiedCell extends this partial column with one verified cell.
func (p *PartialDataColumn) ExtendFromVerifiedCell(cellIndex uint64, cell, proof []byte) bool {
// ExtendFromVerfifiedCells will extend this partial column with the provided verified cells
func (p *PartialDataColumn) ExtendFromVerfifiedCell(cellIndex uint64, cell, proof []byte) bool {
if p.Included.BitAt(cellIndex) {
// We already have this cell
return false
@@ -373,22 +305,21 @@ func (p *PartialDataColumn) ExtendFromVerifiedCell(cellIndex uint64, cell, proof
return true
}
// ExtendFromVerifiedCells extends this partial column with the provided verified cells.
func (p *PartialDataColumn) ExtendFromVerifiedCells(cellIndices []uint64, cells []CellProofBundle) /* extended */ bool {
// ExtendFromVerfifiedCells will extend this partial column with the provided verified cells
func (p *PartialDataColumn) ExtendFromVerfifiedCells(cellIndices []uint64, cells []CellProofBundle) /* extended */ bool {
var extended bool
for i, bundle := range cells {
if bundle.ColumnIndex != p.Index {
// Invalid column index, shouldn't happen
return false
}
if p.ExtendFromVerifiedCell(cellIndices[i], bundle.Cell, bundle.Proof) {
if p.ExtendFromVerfifiedCell(cellIndices[i], bundle.Cell, bundle.Proof) {
extended = true
}
}
return extended
}
// Complete returns a verified read-only column if all cells are present.
func (p *PartialDataColumn) Complete(logger *logrus.Logger) (VerifiedRODataColumn, bool) {
if uint64(len(p.KzgCommitments)) != p.Included.Count() {
return VerifiedRODataColumn{}, false

View File

@@ -0,0 +1,165 @@
package blocks_test
import (
"bytes"
"fmt"
"testing"
"github.com/OffchainLabs/prysm/v7/consensus-types/blocks"
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v7/testing/util"
"github.com/libp2p/go-libp2p-pubsub/partialmessages"
"github.com/libp2p/go-libp2p/core/peer"
)
type invariantChecker struct {
t *testing.T
}
var _ partialmessages.InvariantChecker[*blocks.PartialDataColumn] = (*invariantChecker)(nil)
func (i *invariantChecker) MergePartsMetadata(left, right partialmessages.PartsMetadata) partialmessages.PartsMetadata {
merged, err := blocks.MergePartsMetadata(left, right)
if err != nil {
i.t.Fatal(err)
}
return merged
}
func (i *invariantChecker) SplitIntoParts(in *blocks.PartialDataColumn) ([]*blocks.PartialDataColumn, error) {
var parts []*blocks.PartialDataColumn
for idx := range in.Column {
if !in.Included.BitAt(uint64(idx)) {
continue
}
msg := i.EmptyMessage()
msg.Included.SetBitAt(uint64(idx), true)
msg.KzgCommitments = in.KzgCommitments
msg.Column[idx] = in.Column[idx]
msg.KzgProofs[idx] = in.KzgProofs[idx]
parts = append(parts, msg)
}
return parts, nil
}
func (i *invariantChecker) FullMessage() (*blocks.PartialDataColumn, error) {
blockRoot := []byte("test-block-root")
numCells := 128
commitments := make([][]byte, numCells)
cells := make([][]byte, numCells)
proofs := make([][]byte, numCells)
for i := range numCells {
for j := range commitments[i] {
commitments[i][j] = byte(i)
}
cells[i] = make([]byte, 2048)
cells[i] = fmt.Appendf(cells[i][:0], "cell %d", i)
proofs[i] = make([]byte, 48)
proofs[i] = fmt.Appendf(proofs[i][:0], "proof %d", i)
}
roDC, _ := util.CreateTestVerifiedRoDataColumnSidecars(i.t, []util.DataColumnParam{
{
BodyRoot: blockRoot,
KzgCommitments: commitments,
Column: cells,
KzgProofs: proofs,
},
})
c, err := blocks.NewPartialDataColumn(roDC[0].DataColumnSidecar.SignedBlockHeader, roDC[0].Index, roDC[0].KzgCommitments, roDC[0].KzgCommitmentsInclusionProof)
return &c, err
}
func (i *invariantChecker) EmptyMessage() *blocks.PartialDataColumn {
blockRoot := []byte("test-block-root")
numCells := 128
commitments := make([][]byte, numCells)
cells := make([][]byte, numCells)
proofs := make([][]byte, numCells)
roDC, _ := util.CreateTestVerifiedRoDataColumnSidecars(i.t, []util.DataColumnParam{
{
BodyRoot: blockRoot,
KzgCommitments: commitments,
Column: cells,
KzgProofs: proofs,
},
})
for i := range roDC[0].Column {
// Clear these fields since this is an empty message
roDC[0].Column[i] = nil
roDC[0].KzgProofs[i] = nil
}
pc, err := blocks.NewPartialDataColumn(roDC[0].DataColumnSidecar.SignedBlockHeader, roDC[0].Index, roDC[0].KzgCommitments, roDC[0].KzgCommitmentsInclusionProof)
if err != nil {
panic(err)
}
return &pc
}
func (i *invariantChecker) ExtendFromBytes(a *blocks.PartialDataColumn, data []byte) (*blocks.PartialDataColumn, error) {
var message ethpb.PartialDataColumnSidecar
err := message.UnmarshalSSZ(data)
if err != nil {
return nil, err
}
cellIndices, bundle, err := a.CellsToVerifyFromPartialMessage(&message)
if err != nil {
return nil, err
}
// No validation happening here. Copy-pasters beware!
_ = a.ExtendFromVerfifiedCells(cellIndices, bundle)
return a, nil
}
func (i *invariantChecker) ShouldRequest(a *blocks.PartialDataColumn, from peer.ID, partsMetadata []byte) bool {
numCommitments := uint64(len(a.KzgCommitments))
available, requests, isNew := blocks.ParseMetadata(partsMetadata, numCommitments)
for idx := range available.Len() {
peerHasAndWilling := available.BitAt(idx) && (!isNew || requests.BitAt(idx))
if peerHasAndWilling && !a.Included.BitAt(idx) {
return true
}
}
return false
}
func (i *invariantChecker) Equal(a, b *blocks.PartialDataColumn) bool {
if !bytes.Equal(a.GroupID(), b.GroupID()) {
return false
}
if !bytes.Equal(a.Included, b.Included) {
return false
}
if len(a.KzgCommitments) != len(b.KzgCommitments) {
return false
}
for i := range a.KzgCommitments {
if !bytes.Equal(a.KzgCommitments[i], b.KzgCommitments[i]) {
return false
}
}
if len(a.Column) != len(b.Column) {
return false
}
for i := range a.Column {
if !bytes.Equal(a.Column[i], b.Column[i]) {
return false
}
}
if len(a.KzgProofs) != len(b.KzgProofs) {
return false
}
for i := range a.KzgProofs {
if !bytes.Equal(a.KzgProofs[i], b.KzgProofs[i]) {
return false
}
}
return true
}
func TestDataColumnInvariants(t *testing.T) {
partialmessages.TestPartialMessageInvariants(t, &invariantChecker{t})
}

View File

@@ -1,988 +0,0 @@
package blocks
import (
"testing"
"github.com/OffchainLabs/go-bitfield"
fieldparams "github.com/OffchainLabs/prysm/v7/config/fieldparams"
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v7/testing/require"
"github.com/libp2p/go-libp2p-pubsub/partialmessages"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/sirupsen/logrus"
)
func testSignedHeader(validRoots bool, sigLen int) *ethpb.SignedBeaconBlockHeader {
parentRoot := make([]byte, fieldparams.RootLength)
stateRoot := make([]byte, fieldparams.RootLength)
bodyRoot := make([]byte, fieldparams.RootLength)
if !validRoots {
parentRoot = []byte{1}
}
return &ethpb.SignedBeaconBlockHeader{
Header: &ethpb.BeaconBlockHeader{
ParentRoot: parentRoot,
StateRoot: stateRoot,
BodyRoot: bodyRoot,
},
Signature: make([]byte, sigLen),
}
}
func sizedSlices(n, size int, start byte) [][]byte {
out := make([][]byte, n)
for i := range n {
b := make([]byte, size)
for j := range b {
b[j] = start + byte(i)
}
out[i] = b
}
return out
}
func testBitlist(n uint64, set ...uint64) bitfield.Bitlist {
bl := bitfield.NewBitlist(n)
for _, idx := range set {
bl.SetBitAt(idx, true)
}
return bl
}
func testPeerMeta(n uint64, available, requests []uint64) *ethpb.PartialDataColumnPartsMetadata {
return &ethpb.PartialDataColumnPartsMetadata{
Available: testBitlist(n, available...),
Requests: testBitlist(n, requests...),
}
}
func allSet(n uint64) []uint64 {
out := make([]uint64, n)
for i := range n {
out[i] = i
}
return out
}
func mustMarshalMeta(t *testing.T, meta *ethpb.PartialDataColumnPartsMetadata) partialmessages.PartsMetadata {
t.Helper()
out, err := marshalPartsMetadata(meta)
require.NoError(t, err)
return out
}
func mustNewPartialColumnWithSigLen(t *testing.T, n int, sigLen int, included ...uint64) *PartialDataColumn {
t.Helper()
pdc, err := NewPartialDataColumn(
testSignedHeader(true, sigLen),
7,
sizedSlices(n, 48, 1),
sizedSlices(4, 32, 90),
)
require.NoError(t, err)
for _, idx := range included {
pdc.Included.SetBitAt(idx, true)
pdc.Column[idx] = sizedSlices(1, 2048, byte(idx+1))[0]
pdc.KzgProofs[idx] = sizedSlices(1, 48, byte(idx+11))[0]
}
return &pdc
}
func mustNewPartialColumn(t *testing.T, n int, included ...uint64) *PartialDataColumn {
t.Helper()
return mustNewPartialColumnWithSigLen(t, n, fieldparams.BLSSignatureLength, included...)
}
func mustDecodeSidecar(t *testing.T, encoded []byte) *ethpb.PartialDataColumnSidecar {
t.Helper()
var msg ethpb.PartialDataColumnSidecar
require.NoError(t, msg.UnmarshalSSZ(encoded))
return &msg
}
func TestNewPartialDataColumn(t *testing.T) {
tests := []struct {
name string
header *ethpb.SignedBeaconBlockHeader
commitments [][]byte
inclusion [][]byte
wantErr string
}{
{
name: "nominal empty commitments",
header: testSignedHeader(true, fieldparams.BLSSignatureLength),
commitments: nil,
inclusion: sizedSlices(4, 32, 10),
},
{
name: "nominal with commitments",
header: testSignedHeader(true, fieldparams.BLSSignatureLength),
commitments: sizedSlices(3, 48, 10),
inclusion: sizedSlices(4, 32, 10),
},
{
name: "header hash tree root error",
header: testSignedHeader(false, fieldparams.BLSSignatureLength),
commitments: sizedSlices(2, 48, 10),
inclusion: sizedSlices(4, 32, 10),
wantErr: "ParentRoot",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
pdc, err := NewPartialDataColumn(tt.header, 11, tt.commitments, tt.inclusion)
if tt.wantErr != "" {
require.ErrorContains(t, tt.wantErr, err)
return
}
require.NoError(t, err)
require.Equal(t, uint64(11), pdc.Index)
require.Equal(t, len(tt.commitments), len(pdc.Column))
require.Equal(t, len(tt.commitments), len(pdc.KzgProofs))
require.Equal(t, uint64(len(tt.commitments)), pdc.Included.Len())
require.Equal(t, uint64(0), pdc.Included.Count())
require.Equal(t, fieldparams.RootLength+1, len(pdc.groupID))
require.Equal(t, byte(0), pdc.groupID[0])
root, rootErr := tt.header.Header.HashTreeRoot()
require.NoError(t, rootErr)
require.DeepEqual(t, root[:], pdc.groupID[1:])
})
}
}
func TestPartialDataColumn_ClearEagerPushSent(t *testing.T) {
tests := []struct {
name string
m map[peer.ID]struct{}
}{
{name: "already nil", m: nil},
{name: "non-empty map", m: map[peer.ID]struct{}{peer.ID("p1"): {}}},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
p := &PartialDataColumn{eagerPushSent: tt.m}
p.ClearEagerPushSent()
require.NotNil(t, p.eagerPushSent)
require.Equal(t, 0, len(p.eagerPushSent))
})
}
}
func TestPartialDataColumn_newPartsMetadata(t *testing.T) {
tests := []struct {
name string
n int
includedBits []uint64
expectedAvail []uint64
}{
{name: "none included", n: 4, includedBits: nil, expectedAvail: nil},
{name: "sparse included", n: 5, includedBits: []uint64{1, 4}, expectedAvail: []uint64{1, 4}},
{name: "all included", n: 3, includedBits: []uint64{0, 1, 2}, expectedAvail: []uint64{0, 1, 2}},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
p := mustNewPartialColumn(t, tt.n, tt.includedBits...)
meta := p.newPartsMetadata()
require.Equal(t, uint64(tt.n), bitfield.Bitlist(meta.Available).Len())
require.Equal(t, uint64(tt.n), bitfield.Bitlist(meta.Requests).Len())
expected := testBitlist(uint64(tt.n), tt.expectedAvail...)
require.DeepEqual(t, []byte(expected), []byte(meta.Available))
for i := uint64(0); i < uint64(tt.n); i++ {
require.Equal(t, true, bitfield.Bitlist(meta.Requests).BitAt(i))
}
})
}
}
func TestNewPartsMetaWithNoAvailableAndAllRequests(t *testing.T) {
tests := []struct {
name string
n uint64
}{
{name: "zero", n: 0},
{name: "non-zero", n: 6},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
meta := NewPartsMetaWithNoAvailableAndAllRequests(tt.n)
require.Equal(t, tt.n, bitfield.Bitlist(meta.Available).Len())
require.Equal(t, uint64(0), bitfield.Bitlist(meta.Available).Count())
require.Equal(t, tt.n, bitfield.Bitlist(meta.Requests).Len())
for i := uint64(0); i < tt.n; i++ {
require.Equal(t, true, bitfield.Bitlist(meta.Requests).BitAt(i))
}
})
}
}
func TestMarshalPartsMetadata(t *testing.T) {
tests := []struct {
name string
meta *ethpb.PartialDataColumnPartsMetadata
wantErr string
}{
{
name: "valid",
meta: &ethpb.PartialDataColumnPartsMetadata{
Available: testBitlist(4, 1),
Requests: testBitlist(4, allSet(4)...),
},
},
{
name: "available too large",
meta: &ethpb.PartialDataColumnPartsMetadata{
Available: bitfield.NewBitlist(4096),
Requests: bitfield.NewBitlist(1),
},
wantErr: "Available",
},
{
name: "requests too large",
meta: &ethpb.PartialDataColumnPartsMetadata{
Available: bitfield.NewBitlist(1),
Requests: bitfield.NewBitlist(4096),
},
wantErr: "Requests",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
out, err := marshalPartsMetadata(tt.meta)
if tt.wantErr != "" {
require.ErrorContains(t, tt.wantErr, err)
return
}
require.NoError(t, err)
parsed, parseErr := ParsePartsMetadata(out, 4)
require.NoError(t, parseErr)
require.Equal(t, uint64(4), bitfield.Bitlist(parsed.Available).Len())
})
}
}
func TestParsePartsMetadata(t *testing.T) {
validMeta := mustMarshalMeta(t, &ethpb.PartialDataColumnPartsMetadata{
Available: testBitlist(4, 1),
Requests: testBitlist(4, allSet(4)...),
})
requestMismatchMeta := mustMarshalMeta(t, &ethpb.PartialDataColumnPartsMetadata{
Available: bitfield.NewBitlist(4),
Requests: bitfield.NewBitlist(3),
})
tests := []struct {
name string
pm partialmessages.PartsMetadata
expectedLength uint64
wantErr string
}{
{
name: "valid",
pm: validMeta,
expectedLength: 4,
},
{
name: "invalid ssz",
pm: partialmessages.PartsMetadata{1, 2, 3},
expectedLength: 4,
wantErr: "size",
},
{
name: "available length mismatch",
pm: validMeta,
expectedLength: 3,
wantErr: "invalid parts metadata length",
},
{
name: "requests length mismatch",
pm: requestMismatchMeta,
expectedLength: 4,
wantErr: "invalid parts metadata length",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
meta, err := ParsePartsMetadata(tt.pm, tt.expectedLength)
if tt.wantErr != "" {
require.ErrorContains(t, tt.wantErr, err)
return
}
require.NoError(t, err)
require.Equal(t, tt.expectedLength, bitfield.Bitlist(meta.Available).Len())
require.Equal(t, tt.expectedLength, bitfield.Bitlist(meta.Requests).Len())
})
}
}
func TestPartialDataColumn_PartsMetadata(t *testing.T) {
tests := []struct {
name string
p *PartialDataColumn
expectedN uint64
expectErr string
availCount uint64
}{
{
name: "nominal",
p: mustNewPartialColumn(t, 4, 1, 2),
expectedN: 4,
availCount: 2,
},
{
name: "marshal error due max bitlist size",
p: &PartialDataColumn{
DataColumnSidecar: &ethpb.DataColumnSidecar{
KzgCommitments: make([][]byte, 4096),
},
Included: bitfield.NewBitlist(4096),
},
expectErr: "Available",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
meta, err := tt.p.PartsMetadata()
if tt.expectErr != "" {
require.ErrorContains(t, tt.expectErr, err)
return
}
require.NoError(t, err)
parsed, parseErr := ParsePartsMetadata(meta, tt.expectedN)
require.NoError(t, parseErr)
require.Equal(t, tt.availCount, bitfield.Bitlist(parsed.Available).Count())
require.Equal(t, tt.expectedN, bitfield.Bitlist(parsed.Requests).Count())
})
}
}
func TestPartialDataColumn_cellsToSendForPeer(t *testing.T) {
tests := []struct {
name string
run func(t *testing.T)
}{
{
name: "metadata length mismatch",
run: func(t *testing.T) {
p := mustNewPartialColumn(t, 4, 0)
_, _, err := p.cellsToSendForPeer(testPeerMeta(3, nil, allSet(3)))
require.ErrorContains(t, "peer metadata bitmap length mismatch", err)
},
},
{
name: "no cells to send",
run: func(t *testing.T) {
p := mustNewPartialColumn(t, 3, 1)
encoded, sent, err := p.cellsToSendForPeer(testPeerMeta(3, []uint64{1}, allSet(3)))
require.NoError(t, err)
require.IsNil(t, encoded)
require.IsNil(t, sent)
},
},
{
name: "sends only requested and missing cells",
run: func(t *testing.T) {
p := mustNewPartialColumn(t, 4, 0, 1, 3)
encoded, sent, err := p.cellsToSendForPeer(testPeerMeta(4, []uint64{1}, allSet(4)))
require.NoError(t, err)
require.NotNil(t, encoded)
require.NotNil(t, sent)
require.Equal(t, true, sent.BitAt(0))
require.Equal(t, false, sent.BitAt(1))
require.Equal(t, true, sent.BitAt(3))
msg := mustDecodeSidecar(t, encoded)
require.Equal(t, 2, len(msg.PartialColumn))
require.Equal(t, 2, len(msg.KzgProofs))
require.Equal(t, true, bitfield.Bitlist(msg.CellsPresentBitmap).BitAt(0))
require.Equal(t, false, bitfield.Bitlist(msg.CellsPresentBitmap).BitAt(1))
require.Equal(t, true, bitfield.Bitlist(msg.CellsPresentBitmap).BitAt(3))
},
},
{
name: "marshal fails with invalid cell length",
run: func(t *testing.T) {
p := mustNewPartialColumn(t, 1, 0)
p.Column[0] = []byte{1}
_, _, err := p.cellsToSendForPeer(testPeerMeta(1, nil, []uint64{0}))
require.ErrorContains(t, "PartialColumn", err)
},
},
{
name: "marshal fails with invalid proof length",
run: func(t *testing.T) {
p := mustNewPartialColumn(t, 1, 0)
p.KzgProofs[0] = []byte{1}
_, _, err := p.cellsToSendForPeer(testPeerMeta(1, nil, []uint64{0}))
require.ErrorContains(t, "KzgProofs", err)
},
},
}
for _, tt := range tests {
t.Run(tt.name, tt.run)
}
}
func TestPartialDataColumn_eagerPushBytes(t *testing.T) {
tests := []struct {
name string
run func(t *testing.T)
}{
{
name: "nominal",
run: func(t *testing.T) {
p := mustNewPartialColumn(t, 3, 0)
encoded, err := p.eagerPushBytes()
require.NoError(t, err)
msg := mustDecodeSidecar(t, encoded)
require.Equal(t, 1, len(msg.Header))
require.Equal(t, 0, len(msg.PartialColumn))
require.Equal(t, 0, len(msg.KzgProofs))
require.Equal(t, uint64(3), bitfield.Bitlist(msg.CellsPresentBitmap).Len())
require.Equal(t, uint64(0), bitfield.Bitlist(msg.CellsPresentBitmap).Count())
},
},
{
name: "invalid commitment size",
run: func(t *testing.T) {
p := mustNewPartialColumn(t, 2, 0)
p.KzgCommitments[0] = []byte{1}
_, err := p.eagerPushBytes()
require.ErrorContains(t, "KzgCommitments", err)
},
},
{
name: "invalid inclusion proof vector length",
run: func(t *testing.T) {
p := mustNewPartialColumn(t, 2, 0)
p.KzgCommitmentsInclusionProof = p.KzgCommitmentsInclusionProof[:3]
_, err := p.eagerPushBytes()
require.ErrorContains(t, "KzgCommitmentsInclusionProof", err)
},
},
}
for _, tt := range tests {
t.Run(tt.name, tt.run)
}
}
func TestMergeAvailableIntoPartsMetadata(t *testing.T) {
tests := []struct {
name string
base *ethpb.PartialDataColumnPartsMetadata
add bitfield.Bitlist
expectErr string
}{
{
name: "nil base",
base: nil,
add: bitfield.NewBitlist(2),
expectErr: "base is nil",
},
{
name: "available length mismatch",
base: &ethpb.PartialDataColumnPartsMetadata{
Available: bitfield.NewBitlist(4),
Requests: bitfield.NewBitlist(4),
},
add: bitfield.NewBitlist(3),
expectErr: "available length mismatch",
},
{
name: "requests length mismatch",
base: &ethpb.PartialDataColumnPartsMetadata{
Available: bitfield.NewBitlist(4),
Requests: bitfield.NewBitlist(3),
},
add: bitfield.NewBitlist(4),
expectErr: "requests length mismatch",
},
{
name: "successfully merges",
base: &ethpb.PartialDataColumnPartsMetadata{
Available: testBitlist(4, 1),
Requests: testBitlist(4, allSet(4)...),
},
add: testBitlist(4, 2),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
out, err := MergeAvailableIntoPartsMetadata(tt.base, tt.add)
if tt.expectErr != "" {
require.ErrorContains(t, tt.expectErr, err)
return
}
require.NoError(t, err)
meta, parseErr := ParsePartsMetadata(out, tt.add.Len())
require.NoError(t, parseErr)
require.Equal(t, false, bitfield.Bitlist(meta.Available).BitAt(0))
require.Equal(t, true, bitfield.Bitlist(meta.Available).BitAt(1))
require.Equal(t, true, bitfield.Bitlist(meta.Available).BitAt(2))
require.Equal(t, false, bitfield.Bitlist(meta.Available).BitAt(3))
// Verify that MergeAvailableIntoPartsMetadata mutates its base argument.
require.Equal(t, true, bitfield.Bitlist(tt.base.Available).BitAt(2))
})
}
}
func TestPartialDataColumn_updateReceivedStateOutgoing(t *testing.T) {
tests := []struct {
name string
run func(t *testing.T, p *PartialDataColumn)
}{
{
name: "nil receivedMeta",
run: func(t *testing.T, p *PartialDataColumn) {
_, err := p.updateReceivedStateOutgoing(nil, testBitlist(4, 1))
require.ErrorContains(t, "recievedMeta is nil", err)
},
},
{
name: "invalid receivedMeta parse",
run: func(t *testing.T, p *PartialDataColumn) {
_, err := p.updateReceivedStateOutgoing(partialmessages.PartsMetadata{1, 2}, testBitlist(4, 1))
require.NotNil(t, err)
},
},
{
name: "cellsSent length mismatch",
run: func(t *testing.T, p *PartialDataColumn) {
recvd := mustMarshalMeta(t, NewPartsMetaWithNoAvailableAndAllRequests(4))
_, err := p.updateReceivedStateOutgoing(recvd, testBitlist(3, 1))
require.ErrorContains(t, "available length mismatch", err)
},
},
{
name: "success",
run: func(t *testing.T, p *PartialDataColumn) {
recvd := mustMarshalMeta(t, &ethpb.PartialDataColumnPartsMetadata{
Available: testBitlist(4, 1),
Requests: testBitlist(4, allSet(4)...),
})
out, err := p.updateReceivedStateOutgoing(recvd, testBitlist(4, 3))
require.NoError(t, err)
parsed, parseErr := ParsePartsMetadata(out, 4)
require.NoError(t, parseErr)
require.Equal(t, true, bitfield.Bitlist(parsed.Available).BitAt(1))
require.Equal(t, true, bitfield.Bitlist(parsed.Available).BitAt(3))
require.Equal(t, uint64(4), bitfield.Bitlist(parsed.Requests).Count())
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
p := mustNewPartialColumn(t, 4, 0, 1)
tt.run(t, p)
})
}
}
func TestPartialDataColumn_ForPeer(t *testing.T) {
tests := []struct {
name string
run func(t *testing.T)
}{
{
name: "eager push first time for peer",
run: func(t *testing.T) {
p := mustNewPartialColumn(t, 2, 0)
initial := partialmessages.PeerState{}
next, encoded, meta, err := p.ForPeer(peer.ID("peer-a"), true, initial)
require.NoError(t, err)
require.NotNil(t, encoded)
require.IsNil(t, meta)
require.IsNil(t, next.RecvdState)
require.IsNil(t, next.SentState)
_, seen := p.eagerPushSent[peer.ID("peer-a")]
require.Equal(t, true, seen)
decoded := mustDecodeSidecar(t, encoded)
require.Equal(t, 1, len(decoded.Header))
require.Equal(t, 0, len(decoded.PartialColumn))
},
},
{
name: "eager push not repeated",
run: func(t *testing.T) {
p := mustNewPartialColumn(t, 2, 0)
_, _, _, err := p.ForPeer(peer.ID("peer-a"), true, partialmessages.PeerState{})
require.NoError(t, err)
next, encoded, meta, err := p.ForPeer(peer.ID("peer-a"), true, partialmessages.PeerState{})
require.NoError(t, err)
require.IsNil(t, encoded)
require.IsNil(t, meta)
require.IsNil(t, next.RecvdState)
require.IsNil(t, next.SentState)
},
},
{
name: "requested false sends only parts metadata",
run: func(t *testing.T) {
p := mustNewPartialColumn(t, 3, 0)
next, encoded, meta, err := p.ForPeer(peer.ID("peer-a"), false, partialmessages.PeerState{})
require.NoError(t, err)
require.IsNil(t, encoded)
require.NotNil(t, meta)
require.NotNil(t, next.SentState)
_, ok := next.SentState.(partialmessages.PartsMetadata)
require.Equal(t, true, ok)
},
},
{
name: "invalid SentState type",
run: func(t *testing.T) {
p := mustNewPartialColumn(t, 3, 0)
_, _, _, err := p.ForPeer(peer.ID("peer-a"), false, partialmessages.PeerState{SentState: "bad"})
require.ErrorContains(t, "SentState is not PartsMetadata", err)
},
},
{
name: "invalid RecvdState type",
run: func(t *testing.T) {
p := mustNewPartialColumn(t, 3, 0)
_, _, _, err := p.ForPeer(peer.ID("peer-a"), false, partialmessages.PeerState{RecvdState: "bad"})
require.ErrorContains(t, "RecvdState is not PartsMetadata", err)
},
},
{
name: "invalid received metadata parse",
run: func(t *testing.T) {
p := mustNewPartialColumn(t, 3, 0)
_, _, _, err := p.ForPeer(peer.ID("peer-a"), true, partialmessages.PeerState{
RecvdState: partialmessages.PartsMetadata{1, 2},
})
require.ErrorContains(t, "size", err)
},
},
{
name: "requested true sends missing cells and updates recvd state",
run: func(t *testing.T) {
p := mustNewPartialColumn(t, 4, 0, 2)
initialMeta := mustMarshalMeta(t, NewPartsMetaWithNoAvailableAndAllRequests(4))
next, encoded, meta, err := p.ForPeer(peer.ID("peer-a"), true, partialmessages.PeerState{
RecvdState: initialMeta,
})
require.NoError(t, err)
require.NotNil(t, encoded)
require.NotNil(t, meta)
sentMeta, parseSentErr := ParsePartsMetadata(meta, 4)
require.NoError(t, parseSentErr)
require.Equal(t, uint64(2), bitfield.Bitlist(sentMeta.Available).Count())
require.Equal(t, true, bitfield.Bitlist(sentMeta.Available).BitAt(0))
require.Equal(t, true, bitfield.Bitlist(sentMeta.Available).BitAt(2))
require.Equal(t, uint64(4), bitfield.Bitlist(sentMeta.Requests).Count())
sentState, ok := next.SentState.(partialmessages.PartsMetadata)
require.Equal(t, true, ok)
require.DeepEqual(t, []byte(meta), []byte(sentState))
msg := mustDecodeSidecar(t, encoded)
require.Equal(t, 2, len(msg.PartialColumn))
require.Equal(t, true, bitfield.Bitlist(msg.CellsPresentBitmap).BitAt(0))
require.Equal(t, true, bitfield.Bitlist(msg.CellsPresentBitmap).BitAt(2))
recvdOut, parseErr := ParsePartsMetadata(next.RecvdState.(partialmessages.PartsMetadata), 4)
require.NoError(t, parseErr)
require.Equal(t, true, bitfield.Bitlist(recvdOut.Available).BitAt(0))
require.Equal(t, true, bitfield.Bitlist(recvdOut.Available).BitAt(2))
},
},
{
name: "requested true with no missing cells",
run: func(t *testing.T) {
p := mustNewPartialColumn(t, 3, 1)
recvd := mustMarshalMeta(t, &ethpb.PartialDataColumnPartsMetadata{
Available: testBitlist(3, 1),
Requests: testBitlist(3, allSet(3)...),
})
next, encoded, _, err := p.ForPeer(peer.ID("peer-a"), true, partialmessages.PeerState{
RecvdState: recvd,
})
require.NoError(t, err)
require.IsNil(t, encoded)
require.DeepEqual(t, []byte(recvd), []byte(next.RecvdState.(partialmessages.PartsMetadata)))
},
},
{
name: "does not resend unchanged metadata",
run: func(t *testing.T) {
p := mustNewPartialColumn(t, 3, 1)
myMeta, err := p.PartsMetadata()
require.NoError(t, err)
next, encoded, meta, err := p.ForPeer(peer.ID("peer-a"), false, partialmessages.PeerState{
SentState: myMeta,
})
require.NoError(t, err)
require.IsNil(t, encoded)
require.IsNil(t, meta)
require.DeepEqual(t, []byte(myMeta), []byte(next.SentState.(partialmessages.PartsMetadata)))
},
},
}
for _, tt := range tests {
t.Run(tt.name, tt.run)
}
}
func TestPartialDataColumn_CellsToVerifyFromPartialMessage(t *testing.T) {
tests := []struct {
name string
run func(t *testing.T)
}{
{
name: "empty bitmap",
run: func(t *testing.T) {
p := mustNewPartialColumn(t, 3, 0)
indices, bundles, err := p.CellsToVerifyFromPartialMessage(&ethpb.PartialDataColumnSidecar{
CellsPresentBitmap: bitfield.NewBitlist(0),
})
require.NoError(t, err)
require.IsNil(t, indices)
require.IsNil(t, bundles)
},
},
{
name: "proofs count mismatch",
run: func(t *testing.T) {
p := mustNewPartialColumn(t, 3, 0)
_, _, err := p.CellsToVerifyFromPartialMessage(&ethpb.PartialDataColumnSidecar{
CellsPresentBitmap: testBitlist(3, 1),
PartialColumn: [][]byte{{1}},
KzgProofs: nil,
})
require.ErrorContains(t, "Missing KZG proofs", err)
},
},
{
name: "cells count mismatch",
run: func(t *testing.T) {
p := mustNewPartialColumn(t, 3, 0)
_, _, err := p.CellsToVerifyFromPartialMessage(&ethpb.PartialDataColumnSidecar{
CellsPresentBitmap: testBitlist(3, 1),
PartialColumn: nil,
KzgProofs: [][]byte{{1}},
})
require.ErrorContains(t, "Missing cells", err)
},
},
{
name: "wrong bitmap length",
run: func(t *testing.T) {
p := mustNewPartialColumn(t, 4, 0)
_, _, err := p.CellsToVerifyFromPartialMessage(&ethpb.PartialDataColumnSidecar{
CellsPresentBitmap: testBitlist(3, 1),
PartialColumn: [][]byte{{1}},
KzgProofs: [][]byte{{2}},
})
require.ErrorContains(t, "wrong bitmap length", err)
},
},
{
name: "returns only unknown cells in bitmap order",
run: func(t *testing.T) {
p := mustNewPartialColumn(t, 4, 1)
msg := &ethpb.PartialDataColumnSidecar{
CellsPresentBitmap: testBitlist(4, 0, 1, 3),
PartialColumn: [][]byte{{0xA}, {0xB}, {0xC}},
KzgProofs: [][]byte{{0x1}, {0x2}, {0x3}},
}
indices, bundles, err := p.CellsToVerifyFromPartialMessage(msg)
require.NoError(t, err)
require.DeepEqual(t, []uint64{0, 3}, indices)
require.Equal(t, 2, len(bundles))
require.Equal(t, p.Index, bundles[0].ColumnIndex)
require.DeepEqual(t, []byte{0xA}, bundles[0].Cell)
require.DeepEqual(t, []byte{0xC}, bundles[1].Cell)
require.DeepEqual(t, p.KzgCommitments[0], bundles[0].Commitment)
require.DeepEqual(t, p.KzgCommitments[3], bundles[1].Commitment)
},
},
{
name: "all cells already known",
run: func(t *testing.T) {
p := mustNewPartialColumn(t, 2, 0, 1)
msg := &ethpb.PartialDataColumnSidecar{
CellsPresentBitmap: testBitlist(2, 0, 1),
PartialColumn: [][]byte{{0xA}, {0xB}},
KzgProofs: [][]byte{{0x1}, {0x2}},
}
indices, bundles, err := p.CellsToVerifyFromPartialMessage(msg)
require.NoError(t, err)
require.Equal(t, 0, len(indices))
require.Equal(t, 0, len(bundles))
},
},
}
for _, tt := range tests {
t.Run(tt.name, tt.run)
}
}
func TestPartialDataColumn_ExtendFromVerifiedCell(t *testing.T) {
tests := []struct {
name string
run func(t *testing.T)
}{
{
name: "already present cell is not overwritten",
run: func(t *testing.T) {
p := mustNewPartialColumn(t, 2, 1)
oldCell := p.Column[1]
ok := p.ExtendFromVerifiedCell(1, []byte{9}, []byte{8})
require.Equal(t, false, ok)
require.DeepEqual(t, oldCell, p.Column[1])
},
},
{
name: "new cell extends data",
run: func(t *testing.T) {
p := mustNewPartialColumn(t, 2, 1)
ok := p.ExtendFromVerifiedCell(0, []byte{9}, []byte{8})
require.Equal(t, true, ok)
require.Equal(t, true, p.Included.BitAt(0))
require.DeepEqual(t, []byte{9}, p.Column[0])
require.DeepEqual(t, []byte{8}, p.KzgProofs[0])
},
},
}
for _, tt := range tests {
t.Run(tt.name, tt.run)
}
}
func TestPartialDataColumn_ExtendFromVerifiedCells(t *testing.T) {
tests := []struct {
name string
run func(t *testing.T)
}{
{
name: "mismatched cellIndices and cells panics",
run: func(t *testing.T) {
p := mustNewPartialColumn(t, 2)
defer func() {
require.NotNil(t, recover())
}()
p.ExtendFromVerifiedCells(
[]uint64{0},
[]CellProofBundle{
{ColumnIndex: p.Index, Cell: []byte{1}, Proof: []byte{2}},
{ColumnIndex: p.Index, Cell: []byte{3}, Proof: []byte{4}},
},
)
},
},
{
name: "all new cells",
run: func(t *testing.T) {
p := mustNewPartialColumn(t, 3)
ok := p.ExtendFromVerifiedCells(
[]uint64{0, 2},
[]CellProofBundle{
{ColumnIndex: p.Index, Cell: []byte{1}, Proof: []byte{2}},
{ColumnIndex: p.Index, Cell: []byte{3}, Proof: []byte{4}},
},
)
require.Equal(t, true, ok)
require.Equal(t, true, p.Included.BitAt(0))
require.Equal(t, true, p.Included.BitAt(2))
},
},
{
name: "all duplicate cells",
run: func(t *testing.T) {
p := mustNewPartialColumn(t, 2, 1)
ok := p.ExtendFromVerifiedCells(
[]uint64{1},
[]CellProofBundle{{ColumnIndex: p.Index, Cell: []byte{7}, Proof: []byte{8}}},
)
require.Equal(t, false, ok)
},
},
{
name: "invalid column index first",
run: func(t *testing.T) {
p := mustNewPartialColumn(t, 2)
ok := p.ExtendFromVerifiedCells(
[]uint64{0},
[]CellProofBundle{{ColumnIndex: p.Index + 1, Cell: []byte{1}, Proof: []byte{2}}},
)
require.Equal(t, false, ok)
require.Equal(t, uint64(0), p.Included.Count())
},
},
{
name: "invalid column index after partial extension",
run: func(t *testing.T) {
p := mustNewPartialColumn(t, 3)
ok := p.ExtendFromVerifiedCells(
[]uint64{0, 1},
[]CellProofBundle{
{ColumnIndex: p.Index, Cell: []byte{1}, Proof: []byte{2}},
{ColumnIndex: p.Index + 1, Cell: []byte{3}, Proof: []byte{4}},
},
)
require.Equal(t, false, ok)
require.Equal(t, true, p.Included.BitAt(0))
require.Equal(t, false, p.Included.BitAt(1))
},
},
}
for _, tt := range tests {
t.Run(tt.name, tt.run)
}
}
func TestPartialDataColumn_Complete(t *testing.T) {
tests := []struct {
name string
p *PartialDataColumn
wantOK bool
}{
{
name: "incomplete",
p: mustNewPartialColumn(t, 2, 0),
wantOK: false,
},
{
name: "complete valid data column",
p: mustNewPartialColumn(t, 2, 0, 1),
wantOK: true,
},
{
name: "complete but invalid signed header",
p: mustNewPartialColumnWithSigLen(t, 2, 0, 0, 1),
wantOK: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, ok := tt.p.Complete(logrus.New())
require.Equal(t, tt.wantOK, ok)
if tt.wantOK {
require.NotNil(t, got.DataColumnSidecar)
}
})
}
}

View File

@@ -1929,8 +1929,8 @@ def prysm_deps():
name = "com_github_libp2p_go_libp2p_pubsub",
build_file_proto_mode = "disable_global",
importpath = "github.com/libp2p/go-libp2p-pubsub",
sum = "h1:rv9cHqvl7YV38hl637gTwPOfeWO8FjO5XScpJ5cKm1U=",
version = "v0.15.1-0.20260213050207-d8f500417dc9",
sum = "h1:dJcYvoeKRxO2DbwG8E3nlZHbEMaxnEzghBltmGel93U=",
version = "v0.15.1-0.20260127225230-d9e98cd10cf3",
)
go_repository(
name = "com_github_libp2p_go_libp2p_testing",

2
go.mod
View File

@@ -43,7 +43,7 @@ require (
github.com/kr/pretty v0.3.1
github.com/libp2p/go-libp2p v0.44.0
github.com/libp2p/go-libp2p-mplex v0.11.0
github.com/libp2p/go-libp2p-pubsub v0.15.1-0.20260213050207-d8f500417dc9
github.com/libp2p/go-libp2p-pubsub v0.15.1-0.20260127225230-d9e98cd10cf3
github.com/libp2p/go-mplex v0.7.0
github.com/logrusorgru/aurora v2.0.3+incompatible
github.com/manifoldco/promptui v0.7.0

4
go.sum
View File

@@ -562,8 +562,8 @@ github.com/libp2p/go-libp2p-asn-util v0.4.1 h1:xqL7++IKD9TBFMgnLPZR6/6iYhawHKHl9
github.com/libp2p/go-libp2p-asn-util v0.4.1/go.mod h1:d/NI6XZ9qxw67b4e+NgpQexCIiFYJjErASrYW4PFDN8=
github.com/libp2p/go-libp2p-mplex v0.11.0 h1:0vwpLXRSfkTzshEjETIEgJaVxXvg+orbxYoIb3Ty5qM=
github.com/libp2p/go-libp2p-mplex v0.11.0/go.mod h1:QrsdNY3lzjpdo9V1goJfPb0O65Nms0sUR8CDAO18f6k=
github.com/libp2p/go-libp2p-pubsub v0.15.1-0.20260213050207-d8f500417dc9 h1:rv9cHqvl7YV38hl637gTwPOfeWO8FjO5XScpJ5cKm1U=
github.com/libp2p/go-libp2p-pubsub v0.15.1-0.20260213050207-d8f500417dc9/go.mod h1:lr4oE8bFgQaifRcoc2uWhWWiK6tPdOEKpUuR408GFN4=
github.com/libp2p/go-libp2p-pubsub v0.15.1-0.20260127225230-d9e98cd10cf3 h1:dJcYvoeKRxO2DbwG8E3nlZHbEMaxnEzghBltmGel93U=
github.com/libp2p/go-libp2p-pubsub v0.15.1-0.20260127225230-d9e98cd10cf3/go.mod h1:lr4oE8bFgQaifRcoc2uWhWWiK6tPdOEKpUuR408GFN4=
github.com/libp2p/go-libp2p-testing v0.12.0 h1:EPvBb4kKMWO29qP4mZGyhVzUyR25dvfUIK5WDu6iPUA=
github.com/libp2p/go-libp2p-testing v0.12.0/go.mod h1:KcGDRXyN7sQCllucn1cOOS+Dmm7ujhfEyXQL5lvkcPg=
github.com/libp2p/go-mplex v0.7.0 h1:BDhFZdlk5tbr0oyFq/xv/NPGfjbnrsDam1EvutpBDbY=

View File

@@ -188,7 +188,6 @@ ssz_fulu_objs = [
"DataColumnIdentifier",
"DataColumnsByRootIdentifier",
"DataColumnSidecar",
"PartialDataColumnPartsMetadata",
"PartialDataColumnSidecar",
"StatusV2",
"SignedBeaconBlockContentsFulu",

View File

@@ -1,5 +1,4 @@
// Code generated by fastssz. DO NOT EDIT.
// Hash: a04fb7ec74508f383f3502e1bf0e7c1c25c3825016e8dbb5a8a98e71615026a6
package eth
import (
@@ -2947,129 +2946,3 @@ func (p *PartialDataColumnHeader) HashTreeRootWith(hh *ssz.Hasher) (err error) {
hh.Merkleize(indx)
return
}
// MarshalSSZ ssz marshals the PartialDataColumnPartsMetadata object
func (p *PartialDataColumnPartsMetadata) MarshalSSZ() ([]byte, error) {
return ssz.MarshalSSZ(p)
}
// MarshalSSZTo ssz marshals the PartialDataColumnPartsMetadata object to a target array
func (p *PartialDataColumnPartsMetadata) MarshalSSZTo(buf []byte) (dst []byte, err error) {
dst = buf
offset := int(8)
// Offset (0) 'Available'
dst = ssz.WriteOffset(dst, offset)
offset += len(p.Available)
// Offset (1) 'Requests'
dst = ssz.WriteOffset(dst, offset)
offset += len(p.Requests)
// Field (0) 'Available'
if size := len(p.Available); size > 512 {
err = ssz.ErrBytesLengthFn("--.Available", size, 512)
return
}
dst = append(dst, p.Available...)
// Field (1) 'Requests'
if size := len(p.Requests); size > 512 {
err = ssz.ErrBytesLengthFn("--.Requests", size, 512)
return
}
dst = append(dst, p.Requests...)
return
}
// UnmarshalSSZ ssz unmarshals the PartialDataColumnPartsMetadata object
func (p *PartialDataColumnPartsMetadata) UnmarshalSSZ(buf []byte) error {
var err error
size := uint64(len(buf))
if size < 8 {
return ssz.ErrSize
}
tail := buf
var o0, o1 uint64
// Offset (0) 'Available'
if o0 = ssz.ReadOffset(buf[0:4]); o0 > size {
return ssz.ErrOffset
}
if o0 != 8 {
return ssz.ErrInvalidVariableOffset
}
// Offset (1) 'Requests'
if o1 = ssz.ReadOffset(buf[4:8]); o1 > size || o0 > o1 {
return ssz.ErrOffset
}
// Field (0) 'Available'
{
buf = tail[o0:o1]
if err = ssz.ValidateBitlist(buf, 512); err != nil {
return err
}
if cap(p.Available) == 0 {
p.Available = make([]byte, 0, len(buf))
}
p.Available = append(p.Available, buf...)
}
// Field (1) 'Requests'
{
buf = tail[o1:]
if err = ssz.ValidateBitlist(buf, 512); err != nil {
return err
}
if cap(p.Requests) == 0 {
p.Requests = make([]byte, 0, len(buf))
}
p.Requests = append(p.Requests, buf...)
}
return err
}
// SizeSSZ returns the ssz encoded size in bytes for the PartialDataColumnPartsMetadata object
func (p *PartialDataColumnPartsMetadata) SizeSSZ() (size int) {
size = 8
// Field (0) 'Available'
size += len(p.Available)
// Field (1) 'Requests'
size += len(p.Requests)
return
}
// HashTreeRoot ssz hashes the PartialDataColumnPartsMetadata object
func (p *PartialDataColumnPartsMetadata) HashTreeRoot() ([32]byte, error) {
return ssz.HashWithDefaultHasher(p)
}
// HashTreeRootWith ssz hashes the PartialDataColumnPartsMetadata object with a hasher
func (p *PartialDataColumnPartsMetadata) HashTreeRootWith(hh *ssz.Hasher) (err error) {
indx := hh.Index()
// Field (0) 'Available'
if len(p.Available) == 0 {
err = ssz.ErrEmptyBitlist
return
}
hh.PutBitlist(p.Available, 512)
// Field (1) 'Requests'
if len(p.Requests) == 0 {
err = ssz.ErrEmptyBitlist
return
}
hh.PutBitlist(p.Requests, 512)
hh.Merkleize(indx)
return
}

View File

@@ -7,12 +7,13 @@
package eth
import (
_ "github.com/OffchainLabs/prysm/v7/proto/eth/ext"
github_com_OffchainLabs_go_bitfield "github.com/OffchainLabs/go-bitfield"
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
github_com_OffchainLabs_go_bitfield "github.com/OffchainLabs/go-bitfield"
_ "github.com/OffchainLabs/prysm/v7/proto/eth/ext"
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
)
const (
@@ -150,58 +151,6 @@ func (x *PartialDataColumnHeader) GetKzgCommitmentsInclusionProof() [][]byte {
return nil
}
type PartialDataColumnPartsMetadata struct {
state protoimpl.MessageState `protogen:"open.v1"`
Available github_com_OffchainLabs_go_bitfield.Bitlist `protobuf:"bytes,1,opt,name=available,proto3" json:"available,omitempty" cast-type:"github.com/OffchainLabs/go-bitfield.Bitlist" ssz-max:"512"`
Requests github_com_OffchainLabs_go_bitfield.Bitlist `protobuf:"bytes,2,opt,name=requests,proto3" json:"requests,omitempty" cast-type:"github.com/OffchainLabs/go-bitfield.Bitlist" ssz-max:"512"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *PartialDataColumnPartsMetadata) Reset() {
*x = PartialDataColumnPartsMetadata{}
mi := &file_proto_prysm_v1alpha1_partial_data_columns_proto_msgTypes[2]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *PartialDataColumnPartsMetadata) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*PartialDataColumnPartsMetadata) ProtoMessage() {}
func (x *PartialDataColumnPartsMetadata) ProtoReflect() protoreflect.Message {
mi := &file_proto_prysm_v1alpha1_partial_data_columns_proto_msgTypes[2]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use PartialDataColumnPartsMetadata.ProtoReflect.Descriptor instead.
func (*PartialDataColumnPartsMetadata) Descriptor() ([]byte, []int) {
return file_proto_prysm_v1alpha1_partial_data_columns_proto_rawDescGZIP(), []int{2}
}
func (x *PartialDataColumnPartsMetadata) GetAvailable() github_com_OffchainLabs_go_bitfield.Bitlist {
if x != nil {
return x.Available
}
return github_com_OffchainLabs_go_bitfield.Bitlist(nil)
}
func (x *PartialDataColumnPartsMetadata) GetRequests() github_com_OffchainLabs_go_bitfield.Bitlist {
if x != nil {
return x.Requests
}
return github_com_OffchainLabs_go_bitfield.Bitlist(nil)
}
var File_proto_prysm_v1alpha1_partial_data_columns_proto protoreflect.FileDescriptor
var file_proto_prysm_v1alpha1_partial_data_columns_proto_rawDesc = []byte{
@@ -250,24 +199,12 @@ var file_proto_prysm_v1alpha1_partial_data_columns_proto_rawDesc = []byte{
0x69, 0x6e, 0x63, 0x6c, 0x75, 0x73, 0x69, 0x6f, 0x6e, 0x5f, 0x70, 0x72, 0x6f, 0x6f, 0x66, 0x18,
0x03, 0x20, 0x03, 0x28, 0x0c, 0x42, 0x08, 0x8a, 0xb5, 0x18, 0x04, 0x34, 0x2c, 0x33, 0x32, 0x52,
0x1c, 0x6b, 0x7a, 0x67, 0x43, 0x6f, 0x6d, 0x6d, 0x69, 0x74, 0x6d, 0x65, 0x6e, 0x74, 0x73, 0x49,
0x6e, 0x63, 0x6c, 0x75, 0x73, 0x69, 0x6f, 0x6e, 0x50, 0x72, 0x6f, 0x6f, 0x66, 0x22, 0xca, 0x01,
0x0a, 0x1e, 0x50, 0x61, 0x72, 0x74, 0x69, 0x61, 0x6c, 0x44, 0x61, 0x74, 0x61, 0x43, 0x6f, 0x6c,
0x75, 0x6d, 0x6e, 0x50, 0x61, 0x72, 0x74, 0x73, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61,
0x12, 0x54, 0x0a, 0x09, 0x61, 0x76, 0x61, 0x69, 0x6c, 0x61, 0x62, 0x6c, 0x65, 0x18, 0x01, 0x20,
0x01, 0x28, 0x0c, 0x42, 0x36, 0x82, 0xb5, 0x18, 0x2b, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e,
0x63, 0x6f, 0x6d, 0x2f, 0x4f, 0x66, 0x66, 0x63, 0x68, 0x61, 0x69, 0x6e, 0x4c, 0x61, 0x62, 0x73,
0x2f, 0x67, 0x6f, 0x2d, 0x62, 0x69, 0x74, 0x66, 0x69, 0x65, 0x6c, 0x64, 0x2e, 0x42, 0x69, 0x74,
0x6c, 0x69, 0x73, 0x74, 0x92, 0xb5, 0x18, 0x03, 0x35, 0x31, 0x32, 0x52, 0x09, 0x61, 0x76, 0x61,
0x69, 0x6c, 0x61, 0x62, 0x6c, 0x65, 0x12, 0x52, 0x0a, 0x08, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73,
0x74, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x42, 0x36, 0x82, 0xb5, 0x18, 0x2b, 0x67, 0x69,
0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x4f, 0x66, 0x66, 0x63, 0x68, 0x61, 0x69,
0x6e, 0x4c, 0x61, 0x62, 0x73, 0x2f, 0x67, 0x6f, 0x2d, 0x62, 0x69, 0x74, 0x66, 0x69, 0x65, 0x6c,
0x64, 0x2e, 0x42, 0x69, 0x74, 0x6c, 0x69, 0x73, 0x74, 0x92, 0xb5, 0x18, 0x03, 0x35, 0x31, 0x32,
0x52, 0x08, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x73, 0x42, 0x3b, 0x5a, 0x39, 0x67, 0x69,
0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x4f, 0x66, 0x66, 0x63, 0x68, 0x61, 0x69,
0x6e, 0x4c, 0x61, 0x62, 0x73, 0x2f, 0x70, 0x72, 0x79, 0x73, 0x6d, 0x2f, 0x76, 0x36, 0x2f, 0x70,
0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x70, 0x72, 0x79, 0x73, 0x6d, 0x2f, 0x76, 0x31, 0x61, 0x6c, 0x70,
0x68, 0x61, 0x31, 0x3b, 0x65, 0x74, 0x68, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
0x6e, 0x63, 0x6c, 0x75, 0x73, 0x69, 0x6f, 0x6e, 0x50, 0x72, 0x6f, 0x6f, 0x66, 0x42, 0x3b, 0x5a,
0x39, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x4f, 0x66, 0x66, 0x63,
0x68, 0x61, 0x69, 0x6e, 0x4c, 0x61, 0x62, 0x73, 0x2f, 0x70, 0x72, 0x79, 0x73, 0x6d, 0x2f, 0x76,
0x36, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x70, 0x72, 0x79, 0x73, 0x6d, 0x2f, 0x76, 0x31,
0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x3b, 0x65, 0x74, 0x68, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74,
0x6f, 0x33,
}
var (
@@ -282,16 +219,15 @@ func file_proto_prysm_v1alpha1_partial_data_columns_proto_rawDescGZIP() []byte {
return file_proto_prysm_v1alpha1_partial_data_columns_proto_rawDescData
}
var file_proto_prysm_v1alpha1_partial_data_columns_proto_msgTypes = make([]protoimpl.MessageInfo, 3)
var file_proto_prysm_v1alpha1_partial_data_columns_proto_msgTypes = make([]protoimpl.MessageInfo, 2)
var file_proto_prysm_v1alpha1_partial_data_columns_proto_goTypes = []any{
(*PartialDataColumnSidecar)(nil), // 0: ethereum.eth.v1alpha1.PartialDataColumnSidecar
(*PartialDataColumnHeader)(nil), // 1: ethereum.eth.v1alpha1.PartialDataColumnHeader
(*PartialDataColumnPartsMetadata)(nil), // 2: ethereum.eth.v1alpha1.PartialDataColumnPartsMetadata
(*SignedBeaconBlockHeader)(nil), // 3: ethereum.eth.v1alpha1.SignedBeaconBlockHeader
(*PartialDataColumnSidecar)(nil), // 0: ethereum.eth.v1alpha1.PartialDataColumnSidecar
(*PartialDataColumnHeader)(nil), // 1: ethereum.eth.v1alpha1.PartialDataColumnHeader
(*SignedBeaconBlockHeader)(nil), // 2: ethereum.eth.v1alpha1.SignedBeaconBlockHeader
}
var file_proto_prysm_v1alpha1_partial_data_columns_proto_depIdxs = []int32{
1, // 0: ethereum.eth.v1alpha1.PartialDataColumnSidecar.header:type_name -> ethereum.eth.v1alpha1.PartialDataColumnHeader
3, // 1: ethereum.eth.v1alpha1.PartialDataColumnHeader.signed_block_header:type_name -> ethereum.eth.v1alpha1.SignedBeaconBlockHeader
2, // 1: ethereum.eth.v1alpha1.PartialDataColumnHeader.signed_block_header:type_name -> ethereum.eth.v1alpha1.SignedBeaconBlockHeader
2, // [2:2] is the sub-list for method output_type
2, // [2:2] is the sub-list for method input_type
2, // [2:2] is the sub-list for extension type_name
@@ -311,7 +247,7 @@ func file_proto_prysm_v1alpha1_partial_data_columns_proto_init() {
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_proto_prysm_v1alpha1_partial_data_columns_proto_rawDesc,
NumEnums: 0,
NumMessages: 3,
NumMessages: 2,
NumExtensions: 0,
NumServices: 0,
},

View File

@@ -44,14 +44,3 @@ message PartialDataColumnHeader {
SignedBeaconBlockHeader signed_block_header = 2;
repeated bytes kzg_commitments_inclusion_proof = 3 [(ethereum.eth.ext.ssz_size) = "kzg_commitments_inclusion_proof_depth.size,32"];
}
message PartialDataColumnPartsMetadata {
bytes available = 1 [
(ethereum.eth.ext.ssz_max) = "max_blob_commitments_bitmap.size",
(ethereum.eth.ext.cast_type) = "github.com/OffchainLabs/go-bitfield.Bitlist"
];
bytes requests = 2 [
(ethereum.eth.ext.ssz_max) = "max_blob_commitments_bitmap.size",
(ethereum.eth.ext.cast_type) = "github.com/OffchainLabs/go-bitfield.Bitlist"
];
}