Compare commits

...

2 Commits

Author SHA1 Message Date
Aarsh Shah
d0d6bab8a0 handle header 2026-02-06 19:21:43 +04:00
Aarsh Shah
3f6c01fc3b changes as per review 2026-02-06 18:44:58 +04:00
4 changed files with 15 additions and 28 deletions

View File

@@ -193,9 +193,9 @@ func TestTwoNodePartialColumnExchange(t *testing.T) {
require.NoError(t, err)
defer sub2.Cancel()
err = broadcaster1.Subscribe(topic1, headerValidator, cellValidator, handler1)
err = broadcaster1.Subscribe(topic1, headerValidator, cellValidator, handler1, nil)
require.NoError(t, err)
err = broadcaster2.Subscribe(topic2, headerValidator, cellValidator, handler2)
err = broadcaster2.Subscribe(topic2, headerValidator, cellValidator, handler2, nil)
require.NoError(t, err)
// Wait for mesh to form

View File

@@ -47,7 +47,7 @@ func extractColumnIndexFromTopic(topic string) (uint64, error) {
// - reject=false, err!=nil: IGNORE - don't penalize, just ignore
// - reject=false, err=nil: valid header
type HeaderValidator func(header *ethpb.PartialDataColumnHeader) (reject bool, err error)
type HeaderHandler func(header *ethpb.PartialDataColumnHeader)
type HeaderHandler func(header *ethpb.PartialDataColumnHeader) error
type ColumnValidator func(cells []blocks.CellProofBundle) error
type PartialColumnBroadcaster struct {
@@ -282,6 +282,7 @@ func (p *PartialColumnBroadcaster) handleIncomingRPC(rpcWithFrom rpcWithFrom) er
groupID := rpcWithFrom.GroupID
ourDataColumn := p.getDataColumn(topicID, groupID)
var shouldRepublish bool
var handledHeader bool
if ourDataColumn == nil && hasMessage {
var header *ethpb.PartialDataColumnHeader
@@ -320,7 +321,12 @@ func (p *PartialColumnBroadcaster) handleIncomingRPC(rpcWithFrom rpcWithFrom) er
p.logger.Debug("No header handler registered for topic")
return nil
}
headerHandler(header)
err = headerHandler(header)
if err != nil {
p.logger.Error("Failed to handle header", "err", err)
} else {
handledHeader = true
}
}
columnIndex, err := extractColumnIndexFromTopic(topicID)
@@ -419,7 +425,7 @@ func (p *PartialColumnBroadcaster) handleIncomingRPC(rpcWithFrom rpcWithFrom) er
logger.Debug("republishing due to parts metadata difference")
}
if shouldRepublish {
if shouldRepublish && !handledHeader {
err := p.ps.PublishPartialMessage(topicID, ourDataColumn, partialmessages.PublishOptions{})
if err != nil {
return err

View File

@@ -361,13 +361,12 @@ func (s *Service) registerSubscribers(nse params.NetworkScheduleEntry) bool {
log.WithError(err).Error("Failed to handle verified RO data column subscriber")
}
},
handleHeader: func(header *ethpb.PartialDataColumnHeader) {
handleHeader: func(header *ethpb.PartialDataColumnHeader) error {
ctx, cancel := context.WithTimeout(s.ctx, pubsubMessageTimeout)
defer cancel()
err := s.partialDataColumnHeaderSubscriber(ctx, header)
if err != nil {
log.WithError(err).Error("Failed to handle partial data column header")
}
source := peerdas.PopulateFromPartialHeader(header)
log.WithField("slot", source.Slot()).Info("Received data column header")
return s.processDataColumnSidecarsFromExecution(ctx, source)
},
}
}

View File

@@ -11,9 +11,7 @@ import (
"github.com/OffchainLabs/prysm/v7/config/params"
"github.com/OffchainLabs/prysm/v7/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/proto"
)
@@ -67,22 +65,6 @@ func (s *Service) dataColumnSubscriber(ctx context.Context, msg proto.Message) e
return nil
}
func (s *Service) partialDataColumnHeaderSubscriber(ctx context.Context, header *ethpb.PartialDataColumnHeader) error {
source := peerdas.PopulateFromPartialHeader(header)
log.WithField("slot", source.Slot()).Info("Received data column header")
go func() {
if err := s.processDataColumnSidecarsFromExecution(ctx, source); err != nil {
log.WithError(err).WithFields(logrus.Fields{
"root": fmt.Sprintf("%#x", source.Root()),
"slot": source.Slot(),
}).Error("Failed to process sidecars from execution for partial data column header")
}
}()
return nil
}
func (s *Service) verifiedRODataColumnSubscriber(ctx context.Context, sidecar blocks.VerifiedRODataColumn) error {
log.WithField("slot", sidecar.Slot()).WithField("column", sidecar.Index).Info("Received data column sidecar")