mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-09 13:28:01 -05:00
Compare commits
1 Commits
d929e1dcaa
...
peerdas-pe
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
33e6ff416b |
@@ -7,6 +7,7 @@ go_library(
|
||||
"info.go",
|
||||
"metrics.go",
|
||||
"p2p_interface.go",
|
||||
"peer_sampling.go",
|
||||
"reconstruction.go",
|
||||
"validator.go",
|
||||
"verification.go",
|
||||
@@ -44,6 +45,7 @@ go_test(
|
||||
"das_core_test.go",
|
||||
"info_test.go",
|
||||
"p2p_interface_test.go",
|
||||
"peer_sampling_test.go",
|
||||
"reconstruction_test.go",
|
||||
"utils_test.go",
|
||||
"validator_test.go",
|
||||
|
||||
56
beacon-chain/core/peerdas/peer_sampling.go
Normal file
56
beacon-chain/core/peerdas/peer_sampling.go
Normal file
@@ -0,0 +1,56 @@
|
||||
package peerdas
|
||||
|
||||
import (
|
||||
"math/big"
|
||||
|
||||
"github.com/OffchainLabs/prysm/v6/config/params"
|
||||
)
|
||||
|
||||
// ExtendedSampleCount computes, for a given number of samples per slot and allowed failures the
|
||||
// number of samples we should actually query from peers.
|
||||
// https://github.com/ethereum/consensus-specs/blob/v1.5.0-beta.5/specs/fulu/peer-sampling.md#get_extended_sample_count
|
||||
func ExtendedSampleCount(samplesPerSlot, allowedFailures uint64) uint64 {
|
||||
// Retrieve the columns count
|
||||
columnsCount := params.BeaconConfig().NumberOfColumns
|
||||
|
||||
// If half of the columns are missing, we are able to reconstruct the data.
|
||||
// If half of the columns + 1 are missing, we are not able to reconstruct the data.
|
||||
// This is the smallest worst case.
|
||||
worstCaseMissing := columnsCount/2 + 1
|
||||
|
||||
// Compute the false positive threshold.
|
||||
falsePositiveThreshold := HypergeomCDF(0, columnsCount, worstCaseMissing, samplesPerSlot)
|
||||
|
||||
var sampleCount uint64
|
||||
|
||||
// Finally, compute the extended sample count.
|
||||
for sampleCount = samplesPerSlot; sampleCount < columnsCount+1; sampleCount++ {
|
||||
if HypergeomCDF(allowedFailures, columnsCount, worstCaseMissing, sampleCount) <= falsePositiveThreshold {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
return sampleCount
|
||||
}
|
||||
|
||||
// HypergeomCDF computes the hypergeometric cumulative distribution function.
|
||||
// https://en.wikipedia.org/wiki/Hypergeometric_distribution
|
||||
func HypergeomCDF(k, M, n, N uint64) float64 {
|
||||
denominatorInt := new(big.Int).Binomial(int64(M), int64(N)) // lint:ignore uintcast
|
||||
denominator := new(big.Float).SetInt(denominatorInt)
|
||||
|
||||
rBig := big.NewFloat(0)
|
||||
|
||||
for i := uint64(0); i < k+1; i++ {
|
||||
a := new(big.Int).Binomial(int64(n), int64(i)) // lint:ignore uintcast
|
||||
b := new(big.Int).Binomial(int64(M-n), int64(N-i))
|
||||
numeratorInt := new(big.Int).Mul(a, b)
|
||||
numerator := new(big.Float).SetInt(numeratorInt)
|
||||
item := new(big.Float).Quo(numerator, denominator)
|
||||
rBig.Add(rBig, item)
|
||||
}
|
||||
|
||||
r, _ := rBig.Float64()
|
||||
|
||||
return r
|
||||
}
|
||||
60
beacon-chain/core/peerdas/peer_sampling_test.go
Normal file
60
beacon-chain/core/peerdas/peer_sampling_test.go
Normal file
@@ -0,0 +1,60 @@
|
||||
package peerdas_test
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/peerdas"
|
||||
"github.com/OffchainLabs/prysm/v6/testing/require"
|
||||
)
|
||||
|
||||
func TestExtendedSampleCount(t *testing.T) {
|
||||
const samplesPerSlot = 16
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
allowedMissings uint64
|
||||
extendedSampleCount uint64
|
||||
}{
|
||||
{name: "allowedMissings=0", allowedMissings: 0, extendedSampleCount: 16},
|
||||
{name: "allowedMissings=1", allowedMissings: 1, extendedSampleCount: 20},
|
||||
{name: "allowedMissings=2", allowedMissings: 2, extendedSampleCount: 24},
|
||||
{name: "allowedMissings=3", allowedMissings: 3, extendedSampleCount: 27},
|
||||
{name: "allowedMissings=4", allowedMissings: 4, extendedSampleCount: 29},
|
||||
{name: "allowedMissings=5", allowedMissings: 5, extendedSampleCount: 32},
|
||||
{name: "allowedMissings=6", allowedMissings: 6, extendedSampleCount: 35},
|
||||
{name: "allowedMissings=7", allowedMissings: 7, extendedSampleCount: 37},
|
||||
{name: "allowedMissings=8", allowedMissings: 8, extendedSampleCount: 40},
|
||||
{name: "allowedMissings=9", allowedMissings: 9, extendedSampleCount: 42},
|
||||
{name: "allowedMissings=10", allowedMissings: 10, extendedSampleCount: 44},
|
||||
{name: "allowedMissings=11", allowedMissings: 11, extendedSampleCount: 47},
|
||||
{name: "allowedMissings=12", allowedMissings: 12, extendedSampleCount: 49},
|
||||
{name: "allowedMissings=13", allowedMissings: 13, extendedSampleCount: 51},
|
||||
{name: "allowedMissings=14", allowedMissings: 14, extendedSampleCount: 53},
|
||||
{name: "allowedMissings=15", allowedMissings: 15, extendedSampleCount: 55},
|
||||
{name: "allowedMissings=16", allowedMissings: 16, extendedSampleCount: 57},
|
||||
{name: "allowedMissings=17", allowedMissings: 17, extendedSampleCount: 59},
|
||||
{name: "allowedMissings=18", allowedMissings: 18, extendedSampleCount: 61},
|
||||
{name: "allowedMissings=19", allowedMissings: 19, extendedSampleCount: 63},
|
||||
{name: "allowedMissings=20", allowedMissings: 20, extendedSampleCount: 65},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
result := peerdas.ExtendedSampleCount(samplesPerSlot, tc.allowedMissings)
|
||||
require.Equal(t, tc.extendedSampleCount, result)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestHypergeomCDF(t *testing.T) {
|
||||
// Test case from https://en.wikipedia.org/wiki/Hypergeometric_distribution
|
||||
// Population size: 1000, number of successes in population: 500, sample size: 10, number of successes in sample: 5
|
||||
// Expected result: 0.072
|
||||
const (
|
||||
expected = 0.0796665913283742
|
||||
margin = 0.000001
|
||||
)
|
||||
|
||||
actual := peerdas.HypergeomCDF(5, 128, 65, 16)
|
||||
require.Equal(t, true, expected-margin <= actual && actual <= expected+margin)
|
||||
}
|
||||
@@ -7,6 +7,7 @@ go_library(
|
||||
"block_batcher.go",
|
||||
"broadcast_bls_changes.go",
|
||||
"context.go",
|
||||
"data_columns_sampling.go",
|
||||
"deadlines.go",
|
||||
"decode_pubsub.go",
|
||||
"doc.go",
|
||||
@@ -78,6 +79,7 @@ go_library(
|
||||
"//beacon-chain/core/feed/state:go_default_library",
|
||||
"//beacon-chain/core/helpers:go_default_library",
|
||||
"//beacon-chain/core/light-client:go_default_library",
|
||||
"//beacon-chain/core/peerdas:go_default_library",
|
||||
"//beacon-chain/core/signing:go_default_library",
|
||||
"//beacon-chain/core/transition:go_default_library",
|
||||
"//beacon-chain/core/transition/interop:go_default_library",
|
||||
@@ -162,6 +164,7 @@ go_test(
|
||||
"block_batcher_test.go",
|
||||
"broadcast_bls_changes_test.go",
|
||||
"context_test.go",
|
||||
"data_columns_sampling_test.go",
|
||||
"decode_pubsub_test.go",
|
||||
"error_test.go",
|
||||
"fork_watcher_test.go",
|
||||
@@ -207,13 +210,16 @@ go_test(
|
||||
deps = [
|
||||
"//async/abool:go_default_library",
|
||||
"//beacon-chain/blockchain:go_default_library",
|
||||
"//beacon-chain/blockchain/kzg:go_default_library",
|
||||
"//beacon-chain/blockchain/testing:go_default_library",
|
||||
"//beacon-chain/cache:go_default_library",
|
||||
"//beacon-chain/core/altair:go_default_library",
|
||||
"//beacon-chain/core/feed:go_default_library",
|
||||
"//beacon-chain/core/feed/operation:go_default_library",
|
||||
"//beacon-chain/core/feed/state:go_default_library",
|
||||
"//beacon-chain/core/helpers:go_default_library",
|
||||
"//beacon-chain/core/light-client:go_default_library",
|
||||
"//beacon-chain/core/peerdas:go_default_library",
|
||||
"//beacon-chain/core/signing:go_default_library",
|
||||
"//beacon-chain/core/time:go_default_library",
|
||||
"//beacon-chain/core/transition:go_default_library",
|
||||
@@ -265,6 +271,8 @@ go_test(
|
||||
"//testing/util:go_default_library",
|
||||
"//time:go_default_library",
|
||||
"//time/slots:go_default_library",
|
||||
"@com_github_consensys_gnark_crypto//ecc/bls12-381/fr:go_default_library",
|
||||
"@com_github_crate_crypto_go_kzg_4844//:go_default_library",
|
||||
"@com_github_d4l3k_messagediff//:go_default_library",
|
||||
"@com_github_ethereum_go_ethereum//common:go_default_library",
|
||||
"@com_github_ethereum_go_ethereum//core/types:go_default_library",
|
||||
@@ -272,6 +280,7 @@ go_test(
|
||||
"@com_github_golang_snappy//:go_default_library",
|
||||
"@com_github_libp2p_go_libp2p//:go_default_library",
|
||||
"@com_github_libp2p_go_libp2p//core:go_default_library",
|
||||
"@com_github_libp2p_go_libp2p//core/crypto:go_default_library",
|
||||
"@com_github_libp2p_go_libp2p//core/network:go_default_library",
|
||||
"@com_github_libp2p_go_libp2p//core/peer:go_default_library",
|
||||
"@com_github_libp2p_go_libp2p//core/protocol:go_default_library",
|
||||
|
||||
627
beacon-chain/sync/data_columns_sampling.go
Normal file
627
beacon-chain/sync/data_columns_sampling.go
Normal file
@@ -0,0 +1,627 @@
|
||||
package sync
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"slices"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/OffchainLabs/prysm/v6/async"
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/feed"
|
||||
statefeed "github.com/OffchainLabs/prysm/v6/beacon-chain/core/feed/state"
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/peerdas"
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p"
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/types"
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/startup"
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/verification"
|
||||
"github.com/OffchainLabs/prysm/v6/config/params"
|
||||
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
|
||||
"github.com/OffchainLabs/prysm/v6/crypto/rand"
|
||||
eth "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1"
|
||||
"github.com/OffchainLabs/prysm/v6/runtime/version"
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
const PeerRefreshInterval = 1 * time.Minute
|
||||
|
||||
type roundSummary struct {
|
||||
RequestedColumns []uint64
|
||||
MissingColumns map[uint64]bool
|
||||
}
|
||||
|
||||
// DataColumnSampler defines the interface for sampling data columns from peers for requested block root and samples count.
|
||||
type DataColumnSampler interface {
|
||||
// Run starts the data column sampling service.
|
||||
Run(ctx context.Context)
|
||||
}
|
||||
|
||||
var _ DataColumnSampler = (*dataColumnSampler1D)(nil)
|
||||
|
||||
// dataColumnSampler1D implements the DataColumnSampler interface for PeerDAS 1D.
|
||||
type dataColumnSampler1D struct {
|
||||
sync.RWMutex
|
||||
|
||||
p2p p2p.P2P
|
||||
clock *startup.Clock
|
||||
ctxMap ContextByteVersions
|
||||
stateNotifier statefeed.Notifier
|
||||
|
||||
// nonCustodyGroups is a set of groups that are not custodied by the node.
|
||||
nonCustodyGroups map[uint64]bool
|
||||
|
||||
// groupsByPeer maps a peer to the groups it is responsible for custody.
|
||||
groupsByPeer map[peer.ID]map[uint64]bool
|
||||
|
||||
// peersByCustodyGroup maps a group to the peer responsible for custody.
|
||||
peersByCustodyGroup map[uint64]map[peer.ID]bool
|
||||
|
||||
// columnVerifier verifies a column according to the specified requirements.
|
||||
columnVerifier verification.NewDataColumnsVerifier
|
||||
|
||||
// custodyInfo contains the custody information of the node.
|
||||
custodyInfo *peerdas.CustodyInfo
|
||||
}
|
||||
|
||||
// newDataColumnSampler1D creates a new 1D data column sampler.
|
||||
func newDataColumnSampler1D(
|
||||
p2p p2p.P2P,
|
||||
clock *startup.Clock,
|
||||
ctxMap ContextByteVersions,
|
||||
stateNotifier statefeed.Notifier,
|
||||
colVerifier verification.NewDataColumnsVerifier,
|
||||
custodyInfo *peerdas.CustodyInfo,
|
||||
) *dataColumnSampler1D {
|
||||
numberOfCustodyGroups := params.BeaconConfig().NumberOfCustodyGroups
|
||||
peersByCustodyGroup := make(map[uint64]map[peer.ID]bool, numberOfCustodyGroups)
|
||||
|
||||
for i := range numberOfCustodyGroups {
|
||||
peersByCustodyGroup[i] = make(map[peer.ID]bool)
|
||||
}
|
||||
|
||||
return &dataColumnSampler1D{
|
||||
p2p: p2p,
|
||||
clock: clock,
|
||||
ctxMap: ctxMap,
|
||||
stateNotifier: stateNotifier,
|
||||
groupsByPeer: make(map[peer.ID]map[uint64]bool),
|
||||
peersByCustodyGroup: peersByCustodyGroup,
|
||||
columnVerifier: colVerifier,
|
||||
custodyInfo: custodyInfo,
|
||||
}
|
||||
}
|
||||
|
||||
// Run implements DataColumnSampler.
|
||||
func (d *dataColumnSampler1D) Run(ctx context.Context) {
|
||||
numberOfCustodyGroups := params.BeaconConfig().NumberOfCustodyGroups
|
||||
|
||||
// Get the node ID.
|
||||
nodeID := d.p2p.NodeID()
|
||||
|
||||
// Verify if we need to run sampling or not, if not, return directly.
|
||||
// TODO: Rework this part to take into account dynamic custody group count with peer sampling.
|
||||
custodyGroupCount := d.custodyInfo.ActualGroupCount()
|
||||
|
||||
// Retrieve our local node info.
|
||||
localNodeInfo, _, err := peerdas.Info(nodeID, custodyGroupCount)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("peer info")
|
||||
return
|
||||
}
|
||||
|
||||
// TODO: custody group count != data column group count
|
||||
if custodyGroupCount >= peerdas.MinimumColumnsCountToReconstruct() {
|
||||
log.WithFields(logrus.Fields{
|
||||
"custodyGroupCount": custodyGroupCount,
|
||||
"totalGroups": numberOfCustodyGroups,
|
||||
}).Debug("The node custodies at least the half of the groups, no need to sample")
|
||||
return
|
||||
}
|
||||
|
||||
// Initialize non custody groups.
|
||||
d.nonCustodyGroups = make(map[uint64]bool)
|
||||
for i := range numberOfCustodyGroups {
|
||||
if !localNodeInfo.CustodyGroups[i] {
|
||||
d.nonCustodyGroups[i] = true
|
||||
}
|
||||
}
|
||||
|
||||
// Initialize peer info first.
|
||||
d.refreshPeerInfo()
|
||||
|
||||
// periodically refresh peer info to keep peer <-> column mapping up to date.
|
||||
async.RunEvery(ctx, PeerRefreshInterval, d.refreshPeerInfo)
|
||||
|
||||
// start the sampling loop.
|
||||
d.samplingRoutine(ctx)
|
||||
}
|
||||
|
||||
func (d *dataColumnSampler1D) samplingRoutine(ctx context.Context) {
|
||||
stateCh := make(chan *feed.Event, 1)
|
||||
stateSub := d.stateNotifier.StateFeed().Subscribe(stateCh)
|
||||
defer stateSub.Unsubscribe()
|
||||
|
||||
for {
|
||||
select {
|
||||
case evt := <-stateCh:
|
||||
d.handleStateNotification(ctx, evt)
|
||||
case err := <-stateSub.Err():
|
||||
log.WithError(err).Error("DataColumnSampler1D subscription to state feed failed")
|
||||
case <-ctx.Done():
|
||||
log.Debug("Context canceled, exiting data column sampling loop.")
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Refresh peer information.
|
||||
func (d *dataColumnSampler1D) refreshPeerInfo() {
|
||||
d.Lock()
|
||||
defer d.Unlock()
|
||||
|
||||
activePeers := d.p2p.Peers().Active()
|
||||
d.prunePeerInfo(activePeers)
|
||||
|
||||
for _, pid := range activePeers {
|
||||
// Retrieve the custody group count of the peer.
|
||||
retrievedCustodyGroupCount := d.p2p.CustodyGroupCountFromPeer(pid)
|
||||
|
||||
// Look into our store the custody storedGroups for this peer.
|
||||
storedGroups, ok := d.groupsByPeer[pid]
|
||||
storedGroupsCount := uint64(len(storedGroups))
|
||||
|
||||
if ok && storedGroupsCount == retrievedCustodyGroupCount {
|
||||
// No change for this peer.
|
||||
continue
|
||||
}
|
||||
|
||||
nodeID, err := p2p.ConvertPeerIDToNodeID(pid)
|
||||
if err != nil {
|
||||
log.WithError(err).WithField("peerID", pid).Error("Failed to convert peer ID to node ID")
|
||||
continue
|
||||
}
|
||||
|
||||
// Retrieve the peer info.
|
||||
peerInfo, _, err := peerdas.Info(nodeID, retrievedCustodyGroupCount)
|
||||
if err != nil {
|
||||
log.WithError(err).WithField("peerID", pid.String()).Error("Failed to determine peer info")
|
||||
}
|
||||
|
||||
d.groupsByPeer[pid] = peerInfo.CustodyGroups
|
||||
for group := range peerInfo.CustodyGroups {
|
||||
d.peersByCustodyGroup[group][pid] = true
|
||||
}
|
||||
}
|
||||
|
||||
groupsWithoutPeers := make([]uint64, 0)
|
||||
for group, peers := range d.peersByCustodyGroup {
|
||||
if len(peers) == 0 {
|
||||
groupsWithoutPeers = append(groupsWithoutPeers, group)
|
||||
}
|
||||
}
|
||||
|
||||
if len(groupsWithoutPeers) > 0 {
|
||||
slices.Sort[[]uint64](groupsWithoutPeers)
|
||||
log.WithField("groups", groupsWithoutPeers).Warn("Some groups have no peers responsible for custody")
|
||||
}
|
||||
}
|
||||
|
||||
// prunePeerInfo prunes inactive peers from peerByGroup and groupByPeer.
|
||||
// This should not be called outside of refreshPeerInfo without being locked.
|
||||
func (d *dataColumnSampler1D) prunePeerInfo(activePeers []peer.ID) {
|
||||
active := make(map[peer.ID]bool)
|
||||
for _, pid := range activePeers {
|
||||
active[pid] = true
|
||||
}
|
||||
|
||||
for pid := range d.groupsByPeer {
|
||||
if !active[pid] {
|
||||
d.prunePeer(pid)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// prunePeer removes a peer from stored peer info map, it should be called with lock held.
|
||||
func (d *dataColumnSampler1D) prunePeer(pid peer.ID) {
|
||||
delete(d.groupsByPeer, pid)
|
||||
for _, peers := range d.peersByCustodyGroup {
|
||||
delete(peers, pid)
|
||||
}
|
||||
}
|
||||
|
||||
func (d *dataColumnSampler1D) handleStateNotification(ctx context.Context, event *feed.Event) {
|
||||
if event.Type != statefeed.BlockProcessed {
|
||||
return
|
||||
}
|
||||
|
||||
data, ok := event.Data.(*statefeed.BlockProcessedData)
|
||||
if !ok {
|
||||
log.Error("Event feed data is not of type *statefeed.BlockProcessedData")
|
||||
return
|
||||
}
|
||||
|
||||
if !data.Verified {
|
||||
// We only process blocks that have been verified
|
||||
log.Error("Data is not verified")
|
||||
return
|
||||
}
|
||||
|
||||
if data.SignedBlock.Version() < version.Fulu {
|
||||
log.Debug("Pre Fulu block, skipping data column sampling")
|
||||
return
|
||||
}
|
||||
|
||||
// Determine if we need to sample data columns for this block.
|
||||
beaconConfig := params.BeaconConfig()
|
||||
samplesPerSlots := beaconConfig.SamplesPerSlot
|
||||
halfOfCustodyGroups := beaconConfig.NumberOfCustodyGroups / 2
|
||||
nonCustodyGroupsCount := uint64(len(d.nonCustodyGroups))
|
||||
|
||||
if nonCustodyGroupsCount <= halfOfCustodyGroups {
|
||||
// Nothing to sample.
|
||||
return
|
||||
}
|
||||
|
||||
// Get the commitments for this block.
|
||||
commitments, err := data.SignedBlock.Block().Body().BlobKzgCommitments()
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Failed to get blob KZG commitments")
|
||||
return
|
||||
}
|
||||
|
||||
// Skip if there are no commitments.
|
||||
if len(commitments) == 0 {
|
||||
log.Debug("No commitments in block, skipping data column sampling")
|
||||
return
|
||||
}
|
||||
|
||||
// Randomize columns for sample selection.
|
||||
randomizedColumns, err := randomizeColumns(d.nonCustodyGroups)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Failed to randomize columns")
|
||||
return
|
||||
}
|
||||
|
||||
samplesCount := min(samplesPerSlots, nonCustodyGroupsCount-halfOfCustodyGroups)
|
||||
|
||||
// TODO: Use the first output of `incrementalDAS` as input of the fork choice rule.
|
||||
_, _, err = d.incrementalDAS(ctx, data, randomizedColumns, samplesCount)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Failed to run incremental DAS")
|
||||
}
|
||||
}
|
||||
|
||||
// incrementalDAS samples data columns from active peers using incremental DAS.
|
||||
// https://ethresear.ch/t/lossydas-lossy-incremental-and-diagonal-sampling-for-data-availability/18963#incrementaldas-dynamically-increase-the-sample-size-10
|
||||
// According to https://github.com/ethereum/consensus-specs/issues/3825, we're going to select query samples exclusively from the non custody columns.
|
||||
func (d *dataColumnSampler1D) incrementalDAS(
|
||||
ctx context.Context,
|
||||
blockProcessedData *statefeed.BlockProcessedData,
|
||||
columns []uint64,
|
||||
sampleCount uint64,
|
||||
) (bool, []roundSummary, error) {
|
||||
allowedFailures := uint64(0)
|
||||
firstColumnToSample, extendedSampleCount := uint64(0), peerdas.ExtendedSampleCount(sampleCount, allowedFailures)
|
||||
roundSummaries := make([]roundSummary, 0, 1) // We optimistically allocate only one round summary.
|
||||
blockRoot := blockProcessedData.BlockRoot
|
||||
columnsCount := uint64(len(columns))
|
||||
|
||||
start := time.Now()
|
||||
|
||||
for round := 1; ; /*No exit condition */ round++ {
|
||||
if extendedSampleCount > columnsCount {
|
||||
// We already tried to sample all possible columns, this is the unhappy path.
|
||||
log.WithFields(logrus.Fields{
|
||||
"root": fmt.Sprintf("%#x", blockRoot),
|
||||
"round": round - 1,
|
||||
}).Warning("Some columns are still missing after trying to sample all possible columns")
|
||||
return false, roundSummaries, nil
|
||||
}
|
||||
|
||||
// Get the columns to sample for this round.
|
||||
columnsToSample := columns[firstColumnToSample:extendedSampleCount]
|
||||
columnsToSampleCount := extendedSampleCount - firstColumnToSample
|
||||
|
||||
log.WithFields(logrus.Fields{
|
||||
"root": fmt.Sprintf("%#x", blockRoot),
|
||||
"columns": columnsToSample,
|
||||
"round": round,
|
||||
}).Debug("Start data columns sampling")
|
||||
|
||||
// Sample data columns from peers in parallel.
|
||||
retrievedSamples, err := d.sampleDataColumns(ctx, blockProcessedData, columnsToSample)
|
||||
if err != nil {
|
||||
return false, nil, errors.Wrap(err, "sample data columns")
|
||||
}
|
||||
|
||||
missingSamples := make(map[uint64]bool)
|
||||
for _, column := range columnsToSample {
|
||||
if !retrievedSamples[column] {
|
||||
missingSamples[column] = true
|
||||
}
|
||||
}
|
||||
|
||||
roundSummaries = append(roundSummaries, roundSummary{
|
||||
RequestedColumns: columnsToSample,
|
||||
MissingColumns: missingSamples,
|
||||
})
|
||||
|
||||
retrievedSampleCount := uint64(len(retrievedSamples))
|
||||
if retrievedSampleCount == columnsToSampleCount {
|
||||
// All columns were correctly sampled, this is the happy path.
|
||||
log.WithFields(logrus.Fields{
|
||||
"root": fmt.Sprintf("%#x", blockRoot),
|
||||
"neededRounds": round,
|
||||
"duration": time.Since(start),
|
||||
}).Debug("All columns were successfully sampled")
|
||||
return true, roundSummaries, nil
|
||||
}
|
||||
|
||||
if retrievedSampleCount > columnsToSampleCount {
|
||||
// This should never happen.
|
||||
return false, nil, errors.New("retrieved more columns than requested")
|
||||
}
|
||||
|
||||
// There is still some missing columns, extend the samples.
|
||||
allowedFailures += columnsToSampleCount - retrievedSampleCount
|
||||
oldExtendedSampleCount := extendedSampleCount
|
||||
firstColumnToSample = extendedSampleCount
|
||||
extendedSampleCount = peerdas.ExtendedSampleCount(sampleCount, allowedFailures)
|
||||
|
||||
log.WithFields(logrus.Fields{
|
||||
"root": fmt.Sprintf("%#x", blockRoot),
|
||||
"round": round,
|
||||
"missingColumnsCount": allowedFailures,
|
||||
"currentSampleIndex": oldExtendedSampleCount,
|
||||
"nextSampleIndex": extendedSampleCount,
|
||||
}).Debug("Some columns are still missing after sampling this round.")
|
||||
}
|
||||
}
|
||||
|
||||
func (d *dataColumnSampler1D) sampleDataColumns(
|
||||
ctx context.Context,
|
||||
blockProcessedData *statefeed.BlockProcessedData,
|
||||
columns []uint64,
|
||||
) (map[uint64]bool, error) {
|
||||
// distribute samples to peer
|
||||
peerToColumns, err := d.distributeSamplesToPeer(columns)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "distribute samples to peer")
|
||||
}
|
||||
|
||||
var (
|
||||
mu sync.Mutex
|
||||
wg sync.WaitGroup
|
||||
)
|
||||
|
||||
res := make(map[uint64]bool)
|
||||
|
||||
sampleFromPeer := func(pid peer.ID, cols map[uint64]bool) {
|
||||
defer wg.Done()
|
||||
retrieved := d.sampleDataColumnsFromPeer(ctx, pid, blockProcessedData, cols)
|
||||
|
||||
mu.Lock()
|
||||
for col := range retrieved {
|
||||
res[col] = true
|
||||
}
|
||||
mu.Unlock()
|
||||
}
|
||||
|
||||
// sample from peers in parallel
|
||||
for pid, cols := range peerToColumns {
|
||||
wg.Add(1)
|
||||
go sampleFromPeer(pid, cols)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
return res, nil
|
||||
}
|
||||
|
||||
// distributeSamplesToPeer distributes samples to peers based on the columns they are responsible for.
|
||||
// Currently it randomizes peer selection for a column and did not take into account whole peer distribution balance. It could be improved if needed.
|
||||
func (d *dataColumnSampler1D) distributeSamplesToPeer(columns []uint64) (map[peer.ID]map[uint64]bool, error) {
|
||||
dist := make(map[peer.ID]map[uint64]bool)
|
||||
|
||||
for _, column := range columns {
|
||||
custodyGroup, err := peerdas.ComputeCustodyGroupForColumn(column)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "compute custody group for column")
|
||||
}
|
||||
|
||||
peers := d.peersByCustodyGroup[custodyGroup]
|
||||
if len(peers) == 0 {
|
||||
log.WithField("column", column).Warning("No peers responsible for custody of column")
|
||||
continue
|
||||
}
|
||||
|
||||
pid, err := selectRandomPeer(peers)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "select random peer")
|
||||
}
|
||||
|
||||
if _, ok := dist[pid]; !ok {
|
||||
dist[pid] = make(map[uint64]bool)
|
||||
}
|
||||
|
||||
dist[pid][column] = true
|
||||
}
|
||||
|
||||
return dist, nil
|
||||
}
|
||||
|
||||
func (d *dataColumnSampler1D) sampleDataColumnsFromPeer(
|
||||
ctx context.Context,
|
||||
pid peer.ID,
|
||||
blockProcessedData *statefeed.BlockProcessedData,
|
||||
requestedColumns map[uint64]bool,
|
||||
) map[uint64]bool {
|
||||
retrievedColumns := make(map[uint64]bool)
|
||||
|
||||
cols := make([]uint64, 0, len(requestedColumns))
|
||||
for col := range requestedColumns {
|
||||
cols = append(cols, col)
|
||||
}
|
||||
req := ð.DataColumnsByRootIdentifier{
|
||||
BlockRoot: blockProcessedData.BlockRoot[:],
|
||||
Columns: cols,
|
||||
}
|
||||
|
||||
// Send the request to the peer.
|
||||
roDataColumns, err := SendDataColumnSidecarsByRootRequest(ctx, d.clock, d.p2p, pid, d.ctxMap, types.DataColumnsByRootIdentifiers{req})
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Failed to send data column sidecar by root")
|
||||
return nil
|
||||
}
|
||||
|
||||
// TODO: Once peer sampling is used, we should verify all sampled data columns in a single batch instead of looping over columns.
|
||||
for _, roDataColumn := range roDataColumns {
|
||||
if verifyColumn(roDataColumn, blockProcessedData, pid, requestedColumns, d.columnVerifier) {
|
||||
retrievedColumns[roDataColumn.Index] = true
|
||||
}
|
||||
}
|
||||
|
||||
if len(retrievedColumns) == len(requestedColumns) {
|
||||
log.WithFields(logrus.Fields{
|
||||
"peerID": pid,
|
||||
"root": fmt.Sprintf("%#x", blockProcessedData.BlockRoot),
|
||||
"requestedColumns": sliceFromMap(requestedColumns, true /*sorted*/),
|
||||
}).Debug("Sampled columns from peer successfully")
|
||||
} else {
|
||||
log.WithFields(logrus.Fields{
|
||||
"peerID": pid,
|
||||
"root": fmt.Sprintf("%#x", blockProcessedData.BlockRoot),
|
||||
"requestedColumns": sliceFromMap(requestedColumns, true /*sorted*/),
|
||||
"retrievedColumns": sliceFromMap(retrievedColumns, true /*sorted*/),
|
||||
}).Debug("Sampled columns from peer with some errors")
|
||||
}
|
||||
|
||||
return retrievedColumns
|
||||
}
|
||||
|
||||
// randomizeColumns returns a slice containing randomly ordered columns belonging to the input `groups`.
|
||||
func randomizeColumns(custodyGroups map[uint64]bool) ([]uint64, error) {
|
||||
// Compute the number of columns per group.
|
||||
numberOfColumns := params.BeaconConfig().NumberOfColumns
|
||||
numberOfCustodyGroups := params.BeaconConfig().NumberOfCustodyGroups
|
||||
columnsPerGroup := numberOfColumns / numberOfCustodyGroups
|
||||
|
||||
// Compute the number of columns.
|
||||
groupCount := uint64(len(custodyGroups))
|
||||
expectedColumnCount := groupCount * columnsPerGroup
|
||||
|
||||
// Compute the columns.
|
||||
columns := make([]uint64, 0, expectedColumnCount)
|
||||
for group := range custodyGroups {
|
||||
columnsGroup, err := peerdas.ComputeColumnsForCustodyGroup(group)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "compute columns for custody group")
|
||||
}
|
||||
|
||||
columns = append(columns, columnsGroup...)
|
||||
}
|
||||
|
||||
actualColumnCount := len(columns)
|
||||
|
||||
// Safety check.
|
||||
if uint64(actualColumnCount) != expectedColumnCount {
|
||||
return nil, errors.New("invalid number of columns, should never happen")
|
||||
}
|
||||
|
||||
// Shuffle the columns.
|
||||
rand.NewGenerator().Shuffle(actualColumnCount, func(i, j int) {
|
||||
columns[i], columns[j] = columns[j], columns[i]
|
||||
})
|
||||
|
||||
return columns, nil
|
||||
}
|
||||
|
||||
// sliceFromMap returns a sorted list of keys from a map.
|
||||
func sliceFromMap(m map[uint64]bool, sorted ...bool) []uint64 {
|
||||
result := make([]uint64, 0, len(m))
|
||||
for k := range m {
|
||||
result = append(result, k)
|
||||
}
|
||||
|
||||
if len(sorted) > 0 && sorted[0] {
|
||||
slices.Sort(result)
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
// selectRandomPeer returns a random peer from the given list of peers.
|
||||
func selectRandomPeer(peers map[peer.ID]bool) (peer.ID, error) {
|
||||
peersCount := uint64(len(peers))
|
||||
pick := rand.NewGenerator().Uint64() % peersCount
|
||||
|
||||
for peer := range peers {
|
||||
if pick == 0 {
|
||||
return peer, nil
|
||||
}
|
||||
|
||||
pick--
|
||||
}
|
||||
|
||||
// This should never be reached.
|
||||
return peer.ID(""), errors.New("failed to select random peer")
|
||||
}
|
||||
|
||||
// verifyColumn verifies the retrieved column against the root, the index,
|
||||
// the KZG inclusion and the KZG proof.
|
||||
func verifyColumn(
|
||||
roDataColumn blocks.RODataColumn,
|
||||
blockProcessedData *statefeed.BlockProcessedData,
|
||||
pid peer.ID,
|
||||
requestedColumns map[uint64]bool,
|
||||
newDataColumnsVerifier verification.NewDataColumnsVerifier,
|
||||
) bool {
|
||||
retrievedColumn := roDataColumn.Index
|
||||
|
||||
// Filter out columns that were not requested.
|
||||
if !requestedColumns[retrievedColumn] {
|
||||
columnsToSampleList := sliceFromMap(requestedColumns, true /*sorted*/)
|
||||
|
||||
log.WithFields(logrus.Fields{
|
||||
"peerID": pid,
|
||||
"requestedColumns": columnsToSampleList,
|
||||
"retrievedColumn": retrievedColumn,
|
||||
}).Debug("Retrieved column was not requested")
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
roBlock, err := blocks.NewROBlock(blockProcessedData.SignedBlock)
|
||||
if err != nil {
|
||||
log.WithError(err).WithField("peerID", pid).Error("Failed to create ROBlock")
|
||||
}
|
||||
|
||||
roDataColumns := []blocks.RODataColumn{roDataColumn}
|
||||
|
||||
if err := peerdas.DataColumnsAlignWithBlock(roBlock, roDataColumns); err != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
// https://github.com/ethereum/consensus-specs/blob/dev/specs/fulu/p2p-interface.md#datacolumnsidecarsbyroot-v1
|
||||
verifier := newDataColumnsVerifier(roDataColumns, verification.ByRootRequestDataColumnSidecarRequirements)
|
||||
|
||||
if err := verifier.ValidFields(); err != nil {
|
||||
log.WithError(err).WithField("peerID", pid).Error("Failed to verify data column")
|
||||
}
|
||||
|
||||
if err := verifier.SidecarInclusionProven(); err != nil {
|
||||
log.WithError(err).WithField("peerID", pid).Error("Failed to prove inclusion")
|
||||
}
|
||||
|
||||
if err := verifier.SidecarKzgProofVerified(); err != nil {
|
||||
log.WithError(err).WithField("peerID", pid).Error("Failed to verify KZG proof")
|
||||
}
|
||||
|
||||
_, err = verifier.VerifiedRODataColumns()
|
||||
if err != nil {
|
||||
log.WithError(err).WithField("peerID", pid).Error("Failed to upgrade RODataColumns to VerifiedRODataColumns - should never happen")
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
554
beacon-chain/sync/data_columns_sampling_test.go
Normal file
554
beacon-chain/sync/data_columns_sampling_test.go
Normal file
@@ -0,0 +1,554 @@
|
||||
package sync
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/sha256"
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain/kzg"
|
||||
mock "github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain/testing"
|
||||
statefeed "github.com/OffchainLabs/prysm/v6/beacon-chain/core/feed/state"
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/peerdas"
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p"
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers"
|
||||
p2ptest "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/testing"
|
||||
p2pTypes "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/types"
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/startup"
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/verification"
|
||||
"github.com/OffchainLabs/prysm/v6/config/params"
|
||||
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
|
||||
ethpb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1"
|
||||
"github.com/OffchainLabs/prysm/v6/runtime/version"
|
||||
"github.com/OffchainLabs/prysm/v6/testing/require"
|
||||
"github.com/OffchainLabs/prysm/v6/testing/util"
|
||||
"github.com/consensys/gnark-crypto/ecc/bls12-381/fr"
|
||||
GoKZG "github.com/crate-crypto/go-kzg-4844"
|
||||
"github.com/ethereum/go-ethereum/p2p/enr"
|
||||
"github.com/libp2p/go-libp2p"
|
||||
"github.com/libp2p/go-libp2p/core/crypto"
|
||||
"github.com/libp2p/go-libp2p/core/network"
|
||||
)
|
||||
|
||||
func TestRandomizeColumns(t *testing.T) {
|
||||
const count uint64 = 128
|
||||
|
||||
// Generate groups.
|
||||
groups := make(map[uint64]bool, count)
|
||||
for i := uint64(0); i < count; i++ {
|
||||
groups[i] = true
|
||||
}
|
||||
|
||||
// Randomize columns.
|
||||
randomizedColumns, err := randomizeColumns(groups)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Convert back to a map.
|
||||
randomizedColumnsMap := make(map[uint64]bool, count)
|
||||
for _, column := range randomizedColumns {
|
||||
randomizedColumnsMap[column] = true
|
||||
}
|
||||
|
||||
// Check duplicates and missing columns.
|
||||
require.Equal(t, len(groups), len(randomizedColumnsMap))
|
||||
|
||||
// Check the values.
|
||||
for column := range randomizedColumnsMap {
|
||||
require.Equal(t, true, column < count)
|
||||
}
|
||||
}
|
||||
|
||||
// createAndConnectPeer creates a peer with a private key `offset` fixed.
|
||||
// The peer is added and connected to `p2pService`.
|
||||
// If a `RPCDataColumnSidecarsByRootTopicV1` request is made with column index `i`,
|
||||
// then the peer will respond with the `dataColumnSidecars[i]` if it is not in `columnsNotToRespond`.
|
||||
// (If `len(dataColumnSidecars) < i`, then this function will panic.)
|
||||
func createAndConnectPeer(
|
||||
t *testing.T,
|
||||
p2pService *p2ptest.TestP2P,
|
||||
chainService *mock.ChainService,
|
||||
dataColumnSidecars []*ethpb.DataColumnSidecar,
|
||||
custodyGroupCount uint64,
|
||||
columnsNotToRespond map[uint64]bool,
|
||||
offset int,
|
||||
) *p2ptest.TestP2P {
|
||||
// Create the private key, depending on the offset.
|
||||
privateKeyBytes := make([]byte, 32)
|
||||
for i := 0; i < 32; i++ {
|
||||
privateKeyBytes[i] = byte(offset + i)
|
||||
}
|
||||
|
||||
privateKey, err := crypto.UnmarshalSecp256k1PrivateKey(privateKeyBytes)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Create the peer.
|
||||
peer := p2ptest.NewTestP2P(t, libp2p.Identity(privateKey))
|
||||
|
||||
peer.SetStreamHandler(p2p.RPCDataColumnSidecarsByRootTopicV1+"/ssz_snappy", func(stream network.Stream) {
|
||||
// Decode the request.
|
||||
req := new(p2pTypes.DataColumnsByRootIdentifiers)
|
||||
err := peer.Encoding().DecodeWithMaxLength(stream, req)
|
||||
require.NoError(t, err)
|
||||
|
||||
for _, identifier := range *req {
|
||||
for _, column := range identifier.Columns {
|
||||
// Filter out the columns not to respond.
|
||||
if columnsNotToRespond[column] {
|
||||
continue
|
||||
}
|
||||
|
||||
// Create the response.
|
||||
resp := dataColumnSidecars[column]
|
||||
|
||||
// Send the response.
|
||||
err := WriteDataColumnSidecarChunk(stream, chainService, p2pService.Encoding(), resp)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
}
|
||||
|
||||
// Close the stream.
|
||||
closeStream(stream, log)
|
||||
})
|
||||
|
||||
// Create the record and set the custody count.
|
||||
enr := &enr.Record{}
|
||||
enr.Set(peerdas.Cgc(custodyGroupCount))
|
||||
|
||||
// Add the peer and connect it.
|
||||
p2pService.Peers().Add(enr, peer.PeerID(), nil, network.DirOutbound)
|
||||
p2pService.Peers().SetConnectionState(peer.PeerID(), peers.Connected)
|
||||
p2pService.Connect(peer)
|
||||
|
||||
return peer
|
||||
}
|
||||
|
||||
type dataSamplerTest struct {
|
||||
ctx context.Context
|
||||
p2pSvc *p2ptest.TestP2P
|
||||
peers []*p2ptest.TestP2P
|
||||
ctxMap map[[4]byte]int
|
||||
chainSvc *mock.ChainService
|
||||
blockProcessedData *statefeed.BlockProcessedData
|
||||
blobs []kzg.Blob
|
||||
kzgCommitments [][]byte
|
||||
kzgProofs [][]byte
|
||||
dataColumnSidecars []*ethpb.DataColumnSidecar
|
||||
}
|
||||
|
||||
func setupDefaultDataColumnSamplerTest(t *testing.T) (*dataSamplerTest, *dataColumnSampler1D) {
|
||||
const (
|
||||
blobCount uint64 = 3
|
||||
custodyRequirement uint64 = 4
|
||||
)
|
||||
|
||||
test, sampler := setupDataColumnSamplerTest(t, blobCount)
|
||||
|
||||
// Custody columns: [6, 38, 70, 102]
|
||||
p1 := createAndConnectPeer(t, test.p2pSvc, test.chainSvc, test.dataColumnSidecars, custodyRequirement, map[uint64]bool{}, 1)
|
||||
|
||||
// Custody columns: [3, 35, 67, 99]
|
||||
p2 := createAndConnectPeer(t, test.p2pSvc, test.chainSvc, test.dataColumnSidecars, custodyRequirement, map[uint64]bool{}, 2)
|
||||
|
||||
// Custody columns: [12, 44, 76, 108]
|
||||
p3 := createAndConnectPeer(t, test.p2pSvc, test.chainSvc, test.dataColumnSidecars, custodyRequirement, map[uint64]bool{}, 3)
|
||||
|
||||
test.peers = []*p2ptest.TestP2P{p1, p2, p3}
|
||||
|
||||
return test, sampler
|
||||
}
|
||||
|
||||
func setupDataColumnSamplerTest(t *testing.T, blobCount uint64) (*dataSamplerTest, *dataColumnSampler1D) {
|
||||
require.NoError(t, kzg.Start())
|
||||
|
||||
// Generate random blobs, commitments and inclusion proofs.
|
||||
blobs := make([]kzg.Blob, blobCount)
|
||||
kzgCommitments := make([][]byte, blobCount)
|
||||
kzgProofs := make([][]byte, blobCount)
|
||||
|
||||
for i := uint64(0); i < blobCount; i++ {
|
||||
blob := getRandBlob(t, int64(i))
|
||||
|
||||
kzgCommitment, kzgProof, err := generateCommitmentAndProof(&blob)
|
||||
require.NoError(t, err)
|
||||
|
||||
blobs[i] = blob
|
||||
kzgCommitments[i] = kzgCommitment[:]
|
||||
kzgProofs[i] = kzgProof[:]
|
||||
}
|
||||
|
||||
dbBlock := util.NewBeaconBlockDeneb()
|
||||
dbBlock.Block.Body.BlobKzgCommitments = kzgCommitments
|
||||
sBlock, err := blocks.NewSignedBeaconBlock(dbBlock)
|
||||
require.NoError(t, err)
|
||||
|
||||
cellsAndProofs := util.GenerateCellsAndProofs(t, blobs)
|
||||
dataColumnSidecars, err := peerdas.DataColumnSidecars(sBlock, cellsAndProofs)
|
||||
require.NoError(t, err)
|
||||
|
||||
blockRoot, err := dataColumnSidecars[0].GetSignedBlockHeader().Header.HashTreeRoot()
|
||||
require.NoError(t, err)
|
||||
|
||||
blockProcessedData := &statefeed.BlockProcessedData{
|
||||
BlockRoot: blockRoot,
|
||||
SignedBlock: sBlock,
|
||||
}
|
||||
|
||||
p2pSvc := p2ptest.NewTestP2P(t)
|
||||
chainSvc, clock := defaultMockChain(t)
|
||||
|
||||
test := &dataSamplerTest{
|
||||
ctx: context.Background(),
|
||||
p2pSvc: p2pSvc,
|
||||
peers: []*p2ptest.TestP2P{},
|
||||
ctxMap: map[[4]byte]int{{245, 165, 253, 66}: version.Fulu},
|
||||
chainSvc: chainSvc,
|
||||
blockProcessedData: blockProcessedData,
|
||||
blobs: blobs,
|
||||
kzgCommitments: kzgCommitments,
|
||||
kzgProofs: kzgProofs,
|
||||
dataColumnSidecars: dataColumnSidecars,
|
||||
}
|
||||
clockSync := startup.NewClockSynchronizer()
|
||||
require.NoError(t, clockSync.SetClock(clock))
|
||||
iniWaiter := verification.NewInitializerWaiter(clockSync, nil, nil)
|
||||
ini, err := iniWaiter.WaitForInitializer(context.Background())
|
||||
require.NoError(t, err)
|
||||
sampler := newDataColumnSampler1D(p2pSvc, clock, test.ctxMap, nil, newDataColumnsVerifierFromInitializer(ini), &peerdas.CustodyInfo{})
|
||||
|
||||
return test, sampler
|
||||
}
|
||||
|
||||
func TestDataColumnSampler1D_PeerManagement(t *testing.T) {
|
||||
testCases := []struct {
|
||||
name string
|
||||
numPeers int
|
||||
custodyRequirement uint64
|
||||
expectedColumns [][]uint64
|
||||
prunePeers map[int]bool // Peers to prune.
|
||||
}{
|
||||
{
|
||||
name: "custodyRequirement=4",
|
||||
numPeers: 3,
|
||||
custodyRequirement: 4,
|
||||
expectedColumns: [][]uint64{
|
||||
{6, 37, 48, 113},
|
||||
{35, 79, 92, 109},
|
||||
{31, 44, 58, 97},
|
||||
},
|
||||
prunePeers: map[int]bool{
|
||||
0: true,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "custodyRequirement=8",
|
||||
numPeers: 3,
|
||||
custodyRequirement: 8,
|
||||
expectedColumns: [][]uint64{
|
||||
{1, 6, 37, 48, 51, 87, 112, 113},
|
||||
{24, 25, 35, 52, 79, 92, 109, 126},
|
||||
{31, 44, 58, 64, 91, 97, 116, 127},
|
||||
},
|
||||
prunePeers: map[int]bool{
|
||||
0: true,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
params.SetupTestConfigCleanup(t)
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
cfg := params.BeaconConfig()
|
||||
cfg.CustodyRequirement = tc.custodyRequirement
|
||||
params.OverrideBeaconConfig(cfg)
|
||||
test, sampler := setupDataColumnSamplerTest(t, uint64(tc.numPeers))
|
||||
for i := 0; i < tc.numPeers; i++ {
|
||||
p := createAndConnectPeer(t, test.p2pSvc, test.chainSvc, test.dataColumnSidecars, tc.custodyRequirement, nil, i+1)
|
||||
test.peers = append(test.peers, p)
|
||||
}
|
||||
|
||||
// confirm everything works
|
||||
sampler.refreshPeerInfo()
|
||||
require.Equal(t, params.BeaconConfig().NumberOfColumns, uint64(len(sampler.peersByCustodyGroup)))
|
||||
|
||||
require.Equal(t, tc.numPeers, len(sampler.groupsByPeer))
|
||||
for i, peer := range test.peers {
|
||||
// confirm peer has the expected columns
|
||||
require.Equal(t, len(tc.expectedColumns[i]), len(sampler.groupsByPeer[peer.PeerID()]))
|
||||
for _, column := range tc.expectedColumns[i] {
|
||||
require.Equal(t, true, sampler.groupsByPeer[peer.PeerID()][column])
|
||||
}
|
||||
|
||||
// confirm column to peer mapping are correct
|
||||
for _, column := range tc.expectedColumns[i] {
|
||||
require.Equal(t, true, sampler.peersByCustodyGroup[column][peer.PeerID()])
|
||||
}
|
||||
}
|
||||
|
||||
// prune peers
|
||||
for peer := range tc.prunePeers {
|
||||
err := test.p2pSvc.Disconnect(test.peers[peer].PeerID())
|
||||
test.p2pSvc.Peers().SetConnectionState(test.peers[peer].PeerID(), peers.Disconnected)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
sampler.refreshPeerInfo()
|
||||
|
||||
require.Equal(t, tc.numPeers-len(tc.prunePeers), len(sampler.groupsByPeer))
|
||||
for i, peer := range test.peers {
|
||||
for _, column := range tc.expectedColumns[i] {
|
||||
expected := true
|
||||
if tc.prunePeers[i] {
|
||||
expected = false
|
||||
}
|
||||
require.Equal(t, expected, sampler.peersByCustodyGroup[column][peer.PeerID()])
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestDataColumnSampler1D_SampleDistribution(t *testing.T) {
|
||||
// TODO: Use `t.Run`.
|
||||
testCases := []struct {
|
||||
numPeers int
|
||||
custodyRequirement uint64
|
||||
columnsToDistribute [][]uint64
|
||||
expectedDistribution []map[int][]uint64
|
||||
}{
|
||||
{
|
||||
numPeers: 3,
|
||||
custodyRequirement: 4,
|
||||
// peer custody maps
|
||||
// p0: {6, 37, 48, 113},
|
||||
// p1: {35, 79, 92, 109},
|
||||
// p2: {31, 44, 58, 97},
|
||||
columnsToDistribute: [][]uint64{
|
||||
{6, 35, 31},
|
||||
{6, 48, 79, 109, 31, 97},
|
||||
{6, 37, 113},
|
||||
{11},
|
||||
},
|
||||
expectedDistribution: []map[int][]uint64{
|
||||
{
|
||||
0: {6}, // p0
|
||||
1: {35}, // p1
|
||||
2: {31}, // p2
|
||||
},
|
||||
{
|
||||
0: {6, 48}, // p0
|
||||
1: {79, 109}, // p1
|
||||
2: {31, 97}, // p2
|
||||
},
|
||||
{
|
||||
0: {6, 37, 113}, // p0
|
||||
},
|
||||
{},
|
||||
},
|
||||
},
|
||||
{
|
||||
numPeers: 3,
|
||||
custodyRequirement: 8,
|
||||
// peer custody maps
|
||||
// p0: {6, 37, 48, 113, 1, 112, 87, 51},
|
||||
// p1: {35, 79, 92, 109, 52, 126, 25, 24},
|
||||
// p2: {31, 44, 58, 97, 116, 91, 64, 127},
|
||||
columnsToDistribute: [][]uint64{
|
||||
{6, 48, 79, 25, 24, 97}, // all covered by peers
|
||||
{6, 35, 31, 32}, // `32` is not in covered by peers
|
||||
},
|
||||
expectedDistribution: []map[int][]uint64{
|
||||
{
|
||||
0: {6, 48}, // p0
|
||||
1: {79, 25, 24}, // p1
|
||||
2: {97}, // p2
|
||||
},
|
||||
{
|
||||
0: {6}, // p0
|
||||
1: {35}, // p1
|
||||
2: {31}, // p2
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
params.SetupTestConfigCleanup(t)
|
||||
for _, tc := range testCases {
|
||||
cfg := params.BeaconConfig()
|
||||
cfg.CustodyRequirement = tc.custodyRequirement
|
||||
params.OverrideBeaconConfig(cfg)
|
||||
test, sampler := setupDataColumnSamplerTest(t, uint64(tc.numPeers))
|
||||
for i := 0; i < tc.numPeers; i++ {
|
||||
p := createAndConnectPeer(t, test.p2pSvc, test.chainSvc, test.dataColumnSidecars, tc.custodyRequirement, nil, i+1)
|
||||
test.peers = append(test.peers, p)
|
||||
}
|
||||
sampler.refreshPeerInfo()
|
||||
|
||||
for idx, columns := range tc.columnsToDistribute {
|
||||
result, err := sampler.distributeSamplesToPeer(columns)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, len(tc.expectedDistribution[idx]), len(result), fmt.Sprintf("%v - %v", tc.expectedDistribution[idx], result))
|
||||
|
||||
for peerIdx, dist := range tc.expectedDistribution[idx] {
|
||||
for _, column := range dist {
|
||||
peerID := test.peers[peerIdx].PeerID()
|
||||
require.Equal(t, true, result[peerID][column])
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestDataColumnSampler1D_SampleDataColumns(t *testing.T) {
|
||||
test, sampler := setupDefaultDataColumnSamplerTest(t)
|
||||
sampler.refreshPeerInfo()
|
||||
|
||||
t.Run("sample all columns", func(t *testing.T) {
|
||||
sampleColumns := []uint64{6, 35, 31, 37, 79, 44, 48, 92, 58, 113, 109, 97}
|
||||
retrieved, err := sampler.sampleDataColumns(test.ctx, test.blockProcessedData, sampleColumns)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 12, len(retrieved))
|
||||
for _, column := range sampleColumns {
|
||||
require.Equal(t, true, retrieved[column])
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("sample a subset of columns", func(t *testing.T) {
|
||||
sampleColumns := []uint64{35, 31, 79, 48, 113, 97}
|
||||
retrieved, err := sampler.sampleDataColumns(test.ctx, test.blockProcessedData, sampleColumns)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 6, len(retrieved))
|
||||
for _, column := range sampleColumns {
|
||||
require.Equal(t, true, retrieved[column])
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("sample a subset of columns with missing columns", func(t *testing.T) {
|
||||
sampleColumns := []uint64{35, 31, 100, 79}
|
||||
retrieved, err := sampler.sampleDataColumns(test.ctx, test.blockProcessedData, sampleColumns)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 3, len(retrieved))
|
||||
require.DeepEqual(t, map[uint64]bool{35: true, 31: true, 79: true}, retrieved)
|
||||
})
|
||||
}
|
||||
|
||||
func TestDataColumnSampler1D_IncrementalDAS(t *testing.T) {
|
||||
params.SetupTestConfigCleanup(t)
|
||||
cfg := params.BeaconConfig()
|
||||
cfg.DataColumnSidecarSubnetCount = 32
|
||||
params.OverrideBeaconConfig(cfg)
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
samplesCount uint64
|
||||
possibleColumnsToRequest []uint64
|
||||
columnsNotToRespond map[uint64]bool
|
||||
expectedSuccess bool
|
||||
expectedRoundSummaries []roundSummary
|
||||
}{
|
||||
{
|
||||
name: "All columns are correctly sampled in a single round",
|
||||
samplesCount: 5,
|
||||
possibleColumnsToRequest: []uint64{6, 35, 31, 37, 79, 44, 48, 92, 58, 113, 109, 97},
|
||||
columnsNotToRespond: map[uint64]bool{},
|
||||
expectedSuccess: true,
|
||||
expectedRoundSummaries: []roundSummary{
|
||||
{
|
||||
RequestedColumns: []uint64{6, 35, 31, 37, 79},
|
||||
MissingColumns: map[uint64]bool{},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Two missing columns in the first round, ok in the second round",
|
||||
samplesCount: 5,
|
||||
possibleColumnsToRequest: []uint64{6, 35, 31, 37, 79, 44, 48, 92, 58, 113, 109, 97},
|
||||
columnsNotToRespond: map[uint64]bool{6: true, 31: true},
|
||||
expectedSuccess: true,
|
||||
expectedRoundSummaries: []roundSummary{
|
||||
{
|
||||
RequestedColumns: []uint64{6, 35, 31, 37, 79},
|
||||
MissingColumns: map[uint64]bool{6: true, 31: true},
|
||||
},
|
||||
{
|
||||
RequestedColumns: []uint64{44, 48, 92, 58, 113, 109},
|
||||
MissingColumns: map[uint64]bool{},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Two missing columns in the first round, one missing in the second round. Fail to sample.",
|
||||
samplesCount: 5,
|
||||
possibleColumnsToRequest: []uint64{6, 35, 31, 37, 79, 44, 48, 92, 58, 113, 109, 97},
|
||||
columnsNotToRespond: map[uint64]bool{6: true, 31: true, 48: true},
|
||||
expectedSuccess: false,
|
||||
expectedRoundSummaries: []roundSummary{
|
||||
{
|
||||
RequestedColumns: []uint64{6, 35, 31, 37, 79},
|
||||
MissingColumns: map[uint64]bool{6: true, 31: true},
|
||||
},
|
||||
{
|
||||
RequestedColumns: []uint64{44, 48, 92, 58, 113, 109},
|
||||
MissingColumns: map[uint64]bool{48: true},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
test, sampler := setupDataColumnSamplerTest(t, 3)
|
||||
p1 := createAndConnectPeer(t, test.p2pSvc, test.chainSvc, test.dataColumnSidecars, params.BeaconConfig().CustodyRequirement, tc.columnsNotToRespond, 1)
|
||||
p2 := createAndConnectPeer(t, test.p2pSvc, test.chainSvc, test.dataColumnSidecars, params.BeaconConfig().CustodyRequirement, tc.columnsNotToRespond, 2)
|
||||
p3 := createAndConnectPeer(t, test.p2pSvc, test.chainSvc, test.dataColumnSidecars, params.BeaconConfig().CustodyRequirement, tc.columnsNotToRespond, 3)
|
||||
test.peers = []*p2ptest.TestP2P{p1, p2, p3}
|
||||
|
||||
sampler.refreshPeerInfo()
|
||||
|
||||
success, summaries, err := sampler.incrementalDAS(test.ctx, test.blockProcessedData, tc.possibleColumnsToRequest, tc.samplesCount)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, tc.expectedSuccess, success)
|
||||
require.DeepEqual(t, tc.expectedRoundSummaries, summaries)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func deterministicRandomness(t *testing.T, seed int64) [32]byte {
|
||||
// Converts an int64 to a byte slice
|
||||
buf := new(bytes.Buffer)
|
||||
err := binary.Write(buf, binary.BigEndian, seed)
|
||||
require.NoError(t, err)
|
||||
bytes := buf.Bytes()
|
||||
|
||||
return sha256.Sum256(bytes)
|
||||
}
|
||||
|
||||
// Returns a serialized random field element in big-endian
|
||||
func getRandFieldElement(t *testing.T, seed int64) [32]byte {
|
||||
bytes := deterministicRandomness(t, seed)
|
||||
var r fr.Element
|
||||
r.SetBytes(bytes[:])
|
||||
|
||||
return GoKZG.SerializeScalar(r)
|
||||
}
|
||||
|
||||
// Returns a random blob using the passed seed as entropy
|
||||
func getRandBlob(t *testing.T, seed int64) kzg.Blob {
|
||||
var blob kzg.Blob
|
||||
for i := 0; i < len(blob); i += 32 {
|
||||
fieldElementBytes := getRandFieldElement(t, seed+int64(i))
|
||||
copy(blob[i:i+32], fieldElementBytes[:])
|
||||
}
|
||||
return blob
|
||||
}
|
||||
|
||||
func generateCommitmentAndProof(blob *kzg.Blob) (*kzg.Commitment, *kzg.Proof, error) {
|
||||
commitment, err := kzg.BlobToKZGCommitment(blob)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
proof, err := kzg.ComputeBlobKZGProof(blob, commitment)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
return &commitment, &proof, err
|
||||
}
|
||||
@@ -38,6 +38,15 @@ var (
|
||||
RequireSidecarProposerExpected,
|
||||
}
|
||||
|
||||
// ByRootRequestDataColumnSidecarRequirements defines the set of requirements that DataColumnSidecars received
|
||||
// via the by root request must satisfy in order to upgrade an RODataColumn to a VerifiedRODataColumn.
|
||||
// https://github.com/ethereum/consensus-specs/blob/dev/specs/fulu/p2p-interface.md#datacolumnsidecarsbyroot-v1
|
||||
ByRootRequestDataColumnSidecarRequirements = []Requirement{
|
||||
RequireValidFields,
|
||||
RequireSidecarInclusionProven,
|
||||
RequireSidecarKzgProofVerified,
|
||||
}
|
||||
|
||||
// ByRangeRequestDataColumnSidecarRequirements defines the set of requirements that DataColumnSidecars received
|
||||
// via the by range request must satisfy in order to upgrade an RODataColumn to a VerifiedRODataColumn.
|
||||
// https://github.com/ethereum/consensus-specs/blob/dev/specs/fulu/p2p-interface.md#datacolumnsidecarsbyrange-v1
|
||||
|
||||
2
changelog/manu-peerdas-peer-sampling.md
Normal file
2
changelog/manu-peerdas-peer-sampling.md
Normal file
@@ -0,0 +1,2 @@
|
||||
### Added
|
||||
- PeerDAS: Implement peer sampling.
|
||||
Reference in New Issue
Block a user