Compare commits

..

2 Commits

Author SHA1 Message Date
Aarsh Shah
d0d6bab8a0 handle header 2026-02-06 19:21:43 +04:00
Aarsh Shah
3f6c01fc3b changes as per review 2026-02-06 18:44:58 +04:00
7 changed files with 35 additions and 143 deletions

View File

@@ -22,6 +22,7 @@ go_library(
"@com_github_pkg_errors//:go_default_library",
"@com_github_prometheus_client_golang//prometheus:go_default_library",
"@com_github_prometheus_client_golang//prometheus/promauto:go_default_library",
"@com_github_prysmaticlabs_go_bitfield//:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
],
)

View File

@@ -193,9 +193,9 @@ func TestTwoNodePartialColumnExchange(t *testing.T) {
require.NoError(t, err)
defer sub2.Cancel()
err = broadcaster1.Subscribe(topic1, headerValidator, cellValidator, handler1)
err = broadcaster1.Subscribe(topic1, headerValidator, cellValidator, handler1, nil)
require.NoError(t, err)
err = broadcaster2.Subscribe(topic2, headerValidator, cellValidator, handler2)
err = broadcaster2.Subscribe(topic2, headerValidator, cellValidator, handler2, nil)
require.NoError(t, err)
// Wait for mesh to form

View File

@@ -7,6 +7,7 @@ import (
"strconv"
"time"
"github.com/OffchainLabs/go-bitfield"
"github.com/OffchainLabs/prysm/v7/config/params"
"github.com/OffchainLabs/prysm/v7/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v7/internal/logrusadapter"
@@ -46,7 +47,7 @@ func extractColumnIndexFromTopic(topic string) (uint64, error) {
// - reject=false, err!=nil: IGNORE - don't penalize, just ignore
// - reject=false, err=nil: valid header
type HeaderValidator func(header *ethpb.PartialDataColumnHeader) (reject bool, err error)
type HeaderHandler func(header *ethpb.PartialDataColumnHeader)
type HeaderHandler func(header *ethpb.PartialDataColumnHeader) error
type ColumnValidator func(cells []blocks.CellProofBundle) error
type PartialColumnBroadcaster struct {
@@ -158,12 +159,15 @@ func (p *PartialColumnBroadcaster) AppendPubSubOpts(opts []pubsub.Option) []pubs
pubsub.WithPartialMessagesExtension(&partialmessages.PartialMessagesExtension{
Logger: slogger,
MergePartsMetadata: func(topic string, left, right partialmessages.PartsMetadata) partialmessages.PartsMetadata {
merged, err := blocks.MergePartsMetadata(left, right)
if len(left) == 0 {
return right
}
merged, err := bitfield.Bitlist(left).Or(bitfield.Bitlist(right))
if err != nil {
p.logger.Warn("Failed to merge bitfields", "err", err, "left", left, "right", right)
return left
}
return merged
return partialmessages.PartsMetadata(merged)
},
ValidateRPC: func(from peer.ID, rpc *pubsub_pb.PartialMessagesExtension) error {
// TODO. Add some basic and fast sanity checks
@@ -278,6 +282,7 @@ func (p *PartialColumnBroadcaster) handleIncomingRPC(rpcWithFrom rpcWithFrom) er
groupID := rpcWithFrom.GroupID
ourDataColumn := p.getDataColumn(topicID, groupID)
var shouldRepublish bool
var handledHeader bool
if ourDataColumn == nil && hasMessage {
var header *ethpb.PartialDataColumnHeader
@@ -316,7 +321,12 @@ func (p *PartialColumnBroadcaster) handleIncomingRPC(rpcWithFrom rpcWithFrom) er
p.logger.Debug("No header handler registered for topic")
return nil
}
headerHandler(header)
err = headerHandler(header)
if err != nil {
p.logger.Error("Failed to handle header", "err", err)
} else {
handledHeader = true
}
}
columnIndex, err := extractColumnIndexFromTopic(topicID)
@@ -415,7 +425,7 @@ func (p *PartialColumnBroadcaster) handleIncomingRPC(rpcWithFrom rpcWithFrom) er
logger.Debug("republishing due to parts metadata difference")
}
if shouldRepublish {
if shouldRepublish && !handledHeader {
err := p.ps.PublishPartialMessage(topicID, ourDataColumn, partialmessages.PublishOptions{})
if err != nil {
return err

View File

@@ -361,13 +361,12 @@ func (s *Service) registerSubscribers(nse params.NetworkScheduleEntry) bool {
log.WithError(err).Error("Failed to handle verified RO data column subscriber")
}
},
handleHeader: func(header *ethpb.PartialDataColumnHeader) {
handleHeader: func(header *ethpb.PartialDataColumnHeader) error {
ctx, cancel := context.WithTimeout(s.ctx, pubsubMessageTimeout)
defer cancel()
err := s.partialDataColumnHeaderSubscriber(ctx, header)
if err != nil {
log.WithError(err).Error("Failed to handle partial data column header")
}
source := peerdas.PopulateFromPartialHeader(header)
log.WithField("slot", source.Slot()).Info("Received data column header")
return s.processDataColumnSidecarsFromExecution(ctx, source)
},
}
}

View File

@@ -11,9 +11,7 @@ import (
"github.com/OffchainLabs/prysm/v7/config/params"
"github.com/OffchainLabs/prysm/v7/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/proto"
)
@@ -67,22 +65,6 @@ func (s *Service) dataColumnSubscriber(ctx context.Context, msg proto.Message) e
return nil
}
func (s *Service) partialDataColumnHeaderSubscriber(ctx context.Context, header *ethpb.PartialDataColumnHeader) error {
source := peerdas.PopulateFromPartialHeader(header)
log.WithField("slot", source.Slot()).Info("Received data column header")
go func() {
if err := s.processDataColumnSidecarsFromExecution(ctx, source); err != nil {
log.WithError(err).WithFields(logrus.Fields{
"root": fmt.Sprintf("%#x", source.Root()),
"slot": source.Slot(),
}).Error("Failed to process sidecars from execution for partial data column header")
}
}()
return nil
}
func (s *Service) verifiedRODataColumnSubscriber(ctx context.Context, sidecar blocks.VerifiedRODataColumn) error {
log.WithField("slot", sidecar.Slot()).WithField("column", sidecar.Index).Info("Received data column sidecar")

View File

@@ -86,32 +86,14 @@ func (p *PartialDataColumn) GroupID() []byte {
return p.groupID
}
func (p *PartialDataColumn) PartialMessageBytes(metadata partialmessages.PartsMetadata) ([]byte, error) {
numCommitments := p.Included.Len()
peerAvailable, peerRequests, isNewFormat := ParseMetadata(metadata, numCommitments)
if peerAvailable.Len() != numCommitments {
peerHas := bitfield.Bitlist(metadata)
if peerHas.Len() != p.Included.Len() {
return nil, errors.New("metadata length does not match expected length")
}
if isNewFormat && peerRequests.Len() != numCommitments {
return nil, errors.New("metadata length does not match expected length")
}
// shouldSend returns true if we should send cell i to this peer.
shouldSend := func(i uint64) bool {
if !p.Included.BitAt(i) {
return false
}
if peerAvailable.BitAt(i) {
return false
}
if isNewFormat && !peerRequests.BitAt(i) {
return false
}
return true
}
var cellsToReturn int
for i := range numCommitments {
if shouldSend(i) {
for i := range peerHas.Len() {
if !peerHas.BitAt(i) && p.Included.BitAt(i) {
cellsToReturn++
}
}
@@ -119,14 +101,14 @@ func (p *PartialDataColumn) PartialMessageBytes(metadata partialmessages.PartsMe
return nil, nil
}
included := bitfield.NewBitlist(numCommitments)
included := bitfield.NewBitlist(p.Included.Len())
outMessage := ethpb.PartialDataColumnSidecar{
CellsPresentBitmap: included,
PartialColumn: make([][]byte, 0, cellsToReturn),
KzgProofs: make([][]byte, 0, cellsToReturn),
}
for i := range numCommitments {
if !shouldSend(i) {
for i := range peerHas.Len() {
if peerHas.BitAt(i) || !p.Included.BitAt(i) {
continue
}
included.SetBitAt(i, true)
@@ -142,7 +124,6 @@ func (p *PartialDataColumn) PartialMessageBytes(metadata partialmessages.PartsMe
return marshalled, nil
}
// TODO: This method will be removed after upgrading to the latest Gossipsub.
func (p *PartialDataColumn) EagerPartialMessageBytes() ([]byte, partialmessages.PartsMetadata, error) {
// TODO: do we want to send this once per groupID per peer
// Eagerly push the PartialDataColumnHeader
@@ -167,83 +148,7 @@ func (p *PartialDataColumn) EagerPartialMessageBytes() ([]byte, partialmessages.
}
func (p *PartialDataColumn) PartsMetadata() partialmessages.PartsMetadata {
n := p.Included.Len()
requests := bitfield.NewBitlist(n)
for i := range n {
requests.SetBitAt(i, true)
}
return combinedMetadata(p.Included, requests)
}
// ParseMetadata splits PartsMetadata into available and request bitlists.
// Old format (len==N): returns (metadata, nil, false)
// New format (len==2N): returns (first N bits, next N bits, true)
func ParseMetadata(metadata partialmessages.PartsMetadata, numCommitments uint64) (available bitfield.Bitlist, requests bitfield.Bitlist, isNewFormat bool) {
bl := bitfield.Bitlist(metadata)
if bl.Len() == 2*numCommitments {
available = bitfield.NewBitlist(numCommitments)
requests = bitfield.NewBitlist(numCommitments)
for i := range numCommitments {
available.SetBitAt(i, bl.BitAt(i))
requests.SetBitAt(i, bl.BitAt(i+numCommitments))
}
return available, requests, true
}
return bl, nil, false
}
func combinedMetadata(available, requests bitfield.Bitlist) partialmessages.PartsMetadata {
n := available.Len()
combined := bitfield.NewBitlist(2 * n)
for i := range n {
combined.SetBitAt(i, available.BitAt(i))
combined.SetBitAt(i+n, requests.BitAt(i))
}
return partialmessages.PartsMetadata(combined)
}
// MergePartsMetadata merges two PartsMetadata values, handling old (N) and new (2N) formats.
// If lengths differ, the old-format (N) is extended to new-format (2N) with all request bits set to 1.
// TODO: This method will be removed after upgrading to the latest Gossipsub.
func MergePartsMetadata(left, right partialmessages.PartsMetadata) (partialmessages.PartsMetadata, error) {
if len(left) == 0 {
return right, nil
}
if len(right) == 0 {
return left, nil
}
leftBl := bitfield.Bitlist(left)
rightBl := bitfield.Bitlist(right)
if leftBl.Len() != rightBl.Len() {
leftBl, rightBl = normalizeMetadataLengths(leftBl, rightBl)
}
merged, err := leftBl.Or(rightBl)
if err != nil {
return nil, err
}
return partialmessages.PartsMetadata(merged), nil
}
// normalizeMetadataLengths extends old-format (N) bitlists to new-format (2N)
// by adding all-1 request bits. Returns both bitlists at the same length.
func normalizeMetadataLengths(left, right bitfield.Bitlist) (bitfield.Bitlist, bitfield.Bitlist) {
if left.Len() < right.Len() {
left = extendToNewFormat(left)
} else {
right = extendToNewFormat(right)
}
return left, right
}
// extendToNewFormat converts old-format (N bits) to new-format (2N bits) with all request bits set to 1.
func extendToNewFormat(bl bitfield.Bitlist) bitfield.Bitlist {
n := bl.Len()
extended := bitfield.NewBitlist(2 * n)
for i := range n {
extended.SetBitAt(i, bl.BitAt(i))
extended.SetBitAt(i+n, true)
}
return extended
return partialmessages.PartsMetadata(p.Included)
}
// CellsToVerifyFromPartialMessage returns cells from the partial message that need to be verified.

View File

@@ -5,6 +5,7 @@ import (
"fmt"
"testing"
"github.com/OffchainLabs/go-bitfield"
"github.com/OffchainLabs/prysm/v7/consensus-types/blocks"
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v7/testing/util"
@@ -19,11 +20,7 @@ type invariantChecker struct {
var _ partialmessages.InvariantChecker[*blocks.PartialDataColumn] = (*invariantChecker)(nil)
func (i *invariantChecker) MergePartsMetadata(left, right partialmessages.PartsMetadata) partialmessages.PartsMetadata {
merged, err := blocks.MergePartsMetadata(left, right)
if err != nil {
i.t.Fatal(err)
}
return merged
return partialmessages.MergeBitmap(left, right)
}
func (i *invariantChecker) SplitIntoParts(in *blocks.PartialDataColumn) ([]*blocks.PartialDataColumn, error) {
@@ -115,11 +112,9 @@ func (i *invariantChecker) ExtendFromBytes(a *blocks.PartialDataColumn, data []b
}
func (i *invariantChecker) ShouldRequest(a *blocks.PartialDataColumn, from peer.ID, partsMetadata []byte) bool {
numCommitments := uint64(len(a.KzgCommitments))
available, requests, isNew := blocks.ParseMetadata(partsMetadata, numCommitments)
for idx := range available.Len() {
peerHasAndWilling := available.BitAt(idx) && (!isNew || requests.BitAt(idx))
if peerHasAndWilling && !a.Included.BitAt(idx) {
peerHas := bitfield.Bitlist(partsMetadata)
for i := range peerHas.Len() {
if peerHas.BitAt(i) && !a.Included.BitAt(i) {
return true
}
}