Compare commits

..

2 Commits

Author SHA1 Message Date
Aarsh Shah
c5914ea4d9 handle partial data column header 2026-02-04 19:05:03 +04:00
Aarsh Shah
08d143bd2c add type for partial header reconstruction 2026-02-04 18:27:54 +04:00
5 changed files with 126 additions and 8 deletions

View File

@@ -24,11 +24,13 @@ var (
var (
_ ConstructionPopulator = (*BlockReconstructionSource)(nil)
_ ConstructionPopulator = (*SidecarReconstructionSource)(nil)
_ ConstructionPopulator = (*PartialDataColumnHeaderReconstructionSource)(nil)
)
const (
BlockType = "BeaconBlock"
SidecarType = "DataColumnSidecar"
BlockType = "BeaconBlock"
SidecarType = "DataColumnSidecar"
PartialDataColumnHeaderType = "PartialDataColumnHeader"
)
type (
@@ -55,6 +57,10 @@ type (
blocks.VerifiedRODataColumn
}
PartialDataColumnHeaderReconstructionSource struct {
*ethpb.PartialDataColumnHeader
}
blockInfo struct {
signedBlockHeader *ethpb.SignedBeaconBlockHeader
kzgCommitments [][]byte
@@ -72,6 +78,11 @@ func PopulateFromSidecar(sidecar blocks.VerifiedRODataColumn) *SidecarReconstruc
return &SidecarReconstructionSource{VerifiedRODataColumn: sidecar}
}
// PopulateFromPartialHeader creates a PartialDataColumnHeaderReconstructionSource from a partial header
func PopulateFromPartialHeader(header *ethpb.PartialDataColumnHeader) *PartialDataColumnHeaderReconstructionSource {
return &PartialDataColumnHeaderReconstructionSource{PartialDataColumnHeader: header}
}
// ValidatorsCustodyRequirement returns the number of custody groups regarding the validator indices attached to the beacon node.
// https://github.com/ethereum/consensus-specs/blob/master/specs/fulu/validator.md#validator-custody
func ValidatorsCustodyRequirement(state beaconState.ReadOnlyBeaconState, validatorsIndex map[primitives.ValidatorIndex]bool) (uint64, error) {
@@ -289,3 +300,43 @@ func (s *SidecarReconstructionSource) extract() (*blockInfo, error) {
return info, nil
}
// Slot returns the slot from the partial data column header
func (p *PartialDataColumnHeaderReconstructionSource) Slot() primitives.Slot {
return p.SignedBlockHeader.Header.Slot
}
// Root returns the block root computed from the header
func (p *PartialDataColumnHeaderReconstructionSource) Root() [fieldparams.RootLength]byte {
root, err := p.SignedBlockHeader.Header.HashTreeRoot()
if err != nil {
return [fieldparams.RootLength]byte{}
}
return root
}
// ProposerIndex returns the proposer index from the header
func (p *PartialDataColumnHeaderReconstructionSource) ProposerIndex() primitives.ValidatorIndex {
return p.SignedBlockHeader.Header.ProposerIndex
}
// Commitments returns the KZG commitments from the header
func (p *PartialDataColumnHeaderReconstructionSource) Commitments() ([][]byte, error) {
return p.KzgCommitments, nil
}
// Type returns the type of the source
func (p *PartialDataColumnHeaderReconstructionSource) Type() string {
return PartialDataColumnHeaderType
}
// extract extracts the block information from the partial header
func (p *PartialDataColumnHeaderReconstructionSource) extract() (*blockInfo, error) {
info := &blockInfo{
signedBlockHeader: p.SignedBlockHeader,
kzgCommitments: p.KzgCommitments,
kzgInclusionProof: p.KzgCommitmentsInclusionProof,
}
return info, nil
}

View File

@@ -267,4 +267,31 @@ func TestReconstructionSource(t *testing.T) {
require.Equal(t, peerdas.SidecarType, src.Type())
})
t.Run("from partial header", func(t *testing.T) {
referenceSidecar := sidecars[0]
partialHeader := &ethpb.PartialDataColumnHeader{
SignedBlockHeader: referenceSidecar.SignedBlockHeader,
KzgCommitments: referenceSidecar.KzgCommitments,
KzgCommitmentsInclusionProof: referenceSidecar.KzgCommitmentsInclusionProof,
}
src := peerdas.PopulateFromPartialHeader(partialHeader)
require.Equal(t, referenceSidecar.SignedBlockHeader.Header.Slot, src.Slot())
// Compute expected root
expectedRoot, err := referenceSidecar.SignedBlockHeader.Header.HashTreeRoot()
require.NoError(t, err)
require.Equal(t, expectedRoot, src.Root())
require.Equal(t, referenceSidecar.SignedBlockHeader.Header.ProposerIndex, src.ProposerIndex())
commitments, err := src.Commitments()
require.NoError(t, err)
require.Equal(t, 2, len(commitments))
require.DeepEqual(t, commitment1, commitments[0])
require.DeepEqual(t, commitment2, commitments[1])
require.Equal(t, peerdas.PartialDataColumnHeaderType, src.Type())
})
}

View File

@@ -47,6 +47,7 @@ func extractColumnIndexFromTopic(topic string) (uint64, error) {
// - reject=false, err!=nil: IGNORE - don't penalize, just ignore
// - reject=false, err=nil: valid header
type HeaderValidator func(header *ethpb.PartialDataColumnHeader) (reject bool, err error)
type HeaderHandler func(header *ethpb.PartialDataColumnHeader)
type ColumnValidator func(cells []blocks.CellProofBundle) error
type PartialColumnBroadcaster struct {
@@ -63,6 +64,9 @@ type PartialColumnBroadcaster struct {
// map topic -> handler
handlers map[string]SubHandler
// map topic -> headerHandler
headerHandlers map[string]HeaderHandler
// map topic -> *pubsub.Topic
topics map[string]*pubsub.Topic
@@ -109,6 +113,7 @@ type subscribe struct {
headerValidator HeaderValidator
validator ColumnValidator
handler SubHandler
headerHandler HeaderHandler
}
type unsubscribe struct {
@@ -133,6 +138,7 @@ func NewBroadcaster(logger *logrus.Logger) *PartialColumnBroadcaster {
validators: make(map[string]ColumnValidator),
headerValidators: make(map[string]HeaderValidator),
handlers: make(map[string]SubHandler),
headerHandlers: make(map[string]HeaderHandler),
topics: make(map[string]*pubsub.Topic),
partialMsgStore: make(map[string]map[string]*blocks.PartialDataColumn),
groupTTL: make(map[string]int8),
@@ -225,7 +231,7 @@ func (p *PartialColumnBroadcaster) loop() {
case requestKindPublish:
req.response <- p.publish(req.publish.topic, req.publish.c)
case requestKindSubscribe:
req.response <- p.subscribe(req.sub.t, req.sub.headerValidator, req.sub.validator, req.sub.handler)
req.response <- p.subscribe(req.sub.t, req.sub.headerValidator, req.sub.validator, req.sub.handler, req.sub.headerHandler)
case requestKindUnsubscribe:
req.response <- p.unsubscribe(req.unsub.topic)
case requestKindHandleIncomingRPC:
@@ -309,7 +315,12 @@ func (p *PartialColumnBroadcaster) handleIncomingRPC(rpcWithFrom rpcWithFrom) er
// Cache the valid header
p.validHeaderCache[string(groupID)] = header
// TODO: We now have the information we need to call GetBlobsV3, we should do that to see what we have locally.
headerHandler, ok := p.headerHandlers[topicID]
if !ok || headerHandler == nil {
p.logger.Debug("No header handler registered for topic")
return nil
}
headerHandler(header)
}
columnIndex, err := extractColumnIndexFromTopic(topicID)
@@ -491,7 +502,7 @@ func (p *PartialColumnBroadcaster) publish(topic string, c blocks.PartialDataCol
type SubHandler func(topic string, col blocks.VerifiedRODataColumn)
func (p *PartialColumnBroadcaster) Subscribe(t *pubsub.Topic, headerValidator HeaderValidator, validator ColumnValidator, handler SubHandler) error {
func (p *PartialColumnBroadcaster) Subscribe(t *pubsub.Topic, headerValidator HeaderValidator, validator ColumnValidator, handler SubHandler, headerHandler HeaderHandler) error {
respCh := make(chan error)
p.incomingReq <- request{
kind: requestKindSubscribe,
@@ -500,12 +511,13 @@ func (p *PartialColumnBroadcaster) Subscribe(t *pubsub.Topic, headerValidator He
headerValidator: headerValidator,
validator: validator,
handler: handler,
headerHandler: headerHandler,
},
response: respCh,
}
return <-respCh
}
func (p *PartialColumnBroadcaster) subscribe(t *pubsub.Topic, headerValidator HeaderValidator, validator ColumnValidator, handler SubHandler) error {
func (p *PartialColumnBroadcaster) subscribe(t *pubsub.Topic, headerValidator HeaderValidator, validator ColumnValidator, handler SubHandler, headerHandler HeaderHandler) error {
topic := t.String()
if _, ok := p.topics[topic]; ok {
return errors.New("already subscribed")
@@ -515,6 +527,7 @@ func (p *PartialColumnBroadcaster) subscribe(t *pubsub.Topic, headerValidator He
p.headerValidators[topic] = headerValidator
p.validators[topic] = validator
p.handlers[topic] = handler
p.headerHandlers[topic] = headerHandler
return nil
}
@@ -539,6 +552,6 @@ func (p *PartialColumnBroadcaster) unsubscribe(topic string) error {
delete(p.headerValidators, topic)
delete(p.validators, topic)
delete(p.handlers, topic)
delete(p.headerHandlers, topic)
return t.Close()
}

View File

@@ -74,6 +74,7 @@ type partialSubscribeParameters struct {
validateHeader partialdatacolumnbroadcaster.HeaderValidator
validate partialdatacolumnbroadcaster.ColumnValidator
handle partialdatacolumnbroadcaster.SubHandler
handleHeader partialdatacolumnbroadcaster.HeaderHandler
}
// shortTopic is a less verbose version of topic strings used for logging.
@@ -360,6 +361,14 @@ func (s *Service) registerSubscribers(nse params.NetworkScheduleEntry) bool {
log.WithError(err).Error("Failed to handle verified RO data column subscriber")
}
},
handleHeader: func(header *ethpb.PartialDataColumnHeader) {
ctx, cancel := context.WithTimeout(s.ctx, pubsubMessageTimeout)
defer cancel()
err := s.partialDataColumnHeaderSubscriber(ctx, header)
if err != nil {
log.WithError(err).Error("Failed to handle partial data column header")
}
},
}
}
s.subscribeWithParameters(subscribeParameters{
@@ -643,7 +652,7 @@ func (s *Service) trySubscribeSubnets(t *subnetTracker) {
if requestPartial {
log.Info("Subscribing to partial columns on", topicStr)
err = t.partial.broadcaster.Subscribe(topic, t.partial.validateHeader, t.partial.validate, t.partial.handle)
err = t.partial.broadcaster.Subscribe(topic, t.partial.validateHeader, t.partial.validate, t.partial.handle, t.partial.handleHeader)
if err != nil {
log.WithError(err).Error("Failed to subscribe to partial column")

View File

@@ -11,7 +11,9 @@ import (
"github.com/OffchainLabs/prysm/v7/config/params"
"github.com/OffchainLabs/prysm/v7/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/proto"
)
@@ -65,6 +67,22 @@ func (s *Service) dataColumnSubscriber(ctx context.Context, msg proto.Message) e
return nil
}
func (s *Service) partialDataColumnHeaderSubscriber(ctx context.Context, header *ethpb.PartialDataColumnHeader) error {
source := peerdas.PopulateFromPartialHeader(header)
log.WithField("slot", source.Slot()).Info("Received data column header")
go func() {
if err := s.processDataColumnSidecarsFromExecution(ctx, source); err != nil {
log.WithError(err).WithFields(logrus.Fields{
"root": fmt.Sprintf("%#x", source.Root()),
"slot": source.Slot(),
}).Error("Failed to process sidecars from execution for partial data column header")
}
}()
return nil
}
func (s *Service) verifiedRODataColumnSubscriber(ctx context.Context, sidecar blocks.VerifiedRODataColumn) error {
log.WithField("slot", sidecar.Slot()).WithField("column", sidecar.Index).Info("Received data column sidecar")