Compare commits

...

1 Commits

Author SHA1 Message Date
Manu NALEPA
33e6ff416b PeerDAS: Implement peer sampling. 2025-06-27 14:50:25 +02:00
8 changed files with 1319 additions and 0 deletions

View File

@@ -7,6 +7,7 @@ go_library(
"info.go",
"metrics.go",
"p2p_interface.go",
"peer_sampling.go",
"reconstruction.go",
"validator.go",
"verification.go",
@@ -44,6 +45,7 @@ go_test(
"das_core_test.go",
"info_test.go",
"p2p_interface_test.go",
"peer_sampling_test.go",
"reconstruction_test.go",
"utils_test.go",
"validator_test.go",

View File

@@ -0,0 +1,56 @@
package peerdas
import (
"math/big"
"github.com/OffchainLabs/prysm/v6/config/params"
)
// ExtendedSampleCount computes, for a given number of samples per slot and allowed failures the
// number of samples we should actually query from peers.
// https://github.com/ethereum/consensus-specs/blob/v1.5.0-beta.5/specs/fulu/peer-sampling.md#get_extended_sample_count
func ExtendedSampleCount(samplesPerSlot, allowedFailures uint64) uint64 {
// Retrieve the columns count
columnsCount := params.BeaconConfig().NumberOfColumns
// If half of the columns are missing, we are able to reconstruct the data.
// If half of the columns + 1 are missing, we are not able to reconstruct the data.
// This is the smallest worst case.
worstCaseMissing := columnsCount/2 + 1
// Compute the false positive threshold.
falsePositiveThreshold := HypergeomCDF(0, columnsCount, worstCaseMissing, samplesPerSlot)
var sampleCount uint64
// Finally, compute the extended sample count.
for sampleCount = samplesPerSlot; sampleCount < columnsCount+1; sampleCount++ {
if HypergeomCDF(allowedFailures, columnsCount, worstCaseMissing, sampleCount) <= falsePositiveThreshold {
break
}
}
return sampleCount
}
// HypergeomCDF computes the hypergeometric cumulative distribution function.
// https://en.wikipedia.org/wiki/Hypergeometric_distribution
func HypergeomCDF(k, M, n, N uint64) float64 {
denominatorInt := new(big.Int).Binomial(int64(M), int64(N)) // lint:ignore uintcast
denominator := new(big.Float).SetInt(denominatorInt)
rBig := big.NewFloat(0)
for i := uint64(0); i < k+1; i++ {
a := new(big.Int).Binomial(int64(n), int64(i)) // lint:ignore uintcast
b := new(big.Int).Binomial(int64(M-n), int64(N-i))
numeratorInt := new(big.Int).Mul(a, b)
numerator := new(big.Float).SetInt(numeratorInt)
item := new(big.Float).Quo(numerator, denominator)
rBig.Add(rBig, item)
}
r, _ := rBig.Float64()
return r
}

View File

@@ -0,0 +1,60 @@
package peerdas_test
import (
"testing"
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/peerdas"
"github.com/OffchainLabs/prysm/v6/testing/require"
)
func TestExtendedSampleCount(t *testing.T) {
const samplesPerSlot = 16
testCases := []struct {
name string
allowedMissings uint64
extendedSampleCount uint64
}{
{name: "allowedMissings=0", allowedMissings: 0, extendedSampleCount: 16},
{name: "allowedMissings=1", allowedMissings: 1, extendedSampleCount: 20},
{name: "allowedMissings=2", allowedMissings: 2, extendedSampleCount: 24},
{name: "allowedMissings=3", allowedMissings: 3, extendedSampleCount: 27},
{name: "allowedMissings=4", allowedMissings: 4, extendedSampleCount: 29},
{name: "allowedMissings=5", allowedMissings: 5, extendedSampleCount: 32},
{name: "allowedMissings=6", allowedMissings: 6, extendedSampleCount: 35},
{name: "allowedMissings=7", allowedMissings: 7, extendedSampleCount: 37},
{name: "allowedMissings=8", allowedMissings: 8, extendedSampleCount: 40},
{name: "allowedMissings=9", allowedMissings: 9, extendedSampleCount: 42},
{name: "allowedMissings=10", allowedMissings: 10, extendedSampleCount: 44},
{name: "allowedMissings=11", allowedMissings: 11, extendedSampleCount: 47},
{name: "allowedMissings=12", allowedMissings: 12, extendedSampleCount: 49},
{name: "allowedMissings=13", allowedMissings: 13, extendedSampleCount: 51},
{name: "allowedMissings=14", allowedMissings: 14, extendedSampleCount: 53},
{name: "allowedMissings=15", allowedMissings: 15, extendedSampleCount: 55},
{name: "allowedMissings=16", allowedMissings: 16, extendedSampleCount: 57},
{name: "allowedMissings=17", allowedMissings: 17, extendedSampleCount: 59},
{name: "allowedMissings=18", allowedMissings: 18, extendedSampleCount: 61},
{name: "allowedMissings=19", allowedMissings: 19, extendedSampleCount: 63},
{name: "allowedMissings=20", allowedMissings: 20, extendedSampleCount: 65},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
result := peerdas.ExtendedSampleCount(samplesPerSlot, tc.allowedMissings)
require.Equal(t, tc.extendedSampleCount, result)
})
}
}
func TestHypergeomCDF(t *testing.T) {
// Test case from https://en.wikipedia.org/wiki/Hypergeometric_distribution
// Population size: 1000, number of successes in population: 500, sample size: 10, number of successes in sample: 5
// Expected result: 0.072
const (
expected = 0.0796665913283742
margin = 0.000001
)
actual := peerdas.HypergeomCDF(5, 128, 65, 16)
require.Equal(t, true, expected-margin <= actual && actual <= expected+margin)
}

View File

@@ -7,6 +7,7 @@ go_library(
"block_batcher.go",
"broadcast_bls_changes.go",
"context.go",
"data_columns_sampling.go",
"deadlines.go",
"decode_pubsub.go",
"doc.go",
@@ -78,6 +79,7 @@ go_library(
"//beacon-chain/core/feed/state:go_default_library",
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/core/light-client:go_default_library",
"//beacon-chain/core/peerdas:go_default_library",
"//beacon-chain/core/signing:go_default_library",
"//beacon-chain/core/transition:go_default_library",
"//beacon-chain/core/transition/interop:go_default_library",
@@ -162,6 +164,7 @@ go_test(
"block_batcher_test.go",
"broadcast_bls_changes_test.go",
"context_test.go",
"data_columns_sampling_test.go",
"decode_pubsub_test.go",
"error_test.go",
"fork_watcher_test.go",
@@ -207,13 +210,16 @@ go_test(
deps = [
"//async/abool:go_default_library",
"//beacon-chain/blockchain:go_default_library",
"//beacon-chain/blockchain/kzg:go_default_library",
"//beacon-chain/blockchain/testing:go_default_library",
"//beacon-chain/cache:go_default_library",
"//beacon-chain/core/altair:go_default_library",
"//beacon-chain/core/feed:go_default_library",
"//beacon-chain/core/feed/operation:go_default_library",
"//beacon-chain/core/feed/state:go_default_library",
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/core/light-client:go_default_library",
"//beacon-chain/core/peerdas:go_default_library",
"//beacon-chain/core/signing:go_default_library",
"//beacon-chain/core/time:go_default_library",
"//beacon-chain/core/transition:go_default_library",
@@ -265,6 +271,8 @@ go_test(
"//testing/util:go_default_library",
"//time:go_default_library",
"//time/slots:go_default_library",
"@com_github_consensys_gnark_crypto//ecc/bls12-381/fr:go_default_library",
"@com_github_crate_crypto_go_kzg_4844//:go_default_library",
"@com_github_d4l3k_messagediff//:go_default_library",
"@com_github_ethereum_go_ethereum//common:go_default_library",
"@com_github_ethereum_go_ethereum//core/types:go_default_library",
@@ -272,6 +280,7 @@ go_test(
"@com_github_golang_snappy//:go_default_library",
"@com_github_libp2p_go_libp2p//:go_default_library",
"@com_github_libp2p_go_libp2p//core:go_default_library",
"@com_github_libp2p_go_libp2p//core/crypto:go_default_library",
"@com_github_libp2p_go_libp2p//core/network:go_default_library",
"@com_github_libp2p_go_libp2p//core/peer:go_default_library",
"@com_github_libp2p_go_libp2p//core/protocol:go_default_library",

View File

@@ -0,0 +1,627 @@
package sync
import (
"context"
"fmt"
"slices"
"sync"
"time"
"github.com/OffchainLabs/prysm/v6/async"
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/feed"
statefeed "github.com/OffchainLabs/prysm/v6/beacon-chain/core/feed/state"
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/peerdas"
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p"
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/types"
"github.com/OffchainLabs/prysm/v6/beacon-chain/startup"
"github.com/OffchainLabs/prysm/v6/beacon-chain/verification"
"github.com/OffchainLabs/prysm/v6/config/params"
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v6/crypto/rand"
eth "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v6/runtime/version"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
const PeerRefreshInterval = 1 * time.Minute
type roundSummary struct {
RequestedColumns []uint64
MissingColumns map[uint64]bool
}
// DataColumnSampler defines the interface for sampling data columns from peers for requested block root and samples count.
type DataColumnSampler interface {
// Run starts the data column sampling service.
Run(ctx context.Context)
}
var _ DataColumnSampler = (*dataColumnSampler1D)(nil)
// dataColumnSampler1D implements the DataColumnSampler interface for PeerDAS 1D.
type dataColumnSampler1D struct {
sync.RWMutex
p2p p2p.P2P
clock *startup.Clock
ctxMap ContextByteVersions
stateNotifier statefeed.Notifier
// nonCustodyGroups is a set of groups that are not custodied by the node.
nonCustodyGroups map[uint64]bool
// groupsByPeer maps a peer to the groups it is responsible for custody.
groupsByPeer map[peer.ID]map[uint64]bool
// peersByCustodyGroup maps a group to the peer responsible for custody.
peersByCustodyGroup map[uint64]map[peer.ID]bool
// columnVerifier verifies a column according to the specified requirements.
columnVerifier verification.NewDataColumnsVerifier
// custodyInfo contains the custody information of the node.
custodyInfo *peerdas.CustodyInfo
}
// newDataColumnSampler1D creates a new 1D data column sampler.
func newDataColumnSampler1D(
p2p p2p.P2P,
clock *startup.Clock,
ctxMap ContextByteVersions,
stateNotifier statefeed.Notifier,
colVerifier verification.NewDataColumnsVerifier,
custodyInfo *peerdas.CustodyInfo,
) *dataColumnSampler1D {
numberOfCustodyGroups := params.BeaconConfig().NumberOfCustodyGroups
peersByCustodyGroup := make(map[uint64]map[peer.ID]bool, numberOfCustodyGroups)
for i := range numberOfCustodyGroups {
peersByCustodyGroup[i] = make(map[peer.ID]bool)
}
return &dataColumnSampler1D{
p2p: p2p,
clock: clock,
ctxMap: ctxMap,
stateNotifier: stateNotifier,
groupsByPeer: make(map[peer.ID]map[uint64]bool),
peersByCustodyGroup: peersByCustodyGroup,
columnVerifier: colVerifier,
custodyInfo: custodyInfo,
}
}
// Run implements DataColumnSampler.
func (d *dataColumnSampler1D) Run(ctx context.Context) {
numberOfCustodyGroups := params.BeaconConfig().NumberOfCustodyGroups
// Get the node ID.
nodeID := d.p2p.NodeID()
// Verify if we need to run sampling or not, if not, return directly.
// TODO: Rework this part to take into account dynamic custody group count with peer sampling.
custodyGroupCount := d.custodyInfo.ActualGroupCount()
// Retrieve our local node info.
localNodeInfo, _, err := peerdas.Info(nodeID, custodyGroupCount)
if err != nil {
log.WithError(err).Error("peer info")
return
}
// TODO: custody group count != data column group count
if custodyGroupCount >= peerdas.MinimumColumnsCountToReconstruct() {
log.WithFields(logrus.Fields{
"custodyGroupCount": custodyGroupCount,
"totalGroups": numberOfCustodyGroups,
}).Debug("The node custodies at least the half of the groups, no need to sample")
return
}
// Initialize non custody groups.
d.nonCustodyGroups = make(map[uint64]bool)
for i := range numberOfCustodyGroups {
if !localNodeInfo.CustodyGroups[i] {
d.nonCustodyGroups[i] = true
}
}
// Initialize peer info first.
d.refreshPeerInfo()
// periodically refresh peer info to keep peer <-> column mapping up to date.
async.RunEvery(ctx, PeerRefreshInterval, d.refreshPeerInfo)
// start the sampling loop.
d.samplingRoutine(ctx)
}
func (d *dataColumnSampler1D) samplingRoutine(ctx context.Context) {
stateCh := make(chan *feed.Event, 1)
stateSub := d.stateNotifier.StateFeed().Subscribe(stateCh)
defer stateSub.Unsubscribe()
for {
select {
case evt := <-stateCh:
d.handleStateNotification(ctx, evt)
case err := <-stateSub.Err():
log.WithError(err).Error("DataColumnSampler1D subscription to state feed failed")
case <-ctx.Done():
log.Debug("Context canceled, exiting data column sampling loop.")
return
}
}
}
// Refresh peer information.
func (d *dataColumnSampler1D) refreshPeerInfo() {
d.Lock()
defer d.Unlock()
activePeers := d.p2p.Peers().Active()
d.prunePeerInfo(activePeers)
for _, pid := range activePeers {
// Retrieve the custody group count of the peer.
retrievedCustodyGroupCount := d.p2p.CustodyGroupCountFromPeer(pid)
// Look into our store the custody storedGroups for this peer.
storedGroups, ok := d.groupsByPeer[pid]
storedGroupsCount := uint64(len(storedGroups))
if ok && storedGroupsCount == retrievedCustodyGroupCount {
// No change for this peer.
continue
}
nodeID, err := p2p.ConvertPeerIDToNodeID(pid)
if err != nil {
log.WithError(err).WithField("peerID", pid).Error("Failed to convert peer ID to node ID")
continue
}
// Retrieve the peer info.
peerInfo, _, err := peerdas.Info(nodeID, retrievedCustodyGroupCount)
if err != nil {
log.WithError(err).WithField("peerID", pid.String()).Error("Failed to determine peer info")
}
d.groupsByPeer[pid] = peerInfo.CustodyGroups
for group := range peerInfo.CustodyGroups {
d.peersByCustodyGroup[group][pid] = true
}
}
groupsWithoutPeers := make([]uint64, 0)
for group, peers := range d.peersByCustodyGroup {
if len(peers) == 0 {
groupsWithoutPeers = append(groupsWithoutPeers, group)
}
}
if len(groupsWithoutPeers) > 0 {
slices.Sort[[]uint64](groupsWithoutPeers)
log.WithField("groups", groupsWithoutPeers).Warn("Some groups have no peers responsible for custody")
}
}
// prunePeerInfo prunes inactive peers from peerByGroup and groupByPeer.
// This should not be called outside of refreshPeerInfo without being locked.
func (d *dataColumnSampler1D) prunePeerInfo(activePeers []peer.ID) {
active := make(map[peer.ID]bool)
for _, pid := range activePeers {
active[pid] = true
}
for pid := range d.groupsByPeer {
if !active[pid] {
d.prunePeer(pid)
}
}
}
// prunePeer removes a peer from stored peer info map, it should be called with lock held.
func (d *dataColumnSampler1D) prunePeer(pid peer.ID) {
delete(d.groupsByPeer, pid)
for _, peers := range d.peersByCustodyGroup {
delete(peers, pid)
}
}
func (d *dataColumnSampler1D) handleStateNotification(ctx context.Context, event *feed.Event) {
if event.Type != statefeed.BlockProcessed {
return
}
data, ok := event.Data.(*statefeed.BlockProcessedData)
if !ok {
log.Error("Event feed data is not of type *statefeed.BlockProcessedData")
return
}
if !data.Verified {
// We only process blocks that have been verified
log.Error("Data is not verified")
return
}
if data.SignedBlock.Version() < version.Fulu {
log.Debug("Pre Fulu block, skipping data column sampling")
return
}
// Determine if we need to sample data columns for this block.
beaconConfig := params.BeaconConfig()
samplesPerSlots := beaconConfig.SamplesPerSlot
halfOfCustodyGroups := beaconConfig.NumberOfCustodyGroups / 2
nonCustodyGroupsCount := uint64(len(d.nonCustodyGroups))
if nonCustodyGroupsCount <= halfOfCustodyGroups {
// Nothing to sample.
return
}
// Get the commitments for this block.
commitments, err := data.SignedBlock.Block().Body().BlobKzgCommitments()
if err != nil {
log.WithError(err).Error("Failed to get blob KZG commitments")
return
}
// Skip if there are no commitments.
if len(commitments) == 0 {
log.Debug("No commitments in block, skipping data column sampling")
return
}
// Randomize columns for sample selection.
randomizedColumns, err := randomizeColumns(d.nonCustodyGroups)
if err != nil {
log.WithError(err).Error("Failed to randomize columns")
return
}
samplesCount := min(samplesPerSlots, nonCustodyGroupsCount-halfOfCustodyGroups)
// TODO: Use the first output of `incrementalDAS` as input of the fork choice rule.
_, _, err = d.incrementalDAS(ctx, data, randomizedColumns, samplesCount)
if err != nil {
log.WithError(err).Error("Failed to run incremental DAS")
}
}
// incrementalDAS samples data columns from active peers using incremental DAS.
// https://ethresear.ch/t/lossydas-lossy-incremental-and-diagonal-sampling-for-data-availability/18963#incrementaldas-dynamically-increase-the-sample-size-10
// According to https://github.com/ethereum/consensus-specs/issues/3825, we're going to select query samples exclusively from the non custody columns.
func (d *dataColumnSampler1D) incrementalDAS(
ctx context.Context,
blockProcessedData *statefeed.BlockProcessedData,
columns []uint64,
sampleCount uint64,
) (bool, []roundSummary, error) {
allowedFailures := uint64(0)
firstColumnToSample, extendedSampleCount := uint64(0), peerdas.ExtendedSampleCount(sampleCount, allowedFailures)
roundSummaries := make([]roundSummary, 0, 1) // We optimistically allocate only one round summary.
blockRoot := blockProcessedData.BlockRoot
columnsCount := uint64(len(columns))
start := time.Now()
for round := 1; ; /*No exit condition */ round++ {
if extendedSampleCount > columnsCount {
// We already tried to sample all possible columns, this is the unhappy path.
log.WithFields(logrus.Fields{
"root": fmt.Sprintf("%#x", blockRoot),
"round": round - 1,
}).Warning("Some columns are still missing after trying to sample all possible columns")
return false, roundSummaries, nil
}
// Get the columns to sample for this round.
columnsToSample := columns[firstColumnToSample:extendedSampleCount]
columnsToSampleCount := extendedSampleCount - firstColumnToSample
log.WithFields(logrus.Fields{
"root": fmt.Sprintf("%#x", blockRoot),
"columns": columnsToSample,
"round": round,
}).Debug("Start data columns sampling")
// Sample data columns from peers in parallel.
retrievedSamples, err := d.sampleDataColumns(ctx, blockProcessedData, columnsToSample)
if err != nil {
return false, nil, errors.Wrap(err, "sample data columns")
}
missingSamples := make(map[uint64]bool)
for _, column := range columnsToSample {
if !retrievedSamples[column] {
missingSamples[column] = true
}
}
roundSummaries = append(roundSummaries, roundSummary{
RequestedColumns: columnsToSample,
MissingColumns: missingSamples,
})
retrievedSampleCount := uint64(len(retrievedSamples))
if retrievedSampleCount == columnsToSampleCount {
// All columns were correctly sampled, this is the happy path.
log.WithFields(logrus.Fields{
"root": fmt.Sprintf("%#x", blockRoot),
"neededRounds": round,
"duration": time.Since(start),
}).Debug("All columns were successfully sampled")
return true, roundSummaries, nil
}
if retrievedSampleCount > columnsToSampleCount {
// This should never happen.
return false, nil, errors.New("retrieved more columns than requested")
}
// There is still some missing columns, extend the samples.
allowedFailures += columnsToSampleCount - retrievedSampleCount
oldExtendedSampleCount := extendedSampleCount
firstColumnToSample = extendedSampleCount
extendedSampleCount = peerdas.ExtendedSampleCount(sampleCount, allowedFailures)
log.WithFields(logrus.Fields{
"root": fmt.Sprintf("%#x", blockRoot),
"round": round,
"missingColumnsCount": allowedFailures,
"currentSampleIndex": oldExtendedSampleCount,
"nextSampleIndex": extendedSampleCount,
}).Debug("Some columns are still missing after sampling this round.")
}
}
func (d *dataColumnSampler1D) sampleDataColumns(
ctx context.Context,
blockProcessedData *statefeed.BlockProcessedData,
columns []uint64,
) (map[uint64]bool, error) {
// distribute samples to peer
peerToColumns, err := d.distributeSamplesToPeer(columns)
if err != nil {
return nil, errors.Wrap(err, "distribute samples to peer")
}
var (
mu sync.Mutex
wg sync.WaitGroup
)
res := make(map[uint64]bool)
sampleFromPeer := func(pid peer.ID, cols map[uint64]bool) {
defer wg.Done()
retrieved := d.sampleDataColumnsFromPeer(ctx, pid, blockProcessedData, cols)
mu.Lock()
for col := range retrieved {
res[col] = true
}
mu.Unlock()
}
// sample from peers in parallel
for pid, cols := range peerToColumns {
wg.Add(1)
go sampleFromPeer(pid, cols)
}
wg.Wait()
return res, nil
}
// distributeSamplesToPeer distributes samples to peers based on the columns they are responsible for.
// Currently it randomizes peer selection for a column and did not take into account whole peer distribution balance. It could be improved if needed.
func (d *dataColumnSampler1D) distributeSamplesToPeer(columns []uint64) (map[peer.ID]map[uint64]bool, error) {
dist := make(map[peer.ID]map[uint64]bool)
for _, column := range columns {
custodyGroup, err := peerdas.ComputeCustodyGroupForColumn(column)
if err != nil {
return nil, errors.Wrap(err, "compute custody group for column")
}
peers := d.peersByCustodyGroup[custodyGroup]
if len(peers) == 0 {
log.WithField("column", column).Warning("No peers responsible for custody of column")
continue
}
pid, err := selectRandomPeer(peers)
if err != nil {
return nil, errors.Wrap(err, "select random peer")
}
if _, ok := dist[pid]; !ok {
dist[pid] = make(map[uint64]bool)
}
dist[pid][column] = true
}
return dist, nil
}
func (d *dataColumnSampler1D) sampleDataColumnsFromPeer(
ctx context.Context,
pid peer.ID,
blockProcessedData *statefeed.BlockProcessedData,
requestedColumns map[uint64]bool,
) map[uint64]bool {
retrievedColumns := make(map[uint64]bool)
cols := make([]uint64, 0, len(requestedColumns))
for col := range requestedColumns {
cols = append(cols, col)
}
req := &eth.DataColumnsByRootIdentifier{
BlockRoot: blockProcessedData.BlockRoot[:],
Columns: cols,
}
// Send the request to the peer.
roDataColumns, err := SendDataColumnSidecarsByRootRequest(ctx, d.clock, d.p2p, pid, d.ctxMap, types.DataColumnsByRootIdentifiers{req})
if err != nil {
log.WithError(err).Error("Failed to send data column sidecar by root")
return nil
}
// TODO: Once peer sampling is used, we should verify all sampled data columns in a single batch instead of looping over columns.
for _, roDataColumn := range roDataColumns {
if verifyColumn(roDataColumn, blockProcessedData, pid, requestedColumns, d.columnVerifier) {
retrievedColumns[roDataColumn.Index] = true
}
}
if len(retrievedColumns) == len(requestedColumns) {
log.WithFields(logrus.Fields{
"peerID": pid,
"root": fmt.Sprintf("%#x", blockProcessedData.BlockRoot),
"requestedColumns": sliceFromMap(requestedColumns, true /*sorted*/),
}).Debug("Sampled columns from peer successfully")
} else {
log.WithFields(logrus.Fields{
"peerID": pid,
"root": fmt.Sprintf("%#x", blockProcessedData.BlockRoot),
"requestedColumns": sliceFromMap(requestedColumns, true /*sorted*/),
"retrievedColumns": sliceFromMap(retrievedColumns, true /*sorted*/),
}).Debug("Sampled columns from peer with some errors")
}
return retrievedColumns
}
// randomizeColumns returns a slice containing randomly ordered columns belonging to the input `groups`.
func randomizeColumns(custodyGroups map[uint64]bool) ([]uint64, error) {
// Compute the number of columns per group.
numberOfColumns := params.BeaconConfig().NumberOfColumns
numberOfCustodyGroups := params.BeaconConfig().NumberOfCustodyGroups
columnsPerGroup := numberOfColumns / numberOfCustodyGroups
// Compute the number of columns.
groupCount := uint64(len(custodyGroups))
expectedColumnCount := groupCount * columnsPerGroup
// Compute the columns.
columns := make([]uint64, 0, expectedColumnCount)
for group := range custodyGroups {
columnsGroup, err := peerdas.ComputeColumnsForCustodyGroup(group)
if err != nil {
return nil, errors.Wrap(err, "compute columns for custody group")
}
columns = append(columns, columnsGroup...)
}
actualColumnCount := len(columns)
// Safety check.
if uint64(actualColumnCount) != expectedColumnCount {
return nil, errors.New("invalid number of columns, should never happen")
}
// Shuffle the columns.
rand.NewGenerator().Shuffle(actualColumnCount, func(i, j int) {
columns[i], columns[j] = columns[j], columns[i]
})
return columns, nil
}
// sliceFromMap returns a sorted list of keys from a map.
func sliceFromMap(m map[uint64]bool, sorted ...bool) []uint64 {
result := make([]uint64, 0, len(m))
for k := range m {
result = append(result, k)
}
if len(sorted) > 0 && sorted[0] {
slices.Sort(result)
}
return result
}
// selectRandomPeer returns a random peer from the given list of peers.
func selectRandomPeer(peers map[peer.ID]bool) (peer.ID, error) {
peersCount := uint64(len(peers))
pick := rand.NewGenerator().Uint64() % peersCount
for peer := range peers {
if pick == 0 {
return peer, nil
}
pick--
}
// This should never be reached.
return peer.ID(""), errors.New("failed to select random peer")
}
// verifyColumn verifies the retrieved column against the root, the index,
// the KZG inclusion and the KZG proof.
func verifyColumn(
roDataColumn blocks.RODataColumn,
blockProcessedData *statefeed.BlockProcessedData,
pid peer.ID,
requestedColumns map[uint64]bool,
newDataColumnsVerifier verification.NewDataColumnsVerifier,
) bool {
retrievedColumn := roDataColumn.Index
// Filter out columns that were not requested.
if !requestedColumns[retrievedColumn] {
columnsToSampleList := sliceFromMap(requestedColumns, true /*sorted*/)
log.WithFields(logrus.Fields{
"peerID": pid,
"requestedColumns": columnsToSampleList,
"retrievedColumn": retrievedColumn,
}).Debug("Retrieved column was not requested")
return false
}
roBlock, err := blocks.NewROBlock(blockProcessedData.SignedBlock)
if err != nil {
log.WithError(err).WithField("peerID", pid).Error("Failed to create ROBlock")
}
roDataColumns := []blocks.RODataColumn{roDataColumn}
if err := peerdas.DataColumnsAlignWithBlock(roBlock, roDataColumns); err != nil {
return false
}
// https://github.com/ethereum/consensus-specs/blob/dev/specs/fulu/p2p-interface.md#datacolumnsidecarsbyroot-v1
verifier := newDataColumnsVerifier(roDataColumns, verification.ByRootRequestDataColumnSidecarRequirements)
if err := verifier.ValidFields(); err != nil {
log.WithError(err).WithField("peerID", pid).Error("Failed to verify data column")
}
if err := verifier.SidecarInclusionProven(); err != nil {
log.WithError(err).WithField("peerID", pid).Error("Failed to prove inclusion")
}
if err := verifier.SidecarKzgProofVerified(); err != nil {
log.WithError(err).WithField("peerID", pid).Error("Failed to verify KZG proof")
}
_, err = verifier.VerifiedRODataColumns()
if err != nil {
log.WithError(err).WithField("peerID", pid).Error("Failed to upgrade RODataColumns to VerifiedRODataColumns - should never happen")
}
return true
}

View File

@@ -0,0 +1,554 @@
package sync
import (
"bytes"
"context"
"crypto/sha256"
"encoding/binary"
"fmt"
"testing"
"github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain/kzg"
mock "github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain/testing"
statefeed "github.com/OffchainLabs/prysm/v6/beacon-chain/core/feed/state"
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/peerdas"
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p"
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers"
p2ptest "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/testing"
p2pTypes "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/types"
"github.com/OffchainLabs/prysm/v6/beacon-chain/startup"
"github.com/OffchainLabs/prysm/v6/beacon-chain/verification"
"github.com/OffchainLabs/prysm/v6/config/params"
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
ethpb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v6/runtime/version"
"github.com/OffchainLabs/prysm/v6/testing/require"
"github.com/OffchainLabs/prysm/v6/testing/util"
"github.com/consensys/gnark-crypto/ecc/bls12-381/fr"
GoKZG "github.com/crate-crypto/go-kzg-4844"
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/network"
)
func TestRandomizeColumns(t *testing.T) {
const count uint64 = 128
// Generate groups.
groups := make(map[uint64]bool, count)
for i := uint64(0); i < count; i++ {
groups[i] = true
}
// Randomize columns.
randomizedColumns, err := randomizeColumns(groups)
require.NoError(t, err)
// Convert back to a map.
randomizedColumnsMap := make(map[uint64]bool, count)
for _, column := range randomizedColumns {
randomizedColumnsMap[column] = true
}
// Check duplicates and missing columns.
require.Equal(t, len(groups), len(randomizedColumnsMap))
// Check the values.
for column := range randomizedColumnsMap {
require.Equal(t, true, column < count)
}
}
// createAndConnectPeer creates a peer with a private key `offset` fixed.
// The peer is added and connected to `p2pService`.
// If a `RPCDataColumnSidecarsByRootTopicV1` request is made with column index `i`,
// then the peer will respond with the `dataColumnSidecars[i]` if it is not in `columnsNotToRespond`.
// (If `len(dataColumnSidecars) < i`, then this function will panic.)
func createAndConnectPeer(
t *testing.T,
p2pService *p2ptest.TestP2P,
chainService *mock.ChainService,
dataColumnSidecars []*ethpb.DataColumnSidecar,
custodyGroupCount uint64,
columnsNotToRespond map[uint64]bool,
offset int,
) *p2ptest.TestP2P {
// Create the private key, depending on the offset.
privateKeyBytes := make([]byte, 32)
for i := 0; i < 32; i++ {
privateKeyBytes[i] = byte(offset + i)
}
privateKey, err := crypto.UnmarshalSecp256k1PrivateKey(privateKeyBytes)
require.NoError(t, err)
// Create the peer.
peer := p2ptest.NewTestP2P(t, libp2p.Identity(privateKey))
peer.SetStreamHandler(p2p.RPCDataColumnSidecarsByRootTopicV1+"/ssz_snappy", func(stream network.Stream) {
// Decode the request.
req := new(p2pTypes.DataColumnsByRootIdentifiers)
err := peer.Encoding().DecodeWithMaxLength(stream, req)
require.NoError(t, err)
for _, identifier := range *req {
for _, column := range identifier.Columns {
// Filter out the columns not to respond.
if columnsNotToRespond[column] {
continue
}
// Create the response.
resp := dataColumnSidecars[column]
// Send the response.
err := WriteDataColumnSidecarChunk(stream, chainService, p2pService.Encoding(), resp)
require.NoError(t, err)
}
}
// Close the stream.
closeStream(stream, log)
})
// Create the record and set the custody count.
enr := &enr.Record{}
enr.Set(peerdas.Cgc(custodyGroupCount))
// Add the peer and connect it.
p2pService.Peers().Add(enr, peer.PeerID(), nil, network.DirOutbound)
p2pService.Peers().SetConnectionState(peer.PeerID(), peers.Connected)
p2pService.Connect(peer)
return peer
}
type dataSamplerTest struct {
ctx context.Context
p2pSvc *p2ptest.TestP2P
peers []*p2ptest.TestP2P
ctxMap map[[4]byte]int
chainSvc *mock.ChainService
blockProcessedData *statefeed.BlockProcessedData
blobs []kzg.Blob
kzgCommitments [][]byte
kzgProofs [][]byte
dataColumnSidecars []*ethpb.DataColumnSidecar
}
func setupDefaultDataColumnSamplerTest(t *testing.T) (*dataSamplerTest, *dataColumnSampler1D) {
const (
blobCount uint64 = 3
custodyRequirement uint64 = 4
)
test, sampler := setupDataColumnSamplerTest(t, blobCount)
// Custody columns: [6, 38, 70, 102]
p1 := createAndConnectPeer(t, test.p2pSvc, test.chainSvc, test.dataColumnSidecars, custodyRequirement, map[uint64]bool{}, 1)
// Custody columns: [3, 35, 67, 99]
p2 := createAndConnectPeer(t, test.p2pSvc, test.chainSvc, test.dataColumnSidecars, custodyRequirement, map[uint64]bool{}, 2)
// Custody columns: [12, 44, 76, 108]
p3 := createAndConnectPeer(t, test.p2pSvc, test.chainSvc, test.dataColumnSidecars, custodyRequirement, map[uint64]bool{}, 3)
test.peers = []*p2ptest.TestP2P{p1, p2, p3}
return test, sampler
}
func setupDataColumnSamplerTest(t *testing.T, blobCount uint64) (*dataSamplerTest, *dataColumnSampler1D) {
require.NoError(t, kzg.Start())
// Generate random blobs, commitments and inclusion proofs.
blobs := make([]kzg.Blob, blobCount)
kzgCommitments := make([][]byte, blobCount)
kzgProofs := make([][]byte, blobCount)
for i := uint64(0); i < blobCount; i++ {
blob := getRandBlob(t, int64(i))
kzgCommitment, kzgProof, err := generateCommitmentAndProof(&blob)
require.NoError(t, err)
blobs[i] = blob
kzgCommitments[i] = kzgCommitment[:]
kzgProofs[i] = kzgProof[:]
}
dbBlock := util.NewBeaconBlockDeneb()
dbBlock.Block.Body.BlobKzgCommitments = kzgCommitments
sBlock, err := blocks.NewSignedBeaconBlock(dbBlock)
require.NoError(t, err)
cellsAndProofs := util.GenerateCellsAndProofs(t, blobs)
dataColumnSidecars, err := peerdas.DataColumnSidecars(sBlock, cellsAndProofs)
require.NoError(t, err)
blockRoot, err := dataColumnSidecars[0].GetSignedBlockHeader().Header.HashTreeRoot()
require.NoError(t, err)
blockProcessedData := &statefeed.BlockProcessedData{
BlockRoot: blockRoot,
SignedBlock: sBlock,
}
p2pSvc := p2ptest.NewTestP2P(t)
chainSvc, clock := defaultMockChain(t)
test := &dataSamplerTest{
ctx: context.Background(),
p2pSvc: p2pSvc,
peers: []*p2ptest.TestP2P{},
ctxMap: map[[4]byte]int{{245, 165, 253, 66}: version.Fulu},
chainSvc: chainSvc,
blockProcessedData: blockProcessedData,
blobs: blobs,
kzgCommitments: kzgCommitments,
kzgProofs: kzgProofs,
dataColumnSidecars: dataColumnSidecars,
}
clockSync := startup.NewClockSynchronizer()
require.NoError(t, clockSync.SetClock(clock))
iniWaiter := verification.NewInitializerWaiter(clockSync, nil, nil)
ini, err := iniWaiter.WaitForInitializer(context.Background())
require.NoError(t, err)
sampler := newDataColumnSampler1D(p2pSvc, clock, test.ctxMap, nil, newDataColumnsVerifierFromInitializer(ini), &peerdas.CustodyInfo{})
return test, sampler
}
func TestDataColumnSampler1D_PeerManagement(t *testing.T) {
testCases := []struct {
name string
numPeers int
custodyRequirement uint64
expectedColumns [][]uint64
prunePeers map[int]bool // Peers to prune.
}{
{
name: "custodyRequirement=4",
numPeers: 3,
custodyRequirement: 4,
expectedColumns: [][]uint64{
{6, 37, 48, 113},
{35, 79, 92, 109},
{31, 44, 58, 97},
},
prunePeers: map[int]bool{
0: true,
},
},
{
name: "custodyRequirement=8",
numPeers: 3,
custodyRequirement: 8,
expectedColumns: [][]uint64{
{1, 6, 37, 48, 51, 87, 112, 113},
{24, 25, 35, 52, 79, 92, 109, 126},
{31, 44, 58, 64, 91, 97, 116, 127},
},
prunePeers: map[int]bool{
0: true,
},
},
}
params.SetupTestConfigCleanup(t)
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
cfg := params.BeaconConfig()
cfg.CustodyRequirement = tc.custodyRequirement
params.OverrideBeaconConfig(cfg)
test, sampler := setupDataColumnSamplerTest(t, uint64(tc.numPeers))
for i := 0; i < tc.numPeers; i++ {
p := createAndConnectPeer(t, test.p2pSvc, test.chainSvc, test.dataColumnSidecars, tc.custodyRequirement, nil, i+1)
test.peers = append(test.peers, p)
}
// confirm everything works
sampler.refreshPeerInfo()
require.Equal(t, params.BeaconConfig().NumberOfColumns, uint64(len(sampler.peersByCustodyGroup)))
require.Equal(t, tc.numPeers, len(sampler.groupsByPeer))
for i, peer := range test.peers {
// confirm peer has the expected columns
require.Equal(t, len(tc.expectedColumns[i]), len(sampler.groupsByPeer[peer.PeerID()]))
for _, column := range tc.expectedColumns[i] {
require.Equal(t, true, sampler.groupsByPeer[peer.PeerID()][column])
}
// confirm column to peer mapping are correct
for _, column := range tc.expectedColumns[i] {
require.Equal(t, true, sampler.peersByCustodyGroup[column][peer.PeerID()])
}
}
// prune peers
for peer := range tc.prunePeers {
err := test.p2pSvc.Disconnect(test.peers[peer].PeerID())
test.p2pSvc.Peers().SetConnectionState(test.peers[peer].PeerID(), peers.Disconnected)
require.NoError(t, err)
}
sampler.refreshPeerInfo()
require.Equal(t, tc.numPeers-len(tc.prunePeers), len(sampler.groupsByPeer))
for i, peer := range test.peers {
for _, column := range tc.expectedColumns[i] {
expected := true
if tc.prunePeers[i] {
expected = false
}
require.Equal(t, expected, sampler.peersByCustodyGroup[column][peer.PeerID()])
}
}
})
}
}
func TestDataColumnSampler1D_SampleDistribution(t *testing.T) {
// TODO: Use `t.Run`.
testCases := []struct {
numPeers int
custodyRequirement uint64
columnsToDistribute [][]uint64
expectedDistribution []map[int][]uint64
}{
{
numPeers: 3,
custodyRequirement: 4,
// peer custody maps
// p0: {6, 37, 48, 113},
// p1: {35, 79, 92, 109},
// p2: {31, 44, 58, 97},
columnsToDistribute: [][]uint64{
{6, 35, 31},
{6, 48, 79, 109, 31, 97},
{6, 37, 113},
{11},
},
expectedDistribution: []map[int][]uint64{
{
0: {6}, // p0
1: {35}, // p1
2: {31}, // p2
},
{
0: {6, 48}, // p0
1: {79, 109}, // p1
2: {31, 97}, // p2
},
{
0: {6, 37, 113}, // p0
},
{},
},
},
{
numPeers: 3,
custodyRequirement: 8,
// peer custody maps
// p0: {6, 37, 48, 113, 1, 112, 87, 51},
// p1: {35, 79, 92, 109, 52, 126, 25, 24},
// p2: {31, 44, 58, 97, 116, 91, 64, 127},
columnsToDistribute: [][]uint64{
{6, 48, 79, 25, 24, 97}, // all covered by peers
{6, 35, 31, 32}, // `32` is not in covered by peers
},
expectedDistribution: []map[int][]uint64{
{
0: {6, 48}, // p0
1: {79, 25, 24}, // p1
2: {97}, // p2
},
{
0: {6}, // p0
1: {35}, // p1
2: {31}, // p2
},
},
},
}
params.SetupTestConfigCleanup(t)
for _, tc := range testCases {
cfg := params.BeaconConfig()
cfg.CustodyRequirement = tc.custodyRequirement
params.OverrideBeaconConfig(cfg)
test, sampler := setupDataColumnSamplerTest(t, uint64(tc.numPeers))
for i := 0; i < tc.numPeers; i++ {
p := createAndConnectPeer(t, test.p2pSvc, test.chainSvc, test.dataColumnSidecars, tc.custodyRequirement, nil, i+1)
test.peers = append(test.peers, p)
}
sampler.refreshPeerInfo()
for idx, columns := range tc.columnsToDistribute {
result, err := sampler.distributeSamplesToPeer(columns)
require.NoError(t, err)
require.Equal(t, len(tc.expectedDistribution[idx]), len(result), fmt.Sprintf("%v - %v", tc.expectedDistribution[idx], result))
for peerIdx, dist := range tc.expectedDistribution[idx] {
for _, column := range dist {
peerID := test.peers[peerIdx].PeerID()
require.Equal(t, true, result[peerID][column])
}
}
}
}
}
func TestDataColumnSampler1D_SampleDataColumns(t *testing.T) {
test, sampler := setupDefaultDataColumnSamplerTest(t)
sampler.refreshPeerInfo()
t.Run("sample all columns", func(t *testing.T) {
sampleColumns := []uint64{6, 35, 31, 37, 79, 44, 48, 92, 58, 113, 109, 97}
retrieved, err := sampler.sampleDataColumns(test.ctx, test.blockProcessedData, sampleColumns)
require.NoError(t, err)
require.Equal(t, 12, len(retrieved))
for _, column := range sampleColumns {
require.Equal(t, true, retrieved[column])
}
})
t.Run("sample a subset of columns", func(t *testing.T) {
sampleColumns := []uint64{35, 31, 79, 48, 113, 97}
retrieved, err := sampler.sampleDataColumns(test.ctx, test.blockProcessedData, sampleColumns)
require.NoError(t, err)
require.Equal(t, 6, len(retrieved))
for _, column := range sampleColumns {
require.Equal(t, true, retrieved[column])
}
})
t.Run("sample a subset of columns with missing columns", func(t *testing.T) {
sampleColumns := []uint64{35, 31, 100, 79}
retrieved, err := sampler.sampleDataColumns(test.ctx, test.blockProcessedData, sampleColumns)
require.NoError(t, err)
require.Equal(t, 3, len(retrieved))
require.DeepEqual(t, map[uint64]bool{35: true, 31: true, 79: true}, retrieved)
})
}
func TestDataColumnSampler1D_IncrementalDAS(t *testing.T) {
params.SetupTestConfigCleanup(t)
cfg := params.BeaconConfig()
cfg.DataColumnSidecarSubnetCount = 32
params.OverrideBeaconConfig(cfg)
testCases := []struct {
name string
samplesCount uint64
possibleColumnsToRequest []uint64
columnsNotToRespond map[uint64]bool
expectedSuccess bool
expectedRoundSummaries []roundSummary
}{
{
name: "All columns are correctly sampled in a single round",
samplesCount: 5,
possibleColumnsToRequest: []uint64{6, 35, 31, 37, 79, 44, 48, 92, 58, 113, 109, 97},
columnsNotToRespond: map[uint64]bool{},
expectedSuccess: true,
expectedRoundSummaries: []roundSummary{
{
RequestedColumns: []uint64{6, 35, 31, 37, 79},
MissingColumns: map[uint64]bool{},
},
},
},
{
name: "Two missing columns in the first round, ok in the second round",
samplesCount: 5,
possibleColumnsToRequest: []uint64{6, 35, 31, 37, 79, 44, 48, 92, 58, 113, 109, 97},
columnsNotToRespond: map[uint64]bool{6: true, 31: true},
expectedSuccess: true,
expectedRoundSummaries: []roundSummary{
{
RequestedColumns: []uint64{6, 35, 31, 37, 79},
MissingColumns: map[uint64]bool{6: true, 31: true},
},
{
RequestedColumns: []uint64{44, 48, 92, 58, 113, 109},
MissingColumns: map[uint64]bool{},
},
},
},
{
name: "Two missing columns in the first round, one missing in the second round. Fail to sample.",
samplesCount: 5,
possibleColumnsToRequest: []uint64{6, 35, 31, 37, 79, 44, 48, 92, 58, 113, 109, 97},
columnsNotToRespond: map[uint64]bool{6: true, 31: true, 48: true},
expectedSuccess: false,
expectedRoundSummaries: []roundSummary{
{
RequestedColumns: []uint64{6, 35, 31, 37, 79},
MissingColumns: map[uint64]bool{6: true, 31: true},
},
{
RequestedColumns: []uint64{44, 48, 92, 58, 113, 109},
MissingColumns: map[uint64]bool{48: true},
},
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
test, sampler := setupDataColumnSamplerTest(t, 3)
p1 := createAndConnectPeer(t, test.p2pSvc, test.chainSvc, test.dataColumnSidecars, params.BeaconConfig().CustodyRequirement, tc.columnsNotToRespond, 1)
p2 := createAndConnectPeer(t, test.p2pSvc, test.chainSvc, test.dataColumnSidecars, params.BeaconConfig().CustodyRequirement, tc.columnsNotToRespond, 2)
p3 := createAndConnectPeer(t, test.p2pSvc, test.chainSvc, test.dataColumnSidecars, params.BeaconConfig().CustodyRequirement, tc.columnsNotToRespond, 3)
test.peers = []*p2ptest.TestP2P{p1, p2, p3}
sampler.refreshPeerInfo()
success, summaries, err := sampler.incrementalDAS(test.ctx, test.blockProcessedData, tc.possibleColumnsToRequest, tc.samplesCount)
require.NoError(t, err)
require.Equal(t, tc.expectedSuccess, success)
require.DeepEqual(t, tc.expectedRoundSummaries, summaries)
})
}
}
func deterministicRandomness(t *testing.T, seed int64) [32]byte {
// Converts an int64 to a byte slice
buf := new(bytes.Buffer)
err := binary.Write(buf, binary.BigEndian, seed)
require.NoError(t, err)
bytes := buf.Bytes()
return sha256.Sum256(bytes)
}
// Returns a serialized random field element in big-endian
func getRandFieldElement(t *testing.T, seed int64) [32]byte {
bytes := deterministicRandomness(t, seed)
var r fr.Element
r.SetBytes(bytes[:])
return GoKZG.SerializeScalar(r)
}
// Returns a random blob using the passed seed as entropy
func getRandBlob(t *testing.T, seed int64) kzg.Blob {
var blob kzg.Blob
for i := 0; i < len(blob); i += 32 {
fieldElementBytes := getRandFieldElement(t, seed+int64(i))
copy(blob[i:i+32], fieldElementBytes[:])
}
return blob
}
func generateCommitmentAndProof(blob *kzg.Blob) (*kzg.Commitment, *kzg.Proof, error) {
commitment, err := kzg.BlobToKZGCommitment(blob)
if err != nil {
return nil, nil, err
}
proof, err := kzg.ComputeBlobKZGProof(blob, commitment)
if err != nil {
return nil, nil, err
}
return &commitment, &proof, err
}

View File

@@ -38,6 +38,15 @@ var (
RequireSidecarProposerExpected,
}
// ByRootRequestDataColumnSidecarRequirements defines the set of requirements that DataColumnSidecars received
// via the by root request must satisfy in order to upgrade an RODataColumn to a VerifiedRODataColumn.
// https://github.com/ethereum/consensus-specs/blob/dev/specs/fulu/p2p-interface.md#datacolumnsidecarsbyroot-v1
ByRootRequestDataColumnSidecarRequirements = []Requirement{
RequireValidFields,
RequireSidecarInclusionProven,
RequireSidecarKzgProofVerified,
}
// ByRangeRequestDataColumnSidecarRequirements defines the set of requirements that DataColumnSidecars received
// via the by range request must satisfy in order to upgrade an RODataColumn to a VerifiedRODataColumn.
// https://github.com/ethereum/consensus-specs/blob/dev/specs/fulu/p2p-interface.md#datacolumnsidecarsbyrange-v1

View File

@@ -0,0 +1,2 @@
### Added
- PeerDAS: Implement peer sampling.