mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-02-06 11:04:57 -05:00
Compare commits
2 Commits
feat/proce
...
feat/reque
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
71d385aa0c | ||
|
|
4836bf6dd2 |
@@ -22,7 +22,6 @@ go_library(
|
||||
"@com_github_pkg_errors//:go_default_library",
|
||||
"@com_github_prometheus_client_golang//prometheus:go_default_library",
|
||||
"@com_github_prometheus_client_golang//prometheus/promauto:go_default_library",
|
||||
"@com_github_prysmaticlabs_go_bitfield//:go_default_library",
|
||||
"@com_github_sirupsen_logrus//:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
@@ -7,7 +7,6 @@ import (
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/OffchainLabs/go-bitfield"
|
||||
"github.com/OffchainLabs/prysm/v7/config/params"
|
||||
"github.com/OffchainLabs/prysm/v7/consensus-types/blocks"
|
||||
"github.com/OffchainLabs/prysm/v7/internal/logrusadapter"
|
||||
@@ -159,15 +158,12 @@ func (p *PartialColumnBroadcaster) AppendPubSubOpts(opts []pubsub.Option) []pubs
|
||||
pubsub.WithPartialMessagesExtension(&partialmessages.PartialMessagesExtension{
|
||||
Logger: slogger,
|
||||
MergePartsMetadata: func(topic string, left, right partialmessages.PartsMetadata) partialmessages.PartsMetadata {
|
||||
if len(left) == 0 {
|
||||
return right
|
||||
}
|
||||
merged, err := bitfield.Bitlist(left).Or(bitfield.Bitlist(right))
|
||||
merged, err := blocks.MergePartsMetadata(left, right)
|
||||
if err != nil {
|
||||
p.logger.Warn("Failed to merge bitfields", "err", err, "left", left, "right", right)
|
||||
return left
|
||||
}
|
||||
return partialmessages.PartsMetadata(merged)
|
||||
return merged
|
||||
},
|
||||
ValidateRPC: func(from peer.ID, rpc *pubsub_pb.PartialMessagesExtension) error {
|
||||
// TODO. Add some basic and fast sanity checks
|
||||
|
||||
@@ -86,14 +86,32 @@ func (p *PartialDataColumn) GroupID() []byte {
|
||||
return p.groupID
|
||||
}
|
||||
func (p *PartialDataColumn) PartialMessageBytes(metadata partialmessages.PartsMetadata) ([]byte, error) {
|
||||
peerHas := bitfield.Bitlist(metadata)
|
||||
if peerHas.Len() != p.Included.Len() {
|
||||
numCommitments := p.Included.Len()
|
||||
peerAvailable, peerRequests, isNewFormat := ParseMetadata(metadata, numCommitments)
|
||||
if peerAvailable.Len() != numCommitments {
|
||||
return nil, errors.New("metadata length does not match expected length")
|
||||
}
|
||||
if isNewFormat && peerRequests.Len() != numCommitments {
|
||||
return nil, errors.New("metadata length does not match expected length")
|
||||
}
|
||||
|
||||
// shouldSend returns true if we should send cell i to this peer.
|
||||
shouldSend := func(i uint64) bool {
|
||||
if !p.Included.BitAt(i) {
|
||||
return false
|
||||
}
|
||||
if peerAvailable.BitAt(i) {
|
||||
return false
|
||||
}
|
||||
if isNewFormat && !peerRequests.BitAt(i) {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
var cellsToReturn int
|
||||
for i := range peerHas.Len() {
|
||||
if !peerHas.BitAt(i) && p.Included.BitAt(i) {
|
||||
for i := range numCommitments {
|
||||
if shouldSend(i) {
|
||||
cellsToReturn++
|
||||
}
|
||||
}
|
||||
@@ -101,14 +119,14 @@ func (p *PartialDataColumn) PartialMessageBytes(metadata partialmessages.PartsMe
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
included := bitfield.NewBitlist(p.Included.Len())
|
||||
included := bitfield.NewBitlist(numCommitments)
|
||||
outMessage := ethpb.PartialDataColumnSidecar{
|
||||
CellsPresentBitmap: included,
|
||||
PartialColumn: make([][]byte, 0, cellsToReturn),
|
||||
KzgProofs: make([][]byte, 0, cellsToReturn),
|
||||
}
|
||||
for i := range peerHas.Len() {
|
||||
if peerHas.BitAt(i) || !p.Included.BitAt(i) {
|
||||
for i := range numCommitments {
|
||||
if !shouldSend(i) {
|
||||
continue
|
||||
}
|
||||
included.SetBitAt(i, true)
|
||||
@@ -124,6 +142,7 @@ func (p *PartialDataColumn) PartialMessageBytes(metadata partialmessages.PartsMe
|
||||
return marshalled, nil
|
||||
}
|
||||
|
||||
// TODO: This method will be removed after upgrading to the latest Gossipsub.
|
||||
func (p *PartialDataColumn) EagerPartialMessageBytes() ([]byte, partialmessages.PartsMetadata, error) {
|
||||
// TODO: do we want to send this once per groupID per peer
|
||||
// Eagerly push the PartialDataColumnHeader
|
||||
@@ -148,7 +167,83 @@ func (p *PartialDataColumn) EagerPartialMessageBytes() ([]byte, partialmessages.
|
||||
}
|
||||
|
||||
func (p *PartialDataColumn) PartsMetadata() partialmessages.PartsMetadata {
|
||||
return partialmessages.PartsMetadata(p.Included)
|
||||
n := p.Included.Len()
|
||||
requests := bitfield.NewBitlist(n)
|
||||
for i := range n {
|
||||
requests.SetBitAt(i, true)
|
||||
}
|
||||
return combinedMetadata(p.Included, requests)
|
||||
}
|
||||
|
||||
// ParseMetadata splits PartsMetadata into available and request bitlists.
|
||||
// Old format (len==N): returns (metadata, nil, false)
|
||||
// New format (len==2N): returns (first N bits, next N bits, true)
|
||||
func ParseMetadata(metadata partialmessages.PartsMetadata, numCommitments uint64) (available bitfield.Bitlist, requests bitfield.Bitlist, isNewFormat bool) {
|
||||
bl := bitfield.Bitlist(metadata)
|
||||
if bl.Len() == 2*numCommitments {
|
||||
available = bitfield.NewBitlist(numCommitments)
|
||||
requests = bitfield.NewBitlist(numCommitments)
|
||||
for i := range numCommitments {
|
||||
available.SetBitAt(i, bl.BitAt(i))
|
||||
requests.SetBitAt(i, bl.BitAt(i+numCommitments))
|
||||
}
|
||||
return available, requests, true
|
||||
}
|
||||
return bl, nil, false
|
||||
}
|
||||
|
||||
func combinedMetadata(available, requests bitfield.Bitlist) partialmessages.PartsMetadata {
|
||||
n := available.Len()
|
||||
combined := bitfield.NewBitlist(2 * n)
|
||||
for i := range n {
|
||||
combined.SetBitAt(i, available.BitAt(i))
|
||||
combined.SetBitAt(i+n, requests.BitAt(i))
|
||||
}
|
||||
return partialmessages.PartsMetadata(combined)
|
||||
}
|
||||
|
||||
// MergePartsMetadata merges two PartsMetadata values, handling old (N) and new (2N) formats.
|
||||
// If lengths differ, the old-format (N) is extended to new-format (2N) with all request bits set to 1.
|
||||
// TODO: This method will be removed after upgrading to the latest Gossipsub.
|
||||
func MergePartsMetadata(left, right partialmessages.PartsMetadata) (partialmessages.PartsMetadata, error) {
|
||||
if len(left) == 0 {
|
||||
return right, nil
|
||||
}
|
||||
if len(right) == 0 {
|
||||
return left, nil
|
||||
}
|
||||
leftBl := bitfield.Bitlist(left)
|
||||
rightBl := bitfield.Bitlist(right)
|
||||
if leftBl.Len() != rightBl.Len() {
|
||||
leftBl, rightBl = normalizeMetadataLengths(leftBl, rightBl)
|
||||
}
|
||||
merged, err := leftBl.Or(rightBl)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return partialmessages.PartsMetadata(merged), nil
|
||||
}
|
||||
|
||||
// normalizeMetadataLengths extends old-format (N) bitlists to new-format (2N)
|
||||
// by adding all-1 request bits. Returns both bitlists at the same length.
|
||||
func normalizeMetadataLengths(left, right bitfield.Bitlist) (bitfield.Bitlist, bitfield.Bitlist) {
|
||||
if left.Len() < right.Len() {
|
||||
left = extendToNewFormat(left)
|
||||
} else {
|
||||
right = extendToNewFormat(right)
|
||||
}
|
||||
return left, right
|
||||
}
|
||||
|
||||
// extendToNewFormat converts old-format (N bits) to new-format (2N bits) with all request bits set to 1.
|
||||
func extendToNewFormat(bl bitfield.Bitlist) bitfield.Bitlist {
|
||||
n := bl.Len()
|
||||
extended := bitfield.NewBitlist(2 * n)
|
||||
for i := range n {
|
||||
extended.SetBitAt(i, bl.BitAt(i))
|
||||
extended.SetBitAt(i+n, true)
|
||||
}
|
||||
return extended
|
||||
}
|
||||
|
||||
// CellsToVerifyFromPartialMessage returns cells from the partial message that need to be verified.
|
||||
|
||||
@@ -5,7 +5,6 @@ import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/OffchainLabs/go-bitfield"
|
||||
"github.com/OffchainLabs/prysm/v7/consensus-types/blocks"
|
||||
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
|
||||
"github.com/OffchainLabs/prysm/v7/testing/util"
|
||||
@@ -20,7 +19,11 @@ type invariantChecker struct {
|
||||
var _ partialmessages.InvariantChecker[*blocks.PartialDataColumn] = (*invariantChecker)(nil)
|
||||
|
||||
func (i *invariantChecker) MergePartsMetadata(left, right partialmessages.PartsMetadata) partialmessages.PartsMetadata {
|
||||
return partialmessages.MergeBitmap(left, right)
|
||||
merged, err := blocks.MergePartsMetadata(left, right)
|
||||
if err != nil {
|
||||
i.t.Fatal(err)
|
||||
}
|
||||
return merged
|
||||
}
|
||||
|
||||
func (i *invariantChecker) SplitIntoParts(in *blocks.PartialDataColumn) ([]*blocks.PartialDataColumn, error) {
|
||||
@@ -112,9 +115,11 @@ func (i *invariantChecker) ExtendFromBytes(a *blocks.PartialDataColumn, data []b
|
||||
}
|
||||
|
||||
func (i *invariantChecker) ShouldRequest(a *blocks.PartialDataColumn, from peer.ID, partsMetadata []byte) bool {
|
||||
peerHas := bitfield.Bitlist(partsMetadata)
|
||||
for i := range peerHas.Len() {
|
||||
if peerHas.BitAt(i) && !a.Included.BitAt(i) {
|
||||
numCommitments := uint64(len(a.KzgCommitments))
|
||||
available, requests, isNew := blocks.ParseMetadata(partsMetadata, numCommitments)
|
||||
for idx := range available.Len() {
|
||||
peerHasAndWilling := available.BitAt(idx) && (!isNew || requests.BitAt(idx))
|
||||
if peerHasAndWilling && !a.Included.BitAt(idx) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user