mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-02-13 06:25:06 -05:00
Compare commits
16 Commits
rebased-pa
...
feat/updat
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9b8a6baad3 | ||
|
|
fd07e59c0a | ||
|
|
d66d25e7ad | ||
|
|
89ab513183 | ||
|
|
1c6110b6e8 | ||
|
|
a82edc7fbc | ||
|
|
39bb8d2929 | ||
|
|
387cfcd442 | ||
|
|
c70e51b445 | ||
|
|
a1a8a86341 | ||
|
|
71b1610331 | ||
|
|
814abab4b5 | ||
|
|
d0d6bab8a0 | ||
|
|
3f6c01fc3b | ||
|
|
c5914ea4d9 | ||
|
|
08d143bd2c |
@@ -24,11 +24,13 @@ var (
|
||||
var (
|
||||
_ ConstructionPopulator = (*BlockReconstructionSource)(nil)
|
||||
_ ConstructionPopulator = (*SidecarReconstructionSource)(nil)
|
||||
_ ConstructionPopulator = (*PartialDataColumnHeaderReconstructionSource)(nil)
|
||||
)
|
||||
|
||||
const (
|
||||
BlockType = "BeaconBlock"
|
||||
SidecarType = "DataColumnSidecar"
|
||||
BlockType = "BeaconBlock"
|
||||
SidecarType = "DataColumnSidecar"
|
||||
PartialDataColumnHeaderType = "PartialDataColumnHeader"
|
||||
)
|
||||
|
||||
type (
|
||||
@@ -55,6 +57,10 @@ type (
|
||||
blocks.VerifiedRODataColumn
|
||||
}
|
||||
|
||||
PartialDataColumnHeaderReconstructionSource struct {
|
||||
*ethpb.PartialDataColumnHeader
|
||||
}
|
||||
|
||||
blockInfo struct {
|
||||
signedBlockHeader *ethpb.SignedBeaconBlockHeader
|
||||
kzgCommitments [][]byte
|
||||
@@ -72,6 +78,11 @@ func PopulateFromSidecar(sidecar blocks.VerifiedRODataColumn) *SidecarReconstruc
|
||||
return &SidecarReconstructionSource{VerifiedRODataColumn: sidecar}
|
||||
}
|
||||
|
||||
// PopulateFromPartialHeader creates a PartialDataColumnHeaderReconstructionSource from a partial header
|
||||
func PopulateFromPartialHeader(header *ethpb.PartialDataColumnHeader) *PartialDataColumnHeaderReconstructionSource {
|
||||
return &PartialDataColumnHeaderReconstructionSource{PartialDataColumnHeader: header}
|
||||
}
|
||||
|
||||
// ValidatorsCustodyRequirement returns the number of custody groups regarding the validator indices attached to the beacon node.
|
||||
// https://github.com/ethereum/consensus-specs/blob/master/specs/fulu/validator.md#validator-custody
|
||||
func ValidatorsCustodyRequirement(state beaconState.ReadOnlyBeaconState, validatorsIndex map[primitives.ValidatorIndex]bool) (uint64, error) {
|
||||
@@ -289,3 +300,43 @@ func (s *SidecarReconstructionSource) extract() (*blockInfo, error) {
|
||||
|
||||
return info, nil
|
||||
}
|
||||
|
||||
// Slot returns the slot from the partial data column header
|
||||
func (p *PartialDataColumnHeaderReconstructionSource) Slot() primitives.Slot {
|
||||
return p.SignedBlockHeader.Header.Slot
|
||||
}
|
||||
|
||||
// Root returns the block root computed from the header
|
||||
func (p *PartialDataColumnHeaderReconstructionSource) Root() [fieldparams.RootLength]byte {
|
||||
root, err := p.SignedBlockHeader.Header.HashTreeRoot()
|
||||
if err != nil {
|
||||
return [fieldparams.RootLength]byte{}
|
||||
}
|
||||
return root
|
||||
}
|
||||
|
||||
// ProposerIndex returns the proposer index from the header
|
||||
func (p *PartialDataColumnHeaderReconstructionSource) ProposerIndex() primitives.ValidatorIndex {
|
||||
return p.SignedBlockHeader.Header.ProposerIndex
|
||||
}
|
||||
|
||||
// Commitments returns the KZG commitments from the header
|
||||
func (p *PartialDataColumnHeaderReconstructionSource) Commitments() ([][]byte, error) {
|
||||
return p.KzgCommitments, nil
|
||||
}
|
||||
|
||||
// Type returns the type of the source
|
||||
func (p *PartialDataColumnHeaderReconstructionSource) Type() string {
|
||||
return PartialDataColumnHeaderType
|
||||
}
|
||||
|
||||
// extract extracts the block information from the partial header
|
||||
func (p *PartialDataColumnHeaderReconstructionSource) extract() (*blockInfo, error) {
|
||||
info := &blockInfo{
|
||||
signedBlockHeader: p.SignedBlockHeader,
|
||||
kzgCommitments: p.KzgCommitments,
|
||||
kzgInclusionProof: p.KzgCommitmentsInclusionProof,
|
||||
}
|
||||
|
||||
return info, nil
|
||||
}
|
||||
|
||||
@@ -267,4 +267,31 @@ func TestReconstructionSource(t *testing.T) {
|
||||
|
||||
require.Equal(t, peerdas.SidecarType, src.Type())
|
||||
})
|
||||
|
||||
t.Run("from partial header", func(t *testing.T) {
|
||||
referenceSidecar := sidecars[0]
|
||||
partialHeader := ðpb.PartialDataColumnHeader{
|
||||
SignedBlockHeader: referenceSidecar.SignedBlockHeader,
|
||||
KzgCommitments: referenceSidecar.KzgCommitments,
|
||||
KzgCommitmentsInclusionProof: referenceSidecar.KzgCommitmentsInclusionProof,
|
||||
}
|
||||
|
||||
src := peerdas.PopulateFromPartialHeader(partialHeader)
|
||||
require.Equal(t, referenceSidecar.SignedBlockHeader.Header.Slot, src.Slot())
|
||||
|
||||
// Compute expected root
|
||||
expectedRoot, err := referenceSidecar.SignedBlockHeader.Header.HashTreeRoot()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, expectedRoot, src.Root())
|
||||
|
||||
require.Equal(t, referenceSidecar.SignedBlockHeader.Header.ProposerIndex, src.ProposerIndex())
|
||||
|
||||
commitments, err := src.Commitments()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 2, len(commitments))
|
||||
require.DeepEqual(t, commitment1, commitments[0])
|
||||
require.DeepEqual(t, commitment2, commitments[1])
|
||||
|
||||
require.Equal(t, peerdas.PartialDataColumnHeaderType, src.Type())
|
||||
})
|
||||
}
|
||||
|
||||
@@ -77,8 +77,6 @@ func TestTwoNodePartialColumnExchange(t *testing.T) {
|
||||
ps2, err := pubsub.NewGossipSub(ctx, h2, opts2...)
|
||||
require.NoError(t, err)
|
||||
|
||||
go broadcaster1.Start()
|
||||
go broadcaster2.Start()
|
||||
defer func() {
|
||||
broadcaster1.Stop()
|
||||
broadcaster2.Stop()
|
||||
@@ -140,7 +138,7 @@ func TestTwoNodePartialColumnExchange(t *testing.T) {
|
||||
topic2, err := ps2.Join(topicStr, pubsub.RequestPartialMessages())
|
||||
require.NoError(t, err)
|
||||
|
||||
// Header validator that verifies the inclusion proof
|
||||
// Header validator
|
||||
headerValidator := func(header *ethpb.PartialDataColumnHeader) (reject bool, err error) {
|
||||
if header == nil {
|
||||
return false, fmt.Errorf("nil header")
|
||||
@@ -193,9 +191,17 @@ func TestTwoNodePartialColumnExchange(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
defer sub2.Cancel()
|
||||
|
||||
err = broadcaster1.Subscribe(topic1, headerValidator, cellValidator, handler1)
|
||||
noopHeaderHandler := func(header *ethpb.PartialDataColumnHeader, groupID string) {}
|
||||
|
||||
err = broadcaster1.Start(headerValidator, cellValidator, handler1, noopHeaderHandler)
|
||||
require.NoError(t, err)
|
||||
err = broadcaster2.Subscribe(topic2, headerValidator, cellValidator, handler2)
|
||||
|
||||
err = broadcaster2.Start(headerValidator, cellValidator, handler2, noopHeaderHandler)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = broadcaster1.Subscribe(topic1)
|
||||
require.NoError(t, err)
|
||||
err = broadcaster2.Subscribe(topic2)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Wait for mesh to form
|
||||
|
||||
@@ -7,14 +7,12 @@ import (
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/OffchainLabs/go-bitfield"
|
||||
"github.com/OffchainLabs/prysm/v7/config/params"
|
||||
"github.com/OffchainLabs/prysm/v7/consensus-types/blocks"
|
||||
"github.com/OffchainLabs/prysm/v7/internal/logrusadapter"
|
||||
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
|
||||
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
||||
"github.com/libp2p/go-libp2p-pubsub/partialmessages"
|
||||
"github.com/libp2p/go-libp2p-pubsub/partialmessages/bitmap"
|
||||
pubsub_pb "github.com/libp2p/go-libp2p-pubsub/pb"
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
"github.com/pkg/errors"
|
||||
@@ -30,6 +28,8 @@ import (
|
||||
|
||||
const TTLInSlots = 3
|
||||
const maxConcurrentValidators = 128
|
||||
const headerHandledTimeout = time.Second * 1
|
||||
const maxConcurrentHeaderHandlers = 128
|
||||
|
||||
var dataColumnTopicRegex = regexp.MustCompile(`data_column_sidecar_(\d+)`)
|
||||
|
||||
@@ -47,6 +47,7 @@ func extractColumnIndexFromTopic(topic string) (uint64, error) {
|
||||
// - reject=false, err!=nil: IGNORE - don't penalize, just ignore
|
||||
// - reject=false, err=nil: valid header
|
||||
type HeaderValidator func(header *ethpb.PartialDataColumnHeader) (reject bool, err error)
|
||||
type HeaderHandler func(header *ethpb.PartialDataColumnHeader, groupID string)
|
||||
type ColumnValidator func(cells []blocks.CellProofBundle) error
|
||||
|
||||
type PartialColumnBroadcaster struct {
|
||||
@@ -55,18 +56,19 @@ type PartialColumnBroadcaster struct {
|
||||
ps *pubsub.PubSub
|
||||
stop chan struct{}
|
||||
|
||||
// map topic -> headerValidators
|
||||
headerValidators map[string]HeaderValidator
|
||||
// map topic -> Validator
|
||||
validators map[string]ColumnValidator
|
||||
validateHeader HeaderValidator
|
||||
validateColumn ColumnValidator
|
||||
handleColumn SubHandler
|
||||
handleHeader HeaderHandler
|
||||
|
||||
// map topic -> handler
|
||||
handlers map[string]SubHandler
|
||||
// map groupID -> bool to signal when header has been handled
|
||||
headerHandled map[string]bool
|
||||
|
||||
// map topic -> *pubsub.Topic
|
||||
topics map[string]*pubsub.Topic
|
||||
|
||||
concurrentValidatorSemaphore chan struct{}
|
||||
concurrentValidatorSemaphore chan struct{}
|
||||
concurrentHeaderHandlerSemaphore chan struct{}
|
||||
|
||||
// map topic -> map[groupID]PartialColumn
|
||||
partialMsgStore map[string]map[string]*blocks.PartialDataColumn
|
||||
@@ -87,16 +89,18 @@ const (
|
||||
requestKindUnsubscribe
|
||||
requestKindHandleIncomingRPC
|
||||
requestKindCellsValidated
|
||||
requestKindHeaderHandled
|
||||
)
|
||||
|
||||
type request struct {
|
||||
kind requestKind
|
||||
response chan error
|
||||
sub subscribe
|
||||
unsub unsubscribe
|
||||
publish publish
|
||||
incomingRPC rpcWithFrom
|
||||
cellsValidated *cellsValidated
|
||||
kind requestKind
|
||||
response chan error
|
||||
sub subscribe
|
||||
unsub unsubscribe
|
||||
publish publish
|
||||
incomingRPC rpcWithFrom
|
||||
cellsValidated *cellsValidated
|
||||
headerHandledGroup string
|
||||
}
|
||||
|
||||
type publish struct {
|
||||
@@ -105,10 +109,7 @@ type publish struct {
|
||||
}
|
||||
|
||||
type subscribe struct {
|
||||
t *pubsub.Topic
|
||||
headerValidator HeaderValidator
|
||||
validator ColumnValidator
|
||||
handler SubHandler
|
||||
t *pubsub.Topic
|
||||
}
|
||||
|
||||
type unsubscribe struct {
|
||||
@@ -130,19 +131,18 @@ type cellsValidated struct {
|
||||
|
||||
func NewBroadcaster(logger *logrus.Logger) *PartialColumnBroadcaster {
|
||||
return &PartialColumnBroadcaster{
|
||||
validators: make(map[string]ColumnValidator),
|
||||
headerValidators: make(map[string]HeaderValidator),
|
||||
handlers: make(map[string]SubHandler),
|
||||
topics: make(map[string]*pubsub.Topic),
|
||||
partialMsgStore: make(map[string]map[string]*blocks.PartialDataColumn),
|
||||
groupTTL: make(map[string]int8),
|
||||
validHeaderCache: make(map[string]*ethpb.PartialDataColumnHeader),
|
||||
headerHandled: make(map[string]bool),
|
||||
// GossipSub sends the messages to this channel. The buffer should be
|
||||
// big enough to avoid dropping messages. We don't want to block the gossipsub event loop for this.
|
||||
incomingReq: make(chan request, 128*16),
|
||||
logger: logger,
|
||||
|
||||
concurrentValidatorSemaphore: make(chan struct{}, maxConcurrentValidators),
|
||||
concurrentValidatorSemaphore: make(chan struct{}, maxConcurrentValidators),
|
||||
concurrentHeaderHandlerSemaphore: make(chan struct{}, maxConcurrentHeaderHandlers),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -153,15 +153,12 @@ func (p *PartialColumnBroadcaster) AppendPubSubOpts(opts []pubsub.Option) []pubs
|
||||
pubsub.WithPartialMessagesExtension(&partialmessages.PartialMessagesExtension{
|
||||
Logger: slogger,
|
||||
MergePartsMetadata: func(topic string, left, right partialmessages.PartsMetadata) partialmessages.PartsMetadata {
|
||||
if len(left) == 0 {
|
||||
return right
|
||||
}
|
||||
merged, err := bitfield.Bitlist(left).Or(bitfield.Bitlist(right))
|
||||
merged, err := blocks.MergePartsMetadata(left, right)
|
||||
if err != nil {
|
||||
p.logger.Warn("Failed to merge bitfields", "err", err, "left", left, "right", right)
|
||||
p.logger.Warn("Failed to merge parts metadata", "err", err)
|
||||
return left
|
||||
}
|
||||
return partialmessages.PartsMetadata(merged)
|
||||
return merged
|
||||
},
|
||||
ValidateRPC: func(from peer.ID, rpc *pubsub_pb.PartialMessagesExtension) error {
|
||||
// TODO. Add some basic and fast sanity checks
|
||||
@@ -187,11 +184,34 @@ func (p *PartialColumnBroadcaster) AppendPubSubOpts(opts []pubsub.Option) []pubs
|
||||
return opts
|
||||
}
|
||||
|
||||
// Start starts the event loop of the PartialColumnBroadcaster. Should be called
|
||||
// within a goroutine (go p.Start())
|
||||
func (p *PartialColumnBroadcaster) Start() {
|
||||
// Start starts the event loop of the PartialColumnBroadcaster.
|
||||
// It accepts the required validator and handler functions, returning an error if any is nil.
|
||||
// The event loop is launched in a goroutine.
|
||||
func (p *PartialColumnBroadcaster) Start(
|
||||
validateHeader HeaderValidator,
|
||||
validateColumn ColumnValidator,
|
||||
handleColumn SubHandler,
|
||||
handleHeader HeaderHandler,
|
||||
) error {
|
||||
if validateHeader == nil {
|
||||
return errors.New("no header validator provided")
|
||||
}
|
||||
if handleHeader == nil {
|
||||
return errors.New("no header handler provided")
|
||||
}
|
||||
if validateColumn == nil {
|
||||
return errors.New("no column validator provided")
|
||||
}
|
||||
if handleColumn == nil {
|
||||
return errors.New("no column handler provided")
|
||||
}
|
||||
p.validateHeader = validateHeader
|
||||
p.validateColumn = validateColumn
|
||||
p.handleColumn = handleColumn
|
||||
p.handleHeader = handleHeader
|
||||
p.stop = make(chan struct{})
|
||||
p.loop()
|
||||
go p.loop()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *PartialColumnBroadcaster) loop() {
|
||||
@@ -210,6 +230,7 @@ func (p *PartialColumnBroadcaster) loop() {
|
||||
|
||||
delete(p.groupTTL, groupID)
|
||||
delete(p.validHeaderCache, groupID)
|
||||
delete(p.headerHandled, groupID)
|
||||
for topic, msgStore := range p.partialMsgStore {
|
||||
delete(msgStore, groupID)
|
||||
if len(msgStore) == 0 {
|
||||
@@ -222,7 +243,7 @@ func (p *PartialColumnBroadcaster) loop() {
|
||||
case requestKindPublish:
|
||||
req.response <- p.publish(req.publish.topic, req.publish.c)
|
||||
case requestKindSubscribe:
|
||||
req.response <- p.subscribe(req.sub.t, req.sub.headerValidator, req.sub.validator, req.sub.handler)
|
||||
req.response <- p.subscribe(req.sub.t)
|
||||
case requestKindUnsubscribe:
|
||||
req.response <- p.unsubscribe(req.unsub.topic)
|
||||
case requestKindHandleIncomingRPC:
|
||||
@@ -235,6 +256,8 @@ func (p *PartialColumnBroadcaster) loop() {
|
||||
if err != nil {
|
||||
p.logger.Error("Failed to handle cells validated", "err", err)
|
||||
}
|
||||
case requestKindHeaderHandled:
|
||||
p.handleHeaderHandled(req.headerHandledGroup)
|
||||
default:
|
||||
p.logger.Error("Unknown request kind", "kind", req.kind)
|
||||
}
|
||||
@@ -287,13 +310,7 @@ func (p *PartialColumnBroadcaster) handleIncomingRPC(rpcWithFrom rpcWithFrom) er
|
||||
}
|
||||
|
||||
header = message.Header[0]
|
||||
headerValidator, ok := p.headerValidators[topicID]
|
||||
if !ok || headerValidator == nil {
|
||||
p.logger.Debug("No header validator registered for topic")
|
||||
return nil
|
||||
}
|
||||
|
||||
reject, err := headerValidator(header)
|
||||
reject, err := p.validateHeader(header)
|
||||
if err != nil {
|
||||
p.logger.Debug("Header validation failed", "err", err, "reject", reject)
|
||||
if reject {
|
||||
@@ -305,8 +322,15 @@ func (p *PartialColumnBroadcaster) handleIncomingRPC(rpcWithFrom rpcWithFrom) er
|
||||
}
|
||||
// Cache the valid header
|
||||
p.validHeaderCache[string(groupID)] = header
|
||||
p.headerHandled[string(groupID)] = false
|
||||
|
||||
// TODO: We now have the information we need to call GetBlobsV3, we should do that to see what we have locally.
|
||||
p.concurrentHeaderHandlerSemaphore <- struct{}{}
|
||||
go func() {
|
||||
defer func() {
|
||||
<-p.concurrentHeaderHandlerSemaphore
|
||||
}()
|
||||
p.handleHeader(header, string(groupID))
|
||||
}()
|
||||
}
|
||||
|
||||
columnIndex, err := extractColumnIndexFromTopic(topicID)
|
||||
@@ -354,8 +378,7 @@ func (p *PartialColumnBroadcaster) handleIncomingRPC(rpcWithFrom rpcWithFrom) er
|
||||
"group": groupID,
|
||||
})
|
||||
|
||||
validator, validatorOK := p.validators[topicID]
|
||||
if len(rpcWithFrom.PartialMessage) > 0 && validatorOK {
|
||||
if len(rpcWithFrom.PartialMessage) > 0 {
|
||||
// TODO: is there any penalty we want to consider for giving us data we didn't request?
|
||||
// Note that we need to be careful around race conditions and eager data.
|
||||
// Also note that protobufs by design allow extra data that we don't parse.
|
||||
@@ -376,7 +399,7 @@ func (p *PartialColumnBroadcaster) handleIncomingRPC(rpcWithFrom rpcWithFrom) er
|
||||
<-p.concurrentValidatorSemaphore
|
||||
}()
|
||||
start := time.Now()
|
||||
err := validator(cellsToVerify)
|
||||
err := p.validateColumn(cellsToVerify)
|
||||
if err != nil {
|
||||
logger.Error("failed to validate cells", "err", err)
|
||||
_ = p.ps.PeerFeedback(topicID, rpcWithFrom.from, pubsub.PeerFeedbackInvalidMessage)
|
||||
@@ -397,12 +420,23 @@ func (p *PartialColumnBroadcaster) handleIncomingRPC(rpcWithFrom rpcWithFrom) er
|
||||
}
|
||||
}
|
||||
|
||||
peerHas := bitmap.Bitmap(rpcWithFrom.PartsMetadata)
|
||||
iHave := bitmap.Bitmap(ourDataColumn.PartsMetadata())
|
||||
if !shouldRepublish && len(peerHas) > 0 && !bytes.Equal(peerHas, iHave) {
|
||||
// Either we have something they don't or vice versa
|
||||
shouldRepublish = true
|
||||
logger.Debug("republishing due to parts metadata difference")
|
||||
if !shouldRepublish && len(rpcWithFrom.PartsMetadata) > 0 {
|
||||
peerMeta, err := blocks.ParsePartsMetadata(rpcWithFrom.PartsMetadata)
|
||||
ourMeta, err2 := blocks.ParsePartsMetadata(ourDataColumn.PartsMetadata())
|
||||
if err == nil && err2 == nil && !bytes.Equal(peerMeta.Available, ourMeta.Available) {
|
||||
// Either we have something they don't or vice versa
|
||||
shouldRepublish = true
|
||||
logger.Debug("republishing due to parts metadata difference")
|
||||
}
|
||||
}
|
||||
|
||||
headerHandled, ok := p.headerHandled[string(groupID)]
|
||||
// we only want to skip republishing if the header is currently being handled but hasn't been handled yet.
|
||||
// If the header is NOT being handled at all, these incoming cells are likely in response to a previous publish we sent
|
||||
// (either when got a data column sidecar, a beacon block body or if we are a block proposer).
|
||||
if ok && !headerHandled {
|
||||
p.logger.Debug("Header not handled, skipping republish", "topic", topicID, "group", groupID)
|
||||
return nil
|
||||
}
|
||||
|
||||
if shouldRepublish {
|
||||
@@ -433,15 +467,19 @@ func (p *PartialColumnBroadcaster) handleCellsValidated(cells *cellsValidated) e
|
||||
|
||||
if col, ok := ourDataColumn.Complete(p.logger); ok {
|
||||
p.logger.Info("Completed partial column", "topic", cells.topic, "group", cells.group)
|
||||
handler, handlerOK := p.handlers[cells.topic]
|
||||
|
||||
if handlerOK {
|
||||
go handler(cells.topic, col)
|
||||
if p.handleColumn != nil {
|
||||
go p.handleColumn(cells.topic, col)
|
||||
}
|
||||
} else {
|
||||
p.logger.Info("Extended partial column", "topic", cells.topic, "group", cells.group)
|
||||
}
|
||||
|
||||
headerHandled, ok := p.headerHandled[string(ourDataColumn.GroupID())]
|
||||
if ok && !headerHandled {
|
||||
p.logger.Debug("Header not handled, skipping republish", "topic", cells.topic, "group", cells.group)
|
||||
return nil
|
||||
}
|
||||
|
||||
err := p.ps.PublishPartialMessage(cells.topic, ourDataColumn, partialmessages.PublishOptions{})
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -450,12 +488,39 @@ func (p *PartialColumnBroadcaster) handleCellsValidated(cells *cellsValidated) e
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *PartialColumnBroadcaster) handleHeaderHandled(groupID string) {
|
||||
p.headerHandled[groupID] = true
|
||||
for topic, topicStore := range p.partialMsgStore {
|
||||
col, ok := topicStore[groupID]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
err := p.ps.PublishPartialMessage(topic, col, partialmessages.PublishOptions{})
|
||||
if err != nil {
|
||||
p.logger.WithError(err).Error("Failed to republish after header handled", "topic", topic, "groupID", groupID)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (p *PartialColumnBroadcaster) Stop() {
|
||||
if p.stop != nil {
|
||||
close(p.stop)
|
||||
}
|
||||
}
|
||||
|
||||
// HeaderHandled notifies the event loop that a header has been fully processed,
|
||||
// triggering republishing of all columns in the store for the given groupID.
|
||||
func (p *PartialColumnBroadcaster) HeaderHandled(groupID string) error {
|
||||
if p.ps == nil {
|
||||
return errors.New("pubsub not initialized")
|
||||
}
|
||||
p.incomingReq <- request{
|
||||
kind: requestKindHeaderHandled,
|
||||
headerHandledGroup: groupID,
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Publish publishes the partial column.
|
||||
func (p *PartialColumnBroadcaster) Publish(topic string, c blocks.PartialDataColumn) error {
|
||||
if p.ps == nil {
|
||||
@@ -479,38 +544,55 @@ func (p *PartialColumnBroadcaster) publish(topic string, c blocks.PartialDataCol
|
||||
topicStore = make(map[string]*blocks.PartialDataColumn)
|
||||
p.partialMsgStore[topic] = topicStore
|
||||
}
|
||||
topicStore[string(c.GroupID())] = &c
|
||||
|
||||
var extended bool
|
||||
existing := topicStore[string(c.GroupID())]
|
||||
if existing != nil {
|
||||
// Extend the existing column with cells being published here.
|
||||
// The existing column may already contain cells received from peers. We must not overwrite it.
|
||||
for i := range c.Included.Len() {
|
||||
if c.Included.BitAt(i) {
|
||||
extended = existing.ExtendFromVerfifiedCell(uint64(i), c.Column[i], c.KzgProofs[i])
|
||||
}
|
||||
}
|
||||
if extended {
|
||||
if col, ok := existing.Complete(p.logger); ok {
|
||||
p.logger.Info("Completed partial column", "topic", topic, "group", existing.GroupID())
|
||||
if p.handleColumn != nil {
|
||||
go p.handleColumn(topic, col)
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
topicStore[string(c.GroupID())] = &c
|
||||
existing = &c
|
||||
}
|
||||
|
||||
p.groupTTL[string(c.GroupID())] = TTLInSlots
|
||||
|
||||
return p.ps.PublishPartialMessage(topic, &c, partialmessages.PublishOptions{})
|
||||
return p.ps.PublishPartialMessage(topic, existing, partialmessages.PublishOptions{})
|
||||
}
|
||||
|
||||
type SubHandler func(topic string, col blocks.VerifiedRODataColumn)
|
||||
|
||||
func (p *PartialColumnBroadcaster) Subscribe(t *pubsub.Topic, headerValidator HeaderValidator, validator ColumnValidator, handler SubHandler) error {
|
||||
func (p *PartialColumnBroadcaster) Subscribe(t *pubsub.Topic) error {
|
||||
respCh := make(chan error)
|
||||
p.incomingReq <- request{
|
||||
kind: requestKindSubscribe,
|
||||
sub: subscribe{
|
||||
t: t,
|
||||
headerValidator: headerValidator,
|
||||
validator: validator,
|
||||
handler: handler,
|
||||
t: t,
|
||||
},
|
||||
response: respCh,
|
||||
}
|
||||
return <-respCh
|
||||
}
|
||||
func (p *PartialColumnBroadcaster) subscribe(t *pubsub.Topic, headerValidator HeaderValidator, validator ColumnValidator, handler SubHandler) error {
|
||||
func (p *PartialColumnBroadcaster) subscribe(t *pubsub.Topic) error {
|
||||
topic := t.String()
|
||||
if _, ok := p.topics[topic]; ok {
|
||||
return errors.New("already subscribed")
|
||||
}
|
||||
|
||||
p.topics[topic] = t
|
||||
p.headerValidators[topic] = headerValidator
|
||||
p.validators[topic] = validator
|
||||
p.handlers[topic] = handler
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -532,9 +614,5 @@ func (p *PartialColumnBroadcaster) unsubscribe(topic string) error {
|
||||
}
|
||||
delete(p.topics, topic)
|
||||
delete(p.partialMsgStore, topic)
|
||||
delete(p.headerValidators, topic)
|
||||
delete(p.validators, topic)
|
||||
delete(p.handlers, topic)
|
||||
|
||||
return t.Close()
|
||||
}
|
||||
|
||||
@@ -311,10 +311,6 @@ func (s *Service) Start() {
|
||||
logExternalDNSAddr(s.host.ID(), p2pHostDNS, p2pTCPPort)
|
||||
}
|
||||
go s.forkWatcher()
|
||||
|
||||
if s.partialColumnBroadcaster != nil {
|
||||
go s.partialColumnBroadcaster.Start()
|
||||
}
|
||||
}
|
||||
|
||||
// Stop the p2p service and terminate all peer connections.
|
||||
@@ -325,9 +321,6 @@ func (s *Service) Stop() error {
|
||||
s.dv5Listener.Close()
|
||||
}
|
||||
|
||||
if s.partialColumnBroadcaster != nil {
|
||||
s.partialColumnBroadcaster.Stop()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -6,6 +6,8 @@ package sync
|
||||
|
||||
import (
|
||||
"context"
|
||||
"slices"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@@ -28,6 +30,7 @@ import (
|
||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/operations/synccommittee"
|
||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/operations/voluntaryexits"
|
||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p"
|
||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/partialdatacolumnbroadcaster"
|
||||
p2ptypes "github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/types"
|
||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/startup"
|
||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/state/stategen"
|
||||
@@ -41,6 +44,7 @@ import (
|
||||
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
|
||||
leakybucket "github.com/OffchainLabs/prysm/v7/container/leaky-bucket"
|
||||
"github.com/OffchainLabs/prysm/v7/crypto/rand"
|
||||
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
|
||||
"github.com/OffchainLabs/prysm/v7/runtime"
|
||||
prysmTime "github.com/OffchainLabs/prysm/v7/time"
|
||||
"github.com/OffchainLabs/prysm/v7/time/slots"
|
||||
@@ -261,6 +265,13 @@ func (s *Service) Start() {
|
||||
s.newColumnsVerifier = newDataColumnsVerifierFromInitializer(v)
|
||||
|
||||
go s.verifierRoutine()
|
||||
|
||||
if broadcaster := s.cfg.p2p.PartialColumnBroadcaster(); broadcaster != nil {
|
||||
if err := s.startPartialColumnBroadcaster(broadcaster); err != nil {
|
||||
log.WithError(err).Error("Failed to start partial column broadcaster")
|
||||
}
|
||||
}
|
||||
|
||||
go s.startDiscoveryAndSubscriptions()
|
||||
go s.processDataColumnLogs()
|
||||
|
||||
@@ -328,6 +339,9 @@ func (s *Service) Stop() error {
|
||||
for _, t := range s.cfg.p2p.PubSub().GetTopics() {
|
||||
s.unSubscribeFromTopic(t)
|
||||
}
|
||||
if broadcaster := s.cfg.p2p.PartialColumnBroadcaster(); broadcaster != nil {
|
||||
broadcaster.Stop()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -397,6 +411,46 @@ func (s *Service) waitForChainStart() {
|
||||
s.markForChainStart()
|
||||
}
|
||||
|
||||
func (s *Service) startPartialColumnBroadcaster(broadcaster *partialdatacolumnbroadcaster.PartialColumnBroadcaster) error {
|
||||
return broadcaster.Start(
|
||||
func(header *ethpb.PartialDataColumnHeader) (bool, error) {
|
||||
return s.validatePartialDataColumnHeader(s.ctx, header)
|
||||
},
|
||||
func(cellsToVerify []blocks.CellProofBundle) error {
|
||||
return peerdas.VerifyDataColumnsCellsKZGProofs(len(cellsToVerify), slices.Values(cellsToVerify))
|
||||
},
|
||||
func(topic string, col blocks.VerifiedRODataColumn) {
|
||||
ctx, cancel := context.WithTimeout(s.ctx, pubsubMessageTimeout)
|
||||
defer cancel()
|
||||
|
||||
slot := col.SignedBlockHeader.Header.Slot
|
||||
proposerIndex := col.SignedBlockHeader.Header.ProposerIndex
|
||||
if !s.hasSeenDataColumnIndex(slot, proposerIndex, col.Index) {
|
||||
s.setSeenDataColumnIndex(slot, proposerIndex, col.Index)
|
||||
// This column was completed from a partial message.
|
||||
partialMessageColumnCompletionsTotal.WithLabelValues(strconv.FormatUint(col.Index, 10)).Inc()
|
||||
}
|
||||
err := s.verifiedRODataColumnSubscriber(ctx, col)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Failed to handle verified RO data column subscriber")
|
||||
}
|
||||
},
|
||||
func(header *ethpb.PartialDataColumnHeader, groupID string) {
|
||||
ctx, cancel := context.WithTimeout(s.ctx, pubsubMessageTimeout)
|
||||
defer cancel()
|
||||
source := peerdas.PopulateFromPartialHeader(header)
|
||||
log.WithField("slot", source.Slot()).Info("Received data column header")
|
||||
err := s.processDataColumnSidecarsFromExecution(ctx, source)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Failed to process partial data column header")
|
||||
}
|
||||
if err := broadcaster.HeaderHandled(groupID); err != nil {
|
||||
log.WithError(err).Error("Failed to call header handled on broadcaster")
|
||||
}
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
func (s *Service) startDiscoveryAndSubscriptions() {
|
||||
// Wait for the chain to start.
|
||||
s.waitForChainStart()
|
||||
|
||||
@@ -5,8 +5,6 @@ import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"runtime/debug"
|
||||
"slices"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
@@ -22,7 +20,6 @@ import (
|
||||
"github.com/OffchainLabs/prysm/v7/cmd/beacon-chain/flags"
|
||||
"github.com/OffchainLabs/prysm/v7/config/features"
|
||||
"github.com/OffchainLabs/prysm/v7/config/params"
|
||||
"github.com/OffchainLabs/prysm/v7/consensus-types/blocks"
|
||||
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
|
||||
"github.com/OffchainLabs/prysm/v7/monitoring/tracing"
|
||||
"github.com/OffchainLabs/prysm/v7/monitoring/tracing/trace"
|
||||
@@ -70,10 +67,7 @@ type subscribeParameters struct {
|
||||
}
|
||||
|
||||
type partialSubscribeParameters struct {
|
||||
broadcaster *partialdatacolumnbroadcaster.PartialColumnBroadcaster
|
||||
validateHeader partialdatacolumnbroadcaster.HeaderValidator
|
||||
validate partialdatacolumnbroadcaster.ColumnValidator
|
||||
handle partialdatacolumnbroadcaster.SubHandler
|
||||
broadcaster *partialdatacolumnbroadcaster.PartialColumnBroadcaster
|
||||
}
|
||||
|
||||
// shortTopic is a less verbose version of topic strings used for logging.
|
||||
@@ -334,32 +328,9 @@ func (s *Service) registerSubscribers(nse params.NetworkScheduleEntry) bool {
|
||||
if params.BeaconConfig().FuluForkEpoch <= nse.Epoch {
|
||||
s.spawn(func() {
|
||||
var ps *partialSubscribeParameters
|
||||
broadcaster := s.cfg.p2p.PartialColumnBroadcaster()
|
||||
if broadcaster != nil {
|
||||
if broadcaster := s.cfg.p2p.PartialColumnBroadcaster(); broadcaster != nil {
|
||||
ps = &partialSubscribeParameters{
|
||||
broadcaster: broadcaster,
|
||||
validateHeader: func(header *ethpb.PartialDataColumnHeader) (bool, error) {
|
||||
return s.validatePartialDataColumnHeader(context.TODO(), header)
|
||||
},
|
||||
validate: func(cellsToVerify []blocks.CellProofBundle) error {
|
||||
return peerdas.VerifyDataColumnsCellsKZGProofs(len(cellsToVerify), slices.Values(cellsToVerify))
|
||||
},
|
||||
handle: func(topic string, col blocks.VerifiedRODataColumn) {
|
||||
ctx, cancel := context.WithTimeout(s.ctx, pubsubMessageTimeout)
|
||||
defer cancel()
|
||||
|
||||
slot := col.SignedBlockHeader.Header.Slot
|
||||
proposerIndex := col.SignedBlockHeader.Header.ProposerIndex
|
||||
if !s.hasSeenDataColumnIndex(slot, proposerIndex, col.Index) {
|
||||
s.setSeenDataColumnIndex(slot, proposerIndex, col.Index)
|
||||
// This column was completed from a partial message.
|
||||
partialMessageColumnCompletionsTotal.WithLabelValues(strconv.FormatUint(col.Index, 10)).Inc()
|
||||
}
|
||||
err := s.verifiedRODataColumnSubscriber(ctx, col)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Failed to handle verified RO data column subscriber")
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
s.subscribeWithParameters(subscribeParameters{
|
||||
@@ -643,7 +614,7 @@ func (s *Service) trySubscribeSubnets(t *subnetTracker) {
|
||||
|
||||
if requestPartial {
|
||||
log.Info("Subscribing to partial columns on", topicStr)
|
||||
err = t.partial.broadcaster.Subscribe(topic, t.partial.validateHeader, t.partial.validate, t.partial.handle)
|
||||
err = t.partial.broadcaster.Subscribe(topic)
|
||||
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Failed to subscribe to partial column")
|
||||
|
||||
@@ -85,15 +85,81 @@ func NewPartialDataColumn(
|
||||
func (p *PartialDataColumn) GroupID() []byte {
|
||||
return p.groupID
|
||||
}
|
||||
|
||||
// NewPartsMetadata creates SSZ-encoded PartialDataColumnPartsMetadata from the given bitmaps.
|
||||
func NewPartsMetadata(available, requests bitfield.Bitlist) (partialmessages.PartsMetadata, error) {
|
||||
meta := ðpb.PartialDataColumnPartsMetadata{
|
||||
Available: available,
|
||||
Requests: requests,
|
||||
}
|
||||
marshalled, err := meta.MarshalSSZ()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return partialmessages.PartsMetadata(marshalled), nil
|
||||
}
|
||||
|
||||
// ParsePartsMetadata deserializes SSZ-encoded PartialDataColumnPartsMetadata.
|
||||
func ParsePartsMetadata(data partialmessages.PartsMetadata) (*ethpb.PartialDataColumnPartsMetadata, error) {
|
||||
meta := ðpb.PartialDataColumnPartsMetadata{}
|
||||
if err := meta.UnmarshalSSZ(data); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return meta, nil
|
||||
}
|
||||
|
||||
// MergePartsMetadata merges two SSZ-encoded PartialDataColumnPartsMetadata by OR-ing
|
||||
// both available and requests bitmaps.
|
||||
// TODO: How do we handle the request bitmap here ?
|
||||
func MergePartsMetadata(left, right partialmessages.PartsMetadata) (partialmessages.PartsMetadata, error) {
|
||||
if len(left) == 0 {
|
||||
return right, nil
|
||||
}
|
||||
if len(right) == 0 {
|
||||
return left, nil
|
||||
}
|
||||
|
||||
leftMeta, err := ParsePartsMetadata(left)
|
||||
if err != nil {
|
||||
return left, err
|
||||
}
|
||||
rightMeta, err := ParsePartsMetadata(right)
|
||||
if err != nil {
|
||||
return left, err
|
||||
}
|
||||
|
||||
mergedAvailable, err := bitfield.Bitlist(leftMeta.Available).Or(bitfield.Bitlist(rightMeta.Available))
|
||||
if err != nil {
|
||||
return left, err
|
||||
}
|
||||
mergedRequests, err := bitfield.Bitlist(leftMeta.Requests).Or(bitfield.Bitlist(rightMeta.Requests))
|
||||
if err != nil {
|
||||
return left, err
|
||||
}
|
||||
|
||||
return NewPartsMetadata(mergedAvailable, mergedRequests)
|
||||
}
|
||||
|
||||
func (p *PartialDataColumn) PartialMessageBytes(metadata partialmessages.PartsMetadata) ([]byte, error) {
|
||||
peerHas := bitfield.Bitlist(metadata)
|
||||
if peerHas.Len() != p.Included.Len() {
|
||||
return nil, errors.New("metadata length does not match expected length")
|
||||
peerMeta, err := ParsePartsMetadata(metadata)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
peerAvailable := bitfield.Bitlist(peerMeta.Available)
|
||||
peerRequests := bitfield.Bitlist(peerMeta.Requests)
|
||||
|
||||
if peerAvailable.Len() != p.Included.Len() {
|
||||
return nil, errors.New("available bitmap length does not match expected length")
|
||||
}
|
||||
if peerRequests.Len() != p.Included.Len() {
|
||||
return nil, errors.New("requests bitmap length does not match expected length")
|
||||
}
|
||||
|
||||
var cellsToReturn int
|
||||
for i := range peerHas.Len() {
|
||||
if !peerHas.BitAt(i) && p.Included.BitAt(i) {
|
||||
for i := range p.Included.Len() {
|
||||
// Send cell if: we have it, peer doesn't have it, and peer wants it.
|
||||
if p.Included.BitAt(i) && !peerAvailable.BitAt(i) && peerRequests.BitAt(i) {
|
||||
cellsToReturn++
|
||||
}
|
||||
}
|
||||
@@ -107,8 +173,8 @@ func (p *PartialDataColumn) PartialMessageBytes(metadata partialmessages.PartsMe
|
||||
PartialColumn: make([][]byte, 0, cellsToReturn),
|
||||
KzgProofs: make([][]byte, 0, cellsToReturn),
|
||||
}
|
||||
for i := range peerHas.Len() {
|
||||
if peerHas.BitAt(i) || !p.Included.BitAt(i) {
|
||||
for i := range p.Included.Len() {
|
||||
if !p.Included.BitAt(i) || peerAvailable.BitAt(i) || !peerRequests.BitAt(i) {
|
||||
continue
|
||||
}
|
||||
included.SetBitAt(i, true)
|
||||
@@ -141,14 +207,27 @@ func (p *PartialDataColumn) EagerPartialMessageBytes() ([]byte, partialmessages.
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
// Empty bitlist since we aren't including any cells here
|
||||
peersNextParts := partialmessages.PartsMetadata(bitfield.NewBitlist(uint64(len(p.KzgCommitments))))
|
||||
// Empty available (no cells sent), empty requests (we don't know what the peer wants)
|
||||
numCommitments := uint64(len(p.KzgCommitments))
|
||||
peersNextParts, err := NewPartsMetadata(
|
||||
bitfield.NewBitlist(numCommitments),
|
||||
bitfield.NewBitlist(numCommitments),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
return marshalled, peersNextParts, nil
|
||||
}
|
||||
|
||||
func (p *PartialDataColumn) PartsMetadata() partialmessages.PartsMetadata {
|
||||
return partialmessages.PartsMetadata(p.Included)
|
||||
numCommitments := uint64(len(p.KzgCommitments))
|
||||
meta, err := NewPartsMetadata(p.Included, allOnesBitlist(numCommitments))
|
||||
if err != nil {
|
||||
logrus.Error("failed to create parts metadata", "err", err)
|
||||
return nil
|
||||
}
|
||||
return meta
|
||||
}
|
||||
|
||||
// CellsToVerifyFromPartialMessage returns cells from the partial message that need to be verified.
|
||||
@@ -243,3 +322,11 @@ func (p *PartialDataColumn) Complete(logger *logrus.Logger) (VerifiedRODataColum
|
||||
|
||||
return NewVerifiedRODataColumn(rodc), true
|
||||
}
|
||||
|
||||
func allOnesBitlist(length uint64) bitfield.Bitlist {
|
||||
bl := bitfield.NewBitlist(length)
|
||||
for i := range length {
|
||||
bl.SetBitAt(i, true)
|
||||
}
|
||||
return bl
|
||||
}
|
||||
|
||||
@@ -5,7 +5,6 @@ import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/OffchainLabs/go-bitfield"
|
||||
"github.com/OffchainLabs/prysm/v7/consensus-types/blocks"
|
||||
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
|
||||
"github.com/OffchainLabs/prysm/v7/testing/util"
|
||||
@@ -20,7 +19,11 @@ type invariantChecker struct {
|
||||
var _ partialmessages.InvariantChecker[*blocks.PartialDataColumn] = (*invariantChecker)(nil)
|
||||
|
||||
func (i *invariantChecker) MergePartsMetadata(left, right partialmessages.PartsMetadata) partialmessages.PartsMetadata {
|
||||
return partialmessages.MergeBitmap(left, right)
|
||||
merged, err := blocks.MergePartsMetadata(left, right)
|
||||
if err != nil {
|
||||
i.t.Fatalf("Failed to merge parts metadata: %v", err)
|
||||
}
|
||||
return merged
|
||||
}
|
||||
|
||||
func (i *invariantChecker) SplitIntoParts(in *blocks.PartialDataColumn) ([]*blocks.PartialDataColumn, error) {
|
||||
@@ -47,13 +50,10 @@ func (i *invariantChecker) FullMessage() (*blocks.PartialDataColumn, error) {
|
||||
proofs := make([][]byte, numCells)
|
||||
|
||||
for i := range numCells {
|
||||
for j := range commitments[i] {
|
||||
commitments[i][j] = byte(i)
|
||||
}
|
||||
cells[i] = make([]byte, 2048)
|
||||
cells[i] = fmt.Appendf(cells[i][:0], "cell %d", i)
|
||||
copy(cells[i], fmt.Sprintf("cell %d", i))
|
||||
proofs[i] = make([]byte, 48)
|
||||
proofs[i] = fmt.Appendf(proofs[i][:0], "proof %d", i)
|
||||
copy(proofs[i], fmt.Sprintf("proof %d", i))
|
||||
}
|
||||
|
||||
roDC, _ := util.CreateTestVerifiedRoDataColumnSidecars(i.t, []util.DataColumnParam{
|
||||
@@ -66,7 +66,14 @@ func (i *invariantChecker) FullMessage() (*blocks.PartialDataColumn, error) {
|
||||
})
|
||||
|
||||
c, err := blocks.NewPartialDataColumn(roDC[0].DataColumnSidecar.SignedBlockHeader, roDC[0].Index, roDC[0].KzgCommitments, roDC[0].KzgCommitmentsInclusionProof)
|
||||
return &c, err
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Populate all cells to make this a full message
|
||||
for idx := range numCells {
|
||||
c.ExtendFromVerfifiedCell(uint64(idx), roDC[0].Column[idx], roDC[0].KzgProofs[idx])
|
||||
}
|
||||
return &c, nil
|
||||
}
|
||||
|
||||
func (i *invariantChecker) EmptyMessage() *blocks.PartialDataColumn {
|
||||
@@ -112,9 +119,13 @@ func (i *invariantChecker) ExtendFromBytes(a *blocks.PartialDataColumn, data []b
|
||||
}
|
||||
|
||||
func (i *invariantChecker) ShouldRequest(a *blocks.PartialDataColumn, from peer.ID, partsMetadata []byte) bool {
|
||||
peerHas := bitfield.Bitlist(partsMetadata)
|
||||
for i := range peerHas.Len() {
|
||||
if peerHas.BitAt(i) && !a.Included.BitAt(i) {
|
||||
peerMeta, err := blocks.ParsePartsMetadata(partsMetadata)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
for idx := range peerMeta.Available.Len() {
|
||||
// Request if peer has cell, is willing to provide it, and we don't have it.
|
||||
if peerMeta.Available.BitAt(idx) && peerMeta.Requests.BitAt(idx) && !a.Included.BitAt(idx) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
@@ -189,6 +189,7 @@ ssz_fulu_objs = [
|
||||
"DataColumnsByRootIdentifier",
|
||||
"DataColumnSidecar",
|
||||
"PartialDataColumnSidecar",
|
||||
"PartialDataColumnPartsMetadata",
|
||||
"StatusV2",
|
||||
"SignedBeaconBlockContentsFulu",
|
||||
"SignedBeaconBlockFulu",
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
// Code generated by fastssz. DO NOT EDIT.
|
||||
// Hash: 2192552bad81ddd02726341648c9be77395b169121c28ed7ed7a794fc8a5f55d
|
||||
package eth
|
||||
|
||||
import (
|
||||
@@ -2946,3 +2947,129 @@ func (p *PartialDataColumnHeader) HashTreeRootWith(hh *ssz.Hasher) (err error) {
|
||||
hh.Merkleize(indx)
|
||||
return
|
||||
}
|
||||
|
||||
// MarshalSSZ ssz marshals the PartialDataColumnPartsMetadata object
|
||||
func (p *PartialDataColumnPartsMetadata) MarshalSSZ() ([]byte, error) {
|
||||
return ssz.MarshalSSZ(p)
|
||||
}
|
||||
|
||||
// MarshalSSZTo ssz marshals the PartialDataColumnPartsMetadata object to a target array
|
||||
func (p *PartialDataColumnPartsMetadata) MarshalSSZTo(buf []byte) (dst []byte, err error) {
|
||||
dst = buf
|
||||
offset := int(8)
|
||||
|
||||
// Offset (0) 'Available'
|
||||
dst = ssz.WriteOffset(dst, offset)
|
||||
offset += len(p.Available)
|
||||
|
||||
// Offset (1) 'Requests'
|
||||
dst = ssz.WriteOffset(dst, offset)
|
||||
offset += len(p.Requests)
|
||||
|
||||
// Field (0) 'Available'
|
||||
if size := len(p.Available); size > 512 {
|
||||
err = ssz.ErrBytesLengthFn("--.Available", size, 512)
|
||||
return
|
||||
}
|
||||
dst = append(dst, p.Available...)
|
||||
|
||||
// Field (1) 'Requests'
|
||||
if size := len(p.Requests); size > 512 {
|
||||
err = ssz.ErrBytesLengthFn("--.Requests", size, 512)
|
||||
return
|
||||
}
|
||||
dst = append(dst, p.Requests...)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// UnmarshalSSZ ssz unmarshals the PartialDataColumnPartsMetadata object
|
||||
func (p *PartialDataColumnPartsMetadata) UnmarshalSSZ(buf []byte) error {
|
||||
var err error
|
||||
size := uint64(len(buf))
|
||||
if size < 8 {
|
||||
return ssz.ErrSize
|
||||
}
|
||||
|
||||
tail := buf
|
||||
var o0, o1 uint64
|
||||
|
||||
// Offset (0) 'Available'
|
||||
if o0 = ssz.ReadOffset(buf[0:4]); o0 > size {
|
||||
return ssz.ErrOffset
|
||||
}
|
||||
|
||||
if o0 != 8 {
|
||||
return ssz.ErrInvalidVariableOffset
|
||||
}
|
||||
|
||||
// Offset (1) 'Requests'
|
||||
if o1 = ssz.ReadOffset(buf[4:8]); o1 > size || o0 > o1 {
|
||||
return ssz.ErrOffset
|
||||
}
|
||||
|
||||
// Field (0) 'Available'
|
||||
{
|
||||
buf = tail[o0:o1]
|
||||
if err = ssz.ValidateBitlist(buf, 512); err != nil {
|
||||
return err
|
||||
}
|
||||
if cap(p.Available) == 0 {
|
||||
p.Available = make([]byte, 0, len(buf))
|
||||
}
|
||||
p.Available = append(p.Available, buf...)
|
||||
}
|
||||
|
||||
// Field (1) 'Requests'
|
||||
{
|
||||
buf = tail[o1:]
|
||||
if err = ssz.ValidateBitlist(buf, 512); err != nil {
|
||||
return err
|
||||
}
|
||||
if cap(p.Requests) == 0 {
|
||||
p.Requests = make([]byte, 0, len(buf))
|
||||
}
|
||||
p.Requests = append(p.Requests, buf...)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// SizeSSZ returns the ssz encoded size in bytes for the PartialDataColumnPartsMetadata object
|
||||
func (p *PartialDataColumnPartsMetadata) SizeSSZ() (size int) {
|
||||
size = 8
|
||||
|
||||
// Field (0) 'Available'
|
||||
size += len(p.Available)
|
||||
|
||||
// Field (1) 'Requests'
|
||||
size += len(p.Requests)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// HashTreeRoot ssz hashes the PartialDataColumnPartsMetadata object
|
||||
func (p *PartialDataColumnPartsMetadata) HashTreeRoot() ([32]byte, error) {
|
||||
return ssz.HashWithDefaultHasher(p)
|
||||
}
|
||||
|
||||
// HashTreeRootWith ssz hashes the PartialDataColumnPartsMetadata object with a hasher
|
||||
func (p *PartialDataColumnPartsMetadata) HashTreeRootWith(hh *ssz.Hasher) (err error) {
|
||||
indx := hh.Index()
|
||||
|
||||
// Field (0) 'Available'
|
||||
if len(p.Available) == 0 {
|
||||
err = ssz.ErrEmptyBitlist
|
||||
return
|
||||
}
|
||||
hh.PutBitlist(p.Available, 512)
|
||||
|
||||
// Field (1) 'Requests'
|
||||
if len(p.Requests) == 0 {
|
||||
err = ssz.ErrEmptyBitlist
|
||||
return
|
||||
}
|
||||
hh.PutBitlist(p.Requests, 512)
|
||||
|
||||
hh.Merkleize(indx)
|
||||
return
|
||||
}
|
||||
|
||||
89
proto/prysm/v1alpha1/partial_data_columns.pb.go
generated
89
proto/prysm/v1alpha1/partial_data_columns.pb.go
generated
@@ -151,6 +151,58 @@ func (x *PartialDataColumnHeader) GetKzgCommitmentsInclusionProof() [][]byte {
|
||||
return nil
|
||||
}
|
||||
|
||||
type PartialDataColumnPartsMetadata struct {
|
||||
state protoimpl.MessageState `protogen:"open.v1"`
|
||||
Available github_com_OffchainLabs_go_bitfield.Bitlist `protobuf:"bytes,1,opt,name=available,proto3" json:"available,omitempty" cast-type:"github.com/OffchainLabs/go-bitfield.Bitlist" ssz-max:"512"`
|
||||
Requests github_com_OffchainLabs_go_bitfield.Bitlist `protobuf:"bytes,2,opt,name=requests,proto3" json:"requests,omitempty" cast-type:"github.com/OffchainLabs/go-bitfield.Bitlist" ssz-max:"512"`
|
||||
unknownFields protoimpl.UnknownFields
|
||||
sizeCache protoimpl.SizeCache
|
||||
}
|
||||
|
||||
func (x *PartialDataColumnPartsMetadata) Reset() {
|
||||
*x = PartialDataColumnPartsMetadata{}
|
||||
mi := &file_proto_prysm_v1alpha1_partial_data_columns_proto_msgTypes[2]
|
||||
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||
ms.StoreMessageInfo(mi)
|
||||
}
|
||||
|
||||
func (x *PartialDataColumnPartsMetadata) String() string {
|
||||
return protoimpl.X.MessageStringOf(x)
|
||||
}
|
||||
|
||||
func (*PartialDataColumnPartsMetadata) ProtoMessage() {}
|
||||
|
||||
func (x *PartialDataColumnPartsMetadata) ProtoReflect() protoreflect.Message {
|
||||
mi := &file_proto_prysm_v1alpha1_partial_data_columns_proto_msgTypes[2]
|
||||
if x != nil {
|
||||
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||
if ms.LoadMessageInfo() == nil {
|
||||
ms.StoreMessageInfo(mi)
|
||||
}
|
||||
return ms
|
||||
}
|
||||
return mi.MessageOf(x)
|
||||
}
|
||||
|
||||
// Deprecated: Use PartialDataColumnPartsMetadata.ProtoReflect.Descriptor instead.
|
||||
func (*PartialDataColumnPartsMetadata) Descriptor() ([]byte, []int) {
|
||||
return file_proto_prysm_v1alpha1_partial_data_columns_proto_rawDescGZIP(), []int{2}
|
||||
}
|
||||
|
||||
func (x *PartialDataColumnPartsMetadata) GetAvailable() github_com_OffchainLabs_go_bitfield.Bitlist {
|
||||
if x != nil {
|
||||
return x.Available
|
||||
}
|
||||
return github_com_OffchainLabs_go_bitfield.Bitlist(nil)
|
||||
}
|
||||
|
||||
func (x *PartialDataColumnPartsMetadata) GetRequests() github_com_OffchainLabs_go_bitfield.Bitlist {
|
||||
if x != nil {
|
||||
return x.Requests
|
||||
}
|
||||
return github_com_OffchainLabs_go_bitfield.Bitlist(nil)
|
||||
}
|
||||
|
||||
var File_proto_prysm_v1alpha1_partial_data_columns_proto protoreflect.FileDescriptor
|
||||
|
||||
var file_proto_prysm_v1alpha1_partial_data_columns_proto_rawDesc = []byte{
|
||||
@@ -199,12 +251,24 @@ var file_proto_prysm_v1alpha1_partial_data_columns_proto_rawDesc = []byte{
|
||||
0x69, 0x6e, 0x63, 0x6c, 0x75, 0x73, 0x69, 0x6f, 0x6e, 0x5f, 0x70, 0x72, 0x6f, 0x6f, 0x66, 0x18,
|
||||
0x03, 0x20, 0x03, 0x28, 0x0c, 0x42, 0x08, 0x8a, 0xb5, 0x18, 0x04, 0x34, 0x2c, 0x33, 0x32, 0x52,
|
||||
0x1c, 0x6b, 0x7a, 0x67, 0x43, 0x6f, 0x6d, 0x6d, 0x69, 0x74, 0x6d, 0x65, 0x6e, 0x74, 0x73, 0x49,
|
||||
0x6e, 0x63, 0x6c, 0x75, 0x73, 0x69, 0x6f, 0x6e, 0x50, 0x72, 0x6f, 0x6f, 0x66, 0x42, 0x3b, 0x5a,
|
||||
0x39, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x4f, 0x66, 0x66, 0x63,
|
||||
0x68, 0x61, 0x69, 0x6e, 0x4c, 0x61, 0x62, 0x73, 0x2f, 0x70, 0x72, 0x79, 0x73, 0x6d, 0x2f, 0x76,
|
||||
0x36, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x70, 0x72, 0x79, 0x73, 0x6d, 0x2f, 0x76, 0x31,
|
||||
0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x3b, 0x65, 0x74, 0x68, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74,
|
||||
0x6f, 0x33,
|
||||
0x6e, 0x63, 0x6c, 0x75, 0x73, 0x69, 0x6f, 0x6e, 0x50, 0x72, 0x6f, 0x6f, 0x66, 0x22, 0xca, 0x01,
|
||||
0x0a, 0x1e, 0x50, 0x61, 0x72, 0x74, 0x69, 0x61, 0x6c, 0x44, 0x61, 0x74, 0x61, 0x43, 0x6f, 0x6c,
|
||||
0x75, 0x6d, 0x6e, 0x50, 0x61, 0x72, 0x74, 0x73, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61,
|
||||
0x12, 0x54, 0x0a, 0x09, 0x61, 0x76, 0x61, 0x69, 0x6c, 0x61, 0x62, 0x6c, 0x65, 0x18, 0x01, 0x20,
|
||||
0x01, 0x28, 0x0c, 0x42, 0x36, 0x82, 0xb5, 0x18, 0x2b, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e,
|
||||
0x63, 0x6f, 0x6d, 0x2f, 0x4f, 0x66, 0x66, 0x63, 0x68, 0x61, 0x69, 0x6e, 0x4c, 0x61, 0x62, 0x73,
|
||||
0x2f, 0x67, 0x6f, 0x2d, 0x62, 0x69, 0x74, 0x66, 0x69, 0x65, 0x6c, 0x64, 0x2e, 0x42, 0x69, 0x74,
|
||||
0x6c, 0x69, 0x73, 0x74, 0x92, 0xb5, 0x18, 0x03, 0x35, 0x31, 0x32, 0x52, 0x09, 0x61, 0x76, 0x61,
|
||||
0x69, 0x6c, 0x61, 0x62, 0x6c, 0x65, 0x12, 0x52, 0x0a, 0x08, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73,
|
||||
0x74, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x42, 0x36, 0x82, 0xb5, 0x18, 0x2b, 0x67, 0x69,
|
||||
0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x4f, 0x66, 0x66, 0x63, 0x68, 0x61, 0x69,
|
||||
0x6e, 0x4c, 0x61, 0x62, 0x73, 0x2f, 0x67, 0x6f, 0x2d, 0x62, 0x69, 0x74, 0x66, 0x69, 0x65, 0x6c,
|
||||
0x64, 0x2e, 0x42, 0x69, 0x74, 0x6c, 0x69, 0x73, 0x74, 0x92, 0xb5, 0x18, 0x03, 0x35, 0x31, 0x32,
|
||||
0x52, 0x08, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x73, 0x42, 0x3b, 0x5a, 0x39, 0x67, 0x69,
|
||||
0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x4f, 0x66, 0x66, 0x63, 0x68, 0x61, 0x69,
|
||||
0x6e, 0x4c, 0x61, 0x62, 0x73, 0x2f, 0x70, 0x72, 0x79, 0x73, 0x6d, 0x2f, 0x76, 0x36, 0x2f, 0x70,
|
||||
0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x70, 0x72, 0x79, 0x73, 0x6d, 0x2f, 0x76, 0x31, 0x61, 0x6c, 0x70,
|
||||
0x68, 0x61, 0x31, 0x3b, 0x65, 0x74, 0x68, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
|
||||
}
|
||||
|
||||
var (
|
||||
@@ -219,15 +283,16 @@ func file_proto_prysm_v1alpha1_partial_data_columns_proto_rawDescGZIP() []byte {
|
||||
return file_proto_prysm_v1alpha1_partial_data_columns_proto_rawDescData
|
||||
}
|
||||
|
||||
var file_proto_prysm_v1alpha1_partial_data_columns_proto_msgTypes = make([]protoimpl.MessageInfo, 2)
|
||||
var file_proto_prysm_v1alpha1_partial_data_columns_proto_msgTypes = make([]protoimpl.MessageInfo, 3)
|
||||
var file_proto_prysm_v1alpha1_partial_data_columns_proto_goTypes = []any{
|
||||
(*PartialDataColumnSidecar)(nil), // 0: ethereum.eth.v1alpha1.PartialDataColumnSidecar
|
||||
(*PartialDataColumnHeader)(nil), // 1: ethereum.eth.v1alpha1.PartialDataColumnHeader
|
||||
(*SignedBeaconBlockHeader)(nil), // 2: ethereum.eth.v1alpha1.SignedBeaconBlockHeader
|
||||
(*PartialDataColumnSidecar)(nil), // 0: ethereum.eth.v1alpha1.PartialDataColumnSidecar
|
||||
(*PartialDataColumnHeader)(nil), // 1: ethereum.eth.v1alpha1.PartialDataColumnHeader
|
||||
(*PartialDataColumnPartsMetadata)(nil), // 2: ethereum.eth.v1alpha1.PartialDataColumnPartsMetadata
|
||||
(*SignedBeaconBlockHeader)(nil), // 3: ethereum.eth.v1alpha1.SignedBeaconBlockHeader
|
||||
}
|
||||
var file_proto_prysm_v1alpha1_partial_data_columns_proto_depIdxs = []int32{
|
||||
1, // 0: ethereum.eth.v1alpha1.PartialDataColumnSidecar.header:type_name -> ethereum.eth.v1alpha1.PartialDataColumnHeader
|
||||
2, // 1: ethereum.eth.v1alpha1.PartialDataColumnHeader.signed_block_header:type_name -> ethereum.eth.v1alpha1.SignedBeaconBlockHeader
|
||||
3, // 1: ethereum.eth.v1alpha1.PartialDataColumnHeader.signed_block_header:type_name -> ethereum.eth.v1alpha1.SignedBeaconBlockHeader
|
||||
2, // [2:2] is the sub-list for method output_type
|
||||
2, // [2:2] is the sub-list for method input_type
|
||||
2, // [2:2] is the sub-list for extension type_name
|
||||
@@ -247,7 +312,7 @@ func file_proto_prysm_v1alpha1_partial_data_columns_proto_init() {
|
||||
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
|
||||
RawDescriptor: file_proto_prysm_v1alpha1_partial_data_columns_proto_rawDesc,
|
||||
NumEnums: 0,
|
||||
NumMessages: 2,
|
||||
NumMessages: 3,
|
||||
NumExtensions: 0,
|
||||
NumServices: 0,
|
||||
},
|
||||
|
||||
@@ -44,3 +44,14 @@ message PartialDataColumnHeader {
|
||||
SignedBeaconBlockHeader signed_block_header = 2;
|
||||
repeated bytes kzg_commitments_inclusion_proof = 3 [(ethereum.eth.ext.ssz_size) = "kzg_commitments_inclusion_proof_depth.size,32"];
|
||||
}
|
||||
|
||||
message PartialDataColumnPartsMetadata {
|
||||
bytes available = 1 [
|
||||
(ethereum.eth.ext.ssz_max) = "max_blob_commitments_bitmap.size",
|
||||
(ethereum.eth.ext.cast_type) = "github.com/OffchainLabs/go-bitfield.Bitlist"
|
||||
];
|
||||
bytes requests = 2 [
|
||||
(ethereum.eth.ext.ssz_max) = "max_blob_commitments_bitmap.size",
|
||||
(ethereum.eth.ext.cast_type) = "github.com/OffchainLabs/go-bitfield.Bitlist"
|
||||
];
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user