mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-10 05:47:59 -05:00
Compare commits
4 Commits
peerdas-ge
...
backfill-p
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2b8384ff0b | ||
|
|
b0cc1ec0aa | ||
|
|
6e02170881 | ||
|
|
bf6007c5f0 |
@@ -133,7 +133,7 @@ func getStateVersionAndPayload(st state.BeaconState) (int, interfaces.ExecutionD
|
||||
return preStateVersion, preStateHeader, nil
|
||||
}
|
||||
|
||||
func (s *Service) onBlockBatch(ctx context.Context, blks []consensusblocks.ROBlock, avs das.AvailabilityStore) error {
|
||||
func (s *Service) onBlockBatch(ctx context.Context, blks []consensusblocks.ROBlock, avs das.AvailabilityChecker) error {
|
||||
ctx, span := trace.StartSpan(ctx, "blockChain.onBlockBatch")
|
||||
defer span.End()
|
||||
|
||||
@@ -309,7 +309,7 @@ func (s *Service) onBlockBatch(ctx context.Context, blks []consensusblocks.ROBlo
|
||||
return s.saveHeadNoDB(ctx, lastB, lastBR, preState, !isValidPayload)
|
||||
}
|
||||
|
||||
func (s *Service) areSidecarsAvailable(ctx context.Context, avs das.AvailabilityStore, roBlock consensusblocks.ROBlock) error {
|
||||
func (s *Service) areSidecarsAvailable(ctx context.Context, avs das.AvailabilityChecker, roBlock consensusblocks.ROBlock) error {
|
||||
blockVersion := roBlock.Version()
|
||||
block := roBlock.Block()
|
||||
slot := block.Slot()
|
||||
|
||||
@@ -40,8 +40,8 @@ var epochsSinceFinalityExpandCache = primitives.Epoch(4)
|
||||
|
||||
// BlockReceiver interface defines the methods of chain service for receiving and processing new blocks.
|
||||
type BlockReceiver interface {
|
||||
ReceiveBlock(ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock, blockRoot [32]byte, avs das.AvailabilityStore) error
|
||||
ReceiveBlockBatch(ctx context.Context, blocks []blocks.ROBlock, avs das.AvailabilityStore) error
|
||||
ReceiveBlock(ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock, blockRoot [32]byte, avs das.AvailabilityChecker) error
|
||||
ReceiveBlockBatch(ctx context.Context, blocks []blocks.ROBlock, avs das.AvailabilityChecker) error
|
||||
HasBlock(ctx context.Context, root [32]byte) bool
|
||||
RecentBlockSlot(root [32]byte) (primitives.Slot, error)
|
||||
BlockBeingSynced([32]byte) bool
|
||||
@@ -70,7 +70,7 @@ type SlashingReceiver interface {
|
||||
// 1. Validate block, apply state transition and update checkpoints
|
||||
// 2. Apply fork choice to the processed block
|
||||
// 3. Save latest head info
|
||||
func (s *Service) ReceiveBlock(ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock, blockRoot [32]byte, avs das.AvailabilityStore) error {
|
||||
func (s *Service) ReceiveBlock(ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock, blockRoot [32]byte, avs das.AvailabilityChecker) error {
|
||||
ctx, span := trace.StartSpan(ctx, "blockChain.ReceiveBlock")
|
||||
defer span.End()
|
||||
// Return early if the block is blacklisted
|
||||
@@ -244,7 +244,7 @@ func (s *Service) handleDA(
|
||||
ctx context.Context,
|
||||
block interfaces.SignedBeaconBlock,
|
||||
blockRoot [fieldparams.RootLength]byte,
|
||||
avs das.AvailabilityStore,
|
||||
avs das.AvailabilityChecker,
|
||||
) (elapsed time.Duration, err error) {
|
||||
defer func(start time.Time) {
|
||||
elapsed = time.Since(start)
|
||||
@@ -333,7 +333,7 @@ func (s *Service) executePostFinalizationTasks(ctx context.Context, finalizedSta
|
||||
// ReceiveBlockBatch processes the whole block batch at once, assuming the block batch is linear ,transitioning
|
||||
// the state, performing batch verification of all collected signatures and then performing the appropriate
|
||||
// actions for a block post-transition.
|
||||
func (s *Service) ReceiveBlockBatch(ctx context.Context, blocks []blocks.ROBlock, avs das.AvailabilityStore) error {
|
||||
func (s *Service) ReceiveBlockBatch(ctx context.Context, blocks []blocks.ROBlock, avs das.AvailabilityChecker) error {
|
||||
ctx, span := trace.StartSpan(ctx, "blockChain.ReceiveBlockBatch")
|
||||
defer span.End()
|
||||
|
||||
|
||||
@@ -275,7 +275,7 @@ func (s *ChainService) ReceiveBlockInitialSync(ctx context.Context, block interf
|
||||
}
|
||||
|
||||
// ReceiveBlockBatch processes blocks in batches from initial-sync.
|
||||
func (s *ChainService) ReceiveBlockBatch(ctx context.Context, blks []blocks.ROBlock, _ das.AvailabilityStore) error {
|
||||
func (s *ChainService) ReceiveBlockBatch(ctx context.Context, blks []blocks.ROBlock, _ das.AvailabilityChecker) error {
|
||||
if s.State == nil {
|
||||
return ErrNilState
|
||||
}
|
||||
@@ -305,7 +305,7 @@ func (s *ChainService) ReceiveBlockBatch(ctx context.Context, blks []blocks.ROBl
|
||||
}
|
||||
|
||||
// ReceiveBlock mocks ReceiveBlock method in chain service.
|
||||
func (s *ChainService) ReceiveBlock(ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock, _ [32]byte, _ das.AvailabilityStore) error {
|
||||
func (s *ChainService) ReceiveBlock(ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock, _ [32]byte, _ das.AvailabilityChecker) error {
|
||||
if s.ReceiveBlockMockErr != nil {
|
||||
return s.ReceiveBlockMockErr
|
||||
}
|
||||
|
||||
@@ -107,3 +107,29 @@ func computeInfoCacheKey(nodeID enode.ID, custodyGroupCount uint64) [nodeInfoCac
|
||||
|
||||
return key
|
||||
}
|
||||
|
||||
// ColumnIndices is a map of column indices where the key is the column index and the value is a boolean.
|
||||
// The boolean could indicate different things, eg whether the column is needed (in the context of satisfying custody requirements)
|
||||
// or present (in the context of a custody check on disk or in cache).
|
||||
type ColumnIndices map[uint64]bool
|
||||
|
||||
// CopyTrueIndices allows callers to get a copy of the given ColumnIndices, filtering out any keys
|
||||
// where the value == `false`.
|
||||
func CopyTrueIndices(src ColumnIndices) ColumnIndices {
|
||||
dst := make(ColumnIndices, len(src))
|
||||
for k, v := range src {
|
||||
if v {
|
||||
dst[k] = true
|
||||
}
|
||||
}
|
||||
return dst
|
||||
}
|
||||
|
||||
// ColumnIndicesFromSlice converts a slice of uint64 indices into the ColumnIndices equivalent.
|
||||
func ColumnIndicesFromSlice(indices []uint64) ColumnIndices {
|
||||
ci := make(ColumnIndices, len(indices))
|
||||
for _, index := range indices {
|
||||
ci[index] = true
|
||||
}
|
||||
return ci
|
||||
}
|
||||
|
||||
@@ -4,6 +4,7 @@ go_library(
|
||||
name = "go_default_library",
|
||||
srcs = [
|
||||
"availability_blobs.go",
|
||||
"availability_columns.go",
|
||||
"blob_cache.go",
|
||||
"data_column_cache.go",
|
||||
"iface.go",
|
||||
@@ -12,6 +13,7 @@ go_library(
|
||||
importpath = "github.com/OffchainLabs/prysm/v6/beacon-chain/das",
|
||||
visibility = ["//visibility:public"],
|
||||
deps = [
|
||||
"//beacon-chain/core/peerdas:go_default_library",
|
||||
"//beacon-chain/db/filesystem:go_default_library",
|
||||
"//beacon-chain/verification:go_default_library",
|
||||
"//config/fieldparams:go_default_library",
|
||||
@@ -21,6 +23,7 @@ go_library(
|
||||
"//runtime/logging:go_default_library",
|
||||
"//runtime/version:go_default_library",
|
||||
"//time/slots:go_default_library",
|
||||
"@com_github_ethereum_go_ethereum//p2p/enode:go_default_library",
|
||||
"@com_github_pkg_errors//:go_default_library",
|
||||
"@com_github_sirupsen_logrus//:go_default_library",
|
||||
],
|
||||
@@ -30,6 +33,7 @@ go_test(
|
||||
name = "go_default_test",
|
||||
srcs = [
|
||||
"availability_blobs_test.go",
|
||||
"availability_columns_test.go",
|
||||
"blob_cache_test.go",
|
||||
"data_column_cache_test.go",
|
||||
],
|
||||
@@ -45,6 +49,7 @@ go_test(
|
||||
"//testing/require:go_default_library",
|
||||
"//testing/util:go_default_library",
|
||||
"//time/slots:go_default_library",
|
||||
"@com_github_ethereum_go_ethereum//p2p/enode:go_default_library",
|
||||
"@com_github_pkg_errors//:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
@@ -29,7 +29,7 @@ type LazilyPersistentStoreBlob struct {
|
||||
verifier BlobBatchVerifier
|
||||
}
|
||||
|
||||
var _ AvailabilityStore = &LazilyPersistentStoreBlob{}
|
||||
var _ AvailabilityChecker = &LazilyPersistentStoreBlob{}
|
||||
|
||||
// BlobBatchVerifier enables LazyAvailabilityStore to manage the verification process
|
||||
// going from ROBlob->VerifiedROBlob, while avoiding the decision of which individual verifications
|
||||
|
||||
208
beacon-chain/das/availability_columns.go
Normal file
208
beacon-chain/das/availability_columns.go
Normal file
@@ -0,0 +1,208 @@
|
||||
package das
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/peerdas"
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/db/filesystem"
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/verification"
|
||||
"github.com/OffchainLabs/prysm/v6/config/params"
|
||||
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
|
||||
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
|
||||
"github.com/OffchainLabs/prysm/v6/runtime/version"
|
||||
"github.com/OffchainLabs/prysm/v6/time/slots"
|
||||
"github.com/ethereum/go-ethereum/p2p/enode"
|
||||
errors "github.com/pkg/errors"
|
||||
)
|
||||
|
||||
// LazilyPersistentStoreColumn is an implementation of AvailabilityStore to be used when batch syncing data columns.
|
||||
// This implementation will hold any data columns passed to Persist until the IsDataAvailable is called for their
|
||||
// block, at which time they will undergo full verification and be saved to the disk.
|
||||
type LazilyPersistentStoreColumn struct {
|
||||
store *filesystem.DataColumnStorage
|
||||
nodeID enode.ID
|
||||
cache *dataColumnCache
|
||||
newDataColumnsVerifier verification.NewDataColumnsVerifier
|
||||
custodyGroupCount uint64
|
||||
}
|
||||
|
||||
var _ AvailabilityChecker = &LazilyPersistentStoreColumn{}
|
||||
|
||||
// DataColumnsVerifier enables LazilyPersistentStoreColumn to manage the verification process
|
||||
// going from RODataColumn->VerifiedRODataColumn, while avoiding the decision of which individual verifications
|
||||
// to run and in what order. Since LazilyPersistentStoreColumn always tries to verify and save data columns only when
|
||||
// they are all available, the interface takes a slice of data column sidecars.
|
||||
type DataColumnsVerifier interface {
|
||||
VerifiedRODataColumns(ctx context.Context, blk blocks.ROBlock, scs []blocks.RODataColumn) ([]blocks.VerifiedRODataColumn, error)
|
||||
}
|
||||
|
||||
// NewLazilyPersistentStoreColumn creates a new LazilyPersistentStoreColumn.
|
||||
// WARNING: The resulting LazilyPersistentStoreColumn is NOT thread-safe.
|
||||
func NewLazilyPersistentStoreColumn(
|
||||
store *filesystem.DataColumnStorage,
|
||||
nodeID enode.ID,
|
||||
newDataColumnsVerifier verification.NewDataColumnsVerifier,
|
||||
custodyGroupCount uint64,
|
||||
) *LazilyPersistentStoreColumn {
|
||||
return &LazilyPersistentStoreColumn{
|
||||
store: store,
|
||||
nodeID: nodeID,
|
||||
cache: newDataColumnCache(),
|
||||
newDataColumnsVerifier: newDataColumnsVerifier,
|
||||
custodyGroupCount: custodyGroupCount,
|
||||
}
|
||||
}
|
||||
|
||||
// PersistColumns adds columns to the working column cache. Columns stored in this cache will be persisted
|
||||
// for at least as long as the node is running. Once IsDataAvailable succeeds, all columns referenced
|
||||
// by the given block are guaranteed to be persisted for the remainder of the retention period.
|
||||
func (s *LazilyPersistentStoreColumn) Persist(current primitives.Slot, sidecars ...blocks.RODataColumn) error {
|
||||
if len(sidecars) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// It is safe to retrieve the first sidecar.
|
||||
firstSidecar := sidecars[0]
|
||||
|
||||
if len(sidecars) > 1 {
|
||||
firstRoot := firstSidecar.BlockRoot()
|
||||
for _, sidecar := range sidecars[1:] {
|
||||
if sidecar.BlockRoot() != firstRoot {
|
||||
return errMixedRoots
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
firstSidecarEpoch, currentEpoch := slots.ToEpoch(firstSidecar.Slot()), slots.ToEpoch(current)
|
||||
if !params.WithinDAPeriod(firstSidecarEpoch, currentEpoch) {
|
||||
return nil
|
||||
}
|
||||
|
||||
key := cacheKey{slot: firstSidecar.Slot(), root: firstSidecar.BlockRoot()}
|
||||
entry := s.cache.ensure(key)
|
||||
|
||||
for _, sidecar := range sidecars {
|
||||
if err := entry.stash(&sidecar); err != nil {
|
||||
return errors.Wrap(err, "stash DataColumnSidecar")
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// IsDataAvailable returns nil if all the commitments in the given block are persisted to the db and have been verified.
|
||||
// DataColumnsSidecars already in the db are assumed to have been previously verified against the block.
|
||||
func (s *LazilyPersistentStoreColumn) IsDataAvailable(ctx context.Context, currentSlot primitives.Slot, block blocks.ROBlock) error {
|
||||
blockCommitments, err := s.fullCommitmentsToCheck(s.nodeID, block, currentSlot)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "full commitments to check with block root `%#x` and current slot `%d`", block.Root(), currentSlot)
|
||||
}
|
||||
|
||||
// Return early for blocks that do not have any commitments.
|
||||
if blockCommitments.count() == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Get the root of the block.
|
||||
blockRoot := block.Root()
|
||||
|
||||
// Build the cache key for the block.
|
||||
key := cacheKey{slot: block.Block().Slot(), root: blockRoot}
|
||||
|
||||
// Retrieve the cache entry for the block, or create an empty one if it doesn't exist.
|
||||
entry := s.cache.ensure(key)
|
||||
|
||||
// Delete the cache entry for the block at the end.
|
||||
defer s.cache.delete(key)
|
||||
|
||||
// Set the disk summary for the block in the cache entry.
|
||||
entry.setDiskSummary(s.store.Summary(blockRoot))
|
||||
|
||||
// Verify we have all the expected sidecars, and fail fast if any are missing or inconsistent.
|
||||
// We don't try to salvage problematic batches because this indicates a misbehaving peer and we'd rather
|
||||
// ignore their response and decrease their peer score.
|
||||
roDataColumns, err := entry.filter(blockRoot, blockCommitments)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "entry filter")
|
||||
}
|
||||
|
||||
// https://github.com/ethereum/consensus-specs/blob/master/specs/fulu/p2p-interface.md#datacolumnsidecarsbyrange-v1
|
||||
verifier := s.newDataColumnsVerifier(roDataColumns, verification.ByRangeRequestDataColumnSidecarRequirements)
|
||||
|
||||
if err := verifier.ValidFields(); err != nil {
|
||||
return errors.Wrap(err, "valid fields")
|
||||
}
|
||||
|
||||
if err := verifier.SidecarInclusionProven(); err != nil {
|
||||
return errors.Wrap(err, "sidecar inclusion proven")
|
||||
}
|
||||
|
||||
if err := verifier.SidecarKzgProofVerified(); err != nil {
|
||||
return errors.Wrap(err, "sidecar KZG proof verified")
|
||||
}
|
||||
|
||||
verifiedRoDataColumns, err := verifier.VerifiedRODataColumns()
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "verified RO data columns - should never happen")
|
||||
}
|
||||
|
||||
if err := s.store.Save(verifiedRoDataColumns); err != nil {
|
||||
return errors.Wrap(err, "save data column sidecars")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// fullCommitmentsToCheck returns the commitments to check for a given block.
|
||||
func (s *LazilyPersistentStoreColumn) fullCommitmentsToCheck(nodeID enode.ID, block blocks.ROBlock, currentSlot primitives.Slot) (*safeCommitmentsArray, error) {
|
||||
samplesPerSlot := params.BeaconConfig().SamplesPerSlot
|
||||
|
||||
// Return early for blocks that are pre-Fulu.
|
||||
if block.Version() < version.Fulu {
|
||||
return &safeCommitmentsArray{}, nil
|
||||
}
|
||||
|
||||
// Compute the block epoch.
|
||||
blockSlot := block.Block().Slot()
|
||||
blockEpoch := slots.ToEpoch(blockSlot)
|
||||
|
||||
// Compute the current epoch.
|
||||
currentEpoch := slots.ToEpoch(currentSlot)
|
||||
|
||||
// Return early if the request is out of the MIN_EPOCHS_FOR_DATA_COLUMN_SIDECARS_REQUESTS window.
|
||||
if !params.WithinDAPeriod(blockEpoch, currentEpoch) {
|
||||
return &safeCommitmentsArray{}, nil
|
||||
}
|
||||
|
||||
// Retrieve the KZG commitments for the block.
|
||||
kzgCommitments, err := block.Block().Body().BlobKzgCommitments()
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "blob KZG commitments")
|
||||
}
|
||||
|
||||
// Return early if there are no commitments in the block.
|
||||
if len(kzgCommitments) == 0 {
|
||||
return &safeCommitmentsArray{}, nil
|
||||
}
|
||||
|
||||
// Retrieve peer info.
|
||||
samplingSize := max(s.custodyGroupCount, samplesPerSlot)
|
||||
peerInfo, _, err := peerdas.Info(nodeID, samplingSize)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "peer info")
|
||||
}
|
||||
|
||||
// Create a safe commitments array for the custody columns.
|
||||
commitmentsArray := &safeCommitmentsArray{}
|
||||
commitmentsArraySize := uint64(len(commitmentsArray))
|
||||
|
||||
for column := range peerInfo.CustodyColumns {
|
||||
if column >= commitmentsArraySize {
|
||||
return nil, errors.Errorf("custody column index %d too high (max allowed %d) - should never happen", column, commitmentsArraySize)
|
||||
}
|
||||
|
||||
commitmentsArray[column] = kzgCommitments
|
||||
}
|
||||
|
||||
return commitmentsArray, nil
|
||||
}
|
||||
302
beacon-chain/das/availability_columns_test.go
Normal file
302
beacon-chain/das/availability_columns_test.go
Normal file
@@ -0,0 +1,302 @@
|
||||
package das
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/db/filesystem"
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/verification"
|
||||
fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams"
|
||||
"github.com/OffchainLabs/prysm/v6/config/params"
|
||||
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
|
||||
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
|
||||
"github.com/OffchainLabs/prysm/v6/encoding/bytesutil"
|
||||
"github.com/OffchainLabs/prysm/v6/testing/require"
|
||||
"github.com/OffchainLabs/prysm/v6/testing/util"
|
||||
"github.com/OffchainLabs/prysm/v6/time/slots"
|
||||
"github.com/ethereum/go-ethereum/p2p/enode"
|
||||
)
|
||||
|
||||
var commitments = [][]byte{
|
||||
bytesutil.PadTo([]byte("a"), 48),
|
||||
bytesutil.PadTo([]byte("b"), 48),
|
||||
bytesutil.PadTo([]byte("c"), 48),
|
||||
bytesutil.PadTo([]byte("d"), 48),
|
||||
}
|
||||
|
||||
func TestPersist(t *testing.T) {
|
||||
t.Run("no sidecars", func(t *testing.T) {
|
||||
dataColumnStorage := filesystem.NewEphemeralDataColumnStorage(t)
|
||||
lazilyPersistentStoreColumns := NewLazilyPersistentStoreColumn(dataColumnStorage, enode.ID{}, nil, 0)
|
||||
err := lazilyPersistentStoreColumns.Persist(0)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 0, len(lazilyPersistentStoreColumns.cache.entries))
|
||||
})
|
||||
|
||||
t.Run("mixed roots", func(t *testing.T) {
|
||||
dataColumnStorage := filesystem.NewEphemeralDataColumnStorage(t)
|
||||
|
||||
dataColumnParamsByBlockRoot := []util.DataColumnParam{
|
||||
{Slot: 1, Index: 1},
|
||||
{Slot: 2, Index: 2},
|
||||
}
|
||||
|
||||
roSidecars, _ := util.CreateTestVerifiedRoDataColumnSidecars(t, dataColumnParamsByBlockRoot)
|
||||
lazilyPersistentStoreColumns := NewLazilyPersistentStoreColumn(dataColumnStorage, enode.ID{}, nil, 0)
|
||||
|
||||
err := lazilyPersistentStoreColumns.Persist(0, roSidecars...)
|
||||
require.ErrorIs(t, err, errMixedRoots)
|
||||
require.Equal(t, 0, len(lazilyPersistentStoreColumns.cache.entries))
|
||||
})
|
||||
|
||||
t.Run("outside DA period", func(t *testing.T) {
|
||||
dataColumnStorage := filesystem.NewEphemeralDataColumnStorage(t)
|
||||
|
||||
dataColumnParamsByBlockRoot := []util.DataColumnParam{
|
||||
{Slot: 1, Index: 1},
|
||||
}
|
||||
|
||||
roSidecars, _ := util.CreateTestVerifiedRoDataColumnSidecars(t, dataColumnParamsByBlockRoot)
|
||||
lazilyPersistentStoreColumns := NewLazilyPersistentStoreColumn(dataColumnStorage, enode.ID{}, nil, 0)
|
||||
|
||||
err := lazilyPersistentStoreColumns.Persist(1_000_000, roSidecars...)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 0, len(lazilyPersistentStoreColumns.cache.entries))
|
||||
})
|
||||
|
||||
t.Run("nominal", func(t *testing.T) {
|
||||
const slot = 42
|
||||
dataColumnStorage := filesystem.NewEphemeralDataColumnStorage(t)
|
||||
|
||||
dataColumnParamsByBlockRoot := []util.DataColumnParam{
|
||||
{Slot: slot, Index: 1},
|
||||
{Slot: slot, Index: 5},
|
||||
}
|
||||
|
||||
roSidecars, roDataColumns := util.CreateTestVerifiedRoDataColumnSidecars(t, dataColumnParamsByBlockRoot)
|
||||
lazilyPersistentStoreColumns := NewLazilyPersistentStoreColumn(dataColumnStorage, enode.ID{}, nil, 0)
|
||||
|
||||
err := lazilyPersistentStoreColumns.Persist(slot, roSidecars...)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 1, len(lazilyPersistentStoreColumns.cache.entries))
|
||||
|
||||
key := cacheKey{slot: slot, root: roDataColumns[0].BlockRoot()}
|
||||
entry, ok := lazilyPersistentStoreColumns.cache.entries[key]
|
||||
require.Equal(t, true, ok)
|
||||
|
||||
// A call to Persist does NOT save the sidecars to disk.
|
||||
require.Equal(t, uint64(0), entry.diskSummary.Count())
|
||||
|
||||
require.DeepSSZEqual(t, roDataColumns[0], *entry.scs[1])
|
||||
require.DeepSSZEqual(t, roDataColumns[1], *entry.scs[5])
|
||||
|
||||
for i, roDataColumn := range entry.scs {
|
||||
if map[int]bool{1: true, 5: true}[i] {
|
||||
continue
|
||||
}
|
||||
|
||||
require.IsNil(t, roDataColumn)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestIsDataAvailable(t *testing.T) {
|
||||
newDataColumnsVerifier := func(dataColumnSidecars []blocks.RODataColumn, _ []verification.Requirement) verification.DataColumnsVerifier {
|
||||
return &mockDataColumnsVerifier{t: t, dataColumnSidecars: dataColumnSidecars}
|
||||
}
|
||||
|
||||
ctx := t.Context()
|
||||
|
||||
t.Run("without commitments", func(t *testing.T) {
|
||||
signedBeaconBlockFulu := util.NewBeaconBlockFulu()
|
||||
signedRoBlock := newSignedRoBlock(t, signedBeaconBlockFulu)
|
||||
|
||||
dataColumnStorage := filesystem.NewEphemeralDataColumnStorage(t)
|
||||
lazilyPersistentStoreColumns := NewLazilyPersistentStoreColumn(dataColumnStorage, enode.ID{}, newDataColumnsVerifier, 0)
|
||||
|
||||
err := lazilyPersistentStoreColumns.IsDataAvailable(ctx, 0 /*current slot*/, signedRoBlock)
|
||||
require.NoError(t, err)
|
||||
})
|
||||
|
||||
t.Run("with commitments", func(t *testing.T) {
|
||||
signedBeaconBlockFulu := util.NewBeaconBlockFulu()
|
||||
signedBeaconBlockFulu.Block.Body.BlobKzgCommitments = commitments
|
||||
signedRoBlock := newSignedRoBlock(t, signedBeaconBlockFulu)
|
||||
block := signedRoBlock.Block()
|
||||
slot := block.Slot()
|
||||
proposerIndex := block.ProposerIndex()
|
||||
parentRoot := block.ParentRoot()
|
||||
stateRoot := block.StateRoot()
|
||||
bodyRoot, err := block.Body().HashTreeRoot()
|
||||
require.NoError(t, err)
|
||||
|
||||
root := signedRoBlock.Root()
|
||||
|
||||
dataColumnStorage := filesystem.NewEphemeralDataColumnStorage(t)
|
||||
lazilyPersistentStoreColumns := NewLazilyPersistentStoreColumn(dataColumnStorage, enode.ID{}, newDataColumnsVerifier, 0)
|
||||
|
||||
indices := [...]uint64{1, 17, 19, 42, 75, 87, 102, 117}
|
||||
dataColumnsParams := make([]util.DataColumnParam, 0, len(indices))
|
||||
for _, index := range indices {
|
||||
dataColumnParams := util.DataColumnParam{
|
||||
Index: index,
|
||||
KzgCommitments: commitments,
|
||||
|
||||
Slot: slot,
|
||||
ProposerIndex: proposerIndex,
|
||||
ParentRoot: parentRoot[:],
|
||||
StateRoot: stateRoot[:],
|
||||
BodyRoot: bodyRoot[:],
|
||||
}
|
||||
|
||||
dataColumnsParams = append(dataColumnsParams, dataColumnParams)
|
||||
}
|
||||
|
||||
_, verifiedRoDataColumns := util.CreateTestVerifiedRoDataColumnSidecars(t, dataColumnsParams)
|
||||
|
||||
key := cacheKey{root: root}
|
||||
entry := lazilyPersistentStoreColumns.cache.ensure(key)
|
||||
defer lazilyPersistentStoreColumns.cache.delete(key)
|
||||
|
||||
for _, verifiedRoDataColumn := range verifiedRoDataColumns {
|
||||
err := entry.stash(&verifiedRoDataColumn.RODataColumn)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
err = lazilyPersistentStoreColumns.IsDataAvailable(ctx, slot, signedRoBlock)
|
||||
require.NoError(t, err)
|
||||
|
||||
actual, err := dataColumnStorage.Get(root, indices[:])
|
||||
require.NoError(t, err)
|
||||
|
||||
summary := dataColumnStorage.Summary(root)
|
||||
require.Equal(t, uint64(len(indices)), summary.Count())
|
||||
require.DeepSSZEqual(t, verifiedRoDataColumns, actual)
|
||||
})
|
||||
}
|
||||
|
||||
func TestFullCommitmentsToCheck(t *testing.T) {
|
||||
windowSlots, err := slots.EpochEnd(params.BeaconConfig().MinEpochsForDataColumnSidecarsRequest)
|
||||
require.NoError(t, err)
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
commitments [][]byte
|
||||
block func(*testing.T) blocks.ROBlock
|
||||
slot primitives.Slot
|
||||
}{
|
||||
{
|
||||
name: "Pre-Fulu block",
|
||||
block: func(t *testing.T) blocks.ROBlock {
|
||||
return newSignedRoBlock(t, util.NewBeaconBlockElectra())
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Commitments outside data availability window",
|
||||
block: func(t *testing.T) blocks.ROBlock {
|
||||
beaconBlockElectra := util.NewBeaconBlockElectra()
|
||||
|
||||
// Block is from slot 0, "current slot" is window size +1 (so outside the window)
|
||||
beaconBlockElectra.Block.Body.BlobKzgCommitments = commitments
|
||||
|
||||
return newSignedRoBlock(t, beaconBlockElectra)
|
||||
},
|
||||
slot: windowSlots + 1,
|
||||
},
|
||||
{
|
||||
name: "Commitments within data availability window",
|
||||
block: func(t *testing.T) blocks.ROBlock {
|
||||
signedBeaconBlockFulu := util.NewBeaconBlockFulu()
|
||||
signedBeaconBlockFulu.Block.Body.BlobKzgCommitments = commitments
|
||||
signedBeaconBlockFulu.Block.Slot = 100
|
||||
|
||||
return newSignedRoBlock(t, signedBeaconBlockFulu)
|
||||
},
|
||||
commitments: commitments,
|
||||
slot: 100,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
numberOfColumns := params.BeaconConfig().NumberOfColumns
|
||||
|
||||
b := tc.block(t)
|
||||
s := NewLazilyPersistentStoreColumn(nil, enode.ID{}, nil, numberOfColumns)
|
||||
|
||||
commitmentsArray, err := s.fullCommitmentsToCheck(enode.ID{}, b, tc.slot)
|
||||
require.NoError(t, err)
|
||||
|
||||
for _, commitments := range commitmentsArray {
|
||||
require.DeepEqual(t, tc.commitments, commitments)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func newSignedRoBlock(t *testing.T, signedBeaconBlock interface{}) blocks.ROBlock {
|
||||
sb, err := blocks.NewSignedBeaconBlock(signedBeaconBlock)
|
||||
require.NoError(t, err)
|
||||
|
||||
rb, err := blocks.NewROBlock(sb)
|
||||
require.NoError(t, err)
|
||||
|
||||
return rb
|
||||
}
|
||||
|
||||
type mockDataColumnsVerifier struct {
|
||||
t *testing.T
|
||||
dataColumnSidecars []blocks.RODataColumn
|
||||
validCalled, SidecarInclusionProvenCalled, SidecarKzgProofVerifiedCalled bool
|
||||
}
|
||||
|
||||
var _ verification.DataColumnsVerifier = &mockDataColumnsVerifier{}
|
||||
|
||||
func (m *mockDataColumnsVerifier) VerifiedRODataColumns() ([]blocks.VerifiedRODataColumn, error) {
|
||||
require.Equal(m.t, true, m.validCalled && m.SidecarInclusionProvenCalled && m.SidecarKzgProofVerifiedCalled)
|
||||
|
||||
verifiedDataColumnSidecars := make([]blocks.VerifiedRODataColumn, 0, len(m.dataColumnSidecars))
|
||||
for _, dataColumnSidecar := range m.dataColumnSidecars {
|
||||
verifiedDataColumnSidecar := blocks.NewVerifiedRODataColumn(dataColumnSidecar)
|
||||
verifiedDataColumnSidecars = append(verifiedDataColumnSidecars, verifiedDataColumnSidecar)
|
||||
}
|
||||
|
||||
return verifiedDataColumnSidecars, nil
|
||||
}
|
||||
|
||||
func (m *mockDataColumnsVerifier) SatisfyRequirement(verification.Requirement) {}
|
||||
|
||||
func (m *mockDataColumnsVerifier) ValidFields() error {
|
||||
m.validCalled = true
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *mockDataColumnsVerifier) CorrectSubnet(dataColumnSidecarSubTopic string, expectedTopics []string) error {
|
||||
return nil
|
||||
}
|
||||
func (m *mockDataColumnsVerifier) NotFromFutureSlot() error { return nil }
|
||||
func (m *mockDataColumnsVerifier) SlotAboveFinalized() error { return nil }
|
||||
func (m *mockDataColumnsVerifier) ValidProposerSignature(ctx context.Context) error { return nil }
|
||||
|
||||
func (m *mockDataColumnsVerifier) SidecarParentSeen(parentSeen func([fieldparams.RootLength]byte) bool) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *mockDataColumnsVerifier) SidecarParentValid(badParent func([fieldparams.RootLength]byte) bool) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *mockDataColumnsVerifier) SidecarParentSlotLower() error { return nil }
|
||||
func (m *mockDataColumnsVerifier) SidecarDescendsFromFinalized() error { return nil }
|
||||
|
||||
func (m *mockDataColumnsVerifier) SidecarInclusionProven() error {
|
||||
m.SidecarInclusionProvenCalled = true
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *mockDataColumnsVerifier) SidecarKzgProofVerified() error {
|
||||
m.SidecarKzgProofVerifiedCalled = true
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *mockDataColumnsVerifier) SidecarProposerExpected(ctx context.Context) error { return nil }
|
||||
@@ -15,5 +15,12 @@ import (
|
||||
// durably persisted before returning a non-error value.
|
||||
type AvailabilityStore interface {
|
||||
IsDataAvailable(ctx context.Context, current primitives.Slot, b blocks.ROBlock) error
|
||||
AvailabilityChecker
|
||||
Persist(current primitives.Slot, blobSidecar ...blocks.ROBlob) error
|
||||
}
|
||||
|
||||
// AvailabilityChecker is the minimum interface needed to check if data is available for a block.
|
||||
// We should prefer this interface over AvailabilityStore in places where we don't need to persist blob data.
|
||||
type AvailabilityChecker interface {
|
||||
IsDataAvailable(ctx context.Context, current primitives.Slot, b blocks.ROBlock) error
|
||||
}
|
||||
|
||||
@@ -13,7 +13,7 @@ type MockAvailabilityStore struct {
|
||||
PersistBlobsCallback func(current primitives.Slot, blobSidecar ...blocks.ROBlob) error
|
||||
}
|
||||
|
||||
var _ AvailabilityStore = &MockAvailabilityStore{}
|
||||
var _ AvailabilityChecker = &MockAvailabilityStore{}
|
||||
|
||||
// IsDataAvailable satisfies the corresponding method of the AvailabilityStore interface in a way that is useful for tests.
|
||||
func (m *MockAvailabilityStore) IsDataAvailable(ctx context.Context, current primitives.Slot, b blocks.ROBlock) error {
|
||||
|
||||
@@ -1108,7 +1108,7 @@ func (b *BeaconNode) registerPrunerService(cliCtx *cli.Context) error {
|
||||
|
||||
func (b *BeaconNode) RegisterBackfillService(cliCtx *cli.Context, bfs *backfill.Store) error {
|
||||
pa := peers.NewAssigner(b.fetchP2P().Peers(), b.forkChoicer)
|
||||
bf, err := backfill.NewService(cliCtx.Context, bfs, b.BlobStorage, b.clockWaiter, b.fetchP2P(), pa, b.BackfillOpts...)
|
||||
bf, err := backfill.NewService(cliCtx.Context, bfs, b.BlobStorage, b.DataColumnStorage, b.clockWaiter, b.fetchP2P(), pa, b.BackfillOpts...)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "error initializing backfill service")
|
||||
}
|
||||
|
||||
@@ -42,7 +42,7 @@ func (a *Assigner) freshPeers() ([]peer.ID, error) {
|
||||
if flags.Get().MinimumSyncPeers < required {
|
||||
required = flags.Get().MinimumSyncPeers
|
||||
}
|
||||
_, peers := a.ps.BestFinalized(params.BeaconConfig().MaxPeersToSync, a.fc.FinalizedCheckpoint().Epoch)
|
||||
_, peers := a.ps.BestFinalized(-1, a.fc.FinalizedCheckpoint().Epoch)
|
||||
if len(peers) < required {
|
||||
log.WithFields(logrus.Fields{
|
||||
"suitable": len(peers),
|
||||
@@ -52,27 +52,33 @@ func (a *Assigner) freshPeers() ([]peer.ID, error) {
|
||||
return peers, nil
|
||||
}
|
||||
|
||||
type AssignmentFilter func([]peer.ID) []peer.ID
|
||||
|
||||
// Assign uses the "BestFinalized" method to select the best peers that agree on a canonical block
|
||||
// for the configured finalized epoch. At most `n` peers will be returned. The `busy` param can be used
|
||||
// to filter out peers that we know we don't want to connect to, for instance if we are trying to limit
|
||||
// the number of outbound requests to each peer from a given component.
|
||||
func (a *Assigner) Assign(busy map[peer.ID]bool, n int) ([]peer.ID, error) {
|
||||
func (a *Assigner) Assign(filter AssignmentFilter) ([]peer.ID, error) {
|
||||
best, err := a.freshPeers()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return pickBest(busy, n, best), nil
|
||||
return filter(best), nil
|
||||
}
|
||||
|
||||
func pickBest(busy map[peer.ID]bool, n int, best []peer.ID) []peer.ID {
|
||||
ps := make([]peer.ID, 0, n)
|
||||
for _, p := range best {
|
||||
if len(ps) == n {
|
||||
return ps
|
||||
}
|
||||
if !busy[p] {
|
||||
ps = append(ps, p)
|
||||
// NotBusy is a filter that returns a list of peer.IDs with len() <= n, which are not in the `busy` map.
|
||||
// n == -1 will return all peers that are not busy.
|
||||
func NotBusy(busy map[peer.ID]bool, n int) AssignmentFilter {
|
||||
return func(peers []peer.ID) []peer.ID {
|
||||
ps := make([]peer.ID, 0)
|
||||
for _, p := range peers {
|
||||
if n > 0 && len(ps) == n {
|
||||
return ps
|
||||
}
|
||||
if !busy[p] {
|
||||
ps = append(ps, p)
|
||||
}
|
||||
}
|
||||
return ps
|
||||
}
|
||||
return ps
|
||||
}
|
||||
|
||||
@@ -18,8 +18,9 @@ func TestPickBest(t *testing.T) {
|
||||
expected []peer.ID
|
||||
}{
|
||||
{
|
||||
name: "",
|
||||
n: 0,
|
||||
name: "don't limit",
|
||||
n: 0,
|
||||
expected: best,
|
||||
},
|
||||
{
|
||||
name: "none busy",
|
||||
@@ -88,7 +89,8 @@ func TestPickBest(t *testing.T) {
|
||||
if c.best == nil {
|
||||
c.best = best
|
||||
}
|
||||
pb := pickBest(c.busy, c.n, c.best)
|
||||
filt := NotBusy(c.busy, c.n)
|
||||
pb := filt(c.best)
|
||||
require.Equal(t, len(c.expected), len(pb))
|
||||
for i := range c.expected {
|
||||
require.Equal(t, c.expected[i], pb[i])
|
||||
|
||||
@@ -770,7 +770,7 @@ func (p *Status) BestFinalized(maxPeers int, ourFinalizedEpoch primitives.Epoch)
|
||||
}
|
||||
|
||||
// Trim potential peers to at most maxPeers.
|
||||
if len(potentialPIDs) > maxPeers {
|
||||
if maxPeers > 0 && len(potentialPIDs) > maxPeers {
|
||||
potentialPIDs = potentialPIDs[:maxPeers]
|
||||
}
|
||||
|
||||
|
||||
@@ -8,6 +8,7 @@ go_library(
|
||||
"broadcast_bls_changes.go",
|
||||
"context.go",
|
||||
"custody.go",
|
||||
"data_column_assignment.go",
|
||||
"data_column_sidecars.go",
|
||||
"data_columns_reconstruct.go",
|
||||
"deadlines.go",
|
||||
@@ -137,6 +138,7 @@ go_library(
|
||||
"//time:go_default_library",
|
||||
"//time/slots:go_default_library",
|
||||
"@com_github_ethereum_go_ethereum//common/hexutil:go_default_library",
|
||||
"@com_github_ethereum_go_ethereum//p2p/enode:go_default_library",
|
||||
"@com_github_hashicorp_golang_lru//:go_default_library",
|
||||
"@com_github_libp2p_go_libp2p//core:go_default_library",
|
||||
"@com_github_libp2p_go_libp2p//core/host:go_default_library",
|
||||
|
||||
@@ -6,6 +6,8 @@ go_library(
|
||||
"batch.go",
|
||||
"batcher.go",
|
||||
"blobs.go",
|
||||
"columns.go",
|
||||
"fulu_transition.go",
|
||||
"log.go",
|
||||
"metrics.go",
|
||||
"pool.go",
|
||||
@@ -18,6 +20,7 @@ go_library(
|
||||
visibility = ["//visibility:public"],
|
||||
deps = [
|
||||
"//beacon-chain/core/helpers:go_default_library",
|
||||
"//beacon-chain/core/peerdas:go_default_library",
|
||||
"//beacon-chain/core/signing:go_default_library",
|
||||
"//beacon-chain/das:go_default_library",
|
||||
"//beacon-chain/db:go_default_library",
|
||||
@@ -66,6 +69,7 @@ go_test(
|
||||
"//beacon-chain/das:go_default_library",
|
||||
"//beacon-chain/db:go_default_library",
|
||||
"//beacon-chain/db/filesystem:go_default_library",
|
||||
"//beacon-chain/p2p/peers:go_default_library",
|
||||
"//beacon-chain/p2p/testing:go_default_library",
|
||||
"//beacon-chain/startup:go_default_library",
|
||||
"//beacon-chain/state:go_default_library",
|
||||
|
||||
@@ -6,7 +6,6 @@ import (
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/das"
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/sync"
|
||||
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
|
||||
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
|
||||
@@ -38,8 +37,10 @@ func (s batchState) String() string {
|
||||
return "import_complete"
|
||||
case batchEndSequence:
|
||||
return "end_sequence"
|
||||
case batchBlobSync:
|
||||
return "blob_sync"
|
||||
case batchSyncBlobs:
|
||||
return "sync_blobs"
|
||||
case batchSyncColumns:
|
||||
return "sync_columns"
|
||||
default:
|
||||
return "unknown"
|
||||
}
|
||||
@@ -50,7 +51,9 @@ const (
|
||||
batchInit
|
||||
batchSequenced
|
||||
batchErrRetryable
|
||||
batchBlobSync
|
||||
batchErrFatal
|
||||
batchSyncBlobs
|
||||
batchSyncColumns
|
||||
batchImportable
|
||||
batchImportComplete
|
||||
batchEndSequence
|
||||
@@ -72,9 +75,12 @@ type batch struct {
|
||||
err error
|
||||
state batchState
|
||||
busy peer.ID
|
||||
nextReqCols []uint64
|
||||
blockPid peer.ID
|
||||
blobPid peer.ID
|
||||
columnPid peer.ID
|
||||
bs *blobSync
|
||||
cs *columnSync
|
||||
}
|
||||
|
||||
func (b batch) logFields() logrus.Fields {
|
||||
@@ -93,6 +99,9 @@ func (b batch) logFields() logrus.Fields {
|
||||
if b.retries > 0 {
|
||||
f["retryAfter"] = b.retryAfter.String()
|
||||
}
|
||||
if b.state == batchSyncColumns {
|
||||
f["nextColumns"] = fmt.Sprintf("%v", b.nextReqCols)
|
||||
}
|
||||
return f
|
||||
}
|
||||
|
||||
@@ -136,22 +145,29 @@ func (b batch) blobRequest() *eth.BlobSidecarsByRangeRequest {
|
||||
}
|
||||
}
|
||||
|
||||
func (b batch) withResults(results verifiedROBlocks, bs *blobSync) batch {
|
||||
func (b batch) postBlockSync(results verifiedROBlocks, bs *blobSync, cs *columnSync) batch {
|
||||
b.results = results
|
||||
b.bs = bs
|
||||
b.cs = cs
|
||||
if bs.blobsNeeded() > 0 {
|
||||
return b.withState(batchBlobSync)
|
||||
return b.withState(batchSyncBlobs)
|
||||
}
|
||||
if len(cs.columnsNeeded()) > 0 {
|
||||
return b.withState(batchSyncColumns)
|
||||
}
|
||||
return b.withState(batchImportable)
|
||||
}
|
||||
|
||||
func (b batch) postBlobSync() batch {
|
||||
func (b batch) postSidecarSync() batch {
|
||||
if b.blobsNeeded() > 0 {
|
||||
log.WithFields(b.logFields()).WithField("blobsMissing", b.blobsNeeded()).Error("Batch still missing blobs after downloading from peer")
|
||||
b.bs = nil
|
||||
b.results = []blocks.ROBlock{}
|
||||
return b.withState(batchErrRetryable)
|
||||
}
|
||||
if len(b.cs.columnsNeeded()) > 0 {
|
||||
return b.withState(batchSyncColumns)
|
||||
}
|
||||
return b.withState(batchImportable)
|
||||
}
|
||||
|
||||
@@ -187,6 +203,11 @@ func (b batch) withRetryableError(err error) batch {
|
||||
return b.withState(batchErrRetryable)
|
||||
}
|
||||
|
||||
func (b batch) withFatalError(err error) batch {
|
||||
b.err = errors.Wrap(err, "fatal erorr in batch")
|
||||
return b.withState(batchErrFatal)
|
||||
}
|
||||
|
||||
func (b batch) blobsNeeded() int {
|
||||
return b.bs.blobsNeeded()
|
||||
}
|
||||
@@ -195,8 +216,8 @@ func (b batch) blobResponseValidator() sync.BlobResponseValidation {
|
||||
return b.bs.validateNext
|
||||
}
|
||||
|
||||
func (b batch) availabilityStore() das.AvailabilityStore {
|
||||
return b.bs.store
|
||||
func (b batch) validatingColumnRequest() *validatingColumnRequest {
|
||||
return b.cs.newValidatingColumnRequest(b.nextReqCols)
|
||||
}
|
||||
|
||||
var batchBlockUntil = func(ctx context.Context, untilRetry time.Duration, b batch) error {
|
||||
@@ -223,6 +244,18 @@ func (b batch) waitUntilReady(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b batch) workComplete() bool {
|
||||
return b.state == batchImportable
|
||||
}
|
||||
|
||||
func (b batch) selectPeer(ranking *sync.ColumnScarcityRanking, busy map[peer.ID]bool) (peer.ID, []uint64, error) {
|
||||
if b.state == batchSyncColumns {
|
||||
return ranking.ForColumns(b.cs.columnsNeeded(), busy)
|
||||
}
|
||||
peer, err := ranking.ForBlocks(busy)
|
||||
return peer, nil, err
|
||||
}
|
||||
|
||||
func sortBatchDesc(bb []batch) {
|
||||
sort.Slice(bb, func(i, j int) bool {
|
||||
return bb[i].end > bb[j].end
|
||||
|
||||
@@ -48,7 +48,7 @@ func newBlobSync(current primitives.Slot, vbs verifiedROBlocks, cfg *blobSyncCon
|
||||
type blobVerifierMap map[[32]byte][]verification.BlobVerifier
|
||||
|
||||
type blobSync struct {
|
||||
store das.AvailabilityStore
|
||||
store *das.LazilyPersistentStoreBlob
|
||||
expected []blobSummary
|
||||
next int
|
||||
bbv *blobBatchVerifier
|
||||
|
||||
268
beacon-chain/sync/backfill/columns.go
Normal file
268
beacon-chain/sync/backfill/columns.go
Normal file
@@ -0,0 +1,268 @@
|
||||
package backfill
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/peerdas"
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/das"
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p"
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/sync"
|
||||
"github.com/OffchainLabs/prysm/v6/config/params"
|
||||
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
|
||||
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
|
||||
ethpb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1"
|
||||
"github.com/OffchainLabs/prysm/v6/time/slots"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
var (
|
||||
errInvalidResponseOrder = errors.New("out of order DataColumnSidecar response")
|
||||
errColumnResponseSlotOutOfRange = errors.New("slot out of range for DataColumnSidecar response")
|
||||
errColumnIndexNotRequested = errors.New("index in DataColumnSidecar response not requested")
|
||||
)
|
||||
|
||||
type columnBatch struct {
|
||||
first primitives.Slot
|
||||
last primitives.Slot
|
||||
custodyRequirement peerdas.ColumnIndices
|
||||
blockColumnsByRoot map[[32]byte]*blockColumns
|
||||
}
|
||||
|
||||
type blockColumns struct {
|
||||
remaining peerdas.ColumnIndices
|
||||
commitments [][]byte
|
||||
}
|
||||
|
||||
func (cs *columnBatch) needed() peerdas.ColumnIndices {
|
||||
if len(cs.custodyRequirement) == 0 {
|
||||
return nil
|
||||
}
|
||||
search := peerdas.CopyTrueIndices(cs.custodyRequirement)
|
||||
ci := make(peerdas.ColumnIndices, len(search))
|
||||
// avoid iterating every single block+index by only searching for indices
|
||||
// we haven't found yet.
|
||||
for _, v := range cs.blockColumnsByRoot {
|
||||
if len(search) == 0 {
|
||||
return ci
|
||||
}
|
||||
for col := range search {
|
||||
if v.remaining[col] {
|
||||
ci[col] = true
|
||||
// We found the column, so we can delete it from the search.
|
||||
delete(search, col)
|
||||
}
|
||||
}
|
||||
}
|
||||
return ci
|
||||
}
|
||||
|
||||
type columnSync struct {
|
||||
*columnBatch
|
||||
store *das.LazilyPersistentStoreColumn
|
||||
current primitives.Slot
|
||||
}
|
||||
|
||||
func newColumnSync(b batch, blks verifiedROBlocks, current primitives.Slot, p p2p.P2P, vbs verifiedROBlocks, cfg *workerCfg) (*columnSync, error) {
|
||||
cgc, err := p.CustodyGroupCount()
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "custody group count")
|
||||
}
|
||||
cb, err := buildColumnBatch(b, blks, p)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if cb == nil {
|
||||
return &columnSync{}, nil
|
||||
}
|
||||
return &columnSync{
|
||||
columnBatch: cb,
|
||||
current: current,
|
||||
store: das.NewLazilyPersistentStoreColumn(cfg.cfs, p.NodeID(), cfg.ndcv, cgc),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (cs *columnSync) blockColumns(root [32]byte) *blockColumns {
|
||||
if cs.columnBatch == nil {
|
||||
return nil
|
||||
}
|
||||
return cs.columnBatch.blockColumnsByRoot[root]
|
||||
}
|
||||
|
||||
func (cs *columnSync) columnsNeeded() peerdas.ColumnIndices {
|
||||
if cs.columnBatch == nil {
|
||||
return nil
|
||||
}
|
||||
return cs.columnBatch.needed()
|
||||
}
|
||||
|
||||
func (cs *columnSync) request(reqCols []uint64) *ethpb.DataColumnSidecarsByRangeRequest {
|
||||
return sync.DataColumnSidecarsByRangeRequest(reqCols, cs.first, cs.last)
|
||||
}
|
||||
|
||||
func (cs *columnSync) newValidatingColumnRequest(cols []uint64) *validatingColumnRequest {
|
||||
req := cs.request(cols)
|
||||
if req == nil {
|
||||
return nil
|
||||
}
|
||||
return &validatingColumnRequest{
|
||||
req: req,
|
||||
columns: peerdas.ColumnIndicesFromSlice(cols),
|
||||
cs: cs,
|
||||
}
|
||||
}
|
||||
|
||||
type validatingColumnRequest struct {
|
||||
last primitives.Slot
|
||||
req *ethpb.DataColumnSidecarsByRangeRequest
|
||||
columns map[uint64]bool
|
||||
cs *columnSync
|
||||
}
|
||||
|
||||
func (v *validatingColumnRequest) validate(cd blocks.RODataColumn) error {
|
||||
defer func(start time.Time) {
|
||||
dataColumnSidecarVerifyMs.Observe(float64(time.Since(start).Milliseconds()))
|
||||
}(time.Now())
|
||||
return recordColumnSidecarDownload(cd, v.countedValidation(cd))
|
||||
}
|
||||
|
||||
func recordColumnSidecarDownload(cd blocks.RODataColumn, valid bool) error {
|
||||
validity := "invalid"
|
||||
if valid {
|
||||
validity = "valid"
|
||||
}
|
||||
dataColumnSidecarDownloadCount.WithLabelValues(fmt.Sprintf("%d", cd.Index), validity).Inc()
|
||||
dataColumnSidecarDownloadBytes.Add(float64(cd.SizeSSZ()))
|
||||
if !valid {
|
||||
return errors.New("invalid data column sidecar")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// When we call Persist we'll get the verification checks that are provided by the availability store.
|
||||
// In addition to those checks this function calls rpcValidity which maintains a state machine across
|
||||
// response values to ensure that the response is valid in the context of the overall request,
|
||||
// like making sure that the block roots is one of the ones we expect based on the blocks we used to
|
||||
// construct the request. It also does cheap sanity checks on the DataColumnSidecar values like
|
||||
// ensuring that the commitments line up with the block.
|
||||
func (v *validatingColumnRequest) countedValidation(cd blocks.RODataColumn) bool {
|
||||
if err := v.rpcValidity(cd); err != nil {
|
||||
log.WithError(err).WithField("slot", cd.Slot()).WithField("index", cd.Index).Error("invalid data column sidecar response")
|
||||
return false
|
||||
}
|
||||
root := cd.BlockRoot()
|
||||
expected := v.cs.blockColumns(root)
|
||||
if expected == nil {
|
||||
return false
|
||||
}
|
||||
// We don't need this column, but we trust the column state machine verified we asked for it as part of a range request.
|
||||
// So we can just skip over it and not try to persist it.
|
||||
if !expected.remaining[cd.Index] {
|
||||
return true
|
||||
}
|
||||
if len(cd.KzgCommitments) != len(expected.commitments) {
|
||||
log.WithField("slot", cd.Slot()).WithField("index", cd.Index).Error("unexpected number of commitments in data column sidecar")
|
||||
return false
|
||||
}
|
||||
for i, cmt := range cd.KzgCommitments {
|
||||
if !bytes.Equal(cmt, expected.commitments[i]) {
|
||||
log.WithField("slot", cd.Slot()).WithField("index", cd.Index).WithField("cmtIndex", i).Error("commitment in data column sidecar does not match expected commitment")
|
||||
return false
|
||||
}
|
||||
}
|
||||
if err := v.cs.store.Persist(v.cs.current, cd); err != nil {
|
||||
log.WithError(err).Error("failed to persist data column")
|
||||
return false
|
||||
}
|
||||
delete(expected.remaining, cd.Index)
|
||||
return true
|
||||
}
|
||||
|
||||
// rpcValidity checks that the individual DataColumnSidecar value is valid in the context of the overall response
|
||||
// respecting the p2p spec rules for DataColumnSidecarByRange responses:
|
||||
// - values are in the requsted slot range
|
||||
// - values are in slot order
|
||||
// - block roots are canonical wrt the blocks we believe are canonical
|
||||
// (assuming previous block response from another peer was honest)
|
||||
// - there are not too many values in the response
|
||||
// - the column index is one of the requested columns
|
||||
func (v *validatingColumnRequest) rpcValidity(col blocks.RODataColumn) error {
|
||||
slot := col.Slot()
|
||||
if v.last > slot {
|
||||
return errInvalidResponseOrder
|
||||
}
|
||||
if slot < v.req.StartSlot {
|
||||
return errors.Wrap(errColumnResponseSlotOutOfRange, "sidecar slot before request start")
|
||||
}
|
||||
if slot >= v.req.StartSlot+primitives.Slot(v.req.Count) {
|
||||
return errors.Wrap(errColumnResponseSlotOutOfRange, "sidecar slot after request end")
|
||||
}
|
||||
// This is an important check because we may have already satisfied this column for a given
|
||||
// block root but still requested it for the benefit of other blocks in the batch. So this check ensures
|
||||
// that it was part ofthe overall batch request.
|
||||
if !v.columns[col.Index] {
|
||||
return errColumnIndexNotRequested
|
||||
}
|
||||
v.last = col.Slot()
|
||||
return nil
|
||||
}
|
||||
|
||||
func buildColumnBatch(b batch, fuluBlocks verifiedROBlocks, p p2p.P2P) (*columnBatch, error) {
|
||||
if len(fuluBlocks) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
fuluStart := params.BeaconConfig().FuluForkEpoch
|
||||
// If the batch end slot or last result block are pre-fulu, so are the rest.
|
||||
if slots.ToEpoch(b.end) < fuluStart || slots.ToEpoch(fuluBlocks[len(fuluBlocks)-1].Block().Slot()) < fuluStart {
|
||||
return nil, nil
|
||||
}
|
||||
// The last block in the batch is in fulu, but the first one is not.
|
||||
// Find the index of the first fulu block to exclude the pre-fulu blocks.
|
||||
if slots.ToEpoch(fuluBlocks[0].Block().Slot()) < fuluStart {
|
||||
fuluStart := sort.Search(len(fuluBlocks), func(i int) bool {
|
||||
return slots.ToEpoch(fuluBlocks[i].Block().Slot()) >= fuluStart
|
||||
})
|
||||
fuluBlocks = fuluBlocks[fuluStart:]
|
||||
}
|
||||
cgc, err := p.CustodyGroupCount()
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "custody group count")
|
||||
}
|
||||
|
||||
// Note that in the case where custody_group_count is the minimum CUSTODY_REQUIREMENT, we will
|
||||
// still download the extra columns dictated by SAMPLES_PER_SLOT. This is a hack to avoid complexity in the DA check.
|
||||
// We may want to revisit this to reduce bandwidth and storage for nodes with 0 validators attached.
|
||||
peerInfo, _, err := peerdas.Info(p.NodeID(), max(cgc, params.BeaconConfig().SamplesPerSlot))
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "peer info")
|
||||
}
|
||||
indices := peerdas.CopyTrueIndices(peerInfo.CustodyColumns)
|
||||
|
||||
summary := &columnBatch{
|
||||
custodyRequirement: indices,
|
||||
blockColumnsByRoot: make(map[[32]byte]*blockColumns, len(fuluBlocks)),
|
||||
}
|
||||
for _, b := range fuluBlocks {
|
||||
cmts, err := b.Block().Body().BlobKzgCommitments()
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "failed to get blob kzg commitments")
|
||||
}
|
||||
if len(cmts) == 0 {
|
||||
continue
|
||||
}
|
||||
slot := b.Block().Slot()
|
||||
if len(summary.blockColumnsByRoot) == 0 {
|
||||
summary.first = slot
|
||||
}
|
||||
summary.blockColumnsByRoot[b.Root()] = &blockColumns{
|
||||
remaining: peerdas.CopyTrueIndices(indices),
|
||||
commitments: cmts,
|
||||
}
|
||||
summary.last = slot
|
||||
}
|
||||
|
||||
return summary, nil
|
||||
}
|
||||
54
beacon-chain/sync/backfill/fulu_transition.go
Normal file
54
beacon-chain/sync/backfill/fulu_transition.go
Normal file
@@ -0,0 +1,54 @@
|
||||
package backfill
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/das"
|
||||
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
|
||||
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
var errAvailabilityCheckerInvalid = errors.New("invalid availability checker state")
|
||||
|
||||
type multiStore struct {
|
||||
fuluStart primitives.Slot
|
||||
columnStore das.AvailabilityChecker
|
||||
blobStore das.AvailabilityChecker
|
||||
}
|
||||
|
||||
// Persist implements das.AvailabilityStore.
|
||||
var _ das.AvailabilityChecker = &multiStore{}
|
||||
|
||||
// IsDataAvailable implements the das.AvailabilityStore interface.
|
||||
func (m *multiStore) IsDataAvailable(ctx context.Context, current primitives.Slot, blk blocks.ROBlock) error {
|
||||
if blk.Block().Slot() < m.fuluStart {
|
||||
return m.checkAvailabilityWithFallback(ctx, m.blobStore, current, blk)
|
||||
}
|
||||
return m.checkAvailabilityWithFallback(ctx, m.columnStore, current, blk)
|
||||
}
|
||||
|
||||
func (m *multiStore) checkAvailabilityWithFallback(ctx context.Context, ac das.AvailabilityChecker, current primitives.Slot, blk blocks.ROBlock) error {
|
||||
if ac != nil {
|
||||
return ac.IsDataAvailable(ctx, current, blk)
|
||||
}
|
||||
cmts, err := blk.Block().Body().BlobKzgCommitments()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(cmts) > 0 {
|
||||
return errAvailabilityCheckerInvalid
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func newMultiStore(fuluStart primitives.Slot, b batch) *multiStore {
|
||||
s := &multiStore{fuluStart: fuluStart}
|
||||
if b.bs != nil && b.bs.store != nil {
|
||||
s.blobStore = b.bs.store
|
||||
}
|
||||
if b.cs != nil && b.cs.store != nil {
|
||||
s.columnStore = b.cs.store
|
||||
}
|
||||
return s
|
||||
}
|
||||
@@ -21,40 +21,24 @@ var (
|
||||
Help: "Number of batches that are ready to be imported once they can be connected to the existing chain.",
|
||||
},
|
||||
)
|
||||
backfillRemainingBatches = promauto.NewGauge(
|
||||
batchesRemaining = promauto.NewGauge(
|
||||
prometheus.GaugeOpts{
|
||||
Name: "backfill_remaining_batches",
|
||||
Help: "Backfill remaining batches.",
|
||||
},
|
||||
)
|
||||
backfillBatchesImported = promauto.NewCounter(
|
||||
batchesImported = promauto.NewCounter(
|
||||
prometheus.CounterOpts{
|
||||
Name: "backfill_batches_imported",
|
||||
Help: "Number of backfill batches downloaded and imported.",
|
||||
},
|
||||
)
|
||||
backfillBlocksApproximateBytes = promauto.NewCounter(
|
||||
prometheus.CounterOpts{
|
||||
Name: "backfill_blocks_bytes_downloaded",
|
||||
Help: "BeaconBlock bytes downloaded from peers for backfill.",
|
||||
},
|
||||
)
|
||||
backfillBlobsApproximateBytes = promauto.NewCounter(
|
||||
prometheus.CounterOpts{
|
||||
Name: "backfill_blobs_bytes_downloaded",
|
||||
Help: "BlobSidecar bytes downloaded from peers for backfill.",
|
||||
},
|
||||
)
|
||||
backfillBlobsDownloadCount = promauto.NewCounter(
|
||||
prometheus.CounterOpts{
|
||||
Name: "backfill_blobs_download_count",
|
||||
Help: "Number of BlobSidecar values downloaded from peers for backfill.",
|
||||
},
|
||||
)
|
||||
backfillBlocksDownloadCount = promauto.NewCounter(
|
||||
prometheus.CounterOpts{
|
||||
Name: "backfill_blocks_download_count",
|
||||
Help: "Number of BeaconBlock values downloaded from peers for backfill.",
|
||||
|
||||
backfillBatchTimeWaiting = promauto.NewHistogram(
|
||||
prometheus.HistogramOpts{
|
||||
Name: "backfill_batch_time_waiting",
|
||||
Help: "Time batch waited for a suitable peer.",
|
||||
Buckets: []float64{50, 100, 300, 1000, 2000},
|
||||
},
|
||||
)
|
||||
backfillBatchTimeRoundtrip = promauto.NewHistogram(
|
||||
@@ -64,43 +48,90 @@ var (
|
||||
Buckets: []float64{400, 800, 1600, 3200, 6400, 12800},
|
||||
},
|
||||
)
|
||||
backfillBatchTimeWaiting = promauto.NewHistogram(
|
||||
prometheus.HistogramOpts{
|
||||
Name: "backfill_batch_time_waiting",
|
||||
Help: "Time batch waited for a suitable peer.",
|
||||
Buckets: []float64{50, 100, 300, 1000, 2000},
|
||||
|
||||
blockDownloadCount = promauto.NewCounter(
|
||||
prometheus.CounterOpts{
|
||||
Name: "backfill_blocks_download_count",
|
||||
Help: "Number of BeaconBlock values downloaded from peers for backfill.",
|
||||
},
|
||||
)
|
||||
backfillBatchTimeDownloadingBlocks = promauto.NewHistogram(
|
||||
blockDownloadBytesApprox = promauto.NewCounter(
|
||||
prometheus.CounterOpts{
|
||||
Name: "backfill_blocks_bytes_downloaded",
|
||||
Help: "BeaconBlock bytes downloaded from peers for backfill.",
|
||||
},
|
||||
)
|
||||
blockDownloadMs = promauto.NewHistogram(
|
||||
prometheus.HistogramOpts{
|
||||
Name: "backfill_batch_blocks_time_download",
|
||||
Help: "Time, in milliseconds, batch spent downloading blocks from peer.",
|
||||
Help: "BeaconBlock download time, in ms.",
|
||||
Buckets: []float64{100, 300, 1000, 2000, 4000, 8000},
|
||||
},
|
||||
)
|
||||
backfillBatchTimeDownloadingBlobs = promauto.NewHistogram(
|
||||
prometheus.HistogramOpts{
|
||||
Name: "backfill_batch_blobs_time_download",
|
||||
Help: "Time, in milliseconds, batch spent downloading blobs from peer.",
|
||||
Buckets: []float64{100, 300, 1000, 2000, 4000, 8000},
|
||||
},
|
||||
)
|
||||
backfillBatchTimeVerifying = promauto.NewHistogram(
|
||||
blockVerifyMs = promauto.NewHistogram(
|
||||
prometheus.HistogramOpts{
|
||||
Name: "backfill_batch_time_verify",
|
||||
Help: "Time batch spent downloading blocks from peer.",
|
||||
Help: "BeaconBlock verification time, in ms.",
|
||||
Buckets: []float64{100, 300, 1000, 2000, 4000, 8000},
|
||||
},
|
||||
)
|
||||
|
||||
blobSidecarDownloadCount = promauto.NewCounter(
|
||||
prometheus.CounterOpts{
|
||||
Name: "backfill_blobs_download_count",
|
||||
Help: "Number of BlobSidecar values downloaded from peers for backfill.",
|
||||
},
|
||||
)
|
||||
blobSidecarDownloadBytesApprox = promauto.NewCounter(
|
||||
prometheus.CounterOpts{
|
||||
Name: "backfill_blobs_bytes_downloaded",
|
||||
Help: "BlobSidecar bytes downloaded from peers for backfill.",
|
||||
},
|
||||
)
|
||||
blobSidecarDownloadMs = promauto.NewHistogram(
|
||||
prometheus.HistogramOpts{
|
||||
Name: "backfill_batch_blobs_time_download",
|
||||
Help: "BlobSidecar download time, in ms.",
|
||||
Buckets: []float64{100, 300, 1000, 2000, 4000, 8000},
|
||||
},
|
||||
)
|
||||
|
||||
dataColumnSidecarDownloadCount = promauto.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Name: "backfill_data_column_sidecar_downloaded",
|
||||
Help: "Number of DataColumnSidecar values downloaded from peers for backfill.",
|
||||
},
|
||||
[]string{"index", "validity"},
|
||||
)
|
||||
dataColumnSidecarDownloadBytes = promauto.NewCounter(
|
||||
prometheus.CounterOpts{
|
||||
Name: "backfill_data_column_sidecar_bytes_downloaded",
|
||||
Help: "DataColumnSidecar bytes downloaded from peers for backfill.",
|
||||
},
|
||||
)
|
||||
dataColumnSidecarDownloadMs = promauto.NewHistogram(
|
||||
prometheus.HistogramOpts{
|
||||
Name: "backfill_batch_columns_time_download",
|
||||
Help: "DataColumnSidecars download time, in ms.",
|
||||
Buckets: []float64{100, 300, 1000, 2000, 4000, 8000},
|
||||
},
|
||||
)
|
||||
dataColumnSidecarVerifyMs = promauto.NewHistogram(
|
||||
prometheus.HistogramOpts{
|
||||
Name: "backfill_batch_columns_time_verify",
|
||||
Help: "DataColumnSidecars verification time, in ms.",
|
||||
Buckets: []float64{100, 300, 1000, 2000, 4000, 8000},
|
||||
},
|
||||
)
|
||||
)
|
||||
|
||||
func blobValidationMetrics(_ blocks.ROBlob) error {
|
||||
backfillBlobsDownloadCount.Inc()
|
||||
blobSidecarDownloadCount.Inc()
|
||||
return nil
|
||||
}
|
||||
|
||||
func blockValidationMetrics(interfaces.ReadOnlySignedBeaconBlock) error {
|
||||
backfillBlocksDownloadCount.Inc()
|
||||
blockDownloadCount.Inc()
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -2,22 +2,18 @@ package backfill
|
||||
|
||||
import (
|
||||
"context"
|
||||
"math"
|
||||
"time"
|
||||
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/db/filesystem"
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p"
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers"
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/startup"
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/sync"
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/verification"
|
||||
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type batchWorkerPool interface {
|
||||
spawn(ctx context.Context, n int, clock *startup.Clock, a PeerAssigner, v *verifier, cm sync.ContextByteVersions, blobVerifier verification.NewBlobVerifier, bfs *filesystem.BlobStorage)
|
||||
spawn(ctx context.Context, n int, a PeerAssigner, cfg *workerCfg)
|
||||
todo(b batch)
|
||||
complete() (batch, error)
|
||||
}
|
||||
@@ -26,11 +22,11 @@ type worker interface {
|
||||
run(context.Context)
|
||||
}
|
||||
|
||||
type newWorker func(id workerId, in, out chan batch, c *startup.Clock, v *verifier, cm sync.ContextByteVersions, nbv verification.NewBlobVerifier, bfs *filesystem.BlobStorage) worker
|
||||
type newWorker func(id workerId, in, out chan batch, cfg *workerCfg) worker
|
||||
|
||||
func defaultNewWorker(p p2p.P2P) newWorker {
|
||||
return func(id workerId, in, out chan batch, c *startup.Clock, v *verifier, cm sync.ContextByteVersions, nbv verification.NewBlobVerifier, bfs *filesystem.BlobStorage) worker {
|
||||
return newP2pWorker(id, p, in, out, c, v, cm, nbv, bfs)
|
||||
return func(id workerId, in, out chan batch, cfg *workerCfg) worker {
|
||||
return newP2pWorker(id, p, in, out, cfg)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -45,6 +41,8 @@ type p2pBatchWorkerPool struct {
|
||||
endSeq []batch
|
||||
ctx context.Context
|
||||
cancel func()
|
||||
p2p p2p.P2P
|
||||
earliest primitives.Slot
|
||||
}
|
||||
|
||||
var _ batchWorkerPool = &p2pBatchWorkerPool{}
|
||||
@@ -59,14 +57,15 @@ func newP2PBatchWorkerPool(p p2p.P2P, maxBatches int) *p2pBatchWorkerPool {
|
||||
fromWorkers: make(chan batch),
|
||||
maxBatches: maxBatches,
|
||||
shutdownErr: make(chan error),
|
||||
p2p: p,
|
||||
}
|
||||
}
|
||||
|
||||
func (p *p2pBatchWorkerPool) spawn(ctx context.Context, n int, c *startup.Clock, a PeerAssigner, v *verifier, cm sync.ContextByteVersions, nbv verification.NewBlobVerifier, bfs *filesystem.BlobStorage) {
|
||||
func (p *p2pBatchWorkerPool) spawn(ctx context.Context, n int, a PeerAssigner, cfg *workerCfg) {
|
||||
p.ctx, p.cancel = context.WithCancel(ctx)
|
||||
go p.batchRouter(a)
|
||||
for i := 0; i < n; i++ {
|
||||
go p.newWorker(workerId(i), p.toWorkers, p.fromWorkers, c, v, cm, nbv, bfs).run(p.ctx)
|
||||
go p.newWorker(workerId(i), p.toWorkers, p.fromWorkers, cfg).run(p.ctx)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -103,7 +102,6 @@ func (p *p2pBatchWorkerPool) batchRouter(pa PeerAssigner) {
|
||||
busy := make(map[peer.ID]bool)
|
||||
todo := make([]batch, 0)
|
||||
rt := time.NewTicker(time.Second)
|
||||
earliest := primitives.Slot(math.MaxUint64)
|
||||
for {
|
||||
select {
|
||||
case b := <-p.toRouter:
|
||||
@@ -116,50 +114,72 @@ func (p *p2pBatchWorkerPool) batchRouter(pa PeerAssigner) {
|
||||
// to retry failed assignments.
|
||||
case b := <-p.fromWorkers:
|
||||
pid := b.busy
|
||||
busy[pid] = false
|
||||
if b.state == batchBlobSync {
|
||||
todo = append(todo, b)
|
||||
sortBatchDesc(todo)
|
||||
} else {
|
||||
delete(busy, pid)
|
||||
if b.workComplete() {
|
||||
p.fromRouter <- b
|
||||
break
|
||||
}
|
||||
todo = append(todo, b)
|
||||
sortBatchDesc(todo)
|
||||
case <-p.ctx.Done():
|
||||
log.WithError(p.ctx.Err()).Info("p2pBatchWorkerPool context canceled, shutting down")
|
||||
p.shutdown(p.ctx.Err())
|
||||
return
|
||||
}
|
||||
if len(todo) == 0 {
|
||||
continue
|
||||
}
|
||||
// Try to assign as many outstanding batches as possible to peers and feed the assigned batches to workers.
|
||||
assigned, err := pa.Assign(busy, len(todo))
|
||||
var err error
|
||||
todo, err = p.processTodo(todo, pa, busy)
|
||||
if err != nil {
|
||||
if errors.Is(err, peers.ErrInsufficientSuitable) {
|
||||
// Transient error resulting from insufficient number of connected peers. Leave batches in
|
||||
// queue and get to them whenever the peer situation is resolved.
|
||||
continue
|
||||
}
|
||||
p.shutdown(err)
|
||||
return
|
||||
}
|
||||
for _, pid := range assigned {
|
||||
if err := todo[0].waitUntilReady(p.ctx); err != nil {
|
||||
log.WithError(p.ctx.Err()).Info("p2pBatchWorkerPool context canceled, shutting down")
|
||||
p.shutdown(p.ctx.Err())
|
||||
return
|
||||
}
|
||||
busy[pid] = true
|
||||
todo[0].busy = pid
|
||||
p.toWorkers <- todo[0].withPeer(pid)
|
||||
if todo[0].begin < earliest {
|
||||
earliest = todo[0].begin
|
||||
oldestBatch.Set(float64(earliest))
|
||||
}
|
||||
todo = todo[1:]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (p *p2pBatchWorkerPool) processTodo(todo []batch, pa PeerAssigner, busy map[peer.ID]bool) ([]batch, error) {
|
||||
if len(todo) == 0 {
|
||||
return todo, nil
|
||||
}
|
||||
notBusy, err := pa.Assign(peers.NotBusy(busy, -1))
|
||||
if err != nil {
|
||||
if errors.Is(err, peers.ErrInsufficientSuitable) {
|
||||
// Transient error resulting from insufficient number of connected peers. Leave batches in
|
||||
// queue and get to them whenever the peer situation is resolved.
|
||||
return todo, nil
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
peerRank, err := sync.NewColumnScarcityRanking(notBusy, p.p2p)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Failed to compute column matrix for peer assignment")
|
||||
return todo, nil
|
||||
}
|
||||
if len(notBusy) == 0 {
|
||||
log.Warn("No suitable peers available for batch assignment")
|
||||
return todo, nil
|
||||
}
|
||||
for i, b := range todo {
|
||||
pid, cols, err := b.selectPeer(peerRank, busy)
|
||||
if err != nil {
|
||||
log.WithField("not_busy", len(notBusy)).WithError(err).WithFields(b.logFields()).Error("Failed to select peer for batch")
|
||||
// Return the remaining todo items and allow the outer loop to control when we try again.
|
||||
return todo[i:], nil
|
||||
}
|
||||
busy[pid] = true
|
||||
b.busy = pid
|
||||
b.nextReqCols = cols
|
||||
p.toWorkers <- b.withPeer(pid)
|
||||
p.updateEarliest(b.begin)
|
||||
}
|
||||
return []batch{}, nil
|
||||
}
|
||||
|
||||
func (p *p2pBatchWorkerPool) updateEarliest(current primitives.Slot) {
|
||||
if current >= p.earliest {
|
||||
return
|
||||
}
|
||||
p.earliest = current
|
||||
oldestBatch.Set(float64(p.earliest))
|
||||
}
|
||||
|
||||
func (p *p2pBatchWorkerPool) shutdown(err error) {
|
||||
p.cancel()
|
||||
p.shutdownErr <- err
|
||||
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/db/filesystem"
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers"
|
||||
p2ptest "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/testing"
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/startup"
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/sync"
|
||||
@@ -24,7 +25,7 @@ type mockAssigner struct {
|
||||
|
||||
// Assign satisfies the PeerAssigner interface so that mockAssigner can be used in tests
|
||||
// in place of the concrete p2p implementation of PeerAssigner.
|
||||
func (m mockAssigner) Assign(busy map[peer.ID]bool, n int) ([]peer.ID, error) {
|
||||
func (m mockAssigner) Assign(filter peers.AssignmentFilter) ([]peer.ID, error) {
|
||||
if m.err != nil {
|
||||
return nil, m.err
|
||||
}
|
||||
@@ -53,7 +54,8 @@ func TestPoolDetectAllEnded(t *testing.T) {
|
||||
ctxMap, err := sync.ContextByteVersionsForValRoot(bytesutil.ToBytes32(st.GenesisValidatorsRoot()))
|
||||
require.NoError(t, err)
|
||||
bfs := filesystem.NewEphemeralBlobStorage(t)
|
||||
pool.spawn(ctx, nw, startup.NewClock(time.Now(), [32]byte{}), ma, v, ctxMap, mockNewBlobVerifier, bfs)
|
||||
wcfg := &workerCfg{c: startup.NewClock(time.Now(), [32]byte{}), nbv: mockNewBlobVerifier, v: v, cm: ctxMap, bfs: bfs}
|
||||
pool.spawn(ctx, nw, ma, wcfg)
|
||||
br := batcher{min: 10, size: 10}
|
||||
endSeq := br.before(0)
|
||||
require.Equal(t, batchEndSequence, endSeq.state)
|
||||
@@ -72,7 +74,7 @@ type mockPool struct {
|
||||
todoChan chan batch
|
||||
}
|
||||
|
||||
func (m *mockPool) spawn(_ context.Context, _ int, _ *startup.Clock, _ PeerAssigner, _ *verifier, _ sync.ContextByteVersions, _ verification.NewBlobVerifier, _ *filesystem.BlobStorage) {
|
||||
func (m *mockPool) spawn(_ context.Context, _ int, _ PeerAssigner, _ *workerCfg) {
|
||||
}
|
||||
|
||||
func (m *mockPool) todo(b batch) {
|
||||
|
||||
@@ -6,9 +6,10 @@ import (
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/helpers"
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/db/filesystem"
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p"
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers"
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/startup"
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/sync"
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/verification"
|
||||
"github.com/OffchainLabs/prysm/v6/config/params"
|
||||
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
|
||||
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
|
||||
"github.com/OffchainLabs/prysm/v6/encoding/bytesutil"
|
||||
@@ -21,26 +22,26 @@ import (
|
||||
)
|
||||
|
||||
type Service struct {
|
||||
ctx context.Context
|
||||
enabled bool // service is disabled by default while feature is experimental
|
||||
clock *startup.Clock
|
||||
store *Store
|
||||
ms minimumSlotter
|
||||
cw startup.ClockWaiter
|
||||
verifierWaiter InitializerWaiter
|
||||
newBlobVerifier verification.NewBlobVerifier
|
||||
nWorkers int
|
||||
batchSeq *batchSequencer
|
||||
batchSize uint64
|
||||
pool batchWorkerPool
|
||||
verifier *verifier
|
||||
ctxMap sync.ContextByteVersions
|
||||
p2p p2p.P2P
|
||||
pa PeerAssigner
|
||||
batchImporter batchImporter
|
||||
blobStore *filesystem.BlobStorage
|
||||
initSyncWaiter func() error
|
||||
complete chan struct{}
|
||||
ctx context.Context
|
||||
enabled bool // service is disabled by default while feature is experimental
|
||||
clock *startup.Clock
|
||||
store *Store
|
||||
ms minimumSlotter
|
||||
cw startup.ClockWaiter
|
||||
verifierWaiter InitializerWaiter
|
||||
nWorkers int
|
||||
batchSeq *batchSequencer
|
||||
batchSize uint64
|
||||
pool batchWorkerPool
|
||||
p2p p2p.P2P
|
||||
pa PeerAssigner
|
||||
batchImporter batchImporter
|
||||
blobStore *filesystem.BlobStorage
|
||||
dcStore *filesystem.DataColumnStorage
|
||||
initSyncWaiter func() error
|
||||
complete chan struct{}
|
||||
workerCfg *workerCfg
|
||||
fuluStart primitives.Slot
|
||||
}
|
||||
|
||||
var _ runtime.Service = (*Service)(nil)
|
||||
@@ -49,23 +50,13 @@ var _ runtime.Service = (*Service)(nil)
|
||||
// to service an RPC blockRequest. The Assign method takes a map of peers that should be excluded,
|
||||
// allowing the caller to avoid making multiple concurrent requests to the same peer.
|
||||
type PeerAssigner interface {
|
||||
Assign(busy map[peer.ID]bool, n int) ([]peer.ID, error)
|
||||
//Assign(busy map[peer.ID]bool, n int) ([]peer.ID, error)
|
||||
Assign(filter peers.AssignmentFilter) ([]peer.ID, error)
|
||||
}
|
||||
|
||||
type minimumSlotter func(primitives.Slot) primitives.Slot
|
||||
type batchImporter func(ctx context.Context, current primitives.Slot, b batch, su *Store) (*dbval.BackfillStatus, error)
|
||||
|
||||
func defaultBatchImporter(ctx context.Context, current primitives.Slot, b batch, su *Store) (*dbval.BackfillStatus, error) {
|
||||
status := su.status()
|
||||
if err := b.ensureParent(bytesutil.ToBytes32(status.LowParentRoot)); err != nil {
|
||||
return status, err
|
||||
}
|
||||
// Import blocks to db and update db state to reflect the newly imported blocks.
|
||||
// Other parts of the beacon node may use the same StatusUpdater instance
|
||||
// via the coverage.AvailableBlocker interface to safely determine if a given slot has been backfilled.
|
||||
return su.fillBack(ctx, current, b.results, b.availabilityStore())
|
||||
}
|
||||
|
||||
// ServiceOption represents a functional option for the backfill service constructor.
|
||||
type ServiceOption func(*Service) error
|
||||
|
||||
@@ -140,46 +131,31 @@ func WithMinimumSlot(s primitives.Slot) ServiceOption {
|
||||
|
||||
// NewService initializes the backfill Service. Like all implementations of the Service interface,
|
||||
// the service won't begin its runloop until Start() is called.
|
||||
func NewService(ctx context.Context, su *Store, bStore *filesystem.BlobStorage, cw startup.ClockWaiter, p p2p.P2P, pa PeerAssigner, opts ...ServiceOption) (*Service, error) {
|
||||
func NewService(ctx context.Context, su *Store, bStore *filesystem.BlobStorage, dcStore *filesystem.DataColumnStorage, cw startup.ClockWaiter, p p2p.P2P, pa PeerAssigner, opts ...ServiceOption) (*Service, error) {
|
||||
s := &Service{
|
||||
ctx: ctx,
|
||||
store: su,
|
||||
blobStore: bStore,
|
||||
cw: cw,
|
||||
ms: minimumBackfillSlot,
|
||||
p2p: p,
|
||||
pa: pa,
|
||||
batchImporter: defaultBatchImporter,
|
||||
complete: make(chan struct{}),
|
||||
ctx: ctx,
|
||||
store: su,
|
||||
blobStore: bStore,
|
||||
dcStore: dcStore,
|
||||
cw: cw,
|
||||
ms: minimumBackfillSlot,
|
||||
p2p: p,
|
||||
pa: pa,
|
||||
complete: make(chan struct{}),
|
||||
fuluStart: slots.SafeEpochStartOrMax(params.BeaconConfig().FuluForkEpoch),
|
||||
}
|
||||
s.batchImporter = s.defaultBatchImporter
|
||||
for _, o := range opts {
|
||||
if err := o(s); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
s.pool = newP2PBatchWorkerPool(p, s.nWorkers)
|
||||
|
||||
return s, nil
|
||||
}
|
||||
|
||||
func (s *Service) initVerifier(ctx context.Context) (*verifier, sync.ContextByteVersions, error) {
|
||||
cps, err := s.store.originState(ctx)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
keys, err := cps.PublicKeys()
|
||||
if err != nil {
|
||||
return nil, nil, errors.Wrap(err, "unable to retrieve public keys for all validators in the origin state")
|
||||
}
|
||||
vr := cps.GenesisValidatorsRoot()
|
||||
ctxMap, err := sync.ContextByteVersionsForValRoot(bytesutil.ToBytes32(vr))
|
||||
if err != nil {
|
||||
return nil, nil, errors.Wrapf(err, "unable to initialize context version map using genesis validator root %#x", vr)
|
||||
}
|
||||
v, err := newBackfillVerifier(vr, keys)
|
||||
return v, ctxMap, err
|
||||
}
|
||||
|
||||
func (s *Service) updateComplete() bool {
|
||||
b, err := s.pool.complete()
|
||||
if err != nil {
|
||||
@@ -201,7 +177,7 @@ func (s *Service) importBatches(ctx context.Context) {
|
||||
if imported == 0 {
|
||||
return
|
||||
}
|
||||
backfillBatchesImported.Add(float64(imported))
|
||||
batchesImported.Add(float64(imported))
|
||||
}()
|
||||
current := s.clock.CurrentSlot()
|
||||
for i := range importable {
|
||||
@@ -227,7 +203,19 @@ func (s *Service) importBatches(ctx context.Context) {
|
||||
WithField("batchesRemaining", nt).
|
||||
Info("Backfill batches processed")
|
||||
|
||||
backfillRemainingBatches.Set(float64(nt))
|
||||
batchesRemaining.Set(float64(nt))
|
||||
}
|
||||
|
||||
func (s *Service) defaultBatchImporter(ctx context.Context, current primitives.Slot, b batch, su *Store) (*dbval.BackfillStatus, error) {
|
||||
status := su.status()
|
||||
if err := b.ensureParent(bytesutil.ToBytes32(status.LowParentRoot)); err != nil {
|
||||
return status, err
|
||||
}
|
||||
// Import blocks to db and update db state to reflect the newly imported blocks.
|
||||
// Other parts of the beacon node may use the same StatusUpdater instance
|
||||
// via the coverage.AvailableBlocker interface to safely determine if a given slot has been backfilled.
|
||||
|
||||
return su.fillBack(ctx, current, b.results, newMultiStore(s.fuluStart, b))
|
||||
}
|
||||
|
||||
func (s *Service) scheduleTodos() {
|
||||
@@ -261,25 +249,19 @@ func (s *Service) Start() {
|
||||
log.Info("Backfill service is shutting down")
|
||||
cancel()
|
||||
}()
|
||||
clock, err := s.cw.WaitForClock(ctx)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Backfill service failed to start while waiting for genesis data")
|
||||
return
|
||||
}
|
||||
s.clock = clock
|
||||
v, err := s.verifierWaiter.WaitForInitializer(ctx)
|
||||
s.newBlobVerifier = newBlobVerifierFromInitializer(v)
|
||||
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Could not initialize blob verifier in backfill service")
|
||||
return
|
||||
}
|
||||
|
||||
if s.store.isGenesisSync() {
|
||||
log.Info("Backfill short-circuit; node synced from genesis")
|
||||
s.markComplete()
|
||||
return
|
||||
}
|
||||
|
||||
clock, err := s.cw.WaitForClock(ctx)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Backfill service failed to start while waiting for genesis data")
|
||||
return
|
||||
}
|
||||
s.clock = clock
|
||||
status := s.store.status()
|
||||
// Exit early if there aren't going to be any batches to backfill.
|
||||
if primitives.Slot(status.LowSlot) <= s.ms(s.clock.CurrentSlot()) {
|
||||
@@ -289,11 +271,6 @@ func (s *Service) Start() {
|
||||
s.markComplete()
|
||||
return
|
||||
}
|
||||
s.verifier, s.ctxMap, err = s.initVerifier(ctx)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Unable to initialize backfill verifier")
|
||||
return
|
||||
}
|
||||
|
||||
if s.initSyncWaiter != nil {
|
||||
log.Info("Backfill service waiting for initial-sync to reach head before starting")
|
||||
@@ -302,7 +279,14 @@ func (s *Service) Start() {
|
||||
return
|
||||
}
|
||||
}
|
||||
s.pool.spawn(ctx, s.nWorkers, clock, s.pa, s.verifier, s.ctxMap, s.newBlobVerifier, s.blobStore)
|
||||
|
||||
wc, err := initWorkerCfg(ctx, s.workerCfg, s.clock, s.verifierWaiter, s.store, s.blobStore, s.dcStore)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Could not initialize blob verifier in backfill service")
|
||||
return
|
||||
}
|
||||
|
||||
s.pool.spawn(ctx, s.nWorkers, s.pa, wc)
|
||||
s.batchSeq = newBatchSequencer(s.nWorkers, s.ms(s.clock.CurrentSlot()), primitives.Slot(status.LowSlot), primitives.Slot(s.batchSize))
|
||||
if err = s.initBatches(); err != nil {
|
||||
log.WithError(err).Error("Non-recoverable error in backfill service")
|
||||
@@ -367,6 +351,12 @@ func newBlobVerifierFromInitializer(ini *verification.Initializer) verification.
|
||||
}
|
||||
}
|
||||
|
||||
func newDataColumnVerifierFromInitializer(ini *verification.Initializer) verification.NewDataColumnsVerifier {
|
||||
return func(cols []blocks.RODataColumn, reqs []verification.Requirement) verification.DataColumnsVerifier {
|
||||
return ini.NewDataColumnsVerifier(cols, reqs)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Service) markComplete() {
|
||||
close(s.complete)
|
||||
log.Info("Backfill service marked as complete")
|
||||
|
||||
@@ -57,7 +57,8 @@ func TestServiceInit(t *testing.T) {
|
||||
pool := &mockPool{todoChan: make(chan batch, nWorkers), finishedChan: make(chan batch, nWorkers)}
|
||||
p2pt := p2ptest.NewTestP2P(t)
|
||||
bfs := filesystem.NewEphemeralBlobStorage(t)
|
||||
srv, err := NewService(ctx, su, bfs, cw, p2pt, &mockAssigner{},
|
||||
dcs := filesystem.NewEphemeralDataColumnStorage(t)
|
||||
srv, err := NewService(ctx, su, bfs, dcs, cw, p2pt, &mockAssigner{},
|
||||
WithBatchSize(batchSize), WithWorkerCount(nWorkers), WithEnableBackfill(true), WithVerifierWaiter(&mockInitalizerWaiter{}))
|
||||
require.NoError(t, err)
|
||||
srv.ms = mockMinimumSlotter{min: primitives.Slot(high - batchSize*uint64(nBatches))}.minimumSlot
|
||||
|
||||
@@ -74,7 +74,7 @@ func (s *Store) status() *dbval.BackfillStatus {
|
||||
// fillBack saves the slice of blocks and updates the BackfillStatus LowSlot/Root/ParentRoot tracker to the values
|
||||
// from the first block in the slice. This method assumes that the block slice has been fully validated and
|
||||
// sorted in slot order by the calling function.
|
||||
func (s *Store) fillBack(ctx context.Context, current primitives.Slot, blocks []blocks.ROBlock, store das.AvailabilityStore) (*dbval.BackfillStatus, error) {
|
||||
func (s *Store) fillBack(ctx context.Context, current primitives.Slot, blocks []blocks.ROBlock, store das.AvailabilityChecker) (*dbval.BackfillStatus, error) {
|
||||
status := s.status()
|
||||
if len(blocks) == 0 {
|
||||
return status, nil
|
||||
|
||||
@@ -5,7 +5,6 @@ import (
|
||||
fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams"
|
||||
"github.com/OffchainLabs/prysm/v6/config/params"
|
||||
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
|
||||
"github.com/OffchainLabs/prysm/v6/consensus-types/interfaces"
|
||||
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
|
||||
"github.com/OffchainLabs/prysm/v6/crypto/bls"
|
||||
"github.com/OffchainLabs/prysm/v6/encoding/bytesutil"
|
||||
@@ -22,18 +21,34 @@ var errUnknownDomain = errors.New("runtime error looking up signing domain for f
|
||||
type verifiedROBlocks []blocks.ROBlock
|
||||
|
||||
func (v verifiedROBlocks) blobIdents(retentionStart primitives.Slot) ([]blobSummary, error) {
|
||||
// early return if the newest block is outside the retention window
|
||||
if len(v) > 0 && v[len(v)-1].Block().Slot() < retentionStart {
|
||||
if len(v) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
latest := v[len(v)-1].Block().Slot()
|
||||
// early return if the newest block is outside the retention window
|
||||
if latest < retentionStart {
|
||||
return nil, nil
|
||||
}
|
||||
fuluStart := params.BeaconConfig().FuluForkEpoch
|
||||
// If the batch end slot or last result block are pre-fulu, so are the rest.
|
||||
if slots.ToEpoch(latest) >= fuluStart {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
bs := make([]blobSummary, 0)
|
||||
for i := range v {
|
||||
if v[i].Block().Slot() < retentionStart {
|
||||
slot := v[i].Block().Slot()
|
||||
if slot < retentionStart {
|
||||
continue
|
||||
}
|
||||
if v[i].Block().Version() < version.Deneb {
|
||||
continue
|
||||
}
|
||||
// Assuming blocks are sorted, as soon as we see 1 fulu block we know the rest will also be fulu.
|
||||
if slots.ToEpoch(slot) >= fuluStart {
|
||||
return bs, nil
|
||||
}
|
||||
|
||||
c, err := v[i].Block().Body().BlobKzgCommitments()
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "unexpected error checking commitments for block root %#x", v[i].Root())
|
||||
@@ -56,37 +71,31 @@ type verifier struct {
|
||||
domain *domainCache
|
||||
}
|
||||
|
||||
// TODO: rewrite this to use ROBlock.
|
||||
func (vr verifier) verify(blks []interfaces.ReadOnlySignedBeaconBlock) (verifiedROBlocks, error) {
|
||||
func (vr verifier) verify(blks []blocks.ROBlock) (verifiedROBlocks, error) {
|
||||
var err error
|
||||
result := make([]blocks.ROBlock, len(blks))
|
||||
sigSet := bls.NewSet()
|
||||
for i := range blks {
|
||||
result[i], err = blocks.NewROBlock(blks[i])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if i > 0 && result[i-1].Root() != result[i].Block().ParentRoot() {
|
||||
p, b := result[i-1], result[i]
|
||||
if i > 0 && blks[i-1].Root() != blks[i].Block().ParentRoot() {
|
||||
p, b := blks[i-1], blks[i]
|
||||
return nil, errors.Wrapf(errInvalidBatchChain,
|
||||
"slot %d parent_root=%#x, slot %d root=%#x",
|
||||
b.Block().Slot(), b.Block().ParentRoot(),
|
||||
p.Block().Slot(), p.Root())
|
||||
}
|
||||
set, err := vr.blockSignatureBatch(result[i])
|
||||
set, err := vr.blockSignatureBatch(blks[i])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, errors.Wrap(err, "block signature batch")
|
||||
}
|
||||
sigSet.Join(set)
|
||||
}
|
||||
v, err := sigSet.Verify()
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "block signature verification error")
|
||||
return nil, errors.Wrap(err, "SignatureBatch Verify")
|
||||
}
|
||||
if !v {
|
||||
return nil, errors.New("batch block signature verification failed")
|
||||
return nil, errors.New("SignatureBatch Verify invalid")
|
||||
}
|
||||
return result, nil
|
||||
return blks, nil
|
||||
}
|
||||
|
||||
func (vr verifier) blockSignatureBatch(b blocks.ROBlock) (*bls.SignatureBatch, error) {
|
||||
|
||||
@@ -8,7 +8,6 @@ import (
|
||||
fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams"
|
||||
"github.com/OffchainLabs/prysm/v6/config/params"
|
||||
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
|
||||
"github.com/OffchainLabs/prysm/v6/consensus-types/interfaces"
|
||||
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
|
||||
"github.com/OffchainLabs/prysm/v6/crypto/bls"
|
||||
"github.com/OffchainLabs/prysm/v6/encoding/bytesutil"
|
||||
@@ -70,12 +69,7 @@ func TestVerify(t *testing.T) {
|
||||
}
|
||||
v, err := newBackfillVerifier(vr, pubkeys)
|
||||
require.NoError(t, err)
|
||||
notrob := make([]interfaces.ReadOnlySignedBeaconBlock, len(blks))
|
||||
// We have to unwrap the ROBlocks for this code because that's what it expects (for now).
|
||||
for i := range blks {
|
||||
notrob[i] = blks[i].ReadOnlySignedBeaconBlock
|
||||
}
|
||||
vbs, err := v.verify(notrob)
|
||||
vbs, err := v.verify(blks)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, len(blks), len(vbs))
|
||||
}
|
||||
|
||||
@@ -9,9 +9,56 @@ import (
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/startup"
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/sync"
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/verification"
|
||||
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
|
||||
"github.com/OffchainLabs/prysm/v6/encoding/bytesutil"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type workerCfg struct {
|
||||
c *startup.Clock
|
||||
v *verifier
|
||||
cm sync.ContextByteVersions
|
||||
nbv verification.NewBlobVerifier
|
||||
ndcv verification.NewDataColumnsVerifier
|
||||
bfs *filesystem.BlobStorage
|
||||
cfs *filesystem.DataColumnStorage
|
||||
}
|
||||
|
||||
func initWorkerCfg(ctx context.Context, cfg *workerCfg, c *startup.Clock, vw InitializerWaiter, store *Store, bfs *filesystem.BlobStorage, cfs *filesystem.DataColumnStorage) (*workerCfg, error) {
|
||||
vi, err := vw.WaitForInitializer(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
cps, err := store.originState(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
keys, err := cps.PublicKeys()
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "unable to retrieve public keys for all validators in the origin state")
|
||||
}
|
||||
vr := cps.GenesisValidatorsRoot()
|
||||
cm, err := sync.ContextByteVersionsForValRoot(bytesutil.ToBytes32(vr))
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "unable to initialize context version map using genesis validator root %#x", vr)
|
||||
}
|
||||
v, err := newBackfillVerifier(vr, keys)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "newBackfillVerifier failed")
|
||||
}
|
||||
if cfg == nil {
|
||||
cfg = &workerCfg{}
|
||||
}
|
||||
cfg.c = c
|
||||
cfg.v = v
|
||||
cfg.cm = cm
|
||||
cfg.bfs = bfs
|
||||
cfg.cfs = cfs
|
||||
cfg.nbv = newBlobVerifierFromInitializer(vi)
|
||||
cfg.ndcv = newDataColumnVerifierFromInitializer(vi)
|
||||
return cfg, nil
|
||||
}
|
||||
|
||||
type workerId int
|
||||
|
||||
type p2pWorker struct {
|
||||
@@ -19,23 +66,38 @@ type p2pWorker struct {
|
||||
todo chan batch
|
||||
done chan batch
|
||||
p2p p2p.P2P
|
||||
v *verifier
|
||||
c *startup.Clock
|
||||
cm sync.ContextByteVersions
|
||||
nbv verification.NewBlobVerifier
|
||||
bfs *filesystem.BlobStorage
|
||||
cfg *workerCfg
|
||||
}
|
||||
|
||||
func newP2pWorker(id workerId, p p2p.P2P, todo, done chan batch, cfg *workerCfg) *p2pWorker {
|
||||
return &p2pWorker{
|
||||
id: id,
|
||||
todo: todo,
|
||||
done: done,
|
||||
p2p: p,
|
||||
cfg: cfg,
|
||||
}
|
||||
}
|
||||
|
||||
func (w *p2pWorker) run(ctx context.Context) {
|
||||
for {
|
||||
select {
|
||||
case b := <-w.todo:
|
||||
log.WithFields(b.logFields()).WithField("backfillWorker", w.id).Debug("Backfill worker received batch")
|
||||
if b.state == batchBlobSync {
|
||||
w.done <- w.handleBlobs(ctx, b)
|
||||
} else {
|
||||
w.done <- w.handleBlocks(ctx, b)
|
||||
if err := b.waitUntilReady(ctx); err != nil {
|
||||
log.WithField("batch_id", b.id()).WithError(ctx.Err()).Info("worker context canceled while waiting to retry")
|
||||
continue
|
||||
}
|
||||
log.WithFields(b.logFields()).WithField("backfillWorker", w.id).Debug("Backfill worker received batch")
|
||||
if b.state == batchSyncBlobs {
|
||||
w.done <- w.handleBlobs(ctx, b)
|
||||
continue
|
||||
}
|
||||
if b.state == batchSyncColumns {
|
||||
w.done <- w.handleColumns(ctx, b)
|
||||
continue
|
||||
}
|
||||
|
||||
w.done <- w.handleBlocks(ctx, b)
|
||||
case <-ctx.Done():
|
||||
log.WithField("backfillWorker", w.id).Info("Backfill worker exiting after context canceled")
|
||||
return
|
||||
@@ -44,22 +106,28 @@ func (w *p2pWorker) run(ctx context.Context) {
|
||||
}
|
||||
|
||||
func (w *p2pWorker) handleBlocks(ctx context.Context, b batch) batch {
|
||||
cs := w.c.CurrentSlot()
|
||||
blobRetentionStart, err := sync.BlobRPCMinValidSlot(cs)
|
||||
current := w.cfg.c.CurrentSlot()
|
||||
blobRetentionStart, err := sync.BlobRPCMinValidSlot(current)
|
||||
if err != nil {
|
||||
return b.withRetryableError(errors.Wrap(err, "configuration issue, could not compute minimum blob retention slot"))
|
||||
}
|
||||
b.blockPid = b.busy
|
||||
start := time.Now()
|
||||
results, err := sync.SendBeaconBlocksByRangeRequest(ctx, w.c, w.p2p, b.blockPid, b.blockRequest(), blockValidationMetrics)
|
||||
dlt := time.Now()
|
||||
backfillBatchTimeDownloadingBlocks.Observe(float64(dlt.Sub(start).Milliseconds()))
|
||||
results, err := sync.SendBeaconBlocksByRangeRequest(ctx, w.cfg.c, w.p2p, b.blockPid, b.blockRequest(), blockValidationMetrics)
|
||||
if err != nil {
|
||||
log.WithError(err).WithFields(b.logFields()).Debug("Batch requesting failed")
|
||||
return b.withRetryableError(err)
|
||||
}
|
||||
vb, err := w.v.verify(results)
|
||||
backfillBatchTimeVerifying.Observe(float64(time.Since(dlt).Milliseconds()))
|
||||
dlt := time.Now()
|
||||
blockDownloadMs.Observe(float64(dlt.Sub(start).Milliseconds()))
|
||||
toVerify, err := blocks.NewROBlockSlice(results)
|
||||
if err != nil {
|
||||
log.WithError(err).WithFields(b.logFields()).Debug("Batch conversion to ROBlock failed")
|
||||
return b.withRetryableError(err)
|
||||
}
|
||||
|
||||
vb, err := w.cfg.v.verify(toVerify)
|
||||
blockVerifyMs.Observe(float64(time.Since(dlt).Milliseconds()))
|
||||
if err != nil {
|
||||
log.WithError(err).WithFields(b.logFields()).Debug("Batch validation failed")
|
||||
return b.withRetryableError(err)
|
||||
@@ -71,13 +139,18 @@ func (w *p2pWorker) handleBlocks(ctx context.Context, b batch) batch {
|
||||
for i := range vb {
|
||||
bdl += vb[i].SizeSSZ()
|
||||
}
|
||||
backfillBlocksApproximateBytes.Add(float64(bdl))
|
||||
blockDownloadBytesApprox.Add(float64(bdl))
|
||||
log.WithFields(b.logFields()).WithField("dlbytes", bdl).Debug("Backfill batch block bytes downloaded")
|
||||
bs, err := newBlobSync(cs, vb, &blobSyncConfig{retentionStart: blobRetentionStart, nbv: w.nbv, store: w.bfs})
|
||||
bscfg := &blobSyncConfig{retentionStart: blobRetentionStart, nbv: w.cfg.nbv, store: w.cfg.bfs}
|
||||
bs, err := newBlobSync(current, vb, bscfg)
|
||||
if err != nil {
|
||||
return b.withRetryableError(err)
|
||||
}
|
||||
return b.withResults(vb, bs)
|
||||
cs, err := newColumnSync(b, vb, current, w.p2p, vb, w.cfg)
|
||||
if err != nil {
|
||||
return b.withFatalError(err)
|
||||
}
|
||||
return b.postBlockSync(vb, bs, cs)
|
||||
}
|
||||
|
||||
func (w *p2pWorker) handleBlobs(ctx context.Context, b batch) batch {
|
||||
@@ -85,32 +158,45 @@ func (w *p2pWorker) handleBlobs(ctx context.Context, b batch) batch {
|
||||
start := time.Now()
|
||||
// we don't need to use the response for anything other than metrics, because blobResponseValidation
|
||||
// adds each of them to a batch AvailabilityStore once it is checked.
|
||||
blobs, err := sync.SendBlobsByRangeRequest(ctx, w.c, w.p2p, b.blobPid, w.cm, b.blobRequest(), b.blobResponseValidator(), blobValidationMetrics)
|
||||
blobs, err := sync.SendBlobsByRangeRequest(ctx, w.cfg.c, w.p2p, b.blobPid, w.cfg.cm, b.blobRequest(), b.blobResponseValidator(), blobValidationMetrics)
|
||||
if err != nil {
|
||||
b.bs = nil
|
||||
return b.withRetryableError(err)
|
||||
}
|
||||
dlt := time.Now()
|
||||
backfillBatchTimeDownloadingBlobs.Observe(float64(dlt.Sub(start).Milliseconds()))
|
||||
blobSidecarDownloadMs.Observe(float64(dlt.Sub(start).Milliseconds()))
|
||||
if len(blobs) > 0 {
|
||||
// All blobs are the same size, so we can compute 1 and use it for all in the batch.
|
||||
sz := blobs[0].SizeSSZ() * len(blobs)
|
||||
backfillBlobsApproximateBytes.Add(float64(sz))
|
||||
blobSidecarDownloadBytesApprox.Add(float64(sz))
|
||||
log.WithFields(b.logFields()).WithField("dlbytes", sz).Debug("Backfill batch blob bytes downloaded")
|
||||
}
|
||||
return b.postBlobSync()
|
||||
return b.postSidecarSync()
|
||||
}
|
||||
|
||||
func newP2pWorker(id workerId, p p2p.P2P, todo, done chan batch, c *startup.Clock, v *verifier, cm sync.ContextByteVersions, nbv verification.NewBlobVerifier, bfs *filesystem.BlobStorage) *p2pWorker {
|
||||
return &p2pWorker{
|
||||
id: id,
|
||||
todo: todo,
|
||||
done: done,
|
||||
p2p: p,
|
||||
v: v,
|
||||
c: c,
|
||||
cm: cm,
|
||||
nbv: nbv,
|
||||
bfs: bfs,
|
||||
func (w *p2pWorker) handleColumns(ctx context.Context, b batch) batch {
|
||||
b.columnPid = b.busy
|
||||
start := time.Now()
|
||||
vr := b.validatingColumnRequest()
|
||||
// TODO: the upstream definition of SendDataColumnSidecarsByRangeRequest requires this params type
|
||||
// which has several ambiguously optional fields. The sidecar request functions should be refactored
|
||||
// to use a more explicit set of parameters. RateLimiter, Storage and NewVerifier are not used inside
|
||||
// SendDataColumnSidecarsByRangeRequest.
|
||||
p := sync.DataColumnSidecarsParams{
|
||||
Ctx: ctx,
|
||||
Tor: w.cfg.c,
|
||||
P2P: w.p2p,
|
||||
//RateLimiter *leakybucket.Collector
|
||||
CtxMap: w.cfg.cm,
|
||||
//Storage: w.cfg.cfs,
|
||||
//NewVerifier: vr.validate,
|
||||
}
|
||||
// Response is dropped because the validation code adds the columns to the columnSync AvailabilityStore under the hood.
|
||||
_, err := sync.SendDataColumnSidecarsByRangeRequest(p, b.busy, vr.req, vr.validate)
|
||||
if err != nil {
|
||||
return b.withRetryableError(errors.Wrap(err, "failed to request data column sidecars"))
|
||||
}
|
||||
dlt := time.Now()
|
||||
dataColumnSidecarDownloadMs.Observe(float64(dlt.Sub(start).Milliseconds()))
|
||||
return b.postSidecarSync()
|
||||
}
|
||||
|
||||
205
beacon-chain/sync/data_column_assignment.go
Normal file
205
beacon-chain/sync/data_column_assignment.go
Normal file
@@ -0,0 +1,205 @@
|
||||
package sync
|
||||
|
||||
import (
|
||||
"slices"
|
||||
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/peerdas"
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p"
|
||||
"github.com/OffchainLabs/prysm/v6/config/params"
|
||||
"github.com/OffchainLabs/prysm/v6/crypto/rand"
|
||||
"github.com/ethereum/go-ethereum/p2p/enode"
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
// ColumnScarcityRanking is a structure that maps out the intersection of peer custody and column indices
|
||||
// to weight each peer based on the scarcity of the columns they custody. This allows us to prioritize
|
||||
// requests for more scarce columns to peers that custody them, so that we don't waste our bandwidth allocation
|
||||
// making requests for more common columns from peers that can provide the more scarce columns.
|
||||
type ColumnScarcityRanking struct {
|
||||
peers map[peer.ID]*peerColumnCoverage
|
||||
freq []colFreq
|
||||
scarcity []float64
|
||||
rg rand.Rand
|
||||
covScoreRank []*peerColumnCoverage
|
||||
}
|
||||
|
||||
// NewColumnScarcityRanking computes the ColumnScarcityRanking based on the current view of columns custodied
|
||||
// by the given set of peers.
|
||||
func NewColumnScarcityRanking(peers []peer.ID, p2pSvc p2p.P2P) (*ColumnScarcityRanking, error) {
|
||||
nc := params.BeaconConfig().NumberOfColumns
|
||||
rankingsByPeer := make(map[peer.ID]*peerColumnCoverage, len(peers))
|
||||
freqByColumn := make([]colFreq, nc)
|
||||
for i := range freqByColumn {
|
||||
freqByColumn[i].col = uint64(i)
|
||||
}
|
||||
for _, peer := range peers {
|
||||
nodeID, err := p2p.ConvertPeerIDToNodeID(peer)
|
||||
if err != nil {
|
||||
log.WithField("peerID", peer).WithError(err).Debug("Failed to convert peer ID to node ID.")
|
||||
continue
|
||||
}
|
||||
dasInfo, _, err := peerdas.Info(nodeID, p2pSvc.CustodyGroupCountFromPeer(peer))
|
||||
if err != nil {
|
||||
log.WithField("peerID", peer).WithField("nodeID", nodeID).WithError(err).Debug("Failed to derive custody groups from peer.")
|
||||
return nil, errors.Wrap(err, "custody groups")
|
||||
}
|
||||
p := &peerColumnCoverage{
|
||||
peerID: peer,
|
||||
nodeID: nodeID,
|
||||
custodied: make([]uint64, nc),
|
||||
}
|
||||
for c, v := range dasInfo.CustodyColumns {
|
||||
if c > nc-1 {
|
||||
return nil, errors.Errorf("column %d is out of bounds", c)
|
||||
}
|
||||
if v {
|
||||
p.custodied[c] = 1
|
||||
freqByColumn[c].custodians = append(freqByColumn[c].custodians, p)
|
||||
}
|
||||
}
|
||||
rankingsByPeer[peer] = p
|
||||
}
|
||||
|
||||
colByFreq := slices.SortedFunc(slices.Values(freqByColumn), func(a, b colFreq) int {
|
||||
if a.freq() == b.freq() {
|
||||
return 0
|
||||
}
|
||||
if a.freq() < b.freq() {
|
||||
return -1
|
||||
}
|
||||
return 1
|
||||
})
|
||||
|
||||
scarcity := columnScarcity(colByFreq)
|
||||
covScoreRank := make([]*peerColumnCoverage, 0, len(rankingsByPeer))
|
||||
for _, p := range rankingsByPeer {
|
||||
covScoreRank = append(covScoreRank, p)
|
||||
}
|
||||
slices.SortFunc(covScoreRank, func(a, b *peerColumnCoverage) int {
|
||||
if a.score(scarcity) == b.score(scarcity) {
|
||||
return 0
|
||||
}
|
||||
if a.score(scarcity) < b.score(scarcity) {
|
||||
return -1
|
||||
}
|
||||
return 1
|
||||
})
|
||||
|
||||
return &ColumnScarcityRanking{
|
||||
peers: rankingsByPeer,
|
||||
freq: colByFreq,
|
||||
rg: *rand.NewGenerator(),
|
||||
scarcity: scarcity,
|
||||
covScoreRank: covScoreRank,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// ForColumns returns the best peer to request columns from, based on the scarcity of the columns needed.
|
||||
func (m *ColumnScarcityRanking) ForColumns(needed peerdas.ColumnIndices, busy map[peer.ID]bool) (peer.ID, []uint64, error) {
|
||||
// - find the custodied column with the lowest frequency
|
||||
// - collect all the peers that have custody of that column
|
||||
// - score the peers by how many other of the needed columns they ave
|
||||
// -- or, score them by the rank of the columns they have??
|
||||
for _, cf := range m.freq {
|
||||
if !needed[cf.col] {
|
||||
continue
|
||||
}
|
||||
if cf.freq() == 0 {
|
||||
continue
|
||||
}
|
||||
var best *peerColumnCoverage
|
||||
bestScore, bestCoverage := 0.0, make([]uint64, 1)
|
||||
for _, p := range cf.custodians {
|
||||
if busy[p.peerID] {
|
||||
continue
|
||||
}
|
||||
coverage := p.covered(needed)
|
||||
if len(coverage) == 0 {
|
||||
continue
|
||||
}
|
||||
pscore := coverageScore(coverage, m.scarcity)
|
||||
if pscore > bestScore {
|
||||
best, bestScore, bestCoverage = p, pscore, coverage
|
||||
}
|
||||
}
|
||||
if best != nil {
|
||||
return best.peerID, bestCoverage, nil
|
||||
}
|
||||
}
|
||||
|
||||
return "", nil, errors.New("no peers able to cover needed columns")
|
||||
}
|
||||
|
||||
// ForBlocks returns the lowest scoring peer in the set. This can be used to pick a peer
|
||||
// for block requests, preserving the peers that have the highest coverage scores
|
||||
// for column requests.
|
||||
func (m *ColumnScarcityRanking) ForBlocks(busy map[peer.ID]bool) (peer.ID, error) {
|
||||
for i := len(m.covScoreRank) - 1; i >= 0; i-- {
|
||||
p := m.covScoreRank[i]
|
||||
if !busy[p.peerID] {
|
||||
return p.peerID, nil
|
||||
}
|
||||
}
|
||||
return "", errors.New("no peers available")
|
||||
}
|
||||
|
||||
// peerColumnCoverage represents a peer's custody of columns and their coverage score.
|
||||
type peerColumnCoverage struct {
|
||||
peerID peer.ID
|
||||
nodeID enode.ID
|
||||
custodied []uint64
|
||||
cov float64
|
||||
}
|
||||
|
||||
func (p *peerColumnCoverage) covered(needed peerdas.ColumnIndices) []uint64 {
|
||||
covered := make([]uint64, 0, len(p.custodied))
|
||||
for col, want := range needed {
|
||||
if want && p.custodied[col] == 1 {
|
||||
covered = append(covered, col)
|
||||
}
|
||||
}
|
||||
return covered
|
||||
}
|
||||
|
||||
func (p *peerColumnCoverage) score(scarcity []float64) float64 {
|
||||
if p.cov == 0 {
|
||||
p.cov = coverageScore(p.custodied, scarcity)
|
||||
}
|
||||
return p.cov
|
||||
}
|
||||
|
||||
func coverageScore(covered []uint64, rarity []float64) float64 {
|
||||
score := 0.0
|
||||
for _, col := range covered {
|
||||
if col >= uint64(len(rarity)) {
|
||||
continue
|
||||
}
|
||||
score += rarity[col]
|
||||
}
|
||||
return score
|
||||
}
|
||||
|
||||
type colFreq struct {
|
||||
col uint64
|
||||
custodians []*peerColumnCoverage
|
||||
}
|
||||
|
||||
func (f colFreq) rarity() float64 {
|
||||
if f.freq() == 0 {
|
||||
return 1
|
||||
}
|
||||
return 1 / float64(f.freq())
|
||||
}
|
||||
|
||||
func (f colFreq) freq() int {
|
||||
return len(f.custodians)
|
||||
}
|
||||
|
||||
func columnScarcity(cf []colFreq) []float64 {
|
||||
ra := make([]float64, len(cf))
|
||||
for _, f := range cf {
|
||||
ra[f.col] = f.rarity()
|
||||
}
|
||||
return ra
|
||||
}
|
||||
@@ -3,6 +3,7 @@ package sync
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"math"
|
||||
"slices"
|
||||
"sync"
|
||||
"time"
|
||||
@@ -479,7 +480,10 @@ func sendDataColumnSidecarsRequest(
|
||||
start := time.Now()
|
||||
roDataColumns := make([]blocks.RODataColumn, 0, count)
|
||||
for _, request := range byRangeRequests {
|
||||
params.RateLimiter.Add(peerID.String(), rootCount)
|
||||
if params.RateLimiter != nil {
|
||||
params.RateLimiter.Add(peerID.String(), rootCount)
|
||||
}
|
||||
|
||||
localRoDataColumns, err := SendDataColumnSidecarsByRangeRequest(params, peerID, request)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "send data column sidecars by range request to peer %s", peerID)
|
||||
@@ -503,7 +507,9 @@ func sendDataColumnSidecarsRequest(
|
||||
|
||||
// Send the by root request.
|
||||
start := time.Now()
|
||||
params.RateLimiter.Add(peerID.String(), rootCount)
|
||||
if params.RateLimiter != nil {
|
||||
params.RateLimiter.Add(peerID.String(), rootCount)
|
||||
}
|
||||
roDataColumns, err := SendDataColumnSidecarsByRootRequest(params, peerID, byRootRequest)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "send data column sidecars by root request to peer %s", peerID)
|
||||
@@ -777,7 +783,10 @@ func randomPeer(
|
||||
for ctx.Err() == nil {
|
||||
nonRateLimitedPeers := make([]goPeer.ID, 0, len(indicesByRootByPeer))
|
||||
for peer := range indicesByRootByPeer {
|
||||
remaining := rateLimiter.Remaining(peer.String())
|
||||
remaining := int64(math.MaxInt64)
|
||||
if rateLimiter != nil {
|
||||
remaining = rateLimiter.Remaining(peer.String())
|
||||
}
|
||||
if remaining >= int64(count) {
|
||||
nonRateLimitedPeers = append(nonRateLimitedPeers, peer)
|
||||
}
|
||||
|
||||
@@ -28,10 +28,10 @@ const (
|
||||
)
|
||||
|
||||
// blockReceiverFn defines block receiving function.
|
||||
type blockReceiverFn func(ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock, blockRoot [32]byte, avs das.AvailabilityStore) error
|
||||
type blockReceiverFn func(ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock, blockRoot [32]byte, avs das.AvailabilityChecker) error
|
||||
|
||||
// batchBlockReceiverFn defines batch receiving function.
|
||||
type batchBlockReceiverFn func(ctx context.Context, blks []blocks.ROBlock, avs das.AvailabilityStore) error
|
||||
type batchBlockReceiverFn func(ctx context.Context, blks []blocks.ROBlock, avs das.AvailabilityChecker) error
|
||||
|
||||
// Round Robin sync looks at the latest peer statuses and syncs up to the highest known epoch.
|
||||
//
|
||||
@@ -175,7 +175,7 @@ func (s *Service) processFetchedDataRegSync(ctx context.Context, data *blocksQue
|
||||
blocksWithDataColumns := bwb[fistDataColumnIndex:]
|
||||
|
||||
blobBatchVerifier := verification.NewBlobBatchVerifier(s.newBlobVerifier, verification.InitsyncBlobSidecarRequirements)
|
||||
lazilyPersistentStoreBlobs := das.NewLazilyPersistentStore(s.cfg.BlobStorage, blobBatchVerifier)
|
||||
avs := das.NewLazilyPersistentStore(s.cfg.BlobStorage, blobBatchVerifier)
|
||||
|
||||
log := log.WithField("firstSlot", data.bwb[0].Block.Block().Slot())
|
||||
logBlobs, logDataColumns := log, log
|
||||
@@ -185,12 +185,12 @@ func (s *Service) processFetchedDataRegSync(ctx context.Context, data *blocksQue
|
||||
}
|
||||
|
||||
for i, b := range blocksWithBlobs {
|
||||
if err := lazilyPersistentStoreBlobs.Persist(s.clock.CurrentSlot(), b.Blobs...); err != nil {
|
||||
if err := avs.Persist(s.clock.CurrentSlot(), b.Blobs...); err != nil {
|
||||
logBlobs.WithError(err).WithFields(syncFields(b.Block)).Warning("Batch failure due to BlobSidecar issues")
|
||||
return uint64(i), err
|
||||
}
|
||||
|
||||
if err := s.processBlock(ctx, s.genesisTime, b, s.cfg.Chain.ReceiveBlock, lazilyPersistentStoreBlobs); err != nil {
|
||||
if err := s.processBlock(ctx, s.genesisTime, b, s.cfg.Chain.ReceiveBlock, avs); err != nil {
|
||||
if errors.Is(err, errParentDoesNotExist) {
|
||||
logBlobs.WithField("missingParent", fmt.Sprintf("%#x", b.Block.Block().ParentRoot())).
|
||||
WithFields(syncFields(b.Block)).Debug("Could not process batch blocks due to missing parent")
|
||||
@@ -313,7 +313,7 @@ func (s *Service) processBlock(
|
||||
genesis time.Time,
|
||||
bwb blocks.BlockWithROSidecars,
|
||||
blockReceiver blockReceiverFn,
|
||||
avs das.AvailabilityStore,
|
||||
avs das.AvailabilityChecker,
|
||||
) error {
|
||||
blk := bwb.Block
|
||||
blkRoot := blk.Root()
|
||||
|
||||
@@ -376,7 +376,7 @@ func TestService_processBlock(t *testing.T) {
|
||||
rowsb, err := blocks.NewROBlock(wsb)
|
||||
require.NoError(t, err)
|
||||
err = s.processBlock(ctx, genesis, blocks.BlockWithROSidecars{Block: rowsb}, func(
|
||||
ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock, blockRoot [32]byte, _ das.AvailabilityStore) error {
|
||||
ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock, blockRoot [32]byte, _ das.AvailabilityChecker) error {
|
||||
assert.NoError(t, s.cfg.Chain.ReceiveBlock(ctx, block, blockRoot, nil))
|
||||
return nil
|
||||
}, nil)
|
||||
@@ -388,7 +388,7 @@ func TestService_processBlock(t *testing.T) {
|
||||
rowsb, err = blocks.NewROBlock(wsb)
|
||||
require.NoError(t, err)
|
||||
err = s.processBlock(ctx, genesis, blocks.BlockWithROSidecars{Block: rowsb}, func(
|
||||
ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock, blockRoot [32]byte, _ das.AvailabilityStore) error {
|
||||
ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock, blockRoot [32]byte, _ das.AvailabilityChecker) error {
|
||||
return nil
|
||||
}, nil)
|
||||
assert.ErrorContains(t, errBlockAlreadyProcessed.Error(), err)
|
||||
@@ -399,7 +399,7 @@ func TestService_processBlock(t *testing.T) {
|
||||
rowsb, err = blocks.NewROBlock(wsb)
|
||||
require.NoError(t, err)
|
||||
err = s.processBlock(ctx, genesis, blocks.BlockWithROSidecars{Block: rowsb}, func(
|
||||
ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock, blockRoot [32]byte, _ das.AvailabilityStore) error {
|
||||
ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock, blockRoot [32]byte, _ das.AvailabilityChecker) error {
|
||||
assert.NoError(t, s.cfg.Chain.ReceiveBlock(ctx, block, blockRoot, nil))
|
||||
return nil
|
||||
}, nil)
|
||||
@@ -469,7 +469,7 @@ func TestService_processBlockBatch(t *testing.T) {
|
||||
currBlockRoot = blk1Root
|
||||
}
|
||||
|
||||
cbnormal := func(ctx context.Context, blks []blocks.ROBlock, avs das.AvailabilityStore) error {
|
||||
cbnormal := func(ctx context.Context, blks []blocks.ROBlock, avs das.AvailabilityChecker) error {
|
||||
assert.NoError(t, s.cfg.Chain.ReceiveBlockBatch(ctx, blks, avs))
|
||||
return nil
|
||||
}
|
||||
@@ -478,7 +478,7 @@ func TestService_processBlockBatch(t *testing.T) {
|
||||
assert.NoError(t, err)
|
||||
require.Equal(t, uint64(len(batch)), count)
|
||||
|
||||
cbnil := func(ctx context.Context, blocks []blocks.ROBlock, _ das.AvailabilityStore) error {
|
||||
cbnil := func(ctx context.Context, blocks []blocks.ROBlock, _ das.AvailabilityChecker) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -851,7 +851,7 @@ func TestService_processBlocksWithDataColumns(t *testing.T) {
|
||||
counter: ratecounter.NewRateCounter(counterSeconds * time.Second),
|
||||
}
|
||||
|
||||
receiverFunc := func(ctx context.Context, blks []blocks.ROBlock, avs das.AvailabilityStore) error {
|
||||
receiverFunc := func(ctx context.Context, blks []blocks.ROBlock, avs das.AvailabilityChecker) error {
|
||||
require.Equal(t, 1, len(blks))
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -334,24 +334,8 @@ func (s *Service) sendBatchRootRequest(ctx context.Context, roots [][32]byte, ra
|
||||
return nil
|
||||
}
|
||||
|
||||
// Remove duplicates (if any) from the list of roots.
|
||||
roots = dedupRoots(roots)
|
||||
|
||||
// Filters out in place roots that are already seen in pending blocks or being synced.
|
||||
func() {
|
||||
s.pendingQueueLock.RLock()
|
||||
defer s.pendingQueueLock.RUnlock()
|
||||
|
||||
for i := len(roots) - 1; i >= 0; i-- {
|
||||
r := roots[i]
|
||||
if s.seenPendingBlocks[r] || s.cfg.chain.BlockBeingSynced(r) {
|
||||
roots = append(roots[:i], roots[i+1:]...)
|
||||
continue
|
||||
}
|
||||
|
||||
log.WithField("blockRoot", fmt.Sprintf("%#x", r)).Debug("Requesting block by root")
|
||||
}
|
||||
}()
|
||||
// Filter out roots that are already seen in pending blocks or being synced.
|
||||
roots = s.filterOutPendingAndSynced(roots)
|
||||
|
||||
// Nothing to do, exit early.
|
||||
if len(roots) == 0 {
|
||||
@@ -431,6 +415,27 @@ func (s *Service) sendBatchRootRequest(ctx context.Context, roots [][32]byte, ra
|
||||
return nil
|
||||
}
|
||||
|
||||
// filterOutPendingAndSynced filters out roots that are already seen in pending blocks or being synced.
|
||||
func (s *Service) filterOutPendingAndSynced(roots [][fieldparams.RootLength]byte) [][fieldparams.RootLength]byte {
|
||||
// Remove duplicates (if any) from the list of roots.
|
||||
roots = dedupRoots(roots)
|
||||
|
||||
// Filters out in place roots that are already seen in pending blocks or being synced.
|
||||
s.pendingQueueLock.RLock()
|
||||
defer s.pendingQueueLock.RUnlock()
|
||||
|
||||
for i := len(roots) - 1; i >= 0; i-- {
|
||||
r := roots[i]
|
||||
if s.seenPendingBlocks[r] || s.cfg.chain.BlockBeingSynced(r) {
|
||||
roots = append(roots[:i], roots[i+1:]...)
|
||||
continue
|
||||
}
|
||||
|
||||
log.WithField("blockRoot", fmt.Sprintf("%#x", r)).Debug("Requesting block by root")
|
||||
}
|
||||
return roots
|
||||
}
|
||||
|
||||
func (s *Service) sortedPendingSlots() []primitives.Slot {
|
||||
s.pendingQueueLock.RLock()
|
||||
defer s.pendingQueueLock.RUnlock()
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
"slices"
|
||||
"sort"
|
||||
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain"
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p"
|
||||
@@ -408,6 +409,7 @@ func SendDataColumnSidecarsByRangeRequest(
|
||||
p DataColumnSidecarsParams,
|
||||
pid peer.ID,
|
||||
request *ethpb.DataColumnSidecarsByRangeRequest,
|
||||
vfs ...DataColumnResponseValidation,
|
||||
) ([]blocks.RODataColumn, error) {
|
||||
// Return early if nothing to request.
|
||||
if request == nil || request.Count == 0 || len(request.Columns) == 0 {
|
||||
@@ -457,6 +459,15 @@ func SendDataColumnSidecarsByRangeRequest(
|
||||
}
|
||||
defer closeStream(stream, log)
|
||||
|
||||
requestedSlot, err := isSidecarSlotRequested(request)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "is sidecar slot within bounds")
|
||||
}
|
||||
vfs = append([]DataColumnResponseValidation{
|
||||
isSidecarIndexRequested(request),
|
||||
requestedSlot,
|
||||
}, vfs...)
|
||||
|
||||
// Read the data column sidecars from the stream.
|
||||
roDataColumns := make([]blocks.RODataColumn, 0, totalCount)
|
||||
for range totalCount {
|
||||
@@ -465,16 +476,7 @@ func SendDataColumnSidecarsByRangeRequest(
|
||||
return nil, err
|
||||
}
|
||||
|
||||
validatorSlotWithinBounds, err := isSidecarSlotWithinBounds(request)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "is sidecar slot within bounds")
|
||||
}
|
||||
|
||||
roDataColumn, err := readChunkedDataColumnSidecar(
|
||||
stream, p.P2P, p.CtxMap,
|
||||
validatorSlotWithinBounds,
|
||||
isSidecarIndexRequested(request),
|
||||
)
|
||||
roDataColumn, err := readChunkedDataColumnSidecar(stream, p.P2P, p.CtxMap, vfs...)
|
||||
if errors.Is(err, io.EOF) {
|
||||
return roDataColumns, nil
|
||||
}
|
||||
@@ -497,8 +499,8 @@ func SendDataColumnSidecarsByRangeRequest(
|
||||
return roDataColumns, nil
|
||||
}
|
||||
|
||||
// isSidecarSlotWithinBounds verifies that the slot of the data column sidecar is within the bounds of the request.
|
||||
func isSidecarSlotWithinBounds(request *ethpb.DataColumnSidecarsByRangeRequest) (DataColumnResponseValidation, error) {
|
||||
// isSidecarSlotRequested verifies that the slot of the data column sidecar is within the bounds of the request.
|
||||
func isSidecarSlotRequested(request *ethpb.DataColumnSidecarsByRangeRequest) (DataColumnResponseValidation, error) {
|
||||
// endSlot is exclusive (while request.StartSlot is inclusive).
|
||||
endSlot, err := request.StartSlot.SafeAdd(request.Count)
|
||||
if err != nil {
|
||||
@@ -689,3 +691,14 @@ func readChunkedDataColumnSidecar(
|
||||
|
||||
return &roDataColumn, nil
|
||||
}
|
||||
|
||||
func DataColumnSidecarsByRangeRequest(columns []uint64, start, end primitives.Slot) *ethpb.DataColumnSidecarsByRangeRequest {
|
||||
sort.Slice(columns, func(i, j int) bool {
|
||||
return columns[i] < columns[j]
|
||||
})
|
||||
return ðpb.DataColumnSidecarsByRangeRequest{
|
||||
StartSlot: start,
|
||||
Count: uint64(end-start) + 1,
|
||||
Columns: columns,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1071,7 +1071,7 @@ func TestIsSidecarSlotWithinBounds(t *testing.T) {
|
||||
Count: 10,
|
||||
}
|
||||
|
||||
validator, err := isSidecarSlotWithinBounds(request)
|
||||
validator, err := isSidecarSlotRequested(request)
|
||||
require.NoError(t, err)
|
||||
|
||||
testCases := []struct {
|
||||
|
||||
2
changelog/kasey_backfill-data-columns.md
Normal file
2
changelog/kasey_backfill-data-columns.md
Normal file
@@ -0,0 +1,2 @@
|
||||
### Added
|
||||
- Data column backfill.
|
||||
@@ -286,6 +286,19 @@ func MaxSafeEpoch() primitives.Epoch {
|
||||
return primitives.Epoch(math.MaxUint64 / uint64(params.BeaconConfig().SlotsPerEpoch))
|
||||
}
|
||||
|
||||
// SafeEpochStartOrMax returns the start slot of the given epoch if it will not overflow,
|
||||
// otherwise it returns the
|
||||
func SafeEpochStartOrMax(e primitives.Epoch) primitives.Slot {
|
||||
// The max value converted to a slot can't be the start of a conceptual epoch,
|
||||
// because the first slot of that epoch would be overflow
|
||||
// so use the start slot of the epoch right before that value.
|
||||
me := MaxSafeEpoch() - 1
|
||||
if e > me {
|
||||
return UnsafeEpochStart(me)
|
||||
}
|
||||
return UnsafeEpochStart(e)
|
||||
}
|
||||
|
||||
// SecondsUntilNextEpochStart returns how many seconds until the next Epoch start from the current time and slot
|
||||
func SecondsUntilNextEpochStart(genesis time.Time) (uint64, error) {
|
||||
currentSlot := CurrentSlot(genesis)
|
||||
|
||||
Reference in New Issue
Block a user