FetchDataColumnSidecars: If possible, try to reconstruct after retrieving sidecars from peers if not some are still missing. (#15593)

* `FetchDataColumnSidecars`: If possible, try to reconstruct after retrieving sidecars from peers if not some are still missing.

* `randomPeer`: Deterministic randomness.
Before this commit, `randomPeer` was non derterministic, even with a deterministic random source. There reason is we iterated over a map (which is fully random) and then stopped the iteration on a chosen random index (which can be deterministic if the random source is deterministic.)
After this commit, `randomPeer` and all its callers are fully deterministic when using a deterministic random source.

* Fix Potuz's comment.

* Fix James' comment.

* `tryReconstructFromStorageAndPeers`: Improve godoc.
This commit is contained in:
Manu NALEPA
2025-08-25 16:31:02 +02:00
committed by GitHub
parent 26d8b6b786
commit 240cd1d058
3 changed files with 152 additions and 63 deletions

View File

@@ -3,7 +3,6 @@ package sync
import (
"bytes"
"context"
"math"
"slices"
"sync"
"time"
@@ -64,7 +63,7 @@ func FetchDataColumnSidecars(
indices := sortedSliceFromMap(indicesMap)
slotsWithCommitments := make(map[primitives.Slot]bool)
indicesByRootToQuery := make(map[[fieldparams.RootLength]byte]map[uint64]bool)
missingIndicesByRoot := make(map[[fieldparams.RootLength]byte]map[uint64]bool)
indicesByRootStored := make(map[[fieldparams.RootLength]byte]map[uint64]bool)
result := make(map[[fieldparams.RootLength]byte][]blocks.VerifiedRODataColumn)
@@ -83,7 +82,7 @@ func FetchDataColumnSidecars(
root := roBlock.Root()
// Step 1: Get the requested sidecars for this root if available in storage
requestedColumns, err := tryGetDirectColumns(params.Storage, root, indices)
requestedColumns, err := tryGetStoredColumns(params.Storage, root, indices)
if err != nil {
return nil, errors.Wrapf(err, "try get direct columns for root %#x", root)
}
@@ -107,7 +106,7 @@ func FetchDataColumnSidecars(
indicesToQueryMap, indicesStoredMap := categorizeIndices(params.Storage, root, indices)
if len(indicesToQueryMap) > 0 {
indicesByRootToQuery[root] = indicesToQueryMap
missingIndicesByRoot[root] = indicesToQueryMap
}
if len(indicesStoredMap) > 0 {
indicesByRootStored[root] = indicesStoredMap
@@ -115,40 +114,57 @@ func FetchDataColumnSidecars(
}
// Early return if no sidecars need to be queried from peers.
if len(indicesByRootToQuery) == 0 {
if len(missingIndicesByRoot) == 0 {
return result, nil
}
// Step 3b: Request missing sidecars from peers.
start, count := time.Now(), computeTotalCount(indicesByRootToQuery)
fromPeersResult, err := tryRequestingColumnsFromPeers(params, roBlocks, slotsWithCommitments, indicesByRootToQuery)
start, count := time.Now(), computeTotalCount(missingIndicesByRoot)
fromPeersResult, err := tryRequestingColumnsFromPeers(params, roBlocks, slotsWithCommitments, missingIndicesByRoot)
if err != nil {
return nil, errors.Wrap(err, "request from peers")
}
log.WithFields(logrus.Fields{"duration": time.Since(start), "count": count}).Debug("Requested data column sidecars from peers")
for root, verifiedSidecars := range fromPeersResult {
result[root] = append(result[root], verifiedSidecars...)
// Step 3c: If needed, try to reconstruct missing sidecars from storage and fetched data.
fromReconstructionResult, err := tryReconstructFromStorageAndPeers(params.Storage, fromPeersResult, indicesMap, missingIndicesByRoot)
if err != nil {
return nil, errors.Wrap(err, "reconstruct from storage and peers")
}
// Step 3c: Load the stored sidecars.
for root, indicesStored := range indicesByRootStored {
requestedColumns, err := tryGetDirectColumns(params.Storage, root, sortedSliceFromMap(indicesStored))
for root, verifiedSidecars := range fromReconstructionResult {
result[root] = verifiedSidecars
}
for root := range fromPeersResult {
if _, ok := fromReconstructionResult[root]; ok {
// We already have what we need from peers + reconstruction
continue
}
result[root] = append(result[root], fromPeersResult[root]...)
storedIndices := indicesByRootStored[root]
if len(storedIndices) == 0 {
continue
}
storedColumns, err := tryGetStoredColumns(params.Storage, root, sortedSliceFromMap(storedIndices))
if err != nil {
return nil, errors.Wrapf(err, "try get direct columns for root %#x", root)
}
result[root] = append(result[root], requestedColumns...)
result[root] = append(result[root], storedColumns...)
}
return result, nil
}
// tryGetDirectColumns attempts to retrieve all requested columns directly from storage
// if they are all available. Returns the columns if successful, and nil if at least one
// tryGetStoredColumns attempts to retrieve all requested data column sidecars directly from storage
// if they are all available. Returns the sidecars if successful, and nil if at least one
// requested sidecar is not available in the storage.
func tryGetDirectColumns(storage filesystem.DataColumnStorageReader, blockRoot [fieldparams.RootLength]byte, indices []uint64) ([]blocks.VerifiedRODataColumn, error) {
func tryGetStoredColumns(storage filesystem.DataColumnStorageReader, blockRoot [fieldparams.RootLength]byte, indices []uint64) ([]blocks.VerifiedRODataColumn, error) {
// Check if all requested indices are present in cache
storedIndices := storage.Summary(blockRoot).Stored()
allRequestedPresent := true
@@ -234,9 +250,9 @@ func categorizeIndices(storage filesystem.DataColumnStorageReader, blockRoot [fi
// It explores the connected peers to find those that are expected to custody the requested columns
// and returns only when all requested columns are either retrieved or have been tried to be retrieved
// by all possible peers.
// Returns a map of block roots to their verified read-only data column sidecars and a map of block roots.
// Returns an error if at least one requested column could not be retrieved.
// WARNING: This function alters `missingIndicesByRoot`. The caller should NOT use it after running this function.
// WARNING: This function alters `missingIndicesByRoot` by removing successfully retrieved columns.
// After running this function, the user can check the content of the (modified) `missingIndicesByRoot` map
// to check if some sidecars are still missing.
func tryRequestingColumnsFromPeers(
p DataColumnSidecarsParams,
roBlocks []blocks.ROBlock,
@@ -284,8 +300,7 @@ func tryRequestingColumnsFromPeers(
}
// Remove the verified sidecars from the missing indices map and compute the new verified columns by root.
newMissingIndicesByRoot, localVerifiedColumnsByRoot := updateResults(verifiedRoDataColumnSidecars, missingIndicesByRoot)
missingIndicesByRoot = newMissingIndicesByRoot
localVerifiedColumnsByRoot := updateResults(verifiedRoDataColumnSidecars, missingIndicesByRoot)
for root, verifiedRoDataColumns := range localVerifiedColumnsByRoot {
verifiedColumnsByRoot[root] = append(verifiedColumnsByRoot[root], verifiedRoDataColumns...)
}
@@ -297,11 +312,67 @@ func tryRequestingColumnsFromPeers(
}
}
if len(missingIndicesByRoot) > 0 {
return nil, errors.New("not all requested data column sidecars were retrieved from peers")
return verifiedColumnsByRoot, nil
}
// tryReconstructFromStorageAndPeers attempts to reconstruct missing data column sidecars
// using the data available in the storage and the data fetched from peers.
// If, for at least one root, the reconstruction is not possible, an error is returned.
func tryReconstructFromStorageAndPeers(
storage filesystem.DataColumnStorageReader,
fromPeersByRoot map[[fieldparams.RootLength]byte][]blocks.VerifiedRODataColumn,
indices map[uint64]bool,
missingIndicesByRoot map[[fieldparams.RootLength]byte]map[uint64]bool,
) (map[[fieldparams.RootLength]byte][]blocks.VerifiedRODataColumn, error) {
if len(missingIndicesByRoot) == 0 {
// Nothing to do, return early.
return nil, nil
}
return verifiedColumnsByRoot, nil
minimumColumnsCountToReconstruct := peerdas.MinimumColumnCountToReconstruct()
start := time.Now()
result := make(map[[fieldparams.RootLength]byte][]blocks.VerifiedRODataColumn, len(missingIndicesByRoot))
for root := range missingIndicesByRoot {
// Check if a reconstruction is possible based on what we have from the store and fetched from peers.
summary := storage.Summary(root)
storedCount := summary.Count()
fetchedCount := uint64(len(fromPeersByRoot[root]))
if storedCount+fetchedCount < minimumColumnsCountToReconstruct {
return nil, errors.Errorf("cannot reconstruct all needed columns for root %#x. stored: %d, fetched: %d, minimum: %d", root, storedCount, fetchedCount, minimumColumnsCountToReconstruct)
}
// Load all we have in the store.
storedSidecars, err := storage.Get(root, nil)
if err != nil {
return nil, errors.Wrapf(err, "failed to get stored sidecars for root %#x", root)
}
sidecars := make([]blocks.VerifiedRODataColumn, 0, storedCount+fetchedCount)
sidecars = append(sidecars, storedSidecars...)
sidecars = append(sidecars, fromPeersByRoot[root]...)
// Attempt reconstruction.
reconstructedSidecars, err := peerdas.ReconstructDataColumnSidecars(sidecars)
if err != nil {
return nil, errors.Wrapf(err, "failed to reconstruct data columns for root %#x", root)
}
// Select only sidecars we need.
for _, sidecar := range reconstructedSidecars {
if indices[sidecar.Index] {
result[root] = append(result[root], sidecar)
}
}
}
log.WithFields(logrus.Fields{
"rootCount": len(missingIndicesByRoot),
"elapsed": time.Since(start),
}).Debug("Reconstructed from storage and peers")
return result, nil
}
// selectPeers selects peers to query the sidecars.
@@ -314,7 +385,7 @@ func selectPeers(
count int,
origIndicesByRootByPeer map[goPeer.ID]map[[fieldparams.RootLength]byte]map[uint64]bool,
) (map[goPeer.ID]map[[fieldparams.RootLength]byte]map[uint64]bool, error) {
const randomPeerTimeout = 30 * time.Second
const randomPeerTimeout = 2 * time.Minute
// Select peers to query the missing sidecars from.
indicesByRootByPeer := copyIndicesByRootByPeer(origIndicesByRootByPeer)
@@ -371,12 +442,14 @@ func selectPeers(
}
// updateResults updates the missing indices and verified sidecars maps based on the newly verified sidecars.
// WARNING: This function alters `missingIndicesByRoot` by removing verified sidecars.
// After running this function, the user can check the content of the (modified) `missingIndicesByRoot` map
// to check if some sidecars are still missing.
func updateResults(
verifiedSidecars []blocks.VerifiedRODataColumn,
origMissingIndicesByRoot map[[fieldparams.RootLength]byte]map[uint64]bool,
) (map[[fieldparams.RootLength]byte]map[uint64]bool, map[[fieldparams.RootLength]byte][]blocks.VerifiedRODataColumn) {
missingIndicesByRoot map[[fieldparams.RootLength]byte]map[uint64]bool,
) map[[fieldparams.RootLength]byte][]blocks.VerifiedRODataColumn {
// Copy the original map to avoid modifying it directly.
missingIndicesByRoot := copyIndicesByRoot(origMissingIndicesByRoot)
verifiedSidecarsByRoot := make(map[[fieldparams.RootLength]byte][]blocks.VerifiedRODataColumn)
for _, verifiedSidecar := range verifiedSidecars {
blockRoot := verifiedSidecar.BlockRoot()
@@ -393,7 +466,7 @@ func updateResults(
}
}
return missingIndicesByRoot, verifiedSidecarsByRoot
return verifiedSidecarsByRoot
}
// fetchDataColumnSidecarsFromPeers retrieves data column sidecars from peers.
@@ -783,15 +856,13 @@ func randomPeer(
for ctx.Err() == nil {
nonRateLimitedPeers := make([]goPeer.ID, 0, len(indicesByRootByPeer))
for peer := range indicesByRootByPeer {
remaining := int64(math.MaxInt64)
if rateLimiter != nil {
remaining = rateLimiter.Remaining(peer.String())
}
if remaining >= int64(count) {
if rateLimiter == nil || rateLimiter.Remaining(peer.String()) >= int64(count) {
nonRateLimitedPeers = append(nonRateLimitedPeers, peer)
}
}
slices.Sort(nonRateLimitedPeers)
if len(nonRateLimitedPeers) == 0 {
log.WithFields(logrus.Fields{
"peerCount": peerCount,

View File

@@ -3,10 +3,12 @@ package sync
import (
"context"
"fmt"
"math/rand"
"testing"
"time"
"github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain/kzg"
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/peerdas"
"github.com/OffchainLabs/prysm/v6/beacon-chain/db/filesystem"
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p"
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers"
@@ -19,7 +21,6 @@ import (
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
leakybucket "github.com/OffchainLabs/prysm/v6/container/leaky-bucket"
"github.com/OffchainLabs/prysm/v6/crypto/rand"
ethpb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v6/testing/assert"
"github.com/OffchainLabs/prysm/v6/testing/require"
@@ -36,6 +37,7 @@ func TestFetchDataColumnSidecars(t *testing.T) {
// Slot 2: No commitment
// Slot 3: All sidecars are saved excepted the needed ones
// Slot 4: Some sidecars are in the storage, other have to be retrieved from peers.
// Slot 5: Some sidecars are in the storage, other have to be retrieved from peers but peers do not deliver all requested sidecars.
params.SetupTestConfigCleanup(t)
cfg := params.BeaconConfig().Copy()
@@ -93,6 +95,27 @@ func TestFetchDataColumnSidecars(t *testing.T) {
err = storage.Save(toStore4)
require.NoError(t, err)
// Block 5
minimumColumnsCountToReconstruct := peerdas.MinimumColumnCountToReconstruct()
block5, _, verifiedSidecars5 := util.GenerateTestFuluBlockWithSidecars(t, blobCount, util.WithSlot(5))
root5 := block5.Root()
toStoreCount := minimumColumnsCountToReconstruct - 1
toStore5 := make([]blocks.VerifiedRODataColumn, 0, toStoreCount)
for i := uint64(0); uint64(len(toStore5)) < toStoreCount; i++ {
sidecar := verifiedSidecars5[minimumColumnsCountToReconstruct+i]
if sidecar.Index == 81 {
continue
}
toStore5 = append(toStore5, sidecar)
}
err = storage.Save(toStore5)
require.NoError(t, err)
// Custody columns with this private key and 4-cgc: 31, 81, 97, 105
privateKeyBytes := [32]byte{1}
privateKey, err := crypto.UnmarshalSecp256k1PrivateKey(privateKeyBytes[:])
require.NoError(t, err)
@@ -105,12 +128,12 @@ func TestFetchDataColumnSidecars(t *testing.T) {
p2p.Connect(other)
p2p.Peers().SetChainState(other.PeerID(), &ethpb.StatusV2{
HeadSlot: 4,
HeadSlot: 5,
})
expectedRequest := &ethpb.DataColumnSidecarsByRangeRequest{
StartSlot: 4,
Count: 1,
Count: 2,
Columns: []uint64{31, 81},
}
@@ -138,6 +161,9 @@ func TestFetchDataColumnSidecars(t *testing.T) {
err = WriteDataColumnSidecarChunk(stream, clock, other.Encoding(), verifiedSidecars4[81].DataColumnSidecar)
assert.NoError(t, err)
err = WriteDataColumnSidecarChunk(stream, clock, other.Encoding(), verifiedSidecars5[81].DataColumnSidecar)
assert.NoError(t, err)
err = stream.CloseWrite()
assert.NoError(t, err)
})
@@ -157,9 +183,10 @@ func TestFetchDataColumnSidecars(t *testing.T) {
// no root2 (no commitments in this block)
root3: {verifiedSidecars3[31], verifiedSidecars3[81], verifiedSidecars3[106]},
root4: {verifiedSidecars4[31], verifiedSidecars4[81], verifiedSidecars4[106]},
root5: {verifiedSidecars5[31], verifiedSidecars5[81], verifiedSidecars5[106]},
}
blocks := []blocks.ROBlock{block1, block2, block3, block4}
blocks := []blocks.ROBlock{block1, block2, block3, block4, block5}
actual, err := FetchDataColumnSidecars(params, blocks, indices)
require.NoError(t, err)
@@ -202,7 +229,7 @@ func TestCategorizeIndices(t *testing.T) {
func TestSelectPeers(t *testing.T) {
const (
count = 3
seed = 46
seed = 42
)
params := DataColumnSidecarsParams{
@@ -210,7 +237,7 @@ func TestSelectPeers(t *testing.T) {
RateLimiter: leakybucket.NewCollector(1., 10, time.Second, false /* deleteEmptyBuckets */),
}
randomSource := rand.NewGenerator()
randomSource := rand.New(rand.NewSource(seed))
indicesByRootByPeer := map[peer.ID]map[[fieldparams.RootLength]byte]map[uint64]bool{
"peer1": {
@@ -225,19 +252,7 @@ func TestSelectPeers(t *testing.T) {
},
}
expected_1 := map[peer.ID]map[[fieldparams.RootLength]byte]map[uint64]bool{
"peer1": {
{1}: {12: true, 13: true},
{2}: {13: true, 14: true, 15: true},
{3}: {14: true, 15: true},
},
"peer2": {
{1}: {14: true},
{3}: {16: true},
},
}
expected_2 := map[peer.ID]map[[fieldparams.RootLength]byte]map[uint64]bool{
expected := map[peer.ID]map[[fieldparams.RootLength]byte]map[uint64]bool{
"peer1": {
{1}: {12: true},
{3}: {15: true},
@@ -251,11 +266,6 @@ func TestSelectPeers(t *testing.T) {
actual, err := selectPeers(params, randomSource, count, indicesByRootByPeer)
expected := expected_1
if len(actual["peer1"]) == 2 {
expected = expected_2
}
require.NoError(t, err)
require.Equal(t, len(expected), len(actual))
for peerID := range expected {
@@ -291,8 +301,8 @@ func TestUpdateResults(t *testing.T) {
verifiedSidecars[2].BlockRoot(): {verifiedSidecars[2], verifiedSidecars[3]},
}
actualMissingIndicesByRoot, actualVerifiedSidecarsByRoot := updateResults(verifiedSidecars, missingIndicesByRoot)
require.DeepEqual(t, expectedMissingIndicesByRoot, actualMissingIndicesByRoot)
actualVerifiedSidecarsByRoot := updateResults(verifiedSidecars, missingIndicesByRoot)
require.DeepEqual(t, expectedMissingIndicesByRoot, missingIndicesByRoot)
require.DeepEqual(t, expectedVerifiedSidecarsByRoot, actualVerifiedSidecarsByRoot)
}
@@ -857,8 +867,8 @@ func TestComputeIndicesByRootByPeer(t *testing.T) {
func TestRandomPeer(t *testing.T) {
// Fixed seed.
const seed = 42
randomSource := rand.NewGenerator()
const seed = 43
randomSource := rand.New(rand.NewSource(seed))
t.Run("no peers", func(t *testing.T) {
pid, err := randomPeer(t.Context(), randomSource, leakybucket.NewCollector(4, 8, time.Second, false /* deleteEmptyBuckets */), 1, nil)
@@ -889,7 +899,11 @@ func TestRandomPeer(t *testing.T) {
pid, err := randomPeer(t.Context(), randomSource, collector, count, indicesByRootByPeer)
require.NoError(t, err)
require.Equal(t, true, map[peer.ID]bool{peer1: true, peer2: true, peer3: true}[pid])
require.Equal(t, peer1, pid)
pid, err = randomPeer(t.Context(), randomSource, collector, count, indicesByRootByPeer)
require.NoError(t, err)
require.Equal(t, peer2, pid)
})
}

View File

@@ -0,0 +1,4 @@
### Added
- In FetchDataColumnSidecars, after retrieving sidecars from peers, if still some sidecars are missing for a given root and if a reconstruction is possible (combining sidecars already retrieved from peers and sidecars in the storage), then reconstruct missing sidecars instead of trying to fetch the missing ones from peers.