mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-02-06 11:04:57 -05:00
Compare commits
4 Commits
feat/add-r
...
feat/proce
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d0d6bab8a0 | ||
|
|
3f6c01fc3b | ||
|
|
c5914ea4d9 | ||
|
|
08d143bd2c |
@@ -24,11 +24,13 @@ var (
|
||||
var (
|
||||
_ ConstructionPopulator = (*BlockReconstructionSource)(nil)
|
||||
_ ConstructionPopulator = (*SidecarReconstructionSource)(nil)
|
||||
_ ConstructionPopulator = (*PartialDataColumnHeaderReconstructionSource)(nil)
|
||||
)
|
||||
|
||||
const (
|
||||
BlockType = "BeaconBlock"
|
||||
SidecarType = "DataColumnSidecar"
|
||||
BlockType = "BeaconBlock"
|
||||
SidecarType = "DataColumnSidecar"
|
||||
PartialDataColumnHeaderType = "PartialDataColumnHeader"
|
||||
)
|
||||
|
||||
type (
|
||||
@@ -55,6 +57,10 @@ type (
|
||||
blocks.VerifiedRODataColumn
|
||||
}
|
||||
|
||||
PartialDataColumnHeaderReconstructionSource struct {
|
||||
*ethpb.PartialDataColumnHeader
|
||||
}
|
||||
|
||||
blockInfo struct {
|
||||
signedBlockHeader *ethpb.SignedBeaconBlockHeader
|
||||
kzgCommitments [][]byte
|
||||
@@ -72,6 +78,11 @@ func PopulateFromSidecar(sidecar blocks.VerifiedRODataColumn) *SidecarReconstruc
|
||||
return &SidecarReconstructionSource{VerifiedRODataColumn: sidecar}
|
||||
}
|
||||
|
||||
// PopulateFromPartialHeader creates a PartialDataColumnHeaderReconstructionSource from a partial header
|
||||
func PopulateFromPartialHeader(header *ethpb.PartialDataColumnHeader) *PartialDataColumnHeaderReconstructionSource {
|
||||
return &PartialDataColumnHeaderReconstructionSource{PartialDataColumnHeader: header}
|
||||
}
|
||||
|
||||
// ValidatorsCustodyRequirement returns the number of custody groups regarding the validator indices attached to the beacon node.
|
||||
// https://github.com/ethereum/consensus-specs/blob/master/specs/fulu/validator.md#validator-custody
|
||||
func ValidatorsCustodyRequirement(state beaconState.ReadOnlyBeaconState, validatorsIndex map[primitives.ValidatorIndex]bool) (uint64, error) {
|
||||
@@ -289,3 +300,43 @@ func (s *SidecarReconstructionSource) extract() (*blockInfo, error) {
|
||||
|
||||
return info, nil
|
||||
}
|
||||
|
||||
// Slot returns the slot from the partial data column header
|
||||
func (p *PartialDataColumnHeaderReconstructionSource) Slot() primitives.Slot {
|
||||
return p.SignedBlockHeader.Header.Slot
|
||||
}
|
||||
|
||||
// Root returns the block root computed from the header
|
||||
func (p *PartialDataColumnHeaderReconstructionSource) Root() [fieldparams.RootLength]byte {
|
||||
root, err := p.SignedBlockHeader.Header.HashTreeRoot()
|
||||
if err != nil {
|
||||
return [fieldparams.RootLength]byte{}
|
||||
}
|
||||
return root
|
||||
}
|
||||
|
||||
// ProposerIndex returns the proposer index from the header
|
||||
func (p *PartialDataColumnHeaderReconstructionSource) ProposerIndex() primitives.ValidatorIndex {
|
||||
return p.SignedBlockHeader.Header.ProposerIndex
|
||||
}
|
||||
|
||||
// Commitments returns the KZG commitments from the header
|
||||
func (p *PartialDataColumnHeaderReconstructionSource) Commitments() ([][]byte, error) {
|
||||
return p.KzgCommitments, nil
|
||||
}
|
||||
|
||||
// Type returns the type of the source
|
||||
func (p *PartialDataColumnHeaderReconstructionSource) Type() string {
|
||||
return PartialDataColumnHeaderType
|
||||
}
|
||||
|
||||
// extract extracts the block information from the partial header
|
||||
func (p *PartialDataColumnHeaderReconstructionSource) extract() (*blockInfo, error) {
|
||||
info := &blockInfo{
|
||||
signedBlockHeader: p.SignedBlockHeader,
|
||||
kzgCommitments: p.KzgCommitments,
|
||||
kzgInclusionProof: p.KzgCommitmentsInclusionProof,
|
||||
}
|
||||
|
||||
return info, nil
|
||||
}
|
||||
|
||||
@@ -267,4 +267,31 @@ func TestReconstructionSource(t *testing.T) {
|
||||
|
||||
require.Equal(t, peerdas.SidecarType, src.Type())
|
||||
})
|
||||
|
||||
t.Run("from partial header", func(t *testing.T) {
|
||||
referenceSidecar := sidecars[0]
|
||||
partialHeader := ðpb.PartialDataColumnHeader{
|
||||
SignedBlockHeader: referenceSidecar.SignedBlockHeader,
|
||||
KzgCommitments: referenceSidecar.KzgCommitments,
|
||||
KzgCommitmentsInclusionProof: referenceSidecar.KzgCommitmentsInclusionProof,
|
||||
}
|
||||
|
||||
src := peerdas.PopulateFromPartialHeader(partialHeader)
|
||||
require.Equal(t, referenceSidecar.SignedBlockHeader.Header.Slot, src.Slot())
|
||||
|
||||
// Compute expected root
|
||||
expectedRoot, err := referenceSidecar.SignedBlockHeader.Header.HashTreeRoot()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, expectedRoot, src.Root())
|
||||
|
||||
require.Equal(t, referenceSidecar.SignedBlockHeader.Header.ProposerIndex, src.ProposerIndex())
|
||||
|
||||
commitments, err := src.Commitments()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 2, len(commitments))
|
||||
require.DeepEqual(t, commitment1, commitments[0])
|
||||
require.DeepEqual(t, commitment2, commitments[1])
|
||||
|
||||
require.Equal(t, peerdas.PartialDataColumnHeaderType, src.Type())
|
||||
})
|
||||
}
|
||||
|
||||
@@ -22,6 +22,7 @@ go_library(
|
||||
"@com_github_pkg_errors//:go_default_library",
|
||||
"@com_github_prometheus_client_golang//prometheus:go_default_library",
|
||||
"@com_github_prometheus_client_golang//prometheus/promauto:go_default_library",
|
||||
"@com_github_prysmaticlabs_go_bitfield//:go_default_library",
|
||||
"@com_github_sirupsen_logrus//:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
@@ -193,9 +193,9 @@ func TestTwoNodePartialColumnExchange(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
defer sub2.Cancel()
|
||||
|
||||
err = broadcaster1.Subscribe(topic1, headerValidator, cellValidator, handler1)
|
||||
err = broadcaster1.Subscribe(topic1, headerValidator, cellValidator, handler1, nil)
|
||||
require.NoError(t, err)
|
||||
err = broadcaster2.Subscribe(topic2, headerValidator, cellValidator, handler2)
|
||||
err = broadcaster2.Subscribe(topic2, headerValidator, cellValidator, handler2, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Wait for mesh to form
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/OffchainLabs/go-bitfield"
|
||||
"github.com/OffchainLabs/prysm/v7/config/params"
|
||||
"github.com/OffchainLabs/prysm/v7/consensus-types/blocks"
|
||||
"github.com/OffchainLabs/prysm/v7/internal/logrusadapter"
|
||||
@@ -46,6 +47,7 @@ func extractColumnIndexFromTopic(topic string) (uint64, error) {
|
||||
// - reject=false, err!=nil: IGNORE - don't penalize, just ignore
|
||||
// - reject=false, err=nil: valid header
|
||||
type HeaderValidator func(header *ethpb.PartialDataColumnHeader) (reject bool, err error)
|
||||
type HeaderHandler func(header *ethpb.PartialDataColumnHeader) error
|
||||
type ColumnValidator func(cells []blocks.CellProofBundle) error
|
||||
|
||||
type PartialColumnBroadcaster struct {
|
||||
@@ -62,6 +64,9 @@ type PartialColumnBroadcaster struct {
|
||||
// map topic -> handler
|
||||
handlers map[string]SubHandler
|
||||
|
||||
// map topic -> headerHandler
|
||||
headerHandlers map[string]HeaderHandler
|
||||
|
||||
// map topic -> *pubsub.Topic
|
||||
topics map[string]*pubsub.Topic
|
||||
|
||||
@@ -108,6 +113,7 @@ type subscribe struct {
|
||||
headerValidator HeaderValidator
|
||||
validator ColumnValidator
|
||||
handler SubHandler
|
||||
headerHandler HeaderHandler
|
||||
}
|
||||
|
||||
type unsubscribe struct {
|
||||
@@ -132,6 +138,7 @@ func NewBroadcaster(logger *logrus.Logger) *PartialColumnBroadcaster {
|
||||
validators: make(map[string]ColumnValidator),
|
||||
headerValidators: make(map[string]HeaderValidator),
|
||||
handlers: make(map[string]SubHandler),
|
||||
headerHandlers: make(map[string]HeaderHandler),
|
||||
topics: make(map[string]*pubsub.Topic),
|
||||
partialMsgStore: make(map[string]map[string]*blocks.PartialDataColumn),
|
||||
groupTTL: make(map[string]int8),
|
||||
@@ -152,12 +159,15 @@ func (p *PartialColumnBroadcaster) AppendPubSubOpts(opts []pubsub.Option) []pubs
|
||||
pubsub.WithPartialMessagesExtension(&partialmessages.PartialMessagesExtension{
|
||||
Logger: slogger,
|
||||
MergePartsMetadata: func(topic string, left, right partialmessages.PartsMetadata) partialmessages.PartsMetadata {
|
||||
merged, err := blocks.MergePartsMetadata(left, right)
|
||||
if len(left) == 0 {
|
||||
return right
|
||||
}
|
||||
merged, err := bitfield.Bitlist(left).Or(bitfield.Bitlist(right))
|
||||
if err != nil {
|
||||
p.logger.Warn("Failed to merge bitfields", "err", err, "left", left, "right", right)
|
||||
return left
|
||||
}
|
||||
return merged
|
||||
return partialmessages.PartsMetadata(merged)
|
||||
},
|
||||
ValidateRPC: func(from peer.ID, rpc *pubsub_pb.PartialMessagesExtension) error {
|
||||
// TODO. Add some basic and fast sanity checks
|
||||
@@ -221,7 +231,7 @@ func (p *PartialColumnBroadcaster) loop() {
|
||||
case requestKindPublish:
|
||||
req.response <- p.publish(req.publish.topic, req.publish.c)
|
||||
case requestKindSubscribe:
|
||||
req.response <- p.subscribe(req.sub.t, req.sub.headerValidator, req.sub.validator, req.sub.handler)
|
||||
req.response <- p.subscribe(req.sub.t, req.sub.headerValidator, req.sub.validator, req.sub.handler, req.sub.headerHandler)
|
||||
case requestKindUnsubscribe:
|
||||
req.response <- p.unsubscribe(req.unsub.topic)
|
||||
case requestKindHandleIncomingRPC:
|
||||
@@ -272,6 +282,7 @@ func (p *PartialColumnBroadcaster) handleIncomingRPC(rpcWithFrom rpcWithFrom) er
|
||||
groupID := rpcWithFrom.GroupID
|
||||
ourDataColumn := p.getDataColumn(topicID, groupID)
|
||||
var shouldRepublish bool
|
||||
var handledHeader bool
|
||||
|
||||
if ourDataColumn == nil && hasMessage {
|
||||
var header *ethpb.PartialDataColumnHeader
|
||||
@@ -305,7 +316,17 @@ func (p *PartialColumnBroadcaster) handleIncomingRPC(rpcWithFrom rpcWithFrom) er
|
||||
// Cache the valid header
|
||||
p.validHeaderCache[string(groupID)] = header
|
||||
|
||||
// TODO: We now have the information we need to call GetBlobsV3, we should do that to see what we have locally.
|
||||
headerHandler, ok := p.headerHandlers[topicID]
|
||||
if !ok || headerHandler == nil {
|
||||
p.logger.Debug("No header handler registered for topic")
|
||||
return nil
|
||||
}
|
||||
err = headerHandler(header)
|
||||
if err != nil {
|
||||
p.logger.Error("Failed to handle header", "err", err)
|
||||
} else {
|
||||
handledHeader = true
|
||||
}
|
||||
}
|
||||
|
||||
columnIndex, err := extractColumnIndexFromTopic(topicID)
|
||||
@@ -404,7 +425,7 @@ func (p *PartialColumnBroadcaster) handleIncomingRPC(rpcWithFrom rpcWithFrom) er
|
||||
logger.Debug("republishing due to parts metadata difference")
|
||||
}
|
||||
|
||||
if shouldRepublish {
|
||||
if shouldRepublish && !handledHeader {
|
||||
err := p.ps.PublishPartialMessage(topicID, ourDataColumn, partialmessages.PublishOptions{})
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -487,7 +508,7 @@ func (p *PartialColumnBroadcaster) publish(topic string, c blocks.PartialDataCol
|
||||
|
||||
type SubHandler func(topic string, col blocks.VerifiedRODataColumn)
|
||||
|
||||
func (p *PartialColumnBroadcaster) Subscribe(t *pubsub.Topic, headerValidator HeaderValidator, validator ColumnValidator, handler SubHandler) error {
|
||||
func (p *PartialColumnBroadcaster) Subscribe(t *pubsub.Topic, headerValidator HeaderValidator, validator ColumnValidator, handler SubHandler, headerHandler HeaderHandler) error {
|
||||
respCh := make(chan error)
|
||||
p.incomingReq <- request{
|
||||
kind: requestKindSubscribe,
|
||||
@@ -496,12 +517,13 @@ func (p *PartialColumnBroadcaster) Subscribe(t *pubsub.Topic, headerValidator He
|
||||
headerValidator: headerValidator,
|
||||
validator: validator,
|
||||
handler: handler,
|
||||
headerHandler: headerHandler,
|
||||
},
|
||||
response: respCh,
|
||||
}
|
||||
return <-respCh
|
||||
}
|
||||
func (p *PartialColumnBroadcaster) subscribe(t *pubsub.Topic, headerValidator HeaderValidator, validator ColumnValidator, handler SubHandler) error {
|
||||
func (p *PartialColumnBroadcaster) subscribe(t *pubsub.Topic, headerValidator HeaderValidator, validator ColumnValidator, handler SubHandler, headerHandler HeaderHandler) error {
|
||||
topic := t.String()
|
||||
if _, ok := p.topics[topic]; ok {
|
||||
return errors.New("already subscribed")
|
||||
@@ -511,6 +533,7 @@ func (p *PartialColumnBroadcaster) subscribe(t *pubsub.Topic, headerValidator He
|
||||
p.headerValidators[topic] = headerValidator
|
||||
p.validators[topic] = validator
|
||||
p.handlers[topic] = handler
|
||||
p.headerHandlers[topic] = headerHandler
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -535,6 +558,6 @@ func (p *PartialColumnBroadcaster) unsubscribe(topic string) error {
|
||||
delete(p.headerValidators, topic)
|
||||
delete(p.validators, topic)
|
||||
delete(p.handlers, topic)
|
||||
|
||||
delete(p.headerHandlers, topic)
|
||||
return t.Close()
|
||||
}
|
||||
|
||||
@@ -74,6 +74,7 @@ type partialSubscribeParameters struct {
|
||||
validateHeader partialdatacolumnbroadcaster.HeaderValidator
|
||||
validate partialdatacolumnbroadcaster.ColumnValidator
|
||||
handle partialdatacolumnbroadcaster.SubHandler
|
||||
handleHeader partialdatacolumnbroadcaster.HeaderHandler
|
||||
}
|
||||
|
||||
// shortTopic is a less verbose version of topic strings used for logging.
|
||||
@@ -360,6 +361,13 @@ func (s *Service) registerSubscribers(nse params.NetworkScheduleEntry) bool {
|
||||
log.WithError(err).Error("Failed to handle verified RO data column subscriber")
|
||||
}
|
||||
},
|
||||
handleHeader: func(header *ethpb.PartialDataColumnHeader) error {
|
||||
ctx, cancel := context.WithTimeout(s.ctx, pubsubMessageTimeout)
|
||||
defer cancel()
|
||||
source := peerdas.PopulateFromPartialHeader(header)
|
||||
log.WithField("slot", source.Slot()).Info("Received data column header")
|
||||
return s.processDataColumnSidecarsFromExecution(ctx, source)
|
||||
},
|
||||
}
|
||||
}
|
||||
s.subscribeWithParameters(subscribeParameters{
|
||||
@@ -643,7 +651,7 @@ func (s *Service) trySubscribeSubnets(t *subnetTracker) {
|
||||
|
||||
if requestPartial {
|
||||
log.Info("Subscribing to partial columns on", topicStr)
|
||||
err = t.partial.broadcaster.Subscribe(topic, t.partial.validateHeader, t.partial.validate, t.partial.handle)
|
||||
err = t.partial.broadcaster.Subscribe(topic, t.partial.validateHeader, t.partial.validate, t.partial.handle, t.partial.handleHeader)
|
||||
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Failed to subscribe to partial column")
|
||||
|
||||
@@ -86,32 +86,14 @@ func (p *PartialDataColumn) GroupID() []byte {
|
||||
return p.groupID
|
||||
}
|
||||
func (p *PartialDataColumn) PartialMessageBytes(metadata partialmessages.PartsMetadata) ([]byte, error) {
|
||||
numCommitments := p.Included.Len()
|
||||
peerAvailable, peerRequests, isNewFormat := ParseMetadata(metadata, numCommitments)
|
||||
if peerAvailable.Len() != numCommitments {
|
||||
peerHas := bitfield.Bitlist(metadata)
|
||||
if peerHas.Len() != p.Included.Len() {
|
||||
return nil, errors.New("metadata length does not match expected length")
|
||||
}
|
||||
if isNewFormat && peerRequests.Len() != numCommitments {
|
||||
return nil, errors.New("metadata length does not match expected length")
|
||||
}
|
||||
|
||||
// shouldSend returns true if we should send cell i to this peer.
|
||||
shouldSend := func(i uint64) bool {
|
||||
if !p.Included.BitAt(i) {
|
||||
return false
|
||||
}
|
||||
if peerAvailable.BitAt(i) {
|
||||
return false
|
||||
}
|
||||
if isNewFormat && !peerRequests.BitAt(i) {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
var cellsToReturn int
|
||||
for i := range numCommitments {
|
||||
if shouldSend(i) {
|
||||
for i := range peerHas.Len() {
|
||||
if !peerHas.BitAt(i) && p.Included.BitAt(i) {
|
||||
cellsToReturn++
|
||||
}
|
||||
}
|
||||
@@ -119,14 +101,14 @@ func (p *PartialDataColumn) PartialMessageBytes(metadata partialmessages.PartsMe
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
included := bitfield.NewBitlist(numCommitments)
|
||||
included := bitfield.NewBitlist(p.Included.Len())
|
||||
outMessage := ethpb.PartialDataColumnSidecar{
|
||||
CellsPresentBitmap: included,
|
||||
PartialColumn: make([][]byte, 0, cellsToReturn),
|
||||
KzgProofs: make([][]byte, 0, cellsToReturn),
|
||||
}
|
||||
for i := range numCommitments {
|
||||
if !shouldSend(i) {
|
||||
for i := range peerHas.Len() {
|
||||
if peerHas.BitAt(i) || !p.Included.BitAt(i) {
|
||||
continue
|
||||
}
|
||||
included.SetBitAt(i, true)
|
||||
@@ -142,7 +124,6 @@ func (p *PartialDataColumn) PartialMessageBytes(metadata partialmessages.PartsMe
|
||||
return marshalled, nil
|
||||
}
|
||||
|
||||
// TODO: This method will be removed after upgrading to the latest Gossipsub.
|
||||
func (p *PartialDataColumn) EagerPartialMessageBytes() ([]byte, partialmessages.PartsMetadata, error) {
|
||||
// TODO: do we want to send this once per groupID per peer
|
||||
// Eagerly push the PartialDataColumnHeader
|
||||
@@ -167,80 +148,7 @@ func (p *PartialDataColumn) EagerPartialMessageBytes() ([]byte, partialmessages.
|
||||
}
|
||||
|
||||
func (p *PartialDataColumn) PartsMetadata() partialmessages.PartsMetadata {
|
||||
n := p.Included.Len()
|
||||
requests := bitfield.NewBitlist(n)
|
||||
for i := range n {
|
||||
requests.SetBitAt(i, true)
|
||||
}
|
||||
return combinedMetadata(p.Included, requests)
|
||||
}
|
||||
|
||||
// ParseMetadata splits PartsMetadata into available and request bitlists.
|
||||
// Old format (len==N): returns (metadata, nil, false)
|
||||
// New format (len==2N): returns (first N bits, next N bits, true)
|
||||
func ParseMetadata(metadata partialmessages.PartsMetadata, numCommitments uint64) (available bitfield.Bitlist, requests bitfield.Bitlist, isNewFormat bool) {
|
||||
bl := bitfield.Bitlist(metadata)
|
||||
if bl.Len() == 2*numCommitments {
|
||||
available = bitfield.NewBitlist(numCommitments)
|
||||
requests = bitfield.NewBitlist(numCommitments)
|
||||
for i := range numCommitments {
|
||||
available.SetBitAt(i, bl.BitAt(i))
|
||||
requests.SetBitAt(i, bl.BitAt(i+numCommitments))
|
||||
}
|
||||
return available, requests, true
|
||||
}
|
||||
return bl, nil, false
|
||||
}
|
||||
|
||||
func combinedMetadata(available, requests bitfield.Bitlist) partialmessages.PartsMetadata {
|
||||
n := available.Len()
|
||||
combined := bitfield.NewBitlist(2 * n)
|
||||
for i := range n {
|
||||
combined.SetBitAt(i, available.BitAt(i))
|
||||
combined.SetBitAt(i+n, requests.BitAt(i))
|
||||
}
|
||||
return partialmessages.PartsMetadata(combined)
|
||||
}
|
||||
|
||||
// MergePartsMetadata merges two PartsMetadata values, handling old (N) and new (2N) formats.
|
||||
// If lengths differ, the old-format (N) is extended to new-format (2N) with all request bits set to 1.
|
||||
// TODO: This method will be removed after upgrading to the latest Gossipsub.
|
||||
func MergePartsMetadata(left, right partialmessages.PartsMetadata) (partialmessages.PartsMetadata, error) {
|
||||
if len(left) == 0 {
|
||||
return right, nil
|
||||
}
|
||||
if len(right) == 0 {
|
||||
return left, nil
|
||||
}
|
||||
leftBl := bitfield.Bitlist(left)
|
||||
rightBl := bitfield.Bitlist(right)
|
||||
if leftBl.Len() != rightBl.Len() {
|
||||
leftBl, rightBl = normalizeMetadataLengths(leftBl, rightBl)
|
||||
}
|
||||
merged, err := leftBl.Or(rightBl)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return partialmessages.PartsMetadata(merged), nil
|
||||
}
|
||||
|
||||
func normalizeMetadataLengths(left, right bitfield.Bitlist) (bitfield.Bitlist, bitfield.Bitlist) {
|
||||
if left.Len() < right.Len() {
|
||||
left = extendToNewFormat(left)
|
||||
} else {
|
||||
right = extendToNewFormat(right)
|
||||
}
|
||||
return left, right
|
||||
}
|
||||
|
||||
func extendToNewFormat(bl bitfield.Bitlist) bitfield.Bitlist {
|
||||
n := bl.Len()
|
||||
extended := bitfield.NewBitlist(2 * n)
|
||||
for i := range n {
|
||||
extended.SetBitAt(i, bl.BitAt(i))
|
||||
extended.SetBitAt(i+n, true)
|
||||
}
|
||||
return extended
|
||||
return partialmessages.PartsMetadata(p.Included)
|
||||
}
|
||||
|
||||
// CellsToVerifyFromPartialMessage returns cells from the partial message that need to be verified.
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/OffchainLabs/go-bitfield"
|
||||
"github.com/OffchainLabs/prysm/v7/consensus-types/blocks"
|
||||
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
|
||||
"github.com/OffchainLabs/prysm/v7/testing/util"
|
||||
@@ -19,11 +20,7 @@ type invariantChecker struct {
|
||||
var _ partialmessages.InvariantChecker[*blocks.PartialDataColumn] = (*invariantChecker)(nil)
|
||||
|
||||
func (i *invariantChecker) MergePartsMetadata(left, right partialmessages.PartsMetadata) partialmessages.PartsMetadata {
|
||||
merged, err := blocks.MergePartsMetadata(left, right)
|
||||
if err != nil {
|
||||
i.t.Fatal(err)
|
||||
}
|
||||
return merged
|
||||
return partialmessages.MergeBitmap(left, right)
|
||||
}
|
||||
|
||||
func (i *invariantChecker) SplitIntoParts(in *blocks.PartialDataColumn) ([]*blocks.PartialDataColumn, error) {
|
||||
@@ -115,11 +112,9 @@ func (i *invariantChecker) ExtendFromBytes(a *blocks.PartialDataColumn, data []b
|
||||
}
|
||||
|
||||
func (i *invariantChecker) ShouldRequest(a *blocks.PartialDataColumn, from peer.ID, partsMetadata []byte) bool {
|
||||
numCommitments := uint64(len(a.KzgCommitments))
|
||||
available, requests, isNew := blocks.ParseMetadata(partsMetadata, numCommitments)
|
||||
for idx := range available.Len() {
|
||||
peerHasAndWilling := available.BitAt(idx) && (!isNew || requests.BitAt(idx))
|
||||
if peerHasAndWilling && !a.Included.BitAt(idx) {
|
||||
peerHas := bitfield.Bitlist(partsMetadata)
|
||||
for i := range peerHas.Len() {
|
||||
if peerHas.BitAt(i) && !a.Included.BitAt(i) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user