peerDAS: Implement dataColumnSidecarByRootRPCHandler. (#15405)

* `CreateTestVerifiedRoDataColumnSidecars`: Use consistent block root.

* peerDAS: Implement `dataColumnSidecarByRootRPCHandler`.

* Fix James' comment.

* Fix James' comment.
This commit is contained in:
Manu NALEPA
2025-06-19 14:12:45 +02:00
committed by GitHub
parent 62fec4d1f3
commit 559d02bf4d
33 changed files with 1143 additions and 392 deletions

View File

@@ -258,3 +258,12 @@ func WithLightClientStore(lcs *lightclient.Store) Option {
return nil
}
}
// WithStartWaitingDataColumnSidecars sets a channel that the `areDataColumnsAvailable` function will fill
// in when starting to wait for additional data columns.
func WithStartWaitingDataColumnSidecars(c chan bool) Option {
return func(s *Service) error {
s.startWaitingDataColumnSidecars = c
return nil
}
}

View File

@@ -637,7 +637,11 @@ func missingDataColumnIndices(bs *filesystem.DataColumnStorage, root [fieldparam
// The function will first check the database to see if all sidecars have been persisted. If any
// sidecars are missing, it will then read from the sidecar notifier channel for the given root until the channel is
// closed, the context hits cancellation/timeout, or notifications have been received for all the missing sidecars.
func (s *Service) isDataAvailable(ctx context.Context, root [32]byte, signedBlock interfaces.ReadOnlySignedBeaconBlock) error {
func (s *Service) isDataAvailable(
ctx context.Context,
root [fieldparams.RootLength]byte,
signedBlock interfaces.ReadOnlySignedBeaconBlock,
) error {
block := signedBlock.Block()
if block == nil {
return errors.New("invalid nil beacon block")
@@ -657,7 +661,11 @@ func (s *Service) isDataAvailable(ctx context.Context, root [32]byte, signedBloc
// areDataColumnsAvailable blocks until all data columns committed to in the block are available,
// or an error or context cancellation occurs. A nil result means that the data availability check is successful.
func (s *Service) areDataColumnsAvailable(ctx context.Context, root [fieldparams.RootLength]byte, block interfaces.ReadOnlyBeaconBlock) error {
func (s *Service) areDataColumnsAvailable(
ctx context.Context,
root [fieldparams.RootLength]byte,
block interfaces.ReadOnlyBeaconBlock,
) error {
// We are only required to check within MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS
blockSlot, currentSlot := block.Slot(), s.CurrentSlot()
blockEpoch, currentEpoch := slots.ToEpoch(blockSlot), slots.ToEpoch(currentSlot)
@@ -724,6 +732,10 @@ func (s *Service) areDataColumnsAvailable(ctx context.Context, root [fieldparams
return nil
}
if s.startWaitingDataColumnSidecars != nil {
s.startWaitingDataColumnSidecars <- true
}
// Log for DA checks that cross over into the next slot; helpful for debugging.
nextSlot := slots.BeginsAt(block.Slot()+1, s.genesisTime)

View File

@@ -3332,17 +3332,27 @@ func testIsAvailableSetup(t *testing.T, params testIsAvailableParams) (context.C
signedBeaconBlock, err := util.GenerateFullBlockFulu(genesisState, secretKeys, conf, 10 /*block slot*/)
require.NoError(t, err)
root, err := signedBeaconBlock.Block.HashTreeRoot()
block := signedBeaconBlock.Block
bodyRoot, err := block.Body.HashTreeRoot()
require.NoError(t, err)
dataColumnsParams := make([]util.DataColumnParams, 0, len(params.columnsToSave))
root, err := block.HashTreeRoot()
require.NoError(t, err)
dataColumnsParams := make([]util.DataColumnParam, 0, len(params.columnsToSave))
for _, i := range params.columnsToSave {
dataColumnParam := util.DataColumnParams{ColumnIndex: i}
dataColumnParam := util.DataColumnParam{
Index: i,
Slot: block.Slot,
ProposerIndex: block.ProposerIndex,
ParentRoot: block.ParentRoot,
StateRoot: block.StateRoot,
BodyRoot: bodyRoot[:],
}
dataColumnsParams = append(dataColumnsParams, dataColumnParam)
}
dataColumnParamsByBlockRoot := util.DataColumnsParamsByRoot{root: dataColumnsParams}
_, verifiedRODataColumns := util.CreateTestVerifiedRoDataColumnSidecars(t, dataColumnParamsByBlockRoot)
_, verifiedRODataColumns := util.CreateTestVerifiedRoDataColumnSidecars(t, dataColumnsParams)
err = dataColumnStorage.Save(verifiedRODataColumns)
require.NoError(t, err)
@@ -3402,38 +3412,47 @@ func TestIsDataAvailable(t *testing.T) {
})
t.Run("Fulu - some initially missing data columns (no reconstruction)", func(t *testing.T) {
startWaiting := make(chan bool)
testParams := testIsAvailableParams{
options: []Option{WithCustodyInfo(&peerdas.CustodyInfo{})},
options: []Option{WithCustodyInfo(&peerdas.CustodyInfo{}), WithStartWaitingDataColumnSidecars(startWaiting)},
columnsToSave: []uint64{1, 17, 19, 75, 102, 117, 119}, // 119 is not needed, 42 and 87 are missing
blobKzgCommitmentsCount: 3,
}
ctx, _, service, root, signed := testIsAvailableSetup(t, testParams)
block := signed.Block()
slot := block.Slot()
proposerIndex := block.ProposerIndex()
parentRoot := block.ParentRoot()
stateRoot := block.StateRoot()
bodyRoot, err := block.Body().HashTreeRoot()
require.NoError(t, err)
var wrongRoot [fieldparams.RootLength]byte
copy(wrongRoot[:], root[:])
wrongRoot[0]++ // change the root to simulate a wrong root
_, verifiedSidecarsWrongRoot := util.CreateTestVerifiedRoDataColumnSidecars(
t,
[]util.DataColumnParam{
{Index: 42, Slot: slot + 1}, // Needed index, but not for this slot.
})
_, verifiedSidecarsWrongRoot := util.CreateTestVerifiedRoDataColumnSidecars(t, util.DataColumnsParamsByRoot{wrongRoot: {
{ColumnIndex: 42}, // needed
}})
_, verifiedSidecars := util.CreateTestVerifiedRoDataColumnSidecars(t, []util.DataColumnParam{
{Index: 87, Slot: slot, ProposerIndex: proposerIndex, ParentRoot: parentRoot[:], StateRoot: stateRoot[:], BodyRoot: bodyRoot[:]}, // Needed index
{Index: 1, Slot: slot, ProposerIndex: proposerIndex, ParentRoot: parentRoot[:], StateRoot: stateRoot[:], BodyRoot: bodyRoot[:]}, // Not needed index
{Index: 42, Slot: slot, ProposerIndex: proposerIndex, ParentRoot: parentRoot[:], StateRoot: stateRoot[:], BodyRoot: bodyRoot[:]}, // Needed index
})
_, verifiedSidecars := util.CreateTestVerifiedRoDataColumnSidecars(t, util.DataColumnsParamsByRoot{root: {
{ColumnIndex: 87}, // needed
{ColumnIndex: 1}, // not needed
{ColumnIndex: 42}, // needed
}})
go func() {
<-startWaiting
time.AfterFunc(10*time.Millisecond, func() {
err := service.dataColumnStorage.Save(verifiedSidecarsWrongRoot)
require.NoError(t, err)
err = service.dataColumnStorage.Save(verifiedSidecars)
require.NoError(t, err)
})
}()
err := service.isDataAvailable(ctx, root, signed)
err = service.isDataAvailable(ctx, root, signed)
require.NoError(t, err)
})
@@ -3442,6 +3461,9 @@ func TestIsDataAvailable(t *testing.T) {
missingColumns = uint64(2)
cgc = 128
)
startWaiting := make(chan bool)
var custodyInfo peerdas.CustodyInfo
custodyInfo.TargetGroupCount.SetValidatorsCustodyRequirement(cgc)
custodyInfo.ToAdvertiseGroupCount.Set(cgc)
@@ -3454,41 +3476,61 @@ func TestIsDataAvailable(t *testing.T) {
}
testParams := testIsAvailableParams{
options: []Option{WithCustodyInfo(&custodyInfo)},
options: []Option{WithCustodyInfo(&custodyInfo), WithStartWaitingDataColumnSidecars(startWaiting)},
columnsToSave: indices,
blobKzgCommitmentsCount: 3,
}
ctx, _, service, root, signed := testIsAvailableSetup(t, testParams)
block := signed.Block()
slot := block.Slot()
proposerIndex := block.ProposerIndex()
parentRoot := block.ParentRoot()
stateRoot := block.StateRoot()
bodyRoot, err := block.Body().HashTreeRoot()
require.NoError(t, err)
dataColumnParams := make([]util.DataColumnParams, 0, missingColumns)
dataColumnParams := make([]util.DataColumnParam, 0, missingColumns)
for i := minimumColumnsCountToReconstruct - missingColumns; i < minimumColumnsCountToReconstruct; i++ {
dataColumnParam := util.DataColumnParams{ColumnIndex: i}
dataColumnParam := util.DataColumnParam{
Index: i,
Slot: slot,
ProposerIndex: proposerIndex,
ParentRoot: parentRoot[:],
StateRoot: stateRoot[:],
BodyRoot: bodyRoot[:],
}
dataColumnParams = append(dataColumnParams, dataColumnParam)
}
_, verifiedSidecars := util.CreateTestVerifiedRoDataColumnSidecars(t, util.DataColumnsParamsByRoot{root: dataColumnParams})
_, verifiedSidecars := util.CreateTestVerifiedRoDataColumnSidecars(t, dataColumnParams)
go func() {
<-startWaiting
time.AfterFunc(10*time.Millisecond, func() {
err := service.dataColumnStorage.Save(verifiedSidecars)
require.NoError(t, err)
})
}()
err := service.isDataAvailable(ctx, root, signed)
err = service.isDataAvailable(ctx, root, signed)
require.NoError(t, err)
})
t.Run("Fulu - some columns are definitively missing", func(t *testing.T) {
startWaiting := make(chan bool)
params := testIsAvailableParams{
options: []Option{WithCustodyInfo(&peerdas.CustodyInfo{})},
options: []Option{WithCustodyInfo(&peerdas.CustodyInfo{}), WithStartWaitingDataColumnSidecars(startWaiting)},
blobKzgCommitmentsCount: 3,
}
ctx, cancel, service, root, signed := testIsAvailableSetup(t, params)
time.AfterFunc(10*time.Millisecond, func() {
go func() {
<-startWaiting
cancel()
})
}()
err := service.isDataAvailable(ctx, root, signed)
require.NotNil(t, err)

View File

@@ -47,27 +47,28 @@ import (
// Service represents a service that handles the internal
// logic of managing the full PoS beacon chain.
type Service struct {
cfg *config
ctx context.Context
cancel context.CancelFunc
genesisTime time.Time
head *head
headLock sync.RWMutex
originBlockRoot [32]byte // genesis root, or weak subjectivity checkpoint root, depending on how the node is initialized
boundaryRoots [][32]byte
checkpointStateCache *cache.CheckpointStateCache
initSyncBlocks map[[32]byte]interfaces.ReadOnlySignedBeaconBlock
initSyncBlocksLock sync.RWMutex
wsVerifier *WeakSubjectivityVerifier
clockSetter startup.ClockSetter
clockWaiter startup.ClockWaiter
syncComplete chan struct{}
blobNotifiers *blobNotifierMap
blockBeingSynced *currentlySyncingBlock
blobStorage *filesystem.BlobStorage
dataColumnStorage *filesystem.DataColumnStorage
slasherEnabled bool
lcStore *lightClient.Store
cfg *config
ctx context.Context
cancel context.CancelFunc
genesisTime time.Time
head *head
headLock sync.RWMutex
originBlockRoot [32]byte // genesis root, or weak subjectivity checkpoint root, depending on how the node is initialized
boundaryRoots [][32]byte
checkpointStateCache *cache.CheckpointStateCache
initSyncBlocks map[[32]byte]interfaces.ReadOnlySignedBeaconBlock
initSyncBlocksLock sync.RWMutex
wsVerifier *WeakSubjectivityVerifier
clockSetter startup.ClockSetter
clockWaiter startup.ClockWaiter
syncComplete chan struct{}
blobNotifiers *blobNotifierMap
blockBeingSynced *currentlySyncingBlock
blobStorage *filesystem.BlobStorage
dataColumnStorage *filesystem.DataColumnStorage
slasherEnabled bool
lcStore *lightClient.Store
startWaitingDataColumnSidecars chan bool // for testing purposes only
}
// config options for the service.

View File

@@ -38,9 +38,9 @@ func TestPersist(t *testing.T) {
t.Run("mixed roots", func(t *testing.T) {
dataColumnStorage := filesystem.NewEphemeralDataColumnStorage(t)
dataColumnParamsByBlockRoot := map[[fieldparams.RootLength]byte][]util.DataColumnParams{
{1}: {{ColumnIndex: 1}},
{2}: {{ColumnIndex: 2}},
dataColumnParamsByBlockRoot := []util.DataColumnParam{
{Slot: 1, Index: 1},
{Slot: 2, Index: 2},
}
roSidecars, _ := roSidecarsFromDataColumnParamsByBlockRoot(t, dataColumnParamsByBlockRoot)
@@ -54,8 +54,8 @@ func TestPersist(t *testing.T) {
t.Run("outside DA period", func(t *testing.T) {
dataColumnStorage := filesystem.NewEphemeralDataColumnStorage(t)
dataColumnParamsByBlockRoot := map[[fieldparams.RootLength]byte][]util.DataColumnParams{
{1}: {{ColumnIndex: 1}},
dataColumnParamsByBlockRoot := []util.DataColumnParam{
{Slot: 1, Index: 1},
}
roSidecars, _ := roSidecarsFromDataColumnParamsByBlockRoot(t, dataColumnParamsByBlockRoot)
@@ -67,21 +67,24 @@ func TestPersist(t *testing.T) {
})
t.Run("nominal", func(t *testing.T) {
const slot = 42
dataColumnStorage := filesystem.NewEphemeralDataColumnStorage(t)
dataColumnParamsByBlockRoot := map[[fieldparams.RootLength]byte][]util.DataColumnParams{
{}: {{ColumnIndex: 1}, {ColumnIndex: 5}},
dataColumnParamsByBlockRoot := []util.DataColumnParam{
{Slot: slot, Index: 1},
{Slot: slot, Index: 5},
}
roSidecars, roDataColumns := roSidecarsFromDataColumnParamsByBlockRoot(t, dataColumnParamsByBlockRoot)
lazilyPersistentStoreColumns := NewLazilyPersistentStoreColumn(dataColumnStorage, enode.ID{}, nil, &peerdas.CustodyInfo{})
err := lazilyPersistentStoreColumns.Persist(0, roSidecars...)
err := lazilyPersistentStoreColumns.Persist(slot, roSidecars...)
require.NoError(t, err)
require.Equal(t, 1, len(lazilyPersistentStoreColumns.cache.entries))
key := cacheKey{slot: 0, root: [fieldparams.RootLength]byte{}}
entry := lazilyPersistentStoreColumns.cache.entries[key]
key := cacheKey{slot: slot, root: roDataColumns[0].BlockRoot()}
entry, ok := lazilyPersistentStoreColumns.cache.entries[key]
require.Equal(t, true, ok)
// A call to Persist does NOT save the sidecars to disk.
require.Equal(t, uint64(0), entry.diskSummary.Count())
@@ -121,24 +124,37 @@ func TestIsDataAvailable(t *testing.T) {
signedBeaconBlockFulu := util.NewBeaconBlockFulu()
signedBeaconBlockFulu.Block.Body.BlobKzgCommitments = commitments
signedRoBlock := newSignedRoBlock(t, signedBeaconBlockFulu)
block := signedRoBlock.Block()
slot := block.Slot()
proposerIndex := block.ProposerIndex()
parentRoot := block.ParentRoot()
stateRoot := block.StateRoot()
bodyRoot, err := block.Body().HashTreeRoot()
require.NoError(t, err)
root := signedRoBlock.Root()
dataColumnStorage := filesystem.NewEphemeralDataColumnStorage(t)
lazilyPersistentStoreColumns := NewLazilyPersistentStoreColumn(dataColumnStorage, enode.ID{}, newDataColumnsVerifier, &peerdas.CustodyInfo{})
indices := [...]uint64{1, 17, 87, 102}
dataColumnsParams := make([]util.DataColumnParams, 0, len(indices))
dataColumnsParams := make([]util.DataColumnParam, 0, len(indices))
for _, index := range indices {
dataColumnParams := util.DataColumnParams{
ColumnIndex: index,
dataColumnParams := util.DataColumnParam{
Index: index,
KzgCommitments: commitments,
Slot: slot,
ProposerIndex: proposerIndex,
ParentRoot: parentRoot[:],
StateRoot: stateRoot[:],
BodyRoot: bodyRoot[:],
}
dataColumnsParams = append(dataColumnsParams, dataColumnParams)
}
dataColumnsParamsByBlockRoot := util.DataColumnsParamsByRoot{root: dataColumnsParams}
_, verifiedRoDataColumns := util.CreateTestVerifiedRoDataColumnSidecars(t, dataColumnsParamsByBlockRoot)
_, verifiedRoDataColumns := util.CreateTestVerifiedRoDataColumnSidecars(t, dataColumnsParams)
key := cacheKey{root: root}
entry := lazilyPersistentStoreColumns.cache.ensure(key)
@@ -149,7 +165,7 @@ func TestIsDataAvailable(t *testing.T) {
require.NoError(t, err)
}
err := lazilyPersistentStoreColumns.IsDataAvailable(ctx, 0 /*current slot*/, signedRoBlock)
err = lazilyPersistentStoreColumns.IsDataAvailable(ctx, slot, signedRoBlock)
require.NoError(t, err)
actual, err := dataColumnStorage.Get(root, indices[:])
@@ -224,8 +240,8 @@ func TestFullCommitmentsToCheck(t *testing.T) {
}
}
func roSidecarsFromDataColumnParamsByBlockRoot(t *testing.T, dataColumnParamsByBlockRoot util.DataColumnsParamsByRoot) ([]blocks.ROSidecar, []blocks.RODataColumn) {
roDataColumns, _ := util.CreateTestVerifiedRoDataColumnSidecars(t, dataColumnParamsByBlockRoot)
func roSidecarsFromDataColumnParamsByBlockRoot(t *testing.T, parameters []util.DataColumnParam) ([]blocks.ROSidecar, []blocks.RODataColumn) {
roDataColumns, _ := util.CreateTestVerifiedRoDataColumnSidecars(t, parameters)
roSidecars := make([]blocks.ROSidecar, 0, len(roDataColumns))
for _, roDataColumn := range roDataColumns {

View File

@@ -28,8 +28,7 @@ func TestEnsureDeleteSetDiskSummary(t *testing.T) {
func TestStash(t *testing.T) {
t.Run("Index too high", func(t *testing.T) {
dataColumnParamsByBlockRoot := util.DataColumnsParamsByRoot{{1}: {{ColumnIndex: 10_000}}}
roDataColumns, _ := util.CreateTestVerifiedRoDataColumnSidecars(t, dataColumnParamsByBlockRoot)
roDataColumns, _ := util.CreateTestVerifiedRoDataColumnSidecars(t, []util.DataColumnParam{{Index: 10_000}})
var entry dataColumnCacheEntry
err := entry.stash(&roDataColumns[0])
@@ -37,8 +36,7 @@ func TestStash(t *testing.T) {
})
t.Run("Nominal and already existing", func(t *testing.T) {
dataColumnParamsByBlockRoot := util.DataColumnsParamsByRoot{{1}: {{ColumnIndex: 1}}}
roDataColumns, _ := util.CreateTestVerifiedRoDataColumnSidecars(t, dataColumnParamsByBlockRoot)
roDataColumns, _ := util.CreateTestVerifiedRoDataColumnSidecars(t, []util.DataColumnParam{{Index: 1}})
var entry dataColumnCacheEntry
err := entry.stash(&roDataColumns[0])
@@ -76,36 +74,30 @@ func TestFilterDataColumns(t *testing.T) {
})
t.Run("Commitments not equal", func(t *testing.T) {
root := [fieldparams.RootLength]byte{}
commitmentsArray := safeCommitmentsArray{nil, [][]byte{[]byte{1}}}
dataColumnParamsByBlockRoot := util.DataColumnsParamsByRoot{root: {{ColumnIndex: 1}}}
roDataColumns, _ := util.CreateTestVerifiedRoDataColumnSidecars(t, dataColumnParamsByBlockRoot)
roDataColumns, _ := util.CreateTestVerifiedRoDataColumnSidecars(t, []util.DataColumnParam{{Index: 1}})
var scs [fieldparams.NumberOfColumns]*blocks.RODataColumn
scs[1] = &roDataColumns[0]
dataColumnCacheEntry := dataColumnCacheEntry{scs: scs}
_, err := dataColumnCacheEntry.filter(root, &commitmentsArray)
_, err := dataColumnCacheEntry.filter(roDataColumns[0].BlockRoot(), &commitmentsArray)
require.NotNil(t, err)
})
t.Run("Nominal", func(t *testing.T) {
root := [fieldparams.RootLength]byte{}
commitmentsArray := safeCommitmentsArray{nil, [][]byte{[]byte{1}}, nil, [][]byte{[]byte{3}}}
diskSummary := filesystem.NewDataColumnStorageSummary(42, [fieldparams.NumberOfColumns]bool{false, true})
dataColumnParamsByBlockRoot := util.DataColumnsParamsByRoot{root: {{ColumnIndex: 3, KzgCommitments: [][]byte{[]byte{3}}}}}
expected, _ := util.CreateTestVerifiedRoDataColumnSidecars(t, dataColumnParamsByBlockRoot)
expected, _ := util.CreateTestVerifiedRoDataColumnSidecars(t, []util.DataColumnParam{{Index: 3, KzgCommitments: [][]byte{[]byte{3}}}})
var scs [fieldparams.NumberOfColumns]*blocks.RODataColumn
scs[3] = &expected[0]
dataColumnCacheEntry := dataColumnCacheEntry{scs: scs, diskSummary: diskSummary}
actual, err := dataColumnCacheEntry.filter(root, &commitmentsArray)
actual, err := dataColumnCacheEntry.filter(expected[0].BlockRoot(), &commitmentsArray)
require.NoError(t, err)
require.DeepEqual(t, expected, actual)

View File

@@ -59,6 +59,7 @@ go_test(
"//beacon-chain/verification:go_default_library",
"//config/fieldparams:go_default_library",
"//config/params:go_default_library",
"//consensus-types/blocks:go_default_library",
"//consensus-types/primitives:go_default_library",
"//encoding/bytesutil:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",

View File

@@ -25,7 +25,7 @@ func directoryPermissions() os.FileMode {
var (
errIndexOutOfBounds = errors.New("blob index in file name >= MAX_BLOBS_PER_BLOCK")
errSidecarEmptySSZData = errors.New("sidecar marshalled to an empty ssz byte slice")
errNoBasePath = errors.New("BlobStorage base path not specified in init")
errNoBlobBasePath = errors.New("BlobStorage base path not specified in init")
)
// BlobStorageOption is a functional option for configuring a BlobStorage.
@@ -85,7 +85,7 @@ func NewBlobStorage(opts ...BlobStorageOption) (*BlobStorage, error) {
// Allow tests to set up a different fs using WithFs.
if b.fs == nil {
if b.base == "" {
return nil, errNoBasePath
return nil, errNoBlobBasePath
}
b.base = path.Clean(b.base)
if err := file.MkdirAll(b.base); err != nil {

View File

@@ -160,7 +160,7 @@ func writeFakeSSZ(t *testing.T, fs afero.Fs, root [32]byte, slot primitives.Slot
func TestNewBlobStorage(t *testing.T) {
_, err := NewBlobStorage()
require.ErrorIs(t, err, errNoBasePath)
require.ErrorIs(t, err, errNoBlobBasePath)
_, err = NewBlobStorage(WithBasePath(path.Join(t.TempDir(), "good")))
require.NoError(t, err)
}

View File

@@ -50,6 +50,7 @@ var (
errTooManyDataColumns = errors.New("too many data columns")
errWrongSszEncodedDataColumnSidecarSize = errors.New("wrong SSZ encoded data column sidecar size")
errDataColumnSidecarsFromDifferentSlots = errors.New("data column sidecars from different slots")
errNoDataColumnBasePath = errors.New("DataColumnStorage base path not specified in init")
)
type (
@@ -142,7 +143,7 @@ func NewDataColumnStorage(ctx context.Context, opts ...DataColumnStorageOption)
// Allow tests to set up a different fs using WithFs.
if storage.fs == nil {
if storage.base == "" {
return nil, errNoBasePath
return nil, errNoDataColumnBasePath
}
storage.base = path.Clean(storage.base)

View File

@@ -7,6 +7,7 @@ import (
fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams"
"github.com/OffchainLabs/prysm/v6/config/params"
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
"github.com/OffchainLabs/prysm/v6/testing/require"
"github.com/OffchainLabs/prysm/v6/testing/util"
@@ -18,7 +19,7 @@ func TestNewDataColumnStorage(t *testing.T) {
t.Run("No base path", func(t *testing.T) {
_, err := NewDataColumnStorage(ctx)
require.ErrorIs(t, err, errNoBasePath)
require.ErrorIs(t, err, errNoDataColumnBasePath)
})
t.Run("Nominal", func(t *testing.T) {
@@ -40,32 +41,18 @@ func TestWarmCache(t *testing.T) {
_, verifiedRoDataColumnSidecars := util.CreateTestVerifiedRoDataColumnSidecars(
t,
util.DataColumnsParamsByRoot{
{0}: {
{Slot: 33, ColumnIndex: 2, DataColumn: []byte{1, 2, 3}}, // Period 0 - Epoch 1
{Slot: 33, ColumnIndex: 4, DataColumn: []byte{2, 3, 4}}, // Period 0 - Epoch 1
},
{1}: {
{Slot: 128_002, ColumnIndex: 2, DataColumn: []byte{1, 2, 3}}, // Period 0 - Epoch 4000
{Slot: 128_002, ColumnIndex: 4, DataColumn: []byte{2, 3, 4}}, // Period 0 - Epoch 4000
},
{2}: {
{Slot: 128_003, ColumnIndex: 1, DataColumn: []byte{1, 2, 3}}, // Period 0 - Epoch 4000
{Slot: 128_003, ColumnIndex: 3, DataColumn: []byte{2, 3, 4}}, // Period 0 - Epoch 4000
},
{3}: {
{Slot: 128_034, ColumnIndex: 2, DataColumn: []byte{1, 2, 3}}, // Period 0 - Epoch 4001
{Slot: 128_034, ColumnIndex: 4, DataColumn: []byte{2, 3, 4}}, // Period 0 - Epoch 4001
},
{4}: {
{Slot: 131_138, ColumnIndex: 2, DataColumn: []byte{1, 2, 3}}, // Period 1 - Epoch 4098
},
{5}: {
{Slot: 131_138, ColumnIndex: 1, DataColumn: []byte{1, 2, 3}}, // Period 1 - Epoch 4098
},
{6}: {
{Slot: 131_168, ColumnIndex: 0, DataColumn: []byte{1, 2, 3}}, // Period 1 - Epoch 4099
},
[]util.DataColumnParam{
{Slot: 33, Index: 2, Column: [][]byte{{1}, {2}, {3}}}, // Period 0 - Epoch 1
{Slot: 33, Index: 4, Column: [][]byte{{2}, {3}, {4}}}, // Period 0 - Epoch 1
{Slot: 128_002, Index: 2, Column: [][]byte{{1}, {2}, {3}}}, // Period 0 - Epoch 4000
{Slot: 128_002, Index: 4, Column: [][]byte{{2}, {3}, {4}}}, // Period 0 - Epoch 4000
{Slot: 128_003, Index: 1, Column: [][]byte{{1}, {2}, {3}}}, // Period 0 - Epoch 4000
{Slot: 128_003, Index: 3, Column: [][]byte{{2}, {3}, {4}}}, // Period 0 - Epoch 4000
{Slot: 128_034, Index: 2, Column: [][]byte{{1}, {2}, {3}}}, // Period 0 - Epoch 4001
{Slot: 128_034, Index: 4, Column: [][]byte{{2}, {3}, {4}}}, // Period 0 - Epoch 4001
{Slot: 131_138, Index: 2, Column: [][]byte{{1}, {2}, {3}}}, // Period 1 - Epoch 4098
{Slot: 131_138, Index: 1, Column: [][]byte{{1}, {2}, {3}}}, // Period 1 - Epoch 4098
{Slot: 131_168, Index: 0, Column: [][]byte{{1}, {2}, {3}}}, // Period 1 - Epoch 4099
},
)
@@ -76,29 +63,25 @@ func TestWarmCache(t *testing.T) {
storage.WarmCache()
require.Equal(t, primitives.Epoch(4_000), storage.cache.lowestCachedEpoch)
require.Equal(t, 6, len(storage.cache.cache))
require.Equal(t, 5, len(storage.cache.cache))
summary, ok := storage.cache.get([fieldparams.RootLength]byte{1})
summary, ok := storage.cache.get(verifiedRoDataColumnSidecars[2].BlockRoot())
require.Equal(t, true, ok)
require.DeepEqual(t, DataColumnStorageSummary{epoch: 4_000, mask: [fieldparams.NumberOfColumns]bool{false, false, true, false, true}}, summary)
summary, ok = storage.cache.get([fieldparams.RootLength]byte{2})
summary, ok = storage.cache.get(verifiedRoDataColumnSidecars[4].BlockRoot())
require.Equal(t, true, ok)
require.DeepEqual(t, DataColumnStorageSummary{epoch: 4_000, mask: [fieldparams.NumberOfColumns]bool{false, true, false, true}}, summary)
summary, ok = storage.cache.get([fieldparams.RootLength]byte{3})
summary, ok = storage.cache.get(verifiedRoDataColumnSidecars[6].BlockRoot())
require.Equal(t, true, ok)
require.DeepEqual(t, DataColumnStorageSummary{epoch: 4_001, mask: [fieldparams.NumberOfColumns]bool{false, false, true, false, true}}, summary)
summary, ok = storage.cache.get([fieldparams.RootLength]byte{4})
summary, ok = storage.cache.get(verifiedRoDataColumnSidecars[8].BlockRoot())
require.Equal(t, true, ok)
require.DeepEqual(t, DataColumnStorageSummary{epoch: 4_098, mask: [fieldparams.NumberOfColumns]bool{false, false, true}}, summary)
require.DeepEqual(t, DataColumnStorageSummary{epoch: 4_098, mask: [fieldparams.NumberOfColumns]bool{false, true, true}}, summary)
summary, ok = storage.cache.get([fieldparams.RootLength]byte{5})
require.Equal(t, true, ok)
require.DeepEqual(t, DataColumnStorageSummary{epoch: 4_098, mask: [fieldparams.NumberOfColumns]bool{false, true}}, summary)
summary, ok = storage.cache.get([fieldparams.RootLength]byte{6})
summary, ok = storage.cache.get(verifiedRoDataColumnSidecars[10].BlockRoot())
require.Equal(t, true, ok)
require.DeepEqual(t, DataColumnStorageSummary{epoch: 4_099, mask: [fieldparams.NumberOfColumns]bool{true}}, summary)
}
@@ -112,9 +95,7 @@ func TestSaveDataColumnsSidecars(t *testing.T) {
_, verifiedRoDataColumnSidecars := util.CreateTestVerifiedRoDataColumnSidecars(
t,
util.DataColumnsParamsByRoot{
{}: {{ColumnIndex: 12}, {ColumnIndex: 1_000_000}, {ColumnIndex: 48}},
},
[]util.DataColumnParam{{Index: 12}, {Index: 1_000_000}, {Index: 48}},
)
_, dataColumnStorage := NewEphemeralDataColumnStorageAndFs(t)
@@ -125,7 +106,7 @@ func TestSaveDataColumnsSidecars(t *testing.T) {
t.Run("one of the column index is too large", func(t *testing.T) {
_, verifiedRoDataColumnSidecars := util.CreateTestVerifiedRoDataColumnSidecars(
t,
util.DataColumnsParamsByRoot{{}: {{ColumnIndex: 12}, {ColumnIndex: 1_000_000}, {ColumnIndex: 48}}},
[]util.DataColumnParam{{Index: 12}, {Index: 1_000_000}, {Index: 48}},
)
_, dataColumnStorage := NewEphemeralDataColumnStorageAndFs(t)
@@ -136,23 +117,34 @@ func TestSaveDataColumnsSidecars(t *testing.T) {
t.Run("different slots", func(t *testing.T) {
_, verifiedRoDataColumnSidecars := util.CreateTestVerifiedRoDataColumnSidecars(
t,
util.DataColumnsParamsByRoot{
{}: {
{Slot: 1, ColumnIndex: 12, DataColumn: []byte{1, 2, 3}},
{Slot: 2, ColumnIndex: 12, DataColumn: []byte{1, 2, 3}},
},
[]util.DataColumnParam{
{Slot: 1, Index: 12, Column: [][]byte{{1}, {2}, {3}}},
{Slot: 2, Index: 12, Column: [][]byte{{1}, {2}, {3}}},
},
)
// Create a sidecar with a different slot but the same root.
alteredVerifiedRoDataColumnSidecars := make([]blocks.VerifiedRODataColumn, 0, 2)
alteredVerifiedRoDataColumnSidecars = append(alteredVerifiedRoDataColumnSidecars, verifiedRoDataColumnSidecars[0])
altered, err := blocks.NewRODataColumnWithRoot(
verifiedRoDataColumnSidecars[1].RODataColumn.DataColumnSidecar,
verifiedRoDataColumnSidecars[0].BlockRoot(),
)
require.NoError(t, err)
verifiedAltered := blocks.NewVerifiedRODataColumn(altered)
alteredVerifiedRoDataColumnSidecars = append(alteredVerifiedRoDataColumnSidecars, verifiedAltered)
_, dataColumnStorage := NewEphemeralDataColumnStorageAndFs(t)
err := dataColumnStorage.Save(verifiedRoDataColumnSidecars)
err = dataColumnStorage.Save(alteredVerifiedRoDataColumnSidecars)
require.ErrorIs(t, err, errDataColumnSidecarsFromDifferentSlots)
})
t.Run("new file - no data columns to save", func(t *testing.T) {
_, verifiedRoDataColumnSidecars := util.CreateTestVerifiedRoDataColumnSidecars(
t,
util.DataColumnsParamsByRoot{{}: {}},
[]util.DataColumnParam{},
)
_, dataColumnStorage := NewEphemeralDataColumnStorageAndFs(t)
@@ -163,11 +155,9 @@ func TestSaveDataColumnsSidecars(t *testing.T) {
t.Run("new file - different data column size", func(t *testing.T) {
_, verifiedRoDataColumnSidecars := util.CreateTestVerifiedRoDataColumnSidecars(
t,
util.DataColumnsParamsByRoot{
{}: {
{ColumnIndex: 12, DataColumn: []byte{1, 2, 3}},
{ColumnIndex: 11, DataColumn: []byte{1, 2, 3, 4}},
},
[]util.DataColumnParam{
{Slot: 1, Index: 12, Column: [][]byte{{1}, {2}, {3}}},
{Slot: 1, Index: 13, Column: [][]byte{{1}, {2}, {3}, {4}}},
},
)
@@ -179,7 +169,9 @@ func TestSaveDataColumnsSidecars(t *testing.T) {
t.Run("existing file - wrong incoming SSZ encoded size", func(t *testing.T) {
_, verifiedRoDataColumnSidecars := util.CreateTestVerifiedRoDataColumnSidecars(
t,
util.DataColumnsParamsByRoot{{1}: {{ColumnIndex: 12, DataColumn: []byte{1, 2, 3}}}},
[]util.DataColumnParam{
{Slot: 1, Index: 12, Column: [][]byte{{1}, {2}, {3}}},
},
)
// Save data columns into a file.
@@ -191,7 +183,9 @@ func TestSaveDataColumnsSidecars(t *testing.T) {
// column index and an different SSZ encoded size.
_, verifiedRoDataColumnSidecars = util.CreateTestVerifiedRoDataColumnSidecars(
t,
util.DataColumnsParamsByRoot{{1}: {{ColumnIndex: 13, DataColumn: []byte{1, 2, 3, 4}}}},
[]util.DataColumnParam{
{Slot: 1, Index: 13, Column: [][]byte{{1}, {2}, {3}, {4}}},
},
)
// Try to rewrite the file.
@@ -202,17 +196,13 @@ func TestSaveDataColumnsSidecars(t *testing.T) {
t.Run("nominal", func(t *testing.T) {
_, inputVerifiedRoDataColumnSidecars := util.CreateTestVerifiedRoDataColumnSidecars(
t,
util.DataColumnsParamsByRoot{
{1}: {
{ColumnIndex: 12, DataColumn: []byte{1, 2, 3}},
{ColumnIndex: 11, DataColumn: []byte{3, 4, 5}},
{ColumnIndex: 12, DataColumn: []byte{1, 2, 3}}, // OK if duplicate
{ColumnIndex: 13, DataColumn: []byte{6, 7, 8}},
},
{2}: {
{ColumnIndex: 12, DataColumn: []byte{3, 4, 5}},
{ColumnIndex: 13, DataColumn: []byte{6, 7, 8}},
},
[]util.DataColumnParam{
{Slot: 1, Index: 12, Column: [][]byte{{1}, {2}, {3}}},
{Slot: 1, Index: 11, Column: [][]byte{{3}, {4}, {5}}},
{Slot: 1, Index: 12, Column: [][]byte{{1}, {2}, {3}}}, // OK if duplicate
{Slot: 1, Index: 13, Column: [][]byte{{6}, {7}, {8}}},
{Slot: 2, Index: 12, Column: [][]byte{{3}, {4}, {5}}},
{Slot: 2, Index: 13, Column: [][]byte{{6}, {7}, {8}}},
},
)
@@ -222,16 +212,12 @@ func TestSaveDataColumnsSidecars(t *testing.T) {
_, inputVerifiedRoDataColumnSidecars = util.CreateTestVerifiedRoDataColumnSidecars(
t,
util.DataColumnsParamsByRoot{
{1}: {
{ColumnIndex: 12, DataColumn: []byte{1, 2, 3}}, // OK if duplicate
{ColumnIndex: 15, DataColumn: []byte{2, 3, 4}},
{ColumnIndex: 1, DataColumn: []byte{2, 3, 4}},
},
{3}: {
{ColumnIndex: 6, DataColumn: []byte{3, 4, 5}},
{ColumnIndex: 2, DataColumn: []byte{6, 7, 8}},
},
[]util.DataColumnParam{
{Slot: 1, Index: 12, Column: [][]byte{{1}, {2}, {3}}}, // OK if duplicate
{Slot: 1, Index: 15, Column: [][]byte{{2}, {3}, {4}}},
{Slot: 1, Index: 1, Column: [][]byte{{2}, {3}, {4}}},
{Slot: 3, Index: 6, Column: [][]byte{{3}, {4}, {5}}},
{Slot: 3, Index: 2, Column: [][]byte{{6}, {7}, {8}}},
},
)
@@ -240,51 +226,47 @@ func TestSaveDataColumnsSidecars(t *testing.T) {
type fixture struct {
fileName string
blockRoot [fieldparams.RootLength]byte
expectedIndices [mandatoryNumberOfColumns]byte
dataColumnParams []util.DataColumnParams
dataColumnParams []util.DataColumnParam
}
fixtures := []fixture{
{
fileName: "0/0/0x0100000000000000000000000000000000000000000000000000000000000000.sszs",
blockRoot: [fieldparams.RootLength]byte{1},
fileName: "0/0/0x8bb2f09de48c102635622dc27e6de03ae2b22639df7c33edbc8222b2ec423746.sszs",
expectedIndices: [mandatoryNumberOfColumns]byte{
0, nonZeroOffset + 4, 0, 0, 0, 0, 0, 0,
0, 0, 0, nonZeroOffset + 1, nonZeroOffset, nonZeroOffset + 2, 0, nonZeroOffset + 3,
// The rest is filled with zeroes.
},
dataColumnParams: []util.DataColumnParams{
{ColumnIndex: 12, DataColumn: []byte{1, 2, 3}},
{ColumnIndex: 11, DataColumn: []byte{3, 4, 5}},
{ColumnIndex: 13, DataColumn: []byte{6, 7, 8}},
{ColumnIndex: 15, DataColumn: []byte{2, 3, 4}},
{ColumnIndex: 1, DataColumn: []byte{2, 3, 4}},
dataColumnParams: []util.DataColumnParam{
{Slot: 1, Index: 12, Column: [][]byte{{1}, {2}, {3}}},
{Slot: 1, Index: 11, Column: [][]byte{{3}, {4}, {5}}},
{Slot: 1, Index: 13, Column: [][]byte{{6}, {7}, {8}}},
{Slot: 1, Index: 15, Column: [][]byte{{2}, {3}, {4}}},
{Slot: 1, Index: 1, Column: [][]byte{{2}, {3}, {4}}},
},
},
{
fileName: "0/0/0x0200000000000000000000000000000000000000000000000000000000000000.sszs",
blockRoot: [fieldparams.RootLength]byte{2},
fileName: "0/0/0x221f88cae2219050d4e9d8c2d0d83cb4c8ce4c84ab1bb3e0b89f3dec36077c4f.sszs",
expectedIndices: [mandatoryNumberOfColumns]byte{
0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, nonZeroOffset, nonZeroOffset + 1, 0, 0,
// The rest is filled with zeroes.
},
dataColumnParams: []util.DataColumnParams{
{ColumnIndex: 12, DataColumn: []byte{3, 4, 5}},
{ColumnIndex: 13, DataColumn: []byte{6, 7, 8}},
dataColumnParams: []util.DataColumnParam{
{Slot: 2, Index: 12, Column: [][]byte{{3}, {4}, {5}}},
{Slot: 2, Index: 13, Column: [][]byte{{6}, {7}, {8}}},
},
},
{
fileName: "0/0/0x0300000000000000000000000000000000000000000000000000000000000000.sszs",
blockRoot: [fieldparams.RootLength]byte{3},
fileName: "0/0/0x7b163bd57e1c4c8b5048c5389698098f4c957d62d7ce86f4ffa9bdc75c16a18b.sszs",
expectedIndices: [mandatoryNumberOfColumns]byte{
0, 0, nonZeroOffset + 1, 0, 0, 0, nonZeroOffset, 0,
// The rest is filled with zeroes.
},
dataColumnParams: []util.DataColumnParams{
{ColumnIndex: 6, DataColumn: []byte{3, 4, 5}},
{ColumnIndex: 2, DataColumn: []byte{6, 7, 8}},
dataColumnParams: []util.DataColumnParam{
{Slot: 3, Index: 6, Column: [][]byte{{3}, {4}, {5}}},
{Slot: 3, Index: 2, Column: [][]byte{{6}, {7}, {8}}},
},
},
}
@@ -293,7 +275,7 @@ func TestSaveDataColumnsSidecars(t *testing.T) {
// Build expected data column sidecars.
_, expectedDataColumnSidecars := util.CreateTestVerifiedRoDataColumnSidecars(
t,
util.DataColumnsParamsByRoot{fixture.blockRoot: fixture.dataColumnParams},
fixture.dataColumnParams,
)
// Build expected bytes.
@@ -320,6 +302,8 @@ func TestSaveDataColumnsSidecars(t *testing.T) {
expectedBytes = append(expectedBytes, fixture.expectedIndices[:]...)
expectedBytes = append(expectedBytes, sszEncodedDataColumnSidecars...)
blockRoot := expectedDataColumnSidecars[0].BlockRoot()
// Check the actual content of the file.
actualBytes, err := afero.ReadFile(dataColumnStorage.fs, fixture.fileName)
require.NoError(t, err)
@@ -328,18 +312,18 @@ func TestSaveDataColumnsSidecars(t *testing.T) {
// Check the summary.
indices := map[uint64]bool{}
for _, dataColumnParam := range fixture.dataColumnParams {
indices[dataColumnParam.ColumnIndex] = true
indices[dataColumnParam.Index] = true
}
summary := dataColumnStorage.Summary(fixture.blockRoot)
summary := dataColumnStorage.Summary(blockRoot)
for index := range uint64(mandatoryNumberOfColumns) {
require.Equal(t, indices[index], summary.HasIndex(index))
}
err = dataColumnStorage.Remove(fixture.blockRoot)
err = dataColumnStorage.Remove(blockRoot)
require.NoError(t, err)
summary = dataColumnStorage.Summary(fixture.blockRoot)
summary = dataColumnStorage.Summary(blockRoot)
for index := range uint64(mandatoryNumberOfColumns) {
require.Equal(t, false, summary.HasIndex(index))
}
@@ -362,11 +346,9 @@ func TestGetDataColumnSidecars(t *testing.T) {
t.Run("indices not found", func(t *testing.T) {
_, savedVerifiedRoDataColumnSidecars := util.CreateTestVerifiedRoDataColumnSidecars(
t,
util.DataColumnsParamsByRoot{
{1}: {
{ColumnIndex: 12, DataColumn: []byte{1, 2, 3}},
{ColumnIndex: 14, DataColumn: []byte{2, 3, 4}},
},
[]util.DataColumnParam{
{Index: 12, Column: [][]byte{{1}, {2}, {3}}},
{Index: 14, Column: [][]byte{{2}, {3}, {4}}},
},
)
@@ -374,7 +356,7 @@ func TestGetDataColumnSidecars(t *testing.T) {
err := dataColumnStorage.Save(savedVerifiedRoDataColumnSidecars)
require.NoError(t, err)
verifiedRODataColumnSidecars, err := dataColumnStorage.Get([fieldparams.RootLength]byte{1}, []uint64{3, 1, 2})
verifiedRODataColumnSidecars, err := dataColumnStorage.Get(savedVerifiedRoDataColumnSidecars[0].BlockRoot(), []uint64{3, 1, 2})
require.NoError(t, err)
require.Equal(t, 0, len(verifiedRODataColumnSidecars))
})
@@ -382,11 +364,9 @@ func TestGetDataColumnSidecars(t *testing.T) {
t.Run("nominal", func(t *testing.T) {
_, expectedVerifiedRoDataColumnSidecars := util.CreateTestVerifiedRoDataColumnSidecars(
t,
util.DataColumnsParamsByRoot{
{1}: {
{ColumnIndex: 12, DataColumn: []byte{1, 2, 3}},
{ColumnIndex: 14, DataColumn: []byte{2, 3, 4}},
},
[]util.DataColumnParam{
{Index: 12, Column: [][]byte{{1}, {2}, {3}}},
{Index: 14, Column: [][]byte{{2}, {3}, {4}}},
},
)
@@ -394,11 +374,13 @@ func TestGetDataColumnSidecars(t *testing.T) {
err := dataColumnStorage.Save(expectedVerifiedRoDataColumnSidecars)
require.NoError(t, err)
verifiedRODataColumnSidecars, err := dataColumnStorage.Get([fieldparams.RootLength]byte{1}, nil)
root := expectedVerifiedRoDataColumnSidecars[0].BlockRoot()
verifiedRODataColumnSidecars, err := dataColumnStorage.Get(root, nil)
require.NoError(t, err)
require.DeepSSZEqual(t, expectedVerifiedRoDataColumnSidecars, verifiedRODataColumnSidecars)
verifiedRODataColumnSidecars, err = dataColumnStorage.Get([fieldparams.RootLength]byte{1}, []uint64{12, 13, 14})
verifiedRODataColumnSidecars, err = dataColumnStorage.Get(root, []uint64{12, 13, 14})
require.NoError(t, err)
require.DeepSSZEqual(t, expectedVerifiedRoDataColumnSidecars, verifiedRODataColumnSidecars)
})
@@ -414,15 +396,11 @@ func TestRemove(t *testing.T) {
t.Run("nominal", func(t *testing.T) {
_, inputVerifiedRoDataColumnSidecars := util.CreateTestVerifiedRoDataColumnSidecars(
t,
util.DataColumnsParamsByRoot{
{1}: {
{Slot: 32, ColumnIndex: 10, DataColumn: []byte{1, 2, 3}},
{Slot: 32, ColumnIndex: 11, DataColumn: []byte{2, 3, 4}},
},
{2}: {
{Slot: 33, ColumnIndex: 10, DataColumn: []byte{1, 2, 3}},
{Slot: 33, ColumnIndex: 11, DataColumn: []byte{2, 3, 4}},
},
[]util.DataColumnParam{
{Slot: 32, Index: 10, Column: [][]byte{{1}, {2}, {3}}},
{Slot: 32, Index: 11, Column: [][]byte{{2}, {3}, {4}}},
{Slot: 33, Index: 10, Column: [][]byte{{1}, {2}, {3}}},
{Slot: 33, Index: 11, Column: [][]byte{{2}, {3}, {4}}},
},
)
@@ -430,22 +408,22 @@ func TestRemove(t *testing.T) {
err := dataColumnStorage.Save(inputVerifiedRoDataColumnSidecars)
require.NoError(t, err)
err = dataColumnStorage.Remove([fieldparams.RootLength]byte{1})
err = dataColumnStorage.Remove(inputVerifiedRoDataColumnSidecars[0].BlockRoot())
require.NoError(t, err)
summary := dataColumnStorage.Summary([fieldparams.RootLength]byte{1})
summary := dataColumnStorage.Summary(inputVerifiedRoDataColumnSidecars[0].BlockRoot())
require.Equal(t, primitives.Epoch(0), summary.epoch)
require.Equal(t, uint64(0), summary.Count())
summary = dataColumnStorage.Summary([fieldparams.RootLength]byte{2})
summary = dataColumnStorage.Summary(inputVerifiedRoDataColumnSidecars[3].BlockRoot())
require.Equal(t, primitives.Epoch(1), summary.epoch)
require.Equal(t, uint64(2), summary.Count())
actual, err := dataColumnStorage.Get([fieldparams.RootLength]byte{1}, nil)
actual, err := dataColumnStorage.Get(inputVerifiedRoDataColumnSidecars[0].BlockRoot(), nil)
require.NoError(t, err)
require.Equal(t, 0, len(actual))
actual, err = dataColumnStorage.Get([fieldparams.RootLength]byte{2}, nil)
actual, err = dataColumnStorage.Get(inputVerifiedRoDataColumnSidecars[3].BlockRoot(), nil)
require.NoError(t, err)
require.Equal(t, 2, len(actual))
})
@@ -454,9 +432,9 @@ func TestRemove(t *testing.T) {
func TestClear(t *testing.T) {
_, inputVerifiedRoDataColumnSidecars := util.CreateTestVerifiedRoDataColumnSidecars(
t,
util.DataColumnsParamsByRoot{
{1}: {{ColumnIndex: 12, DataColumn: []byte{1, 2, 3}}},
{2}: {{ColumnIndex: 13, DataColumn: []byte{6, 7, 8}}},
[]util.DataColumnParam{
{Slot: 1, Index: 12, Column: [][]byte{{1}, {2}, {3}}},
{Slot: 2, Index: 13, Column: [][]byte{{6}, {7}, {8}}},
},
)
@@ -465,8 +443,8 @@ func TestClear(t *testing.T) {
require.NoError(t, err)
filePaths := []string{
"0/0/0x0100000000000000000000000000000000000000000000000000000000000000.sszs",
"0/0/0x0200000000000000000000000000000000000000000000000000000000000000.sszs",
"0/0/0x8bb2f09de48c102635622dc27e6de03ae2b22639df7c33edbc8222b2ec423746.sszs",
"0/0/0x221f88cae2219050d4e9d8c2d0d83cb4c8ce4c84ab1bb3e0b89f3dec36077c4f.sszs",
}
for _, filePath := range filePaths {
@@ -492,8 +470,8 @@ func TestMetadata(t *testing.T) {
t.Run("wrong version", func(t *testing.T) {
_, verifiedRoDataColumnSidecars := util.CreateTestVerifiedRoDataColumnSidecars(
t,
util.DataColumnsParamsByRoot{
{1}: {{ColumnIndex: 12, DataColumn: []byte{1, 2, 3}}},
[]util.DataColumnParam{
{Slot: 1, Index: 12, Column: [][]byte{{1}, {2}, {3}}},
},
)
@@ -503,7 +481,7 @@ func TestMetadata(t *testing.T) {
require.NoError(t, err)
// Alter the version.
const filePath = "0/0/0x0100000000000000000000000000000000000000000000000000000000000000.sszs"
const filePath = "0/0/0x8bb2f09de48c102635622dc27e6de03ae2b22639df7c33edbc8222b2ec423746.sszs"
file, err := dataColumnStorage.fs.OpenFile(filePath, os.O_WRONLY, os.FileMode(0600))
require.NoError(t, err)
@@ -643,31 +621,19 @@ func TestPrune(t *testing.T) {
}
_, verifiedRoDataColumnSidecars := util.CreateTestVerifiedRoDataColumnSidecars(
t,
util.DataColumnsParamsByRoot{
{0}: {
{Slot: 33, ColumnIndex: 2, DataColumn: []byte{1, 2, 3}}, // Period 0 - Epoch 1
{Slot: 33, ColumnIndex: 4, DataColumn: []byte{2, 3, 4}}, // Period 0 - Epoch 1
},
{1}: {
{Slot: 128_002, ColumnIndex: 2, DataColumn: []byte{1, 2, 3}}, // Period 0 - Epoch 4000
{Slot: 128_002, ColumnIndex: 4, DataColumn: []byte{2, 3, 4}}, // Period 0 - Epoch 4000
},
{2}: {
{Slot: 128_003, ColumnIndex: 1, DataColumn: []byte{1, 2, 3}}, // Period 0 - Epoch 4000
{Slot: 128_003, ColumnIndex: 3, DataColumn: []byte{2, 3, 4}}, // Period 0 - Epoch 4000
},
{3}: {
{Slot: 131_138, ColumnIndex: 2, DataColumn: []byte{1, 2, 3}}, // Period 1 - Epoch 4098
{Slot: 131_138, ColumnIndex: 3, DataColumn: []byte{1, 2, 3}}, // Period 1 - Epoch 4098
},
{4}: {
{Slot: 131_169, ColumnIndex: 2, DataColumn: []byte{1, 2, 3}}, // Period 1 - Epoch 4099
{Slot: 131_169, ColumnIndex: 3, DataColumn: []byte{1, 2, 3}}, // Period 1 - Epoch 4099
},
{5}: {
{Slot: 262_144, ColumnIndex: 2, DataColumn: []byte{1, 2, 3}}, // Period 2 - Epoch 8192
{Slot: 262_144, ColumnIndex: 3, DataColumn: []byte{1, 2, 3}}, // Period 2 - Epoch 8292
},
[]util.DataColumnParam{
{Slot: 33, Index: 2, Column: [][]byte{{1}, {2}, {3}}}, // Period 0 - Epoch 1
{Slot: 33, Index: 4, Column: [][]byte{{2}, {3}, {4}}}, // Period 0 - Epoch 1
{Slot: 128_002, Index: 2, Column: [][]byte{{1}, {2}, {3}}}, // Period 0 - Epoch 4000
{Slot: 128_002, Index: 4, Column: [][]byte{{2}, {3}, {4}}}, // Period 0 - Epoch 4000
{Slot: 128_003, Index: 1, Column: [][]byte{{1}, {2}, {3}}}, // Period 0 - Epoch 4000
{Slot: 128_003, Index: 3, Column: [][]byte{{2}, {3}, {4}}}, // Period 0 - Epoch 4000
{Slot: 131_138, Index: 2, Column: [][]byte{{1}, {2}, {3}}}, // Period 1 - Epoch 4098
{Slot: 131_138, Index: 3, Column: [][]byte{{1}, {2}, {3}}}, // Period 1 - Epoch 4098
{Slot: 131_169, Index: 2, Column: [][]byte{{1}, {2}, {3}}}, // Period 1 - Epoch 4099
{Slot: 131_169, Index: 3, Column: [][]byte{{1}, {2}, {3}}}, // Period 1 - Epoch 4099
{Slot: 262_144, Index: 2, Column: [][]byte{{1}, {2}, {3}}}, // Period 2 - Epoch 8192
{Slot: 262_144, Index: 3, Column: [][]byte{{1}, {2}, {3}}}, // Period 2 - Epoch 8292
},
)
@@ -696,31 +662,31 @@ func TestPrune(t *testing.T) {
dirs, err = listDir(dataColumnStorage.fs, "0/1")
require.NoError(t, err)
require.Equal(t, true, compareSlices([]string{"0x0000000000000000000000000000000000000000000000000000000000000000.sszs"}, dirs))
require.Equal(t, true, compareSlices([]string{"0x775283f428813c949b7e8af07f01fef9790137f021b3597ad2d0d81e8be8f0f0.sszs"}, dirs))
dirs, err = listDir(dataColumnStorage.fs, "0/4000")
require.NoError(t, err)
require.Equal(t, true, compareSlices([]string{
"0x0200000000000000000000000000000000000000000000000000000000000000.sszs",
"0x0100000000000000000000000000000000000000000000000000000000000000.sszs",
"0x9977031132157ebb9c81bce952003ce07a4f54e921ca63b7693d1562483fdf9f.sszs",
"0xb2b14d9d858fa99b70f0405e4e39f38e51e36dd9a70343c109e24eeb5f77e369.sszs",
}, dirs))
dirs, err = listDir(dataColumnStorage.fs, "1/4098")
require.NoError(t, err)
require.Equal(t, true, compareSlices([]string{"0x0300000000000000000000000000000000000000000000000000000000000000.sszs"}, dirs))
require.Equal(t, true, compareSlices([]string{"0x5106745cdd6b1aa3602ef4d000ef373af672019264c167fa4bd641a1094aa5c5.sszs"}, dirs))
dirs, err = listDir(dataColumnStorage.fs, "1/4099")
require.NoError(t, err)
require.Equal(t, true, compareSlices([]string{"0x0400000000000000000000000000000000000000000000000000000000000000.sszs"}, dirs))
require.Equal(t, true, compareSlices([]string{"0x4e5f2bd5bb84bf0422af8edd1cc5a52cc6cea85baf3d66d172fe41831ac1239c.sszs"}, dirs))
dirs, err = listDir(dataColumnStorage.fs, "2/8192")
require.NoError(t, err)
require.Equal(t, true, compareSlices([]string{"0x0500000000000000000000000000000000000000000000000000000000000000.sszs"}, dirs))
require.Equal(t, true, compareSlices([]string{"0xa8adba7446eb56a01a9dd6d55e9c3990b10c91d43afb77847b4a21ac4ee62527.sszs"}, dirs))
_, verifiedRoDataColumnSidecars = util.CreateTestVerifiedRoDataColumnSidecars(
t,
util.DataColumnsParamsByRoot{
{6}: {{Slot: 451_141, ColumnIndex: 2, DataColumn: []byte{1, 2, 3}}}, // Period 3 - Epoch 14_098
[]util.DataColumnParam{
{Slot: 451_141, Index: 2, Column: [][]byte{{1}, {2}, {3}}}, // Period 3 - Epoch 14_098
},
)
@@ -748,14 +714,14 @@ func TestPrune(t *testing.T) {
dirs, err = listDir(dataColumnStorage.fs, "1/4099")
require.NoError(t, err)
require.Equal(t, true, compareSlices([]string{"0x0400000000000000000000000000000000000000000000000000000000000000.sszs"}, dirs))
require.Equal(t, true, compareSlices([]string{"0x4e5f2bd5bb84bf0422af8edd1cc5a52cc6cea85baf3d66d172fe41831ac1239c.sszs"}, dirs))
dirs, err = listDir(dataColumnStorage.fs, "2/8192")
require.NoError(t, err)
require.Equal(t, true, compareSlices([]string{"0x0500000000000000000000000000000000000000000000000000000000000000.sszs"}, dirs))
require.Equal(t, true, compareSlices([]string{"0xa8adba7446eb56a01a9dd6d55e9c3990b10c91d43afb77847b4a21ac4ee62527.sszs"}, dirs))
dirs, err = listDir(dataColumnStorage.fs, "3/14098")
require.NoError(t, err)
require.Equal(t, true, compareSlices([]string{"0x0600000000000000000000000000000000000000000000000000000000000000.sszs"}, dirs))
require.Equal(t, true, compareSlices([]string{"0x0de28a18cae63cbc6f0b20dc1afb0b1df38da40824a5f09f92d485ade04de97f.sszs"}, dirs))
})
}

View File

@@ -87,45 +87,47 @@ type serviceFlagOpts struct {
// full PoS node. It handles the lifecycle of the entire system and registers
// services to a service registry.
type BeaconNode struct {
cliCtx *cli.Context
ctx context.Context
cancel context.CancelFunc
services *runtime.ServiceRegistry
lock sync.RWMutex
stop chan struct{} // Channel to wait for termination notifications.
db db.Database
slasherDB db.SlasherDatabase
attestationCache *cache.AttestationCache
attestationPool attestations.Pool
exitPool voluntaryexits.PoolManager
slashingsPool slashings.PoolManager
syncCommitteePool synccommittee.Pool
blsToExecPool blstoexec.PoolManager
depositCache cache.DepositCache
trackedValidatorsCache *cache.TrackedValidatorsCache
payloadIDCache *cache.PayloadIDCache
stateFeed *event.Feed
blockFeed *event.Feed
opFeed *event.Feed
stateGen *stategen.State
collector *bcnodeCollector
slasherBlockHeadersFeed *event.Feed
slasherAttestationsFeed *event.Feed
finalizedStateAtStartUp state.BeaconState
serviceFlagOpts *serviceFlagOpts
GenesisInitializer genesis.Initializer
CheckpointInitializer checkpoint.Initializer
forkChoicer forkchoice.ForkChoicer
clockWaiter startup.ClockWaiter
BackfillOpts []backfill.ServiceOption
initialSyncComplete chan struct{}
BlobStorage *filesystem.BlobStorage
BlobStorageOptions []filesystem.BlobStorageOption
custodyInfo *peerdas.CustodyInfo
verifyInitWaiter *verification.InitializerWaiter
syncChecker *initialsync.SyncChecker
slasherEnabled bool
lcStore *lightclient.Store
cliCtx *cli.Context
ctx context.Context
cancel context.CancelFunc
services *runtime.ServiceRegistry
lock sync.RWMutex
stop chan struct{} // Channel to wait for termination notifications.
db db.Database
slasherDB db.SlasherDatabase
attestationCache *cache.AttestationCache
attestationPool attestations.Pool
exitPool voluntaryexits.PoolManager
slashingsPool slashings.PoolManager
syncCommitteePool synccommittee.Pool
blsToExecPool blstoexec.PoolManager
depositCache cache.DepositCache
trackedValidatorsCache *cache.TrackedValidatorsCache
payloadIDCache *cache.PayloadIDCache
stateFeed *event.Feed
blockFeed *event.Feed
opFeed *event.Feed
stateGen *stategen.State
collector *bcnodeCollector
slasherBlockHeadersFeed *event.Feed
slasherAttestationsFeed *event.Feed
finalizedStateAtStartUp state.BeaconState
serviceFlagOpts *serviceFlagOpts
GenesisInitializer genesis.Initializer
CheckpointInitializer checkpoint.Initializer
forkChoicer forkchoice.ForkChoicer
clockWaiter startup.ClockWaiter
BackfillOpts []backfill.ServiceOption
initialSyncComplete chan struct{}
BlobStorage *filesystem.BlobStorage
BlobStorageOptions []filesystem.BlobStorageOption
DataColumnStorage *filesystem.DataColumnStorage
DataColumnStorageOptions []filesystem.DataColumnStorageOption
custodyInfo *peerdas.CustodyInfo
verifyInitWaiter *verification.InitializerWaiter
syncChecker *initialsync.SyncChecker
slasherEnabled bool
lcStore *lightclient.Store
}
// New creates a new node instance, sets up configuration options, and registers
@@ -193,6 +195,15 @@ func New(cliCtx *cli.Context, cancel context.CancelFunc, opts ...Option) (*Beaco
beacon.BlobStorage = blobs
}
if beacon.DataColumnStorage == nil {
dataColumnStorage, err := filesystem.NewDataColumnStorage(cliCtx.Context, beacon.DataColumnStorageOptions...)
if err != nil {
return nil, errors.Wrap(err, "new data column storage")
}
beacon.DataColumnStorage = dataColumnStorage
}
bfs, err := startBaseServices(cliCtx, beacon, depositAddress)
if err != nil {
return nil, errors.Wrap(err, "could not start modules")
@@ -780,6 +791,7 @@ func (b *BeaconNode) registerBlockchainService(fc forkchoice.ForkChoicer, gs *st
blockchain.WithClockSynchronizer(gs),
blockchain.WithSyncComplete(syncComplete),
blockchain.WithBlobStorage(b.BlobStorage),
blockchain.WithDataColumnStorage(b.DataColumnStorage),
blockchain.WithTrackedValidatorsCache(b.trackedValidatorsCache),
blockchain.WithPayloadIDCache(b.payloadIDCache),
blockchain.WithSyncChecker(b.syncChecker),
@@ -868,6 +880,7 @@ func (b *BeaconNode) registerSyncService(initialSyncComplete chan struct{}, bFil
regularsync.WithInitialSyncComplete(initialSyncComplete),
regularsync.WithStateNotifier(b),
regularsync.WithBlobStorage(b.BlobStorage),
regularsync.WithDataColumnStorage(b.DataColumnStorage),
regularsync.WithVerifierWaiter(b.verifyInitWaiter),
regularsync.WithAvailableBlocker(bFillStore),
regularsync.WithSlasherEnabled(b.slasherEnabled),

View File

@@ -54,7 +54,12 @@ func TestNodeClose_OK(t *testing.T) {
cmd.ValidatorMonitorIndicesFlag.Value.SetInt(1)
ctx, cancel := newCliContextWithCancel(&app, set)
node, err := New(ctx, cancel, WithBlobStorage(filesystem.NewEphemeralBlobStorage(t)))
options := []Option{
WithBlobStorage(filesystem.NewEphemeralBlobStorage(t)),
WithDataColumnStorage(filesystem.NewEphemeralDataColumnStorage(t)),
}
node, err := New(ctx, cancel, options...)
require.NoError(t, err)
node.Close()
@@ -72,10 +77,16 @@ func TestNodeStart_Ok(t *testing.T) {
require.NoError(t, set.Set("suggested-fee-recipient", "0x6e35733c5af9B61374A128e6F85f553aF09ff89A"))
ctx, cancel := newCliContextWithCancel(&app, set)
node, err := New(ctx, cancel, WithBlockchainFlagOptions([]blockchain.Option{}),
options := []Option{
WithBlockchainFlagOptions([]blockchain.Option{}),
WithBuilderFlagOptions([]builder.Option{}),
WithExecutionChainOptions([]execution.Option{}),
WithBlobStorage(filesystem.NewEphemeralBlobStorage(t)))
WithBlobStorage(filesystem.NewEphemeralBlobStorage(t)),
WithDataColumnStorage(filesystem.NewEphemeralDataColumnStorage(t)),
}
node, err := New(ctx, cancel, options...)
require.NoError(t, err)
node.services = &runtime.ServiceRegistry{}
go func() {
@@ -96,10 +107,16 @@ func TestNodeStart_SyncChecker(t *testing.T) {
require.NoError(t, set.Set("suggested-fee-recipient", "0x6e35733c5af9B61374A128e6F85f553aF09ff89A"))
ctx, cancel := newCliContextWithCancel(&app, set)
node, err := New(ctx, cancel, WithBlockchainFlagOptions([]blockchain.Option{}),
options := []Option{
WithBlockchainFlagOptions([]blockchain.Option{}),
WithBuilderFlagOptions([]builder.Option{}),
WithExecutionChainOptions([]execution.Option{}),
WithBlobStorage(filesystem.NewEphemeralBlobStorage(t)))
WithBlobStorage(filesystem.NewEphemeralBlobStorage(t)),
WithDataColumnStorage(filesystem.NewEphemeralDataColumnStorage(t)),
}
node, err := New(ctx, cancel, options...)
require.NoError(t, err)
go func() {
node.Start()
@@ -128,10 +145,13 @@ func TestClearDB(t *testing.T) {
set.String("suggested-fee-recipient", "0x6e35733c5af9B61374A128e6F85f553aF09ff89A", "fee recipient")
require.NoError(t, set.Set("suggested-fee-recipient", "0x6e35733c5af9B61374A128e6F85f553aF09ff89A"))
context, cancel := newCliContextWithCancel(&app, set)
options := []Option{
WithExecutionChainOptions([]execution.Option{execution.WithHttpEndpoint(endpoint)}),
WithBlobStorage(filesystem.NewEphemeralBlobStorage(t)),
WithDataColumnStorage(filesystem.NewEphemeralDataColumnStorage(t)),
}
_, err = New(context, cancel, options...)
require.NoError(t, err)
require.LogsContain(t, hook, "Removing database")

View File

@@ -50,3 +50,20 @@ func WithBlobStorageOptions(opt ...filesystem.BlobStorageOption) Option {
return nil
}
}
// WithDataColumnStorage sets the DataColumnStorage backend for the BeaconNode
func WithDataColumnStorage(bs *filesystem.DataColumnStorage) Option {
return func(bn *BeaconNode) error {
bn.DataColumnStorage = bs
return nil
}
}
// WithDataColumnStorageOptions appends 1 or more filesystem.DataColumnStorageOption on the beacon node,
// to be used when initializing data column storage.
func WithDataColumnStorageOptions(opt ...filesystem.DataColumnStorageOption) Option {
return func(bn *BeaconNode) error {
bn.DataColumnStorageOptions = append(bn.DataColumnStorageOptions, opt...)
return nil
}
}

View File

@@ -712,7 +712,7 @@ func TestService_BroadcastDataColumn(t *testing.T) {
subnet := peerdas.ComputeSubnetForDataColumnSidecar(columnIndex)
topic := fmt.Sprintf(topicFormat, digest, subnet)
roSidecars, _ := util.CreateTestVerifiedRoDataColumnSidecars(t, util.DataColumnsParamsByRoot{{}: {{ColumnIndex: columnIndex}}})
roSidecars, _ := util.CreateTestVerifiedRoDataColumnSidecars(t, []util.DataColumnParam{{Index: columnIndex}})
sidecar := roSidecars[0].DataColumnSidecar
// Async listen for the pubsub, must be before the broadcast.

View File

@@ -61,6 +61,12 @@ const LightClientFinalityUpdateName = "/light_client_finality_update"
// LightClientOptimisticUpdateName is the name for the LightClientOptimisticUpdate topic.
const LightClientOptimisticUpdateName = "/light_client_optimistic_update"
// DataColumnSidecarsByRootName is the name for the DataColumnSidecarsByRoot v1 message topic.
const DataColumnSidecarsByRootName = "/data_column_sidecars_by_root"
// DataColumnSidecarsByRangeName is the name for the DataColumnSidecarsByRange v1 message topic.
const DataColumnSidecarsByRangeName = "/data_column_sidecars_by_range"
const (
// V1 RPC Topics
// RPCStatusTopicV1 defines the v1 topic for the status rpc method.
@@ -92,6 +98,9 @@ const (
RPCLightClientFinalityUpdateTopicV1 = protocolPrefix + LightClientFinalityUpdateName + SchemaVersionV1
// RPCLightClientOptimisticUpdateTopicV1 is a topic for requesting a light client Optimistic update.
RPCLightClientOptimisticUpdateTopicV1 = protocolPrefix + LightClientOptimisticUpdateName + SchemaVersionV1
// RPCDataColumnSidecarsByRootTopicV1 is a topic for requesting data column sidecars by their block root.
// /eth2/beacon_chain/req/data_column_sidecars_by_root/1 - New in Fulu.
RPCDataColumnSidecarsByRootTopicV1 = protocolPrefix + DataColumnSidecarsByRootName + SchemaVersionV1
// V2 RPC Topics
// RPCBlocksByRangeTopicV2 defines v2 the topic for the blocks by range rpc method.
@@ -139,6 +148,9 @@ var RPCTopicMappings = map[string]interface{}{
RPCLightClientUpdatesByRangeTopicV1: new(pb.LightClientUpdatesByRangeRequest),
RPCLightClientFinalityUpdateTopicV1: new(interface{}),
RPCLightClientOptimisticUpdateTopicV1: new(interface{}),
// DataColumnSidecarsByRoot v1 Message
RPCDataColumnSidecarsByRootTopicV1: new(p2ptypes.DataColumnsByRootIdentifiers),
}
// Maps all registered protocol prefixes.
@@ -161,6 +173,7 @@ var messageMapping = map[string]bool{
LightClientUpdatesByRangeName: true,
LightClientFinalityUpdateName: true,
LightClientOptimisticUpdateName: true,
DataColumnSidecarsByRootName: true,
}
// Maps all the RPC messages which are to updated in altair.

View File

@@ -9,10 +9,13 @@ var (
ErrInvalidSequenceNum = errors.New("invalid sequence number provided")
ErrGeneric = errors.New("internal service error")
ErrRateLimited = errors.New("rate limited")
ErrIODeadline = errors.New("i/o deadline exceeded")
ErrInvalidRequest = errors.New("invalid range, step or count")
ErrBlobLTMinRequest = errors.New("blob epoch < minimum_request_epoch")
ErrMaxBlobReqExceeded = errors.New("requested more than MAX_REQUEST_BLOB_SIDECARS")
ErrRateLimited = errors.New("rate limited")
ErrIODeadline = errors.New("i/o deadline exceeded")
ErrInvalidRequest = errors.New("invalid range, step or count")
ErrBlobLTMinRequest = errors.New("blob epoch < minimum_request_epoch")
ErrMaxBlobReqExceeded = errors.New("requested more than MAX_REQUEST_BLOB_SIDECARS")
ErrMaxDataColumnReqExceeded = errors.New("requested more than MAX_REQUEST_DATA_COLUMN_SIDECARS")
ErrResourceUnavailable = errors.New("resource requested unavailable")
)

View File

@@ -25,6 +25,7 @@ go_library(
"rpc_blob_sidecars_by_range.go",
"rpc_blob_sidecars_by_root.go",
"rpc_chunked_response.go",
"rpc_data_column_sidecars_by_root.go",
"rpc_goodbye.go",
"rpc_light_client.go",
"rpc_metadata.go",
@@ -170,6 +171,7 @@ go_test(
"rpc_beacon_blocks_by_root_test.go",
"rpc_blob_sidecars_by_range_test.go",
"rpc_blob_sidecars_by_root_test.go",
"rpc_data_column_sidecars_by_root_test.go",
"rpc_goodbye_test.go",
"rpc_handler_test.go",
"rpc_light_client_test.go",

View File

@@ -15,13 +15,17 @@ import (
"github.com/sirupsen/logrus"
)
var ErrNoValidDigest = errors.New("no valid digest matched")
var ErrUnrecognizedVersion = errors.New("cannot determine context bytes for unrecognized object")
var (
ErrNoValidDigest = errors.New("no valid digest matched")
ErrUnrecognizedVersion = errors.New("cannot determine context bytes for unrecognized object")
)
var responseCodeSuccess = byte(0x00)
var responseCodeInvalidRequest = byte(0x01)
var responseCodeServerError = byte(0x02)
var responseCodeResourceUnavailable = byte(0x03)
var (
responseCodeSuccess = byte(0x00)
responseCodeInvalidRequest = byte(0x01)
responseCodeServerError = byte(0x02)
responseCodeResourceUnavailable = byte(0x03)
)
func (s *Service) generateErrorResponse(code byte, reason string) ([]byte, error) {
return createErrorResponse(code, reason, s.cfg.p2p)

View File

@@ -173,6 +173,14 @@ func WithBlobStorage(b *filesystem.BlobStorage) Option {
}
}
// WithDataColumnStorage gives the sync package direct access to DataColumnStorage.
func WithDataColumnStorage(b *filesystem.DataColumnStorage) Option {
return func(s *Service) error {
s.cfg.dataColumnStorage = b
return nil
}
}
// WithVerifierWaiter gives the sync package direct access to the verifier waiter.
func WithVerifierWaiter(v *verification.InitializerWaiter) Option {
return func(s *Service) error {

View File

@@ -39,6 +39,21 @@ type rpcHandler func(context.Context, interface{}, libp2pcore.Stream) error
// rpcHandlerByTopicFromFork returns the RPC handlers for a given fork index.
func (s *Service) rpcHandlerByTopicFromFork(forkIndex int) (map[string]rpcHandler, error) {
// Fulu: https://github.com/ethereum/consensus-specs/blob/dev/specs/fulu/p2p-interface.md#messages
if forkIndex >= version.Fulu {
return map[string]rpcHandler{
p2p.RPCStatusTopicV1: s.statusRPCHandler,
p2p.RPCGoodByeTopicV1: s.goodbyeRPCHandler,
p2p.RPCBlocksByRangeTopicV2: s.beaconBlocksByRangeRPCHandler,
p2p.RPCBlocksByRootTopicV2: s.beaconBlocksRootRPCHandler,
p2p.RPCPingTopicV1: s.pingHandler,
p2p.RPCMetaDataTopicV3: s.metaDataHandler, // Modified in Fulu
p2p.RPCBlobSidecarsByRootTopicV1: s.blobSidecarByRootRPCHandler,
p2p.RPCBlobSidecarsByRangeTopicV1: s.blobSidecarsByRangeRPCHandler,
p2p.RPCDataColumnSidecarsByRootTopicV1: s.dataColumnSidecarByRootRPCHandler, // Added in Fulu
}, nil
}
// Electra: https://github.com/ethereum/consensus-specs/blob/dev/specs/electra/p2p-interface.md#messages
if forkIndex >= version.Electra {
return map[string]rpcHandler{
@@ -258,9 +273,15 @@ func (s *Service) registerRPC(baseTopic string, handle rpcHandler) {
// since some requests do not have any data in the payload, we
// do not decode anything.
if baseTopic == p2p.RPCMetaDataTopicV1 || baseTopic == p2p.RPCMetaDataTopicV2 ||
baseTopic == p2p.RPCLightClientOptimisticUpdateTopicV1 ||
baseTopic == p2p.RPCLightClientFinalityUpdateTopicV1 {
topics := map[string]bool{
p2p.RPCMetaDataTopicV1: true,
p2p.RPCMetaDataTopicV2: true,
p2p.RPCMetaDataTopicV3: true,
p2p.RPCLightClientOptimisticUpdateTopicV1: true,
p2p.RPCLightClientFinalityUpdateTopicV1: true,
}
if topics[baseTopic] {
if err := handle(ctx, base, stream); err != nil {
messageFailedProcessingCounter.WithLabelValues(topic).Inc()
if !errors.Is(err, p2ptypes.ErrWrongForkDigestVersion) {

View File

@@ -9,6 +9,7 @@ import (
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v6/consensus-types/interfaces"
"github.com/OffchainLabs/prysm/v6/network/forks"
ethpb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v6/runtime/version"
"github.com/OffchainLabs/prysm/v6/time/slots"
libp2pcore "github.com/libp2p/go-libp2p/core"
@@ -238,3 +239,30 @@ func WriteLightClientFinalityUpdateChunk(stream libp2pcore.Stream, tor blockchai
_, err = encoding.EncodeWithMaxLength(stream, update)
return err
}
// WriteDataColumnSidecarChunk writes data column chunk object to stream.
// response_chunk ::= <result> | <context-bytes> | <encoding-dependent-header> | <encoded-payload>
func WriteDataColumnSidecarChunk(stream libp2pcore.Stream, tor blockchain.TemporalOracle, encoding encoder.NetworkEncoding, sidecar *ethpb.DataColumnSidecar) error {
// Success response code.
if _, err := stream.Write([]byte{responseCodeSuccess}); err != nil {
return errors.Wrap(err, "stream write")
}
// Fork digest.
genesisValidatorsRoot := tor.GenesisValidatorsRoot()
ctxBytes, err := forks.ForkDigestFromEpoch(slots.ToEpoch(sidecar.SignedBlockHeader.Header.Slot), genesisValidatorsRoot[:])
if err != nil {
return errors.Wrap(err, "fork digest from epoch")
}
if err := writeContextToStream(ctxBytes[:], stream); err != nil {
return errors.Wrap(err, "write context to stream")
}
// Sidecar.
if _, err = encoding.EncodeWithMaxLength(stream, sidecar); err != nil {
return errors.Wrap(err, "encode with max length")
}
return nil
}

View File

@@ -0,0 +1,174 @@
package sync
import (
"context"
"fmt"
"math"
"slices"
"time"
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/types"
"github.com/OffchainLabs/prysm/v6/cmd/beacon-chain/flags"
fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams"
"github.com/OffchainLabs/prysm/v6/config/params"
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
"github.com/OffchainLabs/prysm/v6/monitoring/tracing"
"github.com/OffchainLabs/prysm/v6/monitoring/tracing/trace"
"github.com/OffchainLabs/prysm/v6/time/slots"
libp2pcore "github.com/libp2p/go-libp2p/core"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
var (
notDataColumnsByRootIdentifiersError = errors.New("not data columns by root identifiers")
tickerDelay = time.Second
)
// dataColumnSidecarByRootRPCHandler handles the data column sidecars by root RPC request.
// https://github.com/ethereum/consensus-specs/blob/dev/specs/fulu/p2p-interface.md#datacolumnsidecarsbyroot-v1
func (s *Service) dataColumnSidecarByRootRPCHandler(ctx context.Context, msg interface{}, stream libp2pcore.Stream) error {
ctx, span := trace.StartSpan(ctx, "sync.dataColumnSidecarByRootRPCHandler")
defer span.End()
batchSize := flags.Get().DataColumnBatchLimit
numberOfColumns := params.BeaconConfig().NumberOfColumns
// Check if the message type is the one expected.
ref, ok := msg.(*types.DataColumnsByRootIdentifiers)
if !ok {
return notDataColumnsByRootIdentifiersError
}
requestedColumnIdents := *ref
remotePeerId := stream.Conn().RemotePeer()
ctx, cancel := context.WithTimeout(ctx, ttfbTimeout)
defer cancel()
SetRPCStreamDeadlines(stream)
// Penalize peers that send invalid requests.
if err := validateDataColumnsByRootRequest(requestedColumnIdents); err != nil {
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(remotePeerId)
s.writeErrorResponseToStream(responseCodeInvalidRequest, err.Error(), stream)
return errors.Wrap(err, "validate data columns by root request")
}
requestedColumnsByRoot := make(map[[fieldparams.RootLength]byte][]uint64)
for _, columnIdent := range requestedColumnIdents {
var root [fieldparams.RootLength]byte
copy(root[:], columnIdent.BlockRoot)
requestedColumnsByRoot[root] = append(requestedColumnsByRoot[root], columnIdent.Columns...)
}
// Sort by column index for each root.
for _, columns := range requestedColumnsByRoot {
slices.Sort(columns)
}
// Format nice logs.
requestedColumnsByRootLog := make(map[string]interface{})
for root, columns := range requestedColumnsByRoot {
rootStr := fmt.Sprintf("%#x", root)
requestedColumnsByRootLog[rootStr] = "all"
if uint64(len(columns)) != numberOfColumns {
requestedColumnsByRootLog[rootStr] = columns
}
}
// Compute the oldest slot we'll allow a peer to request, based on the current slot.
minReqSlot, err := dataColumnsRPCMinValidSlot(s.cfg.clock.CurrentSlot())
if err != nil {
return errors.Wrapf(err, "data columns RPC min valid slot")
}
log := log.WithFields(logrus.Fields{
"peer": remotePeerId,
"columns": requestedColumnsByRootLog,
})
defer closeStream(stream, log)
var ticker *time.Ticker
if len(requestedColumnIdents) > batchSize {
ticker = time.NewTicker(tickerDelay)
}
log.Debug("Serving data column sidecar by root request")
count := 0
for root, columns := range requestedColumnsByRoot {
if err := ctx.Err(); err != nil {
closeStream(stream, log)
return errors.Wrap(err, "context error")
}
// Throttle request processing to no more than batchSize/sec.
for range columns {
if ticker != nil && count != 0 && count%batchSize == 0 {
<-ticker.C
}
count++
}
s.rateLimiter.add(stream, int64(len(columns)))
// Retrieve the requested sidecars from the store.
verifiedRODataColumns, err := s.cfg.dataColumnStorage.Get(root, columns)
if err != nil {
s.writeErrorResponseToStream(responseCodeServerError, types.ErrGeneric.Error(), stream)
return errors.Wrap(err, "get data column sidecars")
}
for _, verifiedRODataColumn := range verifiedRODataColumns {
// Filter out data column sidecars that are too old.
if verifiedRODataColumn.SignedBlockHeader.Header.Slot < minReqSlot {
continue
}
SetStreamWriteDeadline(stream, defaultWriteDuration)
if chunkErr := WriteDataColumnSidecarChunk(stream, s.cfg.chain, s.cfg.p2p.Encoding(), verifiedRODataColumn.DataColumnSidecar); chunkErr != nil {
s.writeErrorResponseToStream(responseCodeServerError, types.ErrGeneric.Error(), stream)
tracing.AnnotateError(span, chunkErr)
return chunkErr
}
}
}
return nil
}
// validateDataColumnsByRootRequest checks if the request for data column sidecars is valid.
func validateDataColumnsByRootRequest(colIdents types.DataColumnsByRootIdentifiers) error {
total := uint64(0)
for _, id := range colIdents {
total += uint64(len(id.Columns))
}
if total > params.BeaconConfig().MaxRequestDataColumnSidecars {
return types.ErrMaxDataColumnReqExceeded
}
return nil
}
// dataColumnsRPCMinValidSlot returns the minimum slot that a peer can request data column sidecars for.
func dataColumnsRPCMinValidSlot(currentSlot primitives.Slot) (primitives.Slot, error) {
// Avoid overflow if we're running on a config where fulu is set to far future epoch.
if !params.FuluEnabled() {
return primitives.Slot(math.MaxUint64), nil
}
beaconConfig := params.BeaconConfig()
minReqEpochs := beaconConfig.MinEpochsForDataColumnSidecarsRequest
minStartEpoch := beaconConfig.FuluForkEpoch
currEpoch := slots.ToEpoch(currentSlot)
if currEpoch > minReqEpochs && currEpoch-minReqEpochs > minStartEpoch {
minStartEpoch = currEpoch - minReqEpochs
}
return slots.EpochStart(minStartEpoch)
}

View File

@@ -0,0 +1,314 @@
package sync
import (
"context"
"io"
"math"
"sync"
"testing"
"time"
chainMock "github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain/testing"
"github.com/OffchainLabs/prysm/v6/beacon-chain/db/filesystem"
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p"
p2ptest "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/testing"
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/types"
"github.com/OffchainLabs/prysm/v6/beacon-chain/startup"
"github.com/OffchainLabs/prysm/v6/cmd/beacon-chain/flags"
fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams"
"github.com/OffchainLabs/prysm/v6/config/params"
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
"github.com/OffchainLabs/prysm/v6/runtime/version"
"github.com/OffchainLabs/prysm/v6/testing/require"
"github.com/OffchainLabs/prysm/v6/testing/util"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/pkg/errors"
)
func TestDataColumnSidecarsByRootRPCHandler(t *testing.T) {
ctx := context.Background()
t.Run("wrong message type", func(t *testing.T) {
service := &Service{}
err := service.dataColumnSidecarByRootRPCHandler(ctx, nil, nil)
require.ErrorIs(t, err, notDataColumnsByRootIdentifiersError)
})
t.Run("invalid request", func(t *testing.T) {
params.SetupTestConfigCleanup(t)
beaconConfig := params.BeaconConfig()
beaconConfig.MaxRequestDataColumnSidecars = 1
params.OverrideBeaconConfig(beaconConfig)
localP2P := p2ptest.NewTestP2P(t)
service := &Service{cfg: &config{p2p: localP2P}}
protocolID := protocol.ID(p2p.RPCDataColumnSidecarsByRootTopicV1)
remoteP2P := p2ptest.NewTestP2P(t)
var wg sync.WaitGroup
wg.Add(1)
remoteP2P.BHost.SetStreamHandler(protocolID, func(stream network.Stream) {
defer wg.Done()
code, errMsg, err := readStatusCodeNoDeadline(stream, localP2P.Encoding())
require.NoError(t, err)
require.Equal(t, responseCodeInvalidRequest, code)
require.Equal(t, types.ErrMaxDataColumnReqExceeded.Error(), errMsg)
})
localP2P.Connect(remoteP2P)
stream, err := localP2P.BHost.NewStream(ctx, remoteP2P.BHost.ID(), protocolID)
require.NoError(t, err)
msg := &types.DataColumnsByRootIdentifiers{{Columns: []uint64{1, 2, 3}}}
require.Equal(t, true, localP2P.Peers().Scorers().BadResponsesScorer().Score(remoteP2P.PeerID()) >= 0)
err = service.dataColumnSidecarByRootRPCHandler(ctx, msg, stream)
require.NotNil(t, err)
require.Equal(t, true, localP2P.Peers().Scorers().BadResponsesScorer().Score(remoteP2P.PeerID()) < 0)
if util.WaitTimeout(&wg, 1*time.Second) {
t.Fatal("Did not receive stream within 1 sec")
}
})
t.Run("nominal", func(t *testing.T) {
resetFlags := flags.Get()
gFlags := new(flags.GlobalFlags)
gFlags.DataColumnBatchLimit = 2
flags.Init(gFlags)
defer flags.Init(resetFlags)
// Setting the ticker to 0 will cause the ticker to panic.
// Setting it to the minimum value instead.
refTickerDelay := tickerDelay
tickerDelay = time.Nanosecond
defer func() {
tickerDelay = refTickerDelay
}()
params.SetupTestConfigCleanup(t)
beaconConfig := params.BeaconConfig()
beaconConfig.FuluForkEpoch = 1
params.OverrideBeaconConfig(beaconConfig)
localP2P := p2ptest.NewTestP2P(t)
clock := startup.NewClock(time.Now(), [fieldparams.RootLength]byte{})
params := []util.DataColumnParam{
{Slot: 10, Index: 1}, {Slot: 10, Index: 2}, {Slot: 10, Index: 3},
{Slot: 40, Index: 4}, {Slot: 40, Index: 6},
{Slot: 45, Index: 7}, {Slot: 45, Index: 8}, {Slot: 45, Index: 9},
}
_, verifiedRODataColumns := util.CreateTestVerifiedRoDataColumnSidecars(t, params)
storage := filesystem.NewEphemeralDataColumnStorage(t)
err := storage.Save(verifiedRODataColumns)
require.NoError(t, err)
service := &Service{
cfg: &config{
p2p: localP2P,
clock: clock,
dataColumnStorage: storage,
chain: &chainMock.ChainService{},
},
rateLimiter: newRateLimiter(localP2P),
}
protocolID := protocol.ID(p2p.RPCDataColumnSidecarsByRootTopicV1)
remoteP2P := p2ptest.NewTestP2P(t)
var wg sync.WaitGroup
wg.Add(1)
ctxMap := ContextByteVersions{
[4]byte{245, 165, 253, 66}: version.Fulu,
}
root0 := verifiedRODataColumns[0].BlockRoot()
root3 := verifiedRODataColumns[3].BlockRoot()
root5 := verifiedRODataColumns[5].BlockRoot()
remoteP2P.BHost.SetStreamHandler(protocolID, func(stream network.Stream) {
defer wg.Done()
sidecars := make([]*blocks.RODataColumn, 0, 5)
for i := uint64(0); ; /* no stop condition */ i++ {
sidecar, err := readChunkedDataColumnSideCar(stream, remoteP2P, ctxMap)
if errors.Is(err, io.EOF) {
// End of stream.
break
}
require.NoError(t, err)
sidecars = append(sidecars, sidecar)
}
require.Equal(t, 5, len(sidecars))
require.Equal(t, root3, sidecars[0].BlockRoot())
require.Equal(t, root3, sidecars[1].BlockRoot())
require.Equal(t, root5, sidecars[2].BlockRoot())
require.Equal(t, root5, sidecars[3].BlockRoot())
require.Equal(t, root5, sidecars[4].BlockRoot())
require.Equal(t, uint64(4), sidecars[0].Index)
require.Equal(t, uint64(6), sidecars[1].Index)
require.Equal(t, uint64(7), sidecars[2].Index)
require.Equal(t, uint64(8), sidecars[3].Index)
require.Equal(t, uint64(9), sidecars[4].Index)
})
localP2P.Connect(remoteP2P)
stream, err := localP2P.BHost.NewStream(ctx, remoteP2P.BHost.ID(), protocolID)
require.NoError(t, err)
msg := &types.DataColumnsByRootIdentifiers{
{
BlockRoot: root0[:],
Columns: []uint64{1, 2, 3},
},
{
BlockRoot: root3[:],
Columns: []uint64{4, 5, 6},
},
{
BlockRoot: root5[:],
Columns: []uint64{7, 8, 9},
},
}
err = service.dataColumnSidecarByRootRPCHandler(ctx, msg, stream)
require.NoError(t, err)
require.Equal(t, true, localP2P.Peers().Scorers().BadResponsesScorer().Score(remoteP2P.PeerID()) >= 0)
if util.WaitTimeout(&wg, 1*time.Minute) {
t.Fatal("Did not receive stream within 1 sec")
}
})
}
func TestValidateDataColumnsByRootRequest(t *testing.T) {
params.SetupTestConfigCleanup(t)
config := params.BeaconConfig()
maxCols := uint64(10) // Set a small value for testing
config.MaxRequestDataColumnSidecars = maxCols
params.OverrideBeaconConfig(config)
tests := []struct {
name string
colIdents types.DataColumnsByRootIdentifiers
expectedErr error
}{
{
name: "Invalid request - multiple identifiers exceed max",
colIdents: types.DataColumnsByRootIdentifiers{
{
BlockRoot: make([]byte, fieldparams.RootLength),
Columns: make([]uint64, maxCols/2+1),
},
{
BlockRoot: make([]byte, fieldparams.RootLength),
Columns: make([]uint64, maxCols/2+1),
},
},
expectedErr: types.ErrMaxDataColumnReqExceeded,
},
{
name: "Valid request - less than max",
colIdents: types.DataColumnsByRootIdentifiers{
{
BlockRoot: make([]byte, fieldparams.RootLength),
Columns: make([]uint64, maxCols-1),
},
},
expectedErr: nil,
},
{
name: "Valid request - multiple identifiers sum to max",
colIdents: types.DataColumnsByRootIdentifiers{
{
BlockRoot: make([]byte, fieldparams.RootLength),
Columns: make([]uint64, maxCols/2),
},
{
BlockRoot: make([]byte, fieldparams.RootLength),
Columns: make([]uint64, maxCols/2),
},
},
expectedErr: nil,
},
}
// Run tests
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := validateDataColumnsByRootRequest(tt.colIdents)
if tt.expectedErr == nil {
require.NoError(t, err)
} else {
require.ErrorIs(t, err, tt.expectedErr)
}
})
}
}
func TestDataColumnsRPCMinValidSlot(t *testing.T) {
type testCase struct {
name string
fuluForkEpoch primitives.Epoch
minReqEpochs primitives.Epoch
currentSlot primitives.Slot
expected primitives.Slot
}
slotsPerEpoch := params.BeaconConfig().SlotsPerEpoch
testCases := []testCase{
{
name: "Fulu not enabled",
fuluForkEpoch: math.MaxUint64, // Disable Fulu
minReqEpochs: 5,
currentSlot: 0,
expected: primitives.Slot(math.MaxUint64),
},
{
name: "Current epoch equals fork epoch",
fuluForkEpoch: 10,
minReqEpochs: 5,
currentSlot: primitives.Slot(10 * slotsPerEpoch),
expected: primitives.Slot(10 * slotsPerEpoch),
},
{
name: "Current epoch less than minReqEpochs",
fuluForkEpoch: 10,
minReqEpochs: 20,
currentSlot: primitives.Slot(15 * slotsPerEpoch),
expected: primitives.Slot(10 * slotsPerEpoch),
},
{
name: "Current epoch greater than minReqEpochs + fork epoch",
fuluForkEpoch: 10,
minReqEpochs: 5,
currentSlot: primitives.Slot(20 * slotsPerEpoch),
expected: primitives.Slot(15 * slotsPerEpoch),
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
params.SetupTestConfigCleanup(t)
config := params.BeaconConfig()
config.FuluForkEpoch = tc.fuluForkEpoch
config.MinEpochsForDataColumnSidecarsRequest = tc.minReqEpochs
params.OverrideBeaconConfig(config)
actual, err := dataColumnsRPCMinValidSlot(tc.currentSlot)
require.NoError(t, err)
require.Equal(t, tc.expected, actual)
})
}
}

View File

@@ -20,8 +20,8 @@ type rpcHandlerTest struct {
s *Service
}
func (rt *rpcHandlerTest) testHandler(nh network.StreamHandler, rh rpcHandler, rhi interface{}) {
ctx, cancel := context.WithTimeout(rt.t.Context(), rt.timeout)
func (rt *rpcHandlerTest) testHandler(streamHandler network.StreamHandler, rpcHandler rpcHandler, message interface{}) {
ctx, cancel := context.WithTimeout(context.Background(), rt.timeout)
defer func() {
cancel()
}()
@@ -36,16 +36,18 @@ func (rt *rpcHandlerTest) testHandler(nh network.StreamHandler, rh rpcHandler, r
defer func() {
require.NoError(rt.t, client.Disconnect(server.PeerID()))
}()
require.Equal(rt.t, 1, len(client.BHost.Network().Peers()), "Expected peers to be connected")
h := func(stream network.Stream) {
handler := func(stream network.Stream) {
defer w.Done()
nh(stream)
streamHandler(stream)
}
server.BHost.SetStreamHandler(rt.topic, h)
server.BHost.SetStreamHandler(rt.topic, handler)
stream, err := client.BHost.NewStream(ctx, server.BHost.ID(), rt.topic)
require.NoError(rt.t, err)
err = rh(ctx, rhi, stream)
err = rpcHandler(ctx, message, stream)
if rt.err == nil {
require.NoError(rt.t, err)
} else {

View File

@@ -39,6 +39,7 @@ var (
errBlobResponseOutOfBounds = errors.Wrap(verification.ErrBlobInvalid, "received BlobSidecar with slot outside BlobSidecarsByRangeRequest bounds")
errChunkResponseBlockMismatch = errors.Wrap(verification.ErrBlobInvalid, "blob block details do not match")
errChunkResponseParentMismatch = errors.Wrap(verification.ErrBlobInvalid, "parent root for response element doesn't match previous element root")
errDataColumnChunkedReadFailure = errors.New("failed to read stream of chunk-encoded data columns")
)
// BeaconBlockProcessor defines a block processing function, which allows to start utilizing
@@ -383,3 +384,53 @@ func readChunkedBlobSidecar(stream network.Stream, encoding encoder.NetworkEncod
return rob, nil
}
func readChunkedDataColumnSideCar(
stream network.Stream,
p2pApi p2p.P2P,
ctxMap ContextByteVersions,
) (*blocks.RODataColumn, error) {
// Read the status code from the stream.
statusCode, errMessage, err := ReadStatusCode(stream, p2pApi.Encoding())
if err != nil {
return nil, errors.Wrap(err, "read status code")
}
if statusCode != 0 {
return nil, errors.Wrap(errDataColumnChunkedReadFailure, errMessage)
}
// Retrieve the fork digest.
ctxBytes, err := readContextFromStream(stream)
if err != nil {
return nil, errors.Wrap(err, "read context from stream")
}
// Check if the fork digest is recognized.
msgVersion, ok := ctxMap[bytesutil.ToBytes4(ctxBytes)]
if !ok {
return nil, errors.Errorf("unrecognized fork digest %#x", ctxBytes)
}
// Check if we are on Fulu.
if msgVersion < version.Fulu {
return nil, errors.Errorf(
"unexpected context bytes for DataColumnSidecar, ctx=%#x, msgVersion=%v, minimalSupportedVersion=%v",
ctxBytes, version.String(msgVersion), version.String(version.Fulu),
)
}
// Decode the data column sidecar from the stream.
dataColumnSidecar := new(ethpb.DataColumnSidecar)
if err := p2pApi.Encoding().DecodeWithMaxLength(stream, dataColumnSidecar); err != nil {
return nil, errors.Wrap(err, "failed to decode the protobuf-encoded BlobSidecar message from RPC chunk stream")
}
// Create a read-only data column from the data column sidecar.
roDataColumn, err := blocks.NewRODataColumn(dataColumnSidecar)
if err != nil {
return nil, errors.Wrap(err, "new read only data column")
}
return &roDataColumn, nil
}

View File

@@ -102,6 +102,7 @@ type config struct {
clock *startup.Clock
stateNotifier statefeed.Notifier
blobStorage *filesystem.BlobStorage
dataColumnStorage *filesystem.DataColumnStorage
}
// This defines the interface for interacting with block chain service

View File

@@ -0,0 +1,2 @@
### Added
- Implement `dataColumnSidecarByRootRPCHandler`.

View File

@@ -212,6 +212,11 @@ var (
Usage: "The factor by which blob batch limit may increase on burst.",
Value: 3,
}
DataColumnBatchLimit = &cli.IntFlag{
Name: "data-column-batch-limit",
Usage: "The amount of data columns the local peer is bounded to request and respond to in a batch.",
Value: 4096,
}
// DisableDebugRPCEndpoints disables the debug Beacon API namespace.
DisableDebugRPCEndpoints = &cli.BoolFlag{
Name: "disable-debug-rpc-endpoints",

View File

@@ -16,6 +16,7 @@ type GlobalFlags struct {
BlockBatchLimit int
BlockBatchLimitBurstFactor int
BlobBatchLimit int
DataColumnBatchLimit int
BlobBatchLimitBurstFactor int
}
@@ -53,6 +54,7 @@ func ConfigureGlobalFlags(ctx *cli.Context) {
cfg.BlockBatchLimitBurstFactor = ctx.Int(BlockBatchLimitBurstFactor.Name)
cfg.BlobBatchLimit = ctx.Int(BlobBatchLimit.Name)
cfg.BlobBatchLimitBurstFactor = ctx.Int(BlobBatchLimitBurstFactor.Name)
cfg.DataColumnBatchLimit = ctx.Int(DataColumnBatchLimit.Name)
cfg.MinimumPeersPerSubnet = ctx.Int(MinPeersPerSubnet.Name)
cfg.MaxConcurrentDials = ctx.Int(MaxConcurrentDials.Name)
configureMinimumPeers(ctx, cfg)

View File

@@ -14,7 +14,6 @@ import (
)
var (
// BlobStoragePathFlag defines a flag to start the beacon chain from a give genesis state file.
BlobStoragePathFlag = &cli.PathFlag{
Name: "blob-path",
Usage: "Location for blob storage. Default location will be a 'blobs' directory next to the beacon db.",
@@ -30,6 +29,10 @@ var (
Usage: layoutFlagUsage(),
Value: filesystem.LayoutNameFlat,
}
DataColumnStoragePathFlag = &cli.PathFlag{
Name: "data-column-path",
Usage: "Location for data column storage. Default location will be a 'data-columns' directory next to the beacon db.",
}
)
func layoutOptions() string {
@@ -54,15 +57,23 @@ func validateLayoutFlag(_ *cli.Context, v string) error {
// create a cancellable context. If we switch to using App.RunContext, we can set up this cancellation in the cmd
// package instead, and allow the functional options to tap into context cancellation.
func BeaconNodeOptions(c *cli.Context) ([]node.Option, error) {
e, err := blobRetentionEpoch(c)
blobRetentionEpoch, err := blobRetentionEpoch(c)
if err != nil {
return nil, err
return nil, errors.Wrap(err, "blob retention epoch")
}
opts := []node.Option{node.WithBlobStorageOptions(
filesystem.WithBlobRetentionEpochs(e),
blobStorageOptions := node.WithBlobStorageOptions(
filesystem.WithBlobRetentionEpochs(blobRetentionEpoch),
filesystem.WithBasePath(blobStoragePath(c)),
filesystem.WithLayout(c.String(BlobStorageLayout.Name)), // This is validated in the Action func for BlobStorageLayout.
)}
)
dataColumnStorageOption := node.WithDataColumnStorageOptions(
filesystem.WithDataColumnRetentionEpochs(blobRetentionEpoch),
filesystem.WithDataColumnBasePath(dataColumnStoragePath(c)),
)
opts := []node.Option{blobStorageOptions, dataColumnStorageOption}
return opts, nil
}
@@ -75,6 +86,16 @@ func blobStoragePath(c *cli.Context) string {
return blobsPath
}
func dataColumnStoragePath(c *cli.Context) string {
dataColumnsPath := c.Path(DataColumnStoragePathFlag.Name)
if dataColumnsPath == "" {
// append a "data-columns" subdir to the end of the data dir path
dataColumnsPath = path.Join(c.String(cmd.DataDirFlag.Name), "data-columns")
}
return dataColumnsPath
}
var errInvalidBlobRetentionEpochs = errors.New("value is smaller than spec minimum")
// blobRetentionEpoch returns the spec default MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUEST

View File

@@ -70,7 +70,6 @@ go_library(
"//testing/require:go_default_library",
"//time/slots:go_default_library",
"@com_github_crate_crypto_go_kzg_4844//:go_default_library",
"@com_github_ethereum_c_kzg_4844//bindings/go:go_default_library",
"@com_github_ethereum_go_ethereum//common:go_default_library",
"@com_github_ethereum_go_ethereum//common/hexutil:go_default_library",
"@com_github_ethereum_go_ethereum//core/types:go_default_library",

View File

@@ -3,80 +3,91 @@ package util
import (
"testing"
"github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain/kzg"
fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams"
"github.com/OffchainLabs/prysm/v6/config/params"
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
ethpb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1"
ckzg4844 "github.com/ethereum/c-kzg-4844/v2/bindings/go"
)
type (
DataColumnParams struct {
Slot primitives.Slot
ColumnIndex uint64
KzgCommitments [][]byte
DataColumn []byte // A whole data cell will be filled with the content of one item of this slice.
}
// DataColumnParam is a struct that holds parameters for creating test RODataColumn and VerifiedRODataColumn sidecars.
DataColumnParam struct {
Index uint64
Column [][]byte
KzgCommitments [][]byte
KzgProofs [][]byte
KzgCommitmentsInclusionProof [][]byte
DataColumnsParamsByRoot map[[fieldparams.RootLength]byte][]DataColumnParams
// Part of the beacon block header.
Slot primitives.Slot
ProposerIndex primitives.ValidatorIndex
ParentRoot []byte
StateRoot []byte
BodyRoot []byte
}
)
func CreateTestVerifiedRoDataColumnSidecars(t *testing.T, dataColumnParamsByBlockRoot DataColumnsParamsByRoot) ([]blocks.RODataColumn, []blocks.VerifiedRODataColumn) {
params.SetupTestConfigCleanup(t)
cfg := params.BeaconConfig().Copy()
cfg.FuluForkEpoch = 0
params.OverrideBeaconConfig(cfg)
count := 0
for _, indices := range dataColumnParamsByBlockRoot {
count += len(indices)
}
// CreateTestVerifiedRoDataColumnSidecars creates test RODataColumn and VerifiedRODataColumn sidecars for testing purposes.
func CreateTestVerifiedRoDataColumnSidecars(t *testing.T, params []DataColumnParam) ([]blocks.RODataColumn, []blocks.VerifiedRODataColumn) {
const (
kzgCommitmentsInclusionProofSize = 4
proofSize = 32
)
count := len(params)
verifiedRoDataColumnSidecars := make([]blocks.VerifiedRODataColumn, 0, count)
rodataColumnSidecars := make([]blocks.RODataColumn, 0, count)
for blockRoot, params := range dataColumnParamsByBlockRoot {
for _, param := range params {
dataColumn := make([][]byte, 0, len(param.DataColumn))
for _, value := range param.DataColumn {
cell := make([]byte, ckzg4844.BytesPerCell)
for i := range ckzg4844.BytesPerCell {
cell[i] = value
}
dataColumn = append(dataColumn, cell)
}
kzgCommitmentsInclusionProof := make([][]byte, 4)
for i := range kzgCommitmentsInclusionProof {
kzgCommitmentsInclusionProof[i] = make([]byte, 32)
}
for _, param := range params {
var parentRoot, stateRoot, bodyRoot [fieldparams.RootLength]byte
copy(parentRoot[:], param.ParentRoot)
copy(stateRoot[:], param.StateRoot)
copy(bodyRoot[:], param.BodyRoot)
dataColumnSidecar := &ethpb.DataColumnSidecar{
Index: param.ColumnIndex,
KzgCommitments: param.KzgCommitments,
Column: dataColumn,
KzgCommitmentsInclusionProof: kzgCommitmentsInclusionProof,
SignedBlockHeader: &ethpb.SignedBeaconBlockHeader{
Header: &ethpb.BeaconBlockHeader{
Slot: param.Slot,
ParentRoot: make([]byte, fieldparams.RootLength),
StateRoot: make([]byte, fieldparams.RootLength),
BodyRoot: make([]byte, fieldparams.RootLength),
},
Signature: make([]byte, fieldparams.BLSSignatureLength),
},
}
roDataColumnSidecar, err := blocks.NewRODataColumnWithRoot(dataColumnSidecar, blockRoot)
if err != nil {
t.Fatal(err)
}
rodataColumnSidecars = append(rodataColumnSidecars, roDataColumnSidecar)
verifiedRoDataColumnSidecar := blocks.NewVerifiedRODataColumn(roDataColumnSidecar)
verifiedRoDataColumnSidecars = append(verifiedRoDataColumnSidecars, verifiedRoDataColumnSidecar)
column := make([][]byte, 0, len(param.Column))
for _, cell := range param.Column {
var completeCell [kzg.BytesPerCell]byte
copy(completeCell[:], cell)
column = append(column, completeCell[:])
}
kzgCommitmentsInclusionProof := make([][]byte, 0, kzgCommitmentsInclusionProofSize)
for range kzgCommitmentsInclusionProofSize {
kzgCommitmentsInclusionProof = append(kzgCommitmentsInclusionProof, make([]byte, proofSize))
}
for i, proof := range param.KzgCommitmentsInclusionProof {
copy(kzgCommitmentsInclusionProof[i], proof)
}
dataColumnSidecar := &ethpb.DataColumnSidecar{
Index: param.Index,
Column: column,
KzgCommitments: param.KzgCommitments,
KzgProofs: param.KzgProofs,
SignedBlockHeader: &ethpb.SignedBeaconBlockHeader{
Header: &ethpb.BeaconBlockHeader{
Slot: param.Slot,
ProposerIndex: param.ProposerIndex,
ParentRoot: parentRoot[:],
StateRoot: stateRoot[:],
BodyRoot: bodyRoot[:],
},
Signature: make([]byte, fieldparams.BLSSignatureLength),
},
KzgCommitmentsInclusionProof: kzgCommitmentsInclusionProof,
}
roDataColumnSidecar, err := blocks.NewRODataColumn(dataColumnSidecar)
if err != nil {
t.Fatal(err)
}
rodataColumnSidecars = append(rodataColumnSidecars, roDataColumnSidecar)
verifiedRoDataColumnSidecar := blocks.NewVerifiedRODataColumn(roDataColumnSidecar)
verifiedRoDataColumnSidecars = append(verifiedRoDataColumnSidecars, verifiedRoDataColumnSidecar)
}
return rodataColumnSidecars, verifiedRoDataColumnSidecars