From 559d02bf4db837f2863848056f2261f4e98e6b20 Mon Sep 17 00:00:00 2001 From: Manu NALEPA Date: Thu, 19 Jun 2025 14:12:45 +0200 Subject: [PATCH] peerDAS: Implement `dataColumnSidecarByRootRPCHandler`. (#15405) * `CreateTestVerifiedRoDataColumnSidecars`: Use consistent block root. * peerDAS: Implement `dataColumnSidecarByRootRPCHandler`. * Fix James' comment. * Fix James' comment. --- beacon-chain/blockchain/options.go | 9 + beacon-chain/blockchain/process_block.go | 16 +- beacon-chain/blockchain/process_block_test.go | 102 ++++-- beacon-chain/blockchain/service.go | 43 +-- beacon-chain/das/availability_columns_test.go | 52 ++- beacon-chain/das/data_column_cache_test.go | 20 +- beacon-chain/db/filesystem/BUILD.bazel | 1 + beacon-chain/db/filesystem/blob.go | 4 +- beacon-chain/db/filesystem/blob_test.go | 2 +- beacon-chain/db/filesystem/data_column.go | 3 +- .../db/filesystem/data_column_test.go | 312 ++++++++--------- beacon-chain/node/node.go | 91 ++--- beacon-chain/node/node_test.go | 30 +- beacon-chain/node/options.go | 17 + beacon-chain/p2p/broadcaster_test.go | 2 +- beacon-chain/p2p/rpc_topic_mappings.go | 13 + beacon-chain/p2p/types/rpc_errors.go | 13 +- beacon-chain/sync/BUILD.bazel | 2 + beacon-chain/sync/error.go | 16 +- beacon-chain/sync/options.go | 8 + beacon-chain/sync/rpc.go | 27 +- beacon-chain/sync/rpc_chunked_response.go | 28 ++ .../sync/rpc_data_column_sidecars_by_root.go | 174 ++++++++++ .../rpc_data_column_sidecars_by_root_test.go | 314 ++++++++++++++++++ beacon-chain/sync/rpc_handler_test.go | 14 +- beacon-chain/sync/rpc_send_request.go | 51 +++ beacon-chain/sync/service.go | 1 + .../manu-peerdas-columns-by-root-handler.md | 2 + cmd/beacon-chain/flags/base.go | 5 + cmd/beacon-chain/flags/config.go | 2 + cmd/beacon-chain/storage/options.go | 33 +- testing/util/BUILD.bazel | 1 - testing/util/data_column.go | 127 +++---- 33 files changed, 1143 insertions(+), 392 deletions(-) create mode 100644 beacon-chain/sync/rpc_data_column_sidecars_by_root.go create mode 100644 beacon-chain/sync/rpc_data_column_sidecars_by_root_test.go create mode 100644 changelog/manu-peerdas-columns-by-root-handler.md diff --git a/beacon-chain/blockchain/options.go b/beacon-chain/blockchain/options.go index ea483b338b..5d15f94b0d 100644 --- a/beacon-chain/blockchain/options.go +++ b/beacon-chain/blockchain/options.go @@ -258,3 +258,12 @@ func WithLightClientStore(lcs *lightclient.Store) Option { return nil } } + +// WithStartWaitingDataColumnSidecars sets a channel that the `areDataColumnsAvailable` function will fill +// in when starting to wait for additional data columns. +func WithStartWaitingDataColumnSidecars(c chan bool) Option { + return func(s *Service) error { + s.startWaitingDataColumnSidecars = c + return nil + } +} diff --git a/beacon-chain/blockchain/process_block.go b/beacon-chain/blockchain/process_block.go index 656ce21265..2257107782 100644 --- a/beacon-chain/blockchain/process_block.go +++ b/beacon-chain/blockchain/process_block.go @@ -637,7 +637,11 @@ func missingDataColumnIndices(bs *filesystem.DataColumnStorage, root [fieldparam // The function will first check the database to see if all sidecars have been persisted. If any // sidecars are missing, it will then read from the sidecar notifier channel for the given root until the channel is // closed, the context hits cancellation/timeout, or notifications have been received for all the missing sidecars. -func (s *Service) isDataAvailable(ctx context.Context, root [32]byte, signedBlock interfaces.ReadOnlySignedBeaconBlock) error { +func (s *Service) isDataAvailable( + ctx context.Context, + root [fieldparams.RootLength]byte, + signedBlock interfaces.ReadOnlySignedBeaconBlock, +) error { block := signedBlock.Block() if block == nil { return errors.New("invalid nil beacon block") @@ -657,7 +661,11 @@ func (s *Service) isDataAvailable(ctx context.Context, root [32]byte, signedBloc // areDataColumnsAvailable blocks until all data columns committed to in the block are available, // or an error or context cancellation occurs. A nil result means that the data availability check is successful. -func (s *Service) areDataColumnsAvailable(ctx context.Context, root [fieldparams.RootLength]byte, block interfaces.ReadOnlyBeaconBlock) error { +func (s *Service) areDataColumnsAvailable( + ctx context.Context, + root [fieldparams.RootLength]byte, + block interfaces.ReadOnlyBeaconBlock, +) error { // We are only required to check within MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS blockSlot, currentSlot := block.Slot(), s.CurrentSlot() blockEpoch, currentEpoch := slots.ToEpoch(blockSlot), slots.ToEpoch(currentSlot) @@ -724,6 +732,10 @@ func (s *Service) areDataColumnsAvailable(ctx context.Context, root [fieldparams return nil } + if s.startWaitingDataColumnSidecars != nil { + s.startWaitingDataColumnSidecars <- true + } + // Log for DA checks that cross over into the next slot; helpful for debugging. nextSlot := slots.BeginsAt(block.Slot()+1, s.genesisTime) diff --git a/beacon-chain/blockchain/process_block_test.go b/beacon-chain/blockchain/process_block_test.go index baee94cbb4..21303f45d9 100644 --- a/beacon-chain/blockchain/process_block_test.go +++ b/beacon-chain/blockchain/process_block_test.go @@ -3332,17 +3332,27 @@ func testIsAvailableSetup(t *testing.T, params testIsAvailableParams) (context.C signedBeaconBlock, err := util.GenerateFullBlockFulu(genesisState, secretKeys, conf, 10 /*block slot*/) require.NoError(t, err) - root, err := signedBeaconBlock.Block.HashTreeRoot() + block := signedBeaconBlock.Block + bodyRoot, err := block.Body.HashTreeRoot() require.NoError(t, err) - dataColumnsParams := make([]util.DataColumnParams, 0, len(params.columnsToSave)) + root, err := block.HashTreeRoot() + require.NoError(t, err) + + dataColumnsParams := make([]util.DataColumnParam, 0, len(params.columnsToSave)) for _, i := range params.columnsToSave { - dataColumnParam := util.DataColumnParams{ColumnIndex: i} + dataColumnParam := util.DataColumnParam{ + Index: i, + Slot: block.Slot, + ProposerIndex: block.ProposerIndex, + ParentRoot: block.ParentRoot, + StateRoot: block.StateRoot, + BodyRoot: bodyRoot[:], + } dataColumnsParams = append(dataColumnsParams, dataColumnParam) } - dataColumnParamsByBlockRoot := util.DataColumnsParamsByRoot{root: dataColumnsParams} - _, verifiedRODataColumns := util.CreateTestVerifiedRoDataColumnSidecars(t, dataColumnParamsByBlockRoot) + _, verifiedRODataColumns := util.CreateTestVerifiedRoDataColumnSidecars(t, dataColumnsParams) err = dataColumnStorage.Save(verifiedRODataColumns) require.NoError(t, err) @@ -3402,38 +3412,47 @@ func TestIsDataAvailable(t *testing.T) { }) t.Run("Fulu - some initially missing data columns (no reconstruction)", func(t *testing.T) { + startWaiting := make(chan bool) + testParams := testIsAvailableParams{ - options: []Option{WithCustodyInfo(&peerdas.CustodyInfo{})}, + options: []Option{WithCustodyInfo(&peerdas.CustodyInfo{}), WithStartWaitingDataColumnSidecars(startWaiting)}, columnsToSave: []uint64{1, 17, 19, 75, 102, 117, 119}, // 119 is not needed, 42 and 87 are missing blobKzgCommitmentsCount: 3, } ctx, _, service, root, signed := testIsAvailableSetup(t, testParams) + block := signed.Block() + slot := block.Slot() + proposerIndex := block.ProposerIndex() + parentRoot := block.ParentRoot() + stateRoot := block.StateRoot() + bodyRoot, err := block.Body().HashTreeRoot() + require.NoError(t, err) - var wrongRoot [fieldparams.RootLength]byte - copy(wrongRoot[:], root[:]) - wrongRoot[0]++ // change the root to simulate a wrong root + _, verifiedSidecarsWrongRoot := util.CreateTestVerifiedRoDataColumnSidecars( + t, + []util.DataColumnParam{ + {Index: 42, Slot: slot + 1}, // Needed index, but not for this slot. + }) - _, verifiedSidecarsWrongRoot := util.CreateTestVerifiedRoDataColumnSidecars(t, util.DataColumnsParamsByRoot{wrongRoot: { - {ColumnIndex: 42}, // needed - }}) + _, verifiedSidecars := util.CreateTestVerifiedRoDataColumnSidecars(t, []util.DataColumnParam{ + {Index: 87, Slot: slot, ProposerIndex: proposerIndex, ParentRoot: parentRoot[:], StateRoot: stateRoot[:], BodyRoot: bodyRoot[:]}, // Needed index + {Index: 1, Slot: slot, ProposerIndex: proposerIndex, ParentRoot: parentRoot[:], StateRoot: stateRoot[:], BodyRoot: bodyRoot[:]}, // Not needed index + {Index: 42, Slot: slot, ProposerIndex: proposerIndex, ParentRoot: parentRoot[:], StateRoot: stateRoot[:], BodyRoot: bodyRoot[:]}, // Needed index + }) - _, verifiedSidecars := util.CreateTestVerifiedRoDataColumnSidecars(t, util.DataColumnsParamsByRoot{root: { - {ColumnIndex: 87}, // needed - {ColumnIndex: 1}, // not needed - {ColumnIndex: 42}, // needed - }}) + go func() { + <-startWaiting - time.AfterFunc(10*time.Millisecond, func() { err := service.dataColumnStorage.Save(verifiedSidecarsWrongRoot) require.NoError(t, err) err = service.dataColumnStorage.Save(verifiedSidecars) require.NoError(t, err) - }) + }() - err := service.isDataAvailable(ctx, root, signed) + err = service.isDataAvailable(ctx, root, signed) require.NoError(t, err) }) @@ -3442,6 +3461,9 @@ func TestIsDataAvailable(t *testing.T) { missingColumns = uint64(2) cgc = 128 ) + + startWaiting := make(chan bool) + var custodyInfo peerdas.CustodyInfo custodyInfo.TargetGroupCount.SetValidatorsCustodyRequirement(cgc) custodyInfo.ToAdvertiseGroupCount.Set(cgc) @@ -3454,41 +3476,61 @@ func TestIsDataAvailable(t *testing.T) { } testParams := testIsAvailableParams{ - options: []Option{WithCustodyInfo(&custodyInfo)}, + options: []Option{WithCustodyInfo(&custodyInfo), WithStartWaitingDataColumnSidecars(startWaiting)}, columnsToSave: indices, blobKzgCommitmentsCount: 3, } ctx, _, service, root, signed := testIsAvailableSetup(t, testParams) + block := signed.Block() + slot := block.Slot() + proposerIndex := block.ProposerIndex() + parentRoot := block.ParentRoot() + stateRoot := block.StateRoot() + bodyRoot, err := block.Body().HashTreeRoot() + require.NoError(t, err) - dataColumnParams := make([]util.DataColumnParams, 0, missingColumns) + dataColumnParams := make([]util.DataColumnParam, 0, missingColumns) for i := minimumColumnsCountToReconstruct - missingColumns; i < minimumColumnsCountToReconstruct; i++ { - dataColumnParam := util.DataColumnParams{ColumnIndex: i} + dataColumnParam := util.DataColumnParam{ + Index: i, + Slot: slot, + ProposerIndex: proposerIndex, + ParentRoot: parentRoot[:], + StateRoot: stateRoot[:], + BodyRoot: bodyRoot[:], + } + dataColumnParams = append(dataColumnParams, dataColumnParam) } - _, verifiedSidecars := util.CreateTestVerifiedRoDataColumnSidecars(t, util.DataColumnsParamsByRoot{root: dataColumnParams}) + _, verifiedSidecars := util.CreateTestVerifiedRoDataColumnSidecars(t, dataColumnParams) + + go func() { + <-startWaiting - time.AfterFunc(10*time.Millisecond, func() { err := service.dataColumnStorage.Save(verifiedSidecars) require.NoError(t, err) - }) + }() - err := service.isDataAvailable(ctx, root, signed) + err = service.isDataAvailable(ctx, root, signed) require.NoError(t, err) }) t.Run("Fulu - some columns are definitively missing", func(t *testing.T) { + startWaiting := make(chan bool) + params := testIsAvailableParams{ - options: []Option{WithCustodyInfo(&peerdas.CustodyInfo{})}, + options: []Option{WithCustodyInfo(&peerdas.CustodyInfo{}), WithStartWaitingDataColumnSidecars(startWaiting)}, blobKzgCommitmentsCount: 3, } ctx, cancel, service, root, signed := testIsAvailableSetup(t, params) - time.AfterFunc(10*time.Millisecond, func() { + go func() { + <-startWaiting cancel() - }) + }() err := service.isDataAvailable(ctx, root, signed) require.NotNil(t, err) diff --git a/beacon-chain/blockchain/service.go b/beacon-chain/blockchain/service.go index b370f8f474..e5de0b2525 100644 --- a/beacon-chain/blockchain/service.go +++ b/beacon-chain/blockchain/service.go @@ -47,27 +47,28 @@ import ( // Service represents a service that handles the internal // logic of managing the full PoS beacon chain. type Service struct { - cfg *config - ctx context.Context - cancel context.CancelFunc - genesisTime time.Time - head *head - headLock sync.RWMutex - originBlockRoot [32]byte // genesis root, or weak subjectivity checkpoint root, depending on how the node is initialized - boundaryRoots [][32]byte - checkpointStateCache *cache.CheckpointStateCache - initSyncBlocks map[[32]byte]interfaces.ReadOnlySignedBeaconBlock - initSyncBlocksLock sync.RWMutex - wsVerifier *WeakSubjectivityVerifier - clockSetter startup.ClockSetter - clockWaiter startup.ClockWaiter - syncComplete chan struct{} - blobNotifiers *blobNotifierMap - blockBeingSynced *currentlySyncingBlock - blobStorage *filesystem.BlobStorage - dataColumnStorage *filesystem.DataColumnStorage - slasherEnabled bool - lcStore *lightClient.Store + cfg *config + ctx context.Context + cancel context.CancelFunc + genesisTime time.Time + head *head + headLock sync.RWMutex + originBlockRoot [32]byte // genesis root, or weak subjectivity checkpoint root, depending on how the node is initialized + boundaryRoots [][32]byte + checkpointStateCache *cache.CheckpointStateCache + initSyncBlocks map[[32]byte]interfaces.ReadOnlySignedBeaconBlock + initSyncBlocksLock sync.RWMutex + wsVerifier *WeakSubjectivityVerifier + clockSetter startup.ClockSetter + clockWaiter startup.ClockWaiter + syncComplete chan struct{} + blobNotifiers *blobNotifierMap + blockBeingSynced *currentlySyncingBlock + blobStorage *filesystem.BlobStorage + dataColumnStorage *filesystem.DataColumnStorage + slasherEnabled bool + lcStore *lightClient.Store + startWaitingDataColumnSidecars chan bool // for testing purposes only } // config options for the service. diff --git a/beacon-chain/das/availability_columns_test.go b/beacon-chain/das/availability_columns_test.go index 87f4d2eb9d..1cf26d5423 100644 --- a/beacon-chain/das/availability_columns_test.go +++ b/beacon-chain/das/availability_columns_test.go @@ -38,9 +38,9 @@ func TestPersist(t *testing.T) { t.Run("mixed roots", func(t *testing.T) { dataColumnStorage := filesystem.NewEphemeralDataColumnStorage(t) - dataColumnParamsByBlockRoot := map[[fieldparams.RootLength]byte][]util.DataColumnParams{ - {1}: {{ColumnIndex: 1}}, - {2}: {{ColumnIndex: 2}}, + dataColumnParamsByBlockRoot := []util.DataColumnParam{ + {Slot: 1, Index: 1}, + {Slot: 2, Index: 2}, } roSidecars, _ := roSidecarsFromDataColumnParamsByBlockRoot(t, dataColumnParamsByBlockRoot) @@ -54,8 +54,8 @@ func TestPersist(t *testing.T) { t.Run("outside DA period", func(t *testing.T) { dataColumnStorage := filesystem.NewEphemeralDataColumnStorage(t) - dataColumnParamsByBlockRoot := map[[fieldparams.RootLength]byte][]util.DataColumnParams{ - {1}: {{ColumnIndex: 1}}, + dataColumnParamsByBlockRoot := []util.DataColumnParam{ + {Slot: 1, Index: 1}, } roSidecars, _ := roSidecarsFromDataColumnParamsByBlockRoot(t, dataColumnParamsByBlockRoot) @@ -67,21 +67,24 @@ func TestPersist(t *testing.T) { }) t.Run("nominal", func(t *testing.T) { + const slot = 42 dataColumnStorage := filesystem.NewEphemeralDataColumnStorage(t) - dataColumnParamsByBlockRoot := map[[fieldparams.RootLength]byte][]util.DataColumnParams{ - {}: {{ColumnIndex: 1}, {ColumnIndex: 5}}, + dataColumnParamsByBlockRoot := []util.DataColumnParam{ + {Slot: slot, Index: 1}, + {Slot: slot, Index: 5}, } roSidecars, roDataColumns := roSidecarsFromDataColumnParamsByBlockRoot(t, dataColumnParamsByBlockRoot) lazilyPersistentStoreColumns := NewLazilyPersistentStoreColumn(dataColumnStorage, enode.ID{}, nil, &peerdas.CustodyInfo{}) - err := lazilyPersistentStoreColumns.Persist(0, roSidecars...) + err := lazilyPersistentStoreColumns.Persist(slot, roSidecars...) require.NoError(t, err) require.Equal(t, 1, len(lazilyPersistentStoreColumns.cache.entries)) - key := cacheKey{slot: 0, root: [fieldparams.RootLength]byte{}} - entry := lazilyPersistentStoreColumns.cache.entries[key] + key := cacheKey{slot: slot, root: roDataColumns[0].BlockRoot()} + entry, ok := lazilyPersistentStoreColumns.cache.entries[key] + require.Equal(t, true, ok) // A call to Persist does NOT save the sidecars to disk. require.Equal(t, uint64(0), entry.diskSummary.Count()) @@ -121,24 +124,37 @@ func TestIsDataAvailable(t *testing.T) { signedBeaconBlockFulu := util.NewBeaconBlockFulu() signedBeaconBlockFulu.Block.Body.BlobKzgCommitments = commitments signedRoBlock := newSignedRoBlock(t, signedBeaconBlockFulu) + block := signedRoBlock.Block() + slot := block.Slot() + proposerIndex := block.ProposerIndex() + parentRoot := block.ParentRoot() + stateRoot := block.StateRoot() + bodyRoot, err := block.Body().HashTreeRoot() + require.NoError(t, err) + root := signedRoBlock.Root() dataColumnStorage := filesystem.NewEphemeralDataColumnStorage(t) lazilyPersistentStoreColumns := NewLazilyPersistentStoreColumn(dataColumnStorage, enode.ID{}, newDataColumnsVerifier, &peerdas.CustodyInfo{}) indices := [...]uint64{1, 17, 87, 102} - dataColumnsParams := make([]util.DataColumnParams, 0, len(indices)) + dataColumnsParams := make([]util.DataColumnParam, 0, len(indices)) for _, index := range indices { - dataColumnParams := util.DataColumnParams{ - ColumnIndex: index, + dataColumnParams := util.DataColumnParam{ + Index: index, KzgCommitments: commitments, + + Slot: slot, + ProposerIndex: proposerIndex, + ParentRoot: parentRoot[:], + StateRoot: stateRoot[:], + BodyRoot: bodyRoot[:], } dataColumnsParams = append(dataColumnsParams, dataColumnParams) } - dataColumnsParamsByBlockRoot := util.DataColumnsParamsByRoot{root: dataColumnsParams} - _, verifiedRoDataColumns := util.CreateTestVerifiedRoDataColumnSidecars(t, dataColumnsParamsByBlockRoot) + _, verifiedRoDataColumns := util.CreateTestVerifiedRoDataColumnSidecars(t, dataColumnsParams) key := cacheKey{root: root} entry := lazilyPersistentStoreColumns.cache.ensure(key) @@ -149,7 +165,7 @@ func TestIsDataAvailable(t *testing.T) { require.NoError(t, err) } - err := lazilyPersistentStoreColumns.IsDataAvailable(ctx, 0 /*current slot*/, signedRoBlock) + err = lazilyPersistentStoreColumns.IsDataAvailable(ctx, slot, signedRoBlock) require.NoError(t, err) actual, err := dataColumnStorage.Get(root, indices[:]) @@ -224,8 +240,8 @@ func TestFullCommitmentsToCheck(t *testing.T) { } } -func roSidecarsFromDataColumnParamsByBlockRoot(t *testing.T, dataColumnParamsByBlockRoot util.DataColumnsParamsByRoot) ([]blocks.ROSidecar, []blocks.RODataColumn) { - roDataColumns, _ := util.CreateTestVerifiedRoDataColumnSidecars(t, dataColumnParamsByBlockRoot) +func roSidecarsFromDataColumnParamsByBlockRoot(t *testing.T, parameters []util.DataColumnParam) ([]blocks.ROSidecar, []blocks.RODataColumn) { + roDataColumns, _ := util.CreateTestVerifiedRoDataColumnSidecars(t, parameters) roSidecars := make([]blocks.ROSidecar, 0, len(roDataColumns)) for _, roDataColumn := range roDataColumns { diff --git a/beacon-chain/das/data_column_cache_test.go b/beacon-chain/das/data_column_cache_test.go index ebac11f2ac..1fefd4a105 100644 --- a/beacon-chain/das/data_column_cache_test.go +++ b/beacon-chain/das/data_column_cache_test.go @@ -28,8 +28,7 @@ func TestEnsureDeleteSetDiskSummary(t *testing.T) { func TestStash(t *testing.T) { t.Run("Index too high", func(t *testing.T) { - dataColumnParamsByBlockRoot := util.DataColumnsParamsByRoot{{1}: {{ColumnIndex: 10_000}}} - roDataColumns, _ := util.CreateTestVerifiedRoDataColumnSidecars(t, dataColumnParamsByBlockRoot) + roDataColumns, _ := util.CreateTestVerifiedRoDataColumnSidecars(t, []util.DataColumnParam{{Index: 10_000}}) var entry dataColumnCacheEntry err := entry.stash(&roDataColumns[0]) @@ -37,8 +36,7 @@ func TestStash(t *testing.T) { }) t.Run("Nominal and already existing", func(t *testing.T) { - dataColumnParamsByBlockRoot := util.DataColumnsParamsByRoot{{1}: {{ColumnIndex: 1}}} - roDataColumns, _ := util.CreateTestVerifiedRoDataColumnSidecars(t, dataColumnParamsByBlockRoot) + roDataColumns, _ := util.CreateTestVerifiedRoDataColumnSidecars(t, []util.DataColumnParam{{Index: 1}}) var entry dataColumnCacheEntry err := entry.stash(&roDataColumns[0]) @@ -76,36 +74,30 @@ func TestFilterDataColumns(t *testing.T) { }) t.Run("Commitments not equal", func(t *testing.T) { - root := [fieldparams.RootLength]byte{} commitmentsArray := safeCommitmentsArray{nil, [][]byte{[]byte{1}}} - dataColumnParamsByBlockRoot := util.DataColumnsParamsByRoot{root: {{ColumnIndex: 1}}} - roDataColumns, _ := util.CreateTestVerifiedRoDataColumnSidecars(t, dataColumnParamsByBlockRoot) + roDataColumns, _ := util.CreateTestVerifiedRoDataColumnSidecars(t, []util.DataColumnParam{{Index: 1}}) var scs [fieldparams.NumberOfColumns]*blocks.RODataColumn scs[1] = &roDataColumns[0] dataColumnCacheEntry := dataColumnCacheEntry{scs: scs} - _, err := dataColumnCacheEntry.filter(root, &commitmentsArray) + _, err := dataColumnCacheEntry.filter(roDataColumns[0].BlockRoot(), &commitmentsArray) require.NotNil(t, err) }) t.Run("Nominal", func(t *testing.T) { - root := [fieldparams.RootLength]byte{} commitmentsArray := safeCommitmentsArray{nil, [][]byte{[]byte{1}}, nil, [][]byte{[]byte{3}}} - diskSummary := filesystem.NewDataColumnStorageSummary(42, [fieldparams.NumberOfColumns]bool{false, true}) - - dataColumnParamsByBlockRoot := util.DataColumnsParamsByRoot{root: {{ColumnIndex: 3, KzgCommitments: [][]byte{[]byte{3}}}}} - expected, _ := util.CreateTestVerifiedRoDataColumnSidecars(t, dataColumnParamsByBlockRoot) + expected, _ := util.CreateTestVerifiedRoDataColumnSidecars(t, []util.DataColumnParam{{Index: 3, KzgCommitments: [][]byte{[]byte{3}}}}) var scs [fieldparams.NumberOfColumns]*blocks.RODataColumn scs[3] = &expected[0] dataColumnCacheEntry := dataColumnCacheEntry{scs: scs, diskSummary: diskSummary} - actual, err := dataColumnCacheEntry.filter(root, &commitmentsArray) + actual, err := dataColumnCacheEntry.filter(expected[0].BlockRoot(), &commitmentsArray) require.NoError(t, err) require.DeepEqual(t, expected, actual) diff --git a/beacon-chain/db/filesystem/BUILD.bazel b/beacon-chain/db/filesystem/BUILD.bazel index 1deadce105..d3d2f3a261 100644 --- a/beacon-chain/db/filesystem/BUILD.bazel +++ b/beacon-chain/db/filesystem/BUILD.bazel @@ -59,6 +59,7 @@ go_test( "//beacon-chain/verification:go_default_library", "//config/fieldparams:go_default_library", "//config/params:go_default_library", + "//consensus-types/blocks:go_default_library", "//consensus-types/primitives:go_default_library", "//encoding/bytesutil:go_default_library", "//proto/prysm/v1alpha1:go_default_library", diff --git a/beacon-chain/db/filesystem/blob.go b/beacon-chain/db/filesystem/blob.go index 552918fe41..f57be212a0 100644 --- a/beacon-chain/db/filesystem/blob.go +++ b/beacon-chain/db/filesystem/blob.go @@ -25,7 +25,7 @@ func directoryPermissions() os.FileMode { var ( errIndexOutOfBounds = errors.New("blob index in file name >= MAX_BLOBS_PER_BLOCK") errSidecarEmptySSZData = errors.New("sidecar marshalled to an empty ssz byte slice") - errNoBasePath = errors.New("BlobStorage base path not specified in init") + errNoBlobBasePath = errors.New("BlobStorage base path not specified in init") ) // BlobStorageOption is a functional option for configuring a BlobStorage. @@ -85,7 +85,7 @@ func NewBlobStorage(opts ...BlobStorageOption) (*BlobStorage, error) { // Allow tests to set up a different fs using WithFs. if b.fs == nil { if b.base == "" { - return nil, errNoBasePath + return nil, errNoBlobBasePath } b.base = path.Clean(b.base) if err := file.MkdirAll(b.base); err != nil { diff --git a/beacon-chain/db/filesystem/blob_test.go b/beacon-chain/db/filesystem/blob_test.go index e442f7a265..0079bd8884 100644 --- a/beacon-chain/db/filesystem/blob_test.go +++ b/beacon-chain/db/filesystem/blob_test.go @@ -160,7 +160,7 @@ func writeFakeSSZ(t *testing.T, fs afero.Fs, root [32]byte, slot primitives.Slot func TestNewBlobStorage(t *testing.T) { _, err := NewBlobStorage() - require.ErrorIs(t, err, errNoBasePath) + require.ErrorIs(t, err, errNoBlobBasePath) _, err = NewBlobStorage(WithBasePath(path.Join(t.TempDir(), "good"))) require.NoError(t, err) } diff --git a/beacon-chain/db/filesystem/data_column.go b/beacon-chain/db/filesystem/data_column.go index 73f9a0d3fc..09f4895eb1 100644 --- a/beacon-chain/db/filesystem/data_column.go +++ b/beacon-chain/db/filesystem/data_column.go @@ -50,6 +50,7 @@ var ( errTooManyDataColumns = errors.New("too many data columns") errWrongSszEncodedDataColumnSidecarSize = errors.New("wrong SSZ encoded data column sidecar size") errDataColumnSidecarsFromDifferentSlots = errors.New("data column sidecars from different slots") + errNoDataColumnBasePath = errors.New("DataColumnStorage base path not specified in init") ) type ( @@ -142,7 +143,7 @@ func NewDataColumnStorage(ctx context.Context, opts ...DataColumnStorageOption) // Allow tests to set up a different fs using WithFs. if storage.fs == nil { if storage.base == "" { - return nil, errNoBasePath + return nil, errNoDataColumnBasePath } storage.base = path.Clean(storage.base) diff --git a/beacon-chain/db/filesystem/data_column_test.go b/beacon-chain/db/filesystem/data_column_test.go index 947bcc9d4c..86989408f4 100644 --- a/beacon-chain/db/filesystem/data_column_test.go +++ b/beacon-chain/db/filesystem/data_column_test.go @@ -7,6 +7,7 @@ import ( fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams" "github.com/OffchainLabs/prysm/v6/config/params" + "github.com/OffchainLabs/prysm/v6/consensus-types/blocks" "github.com/OffchainLabs/prysm/v6/consensus-types/primitives" "github.com/OffchainLabs/prysm/v6/testing/require" "github.com/OffchainLabs/prysm/v6/testing/util" @@ -18,7 +19,7 @@ func TestNewDataColumnStorage(t *testing.T) { t.Run("No base path", func(t *testing.T) { _, err := NewDataColumnStorage(ctx) - require.ErrorIs(t, err, errNoBasePath) + require.ErrorIs(t, err, errNoDataColumnBasePath) }) t.Run("Nominal", func(t *testing.T) { @@ -40,32 +41,18 @@ func TestWarmCache(t *testing.T) { _, verifiedRoDataColumnSidecars := util.CreateTestVerifiedRoDataColumnSidecars( t, - util.DataColumnsParamsByRoot{ - {0}: { - {Slot: 33, ColumnIndex: 2, DataColumn: []byte{1, 2, 3}}, // Period 0 - Epoch 1 - {Slot: 33, ColumnIndex: 4, DataColumn: []byte{2, 3, 4}}, // Period 0 - Epoch 1 - }, - {1}: { - {Slot: 128_002, ColumnIndex: 2, DataColumn: []byte{1, 2, 3}}, // Period 0 - Epoch 4000 - {Slot: 128_002, ColumnIndex: 4, DataColumn: []byte{2, 3, 4}}, // Period 0 - Epoch 4000 - }, - {2}: { - {Slot: 128_003, ColumnIndex: 1, DataColumn: []byte{1, 2, 3}}, // Period 0 - Epoch 4000 - {Slot: 128_003, ColumnIndex: 3, DataColumn: []byte{2, 3, 4}}, // Period 0 - Epoch 4000 - }, - {3}: { - {Slot: 128_034, ColumnIndex: 2, DataColumn: []byte{1, 2, 3}}, // Period 0 - Epoch 4001 - {Slot: 128_034, ColumnIndex: 4, DataColumn: []byte{2, 3, 4}}, // Period 0 - Epoch 4001 - }, - {4}: { - {Slot: 131_138, ColumnIndex: 2, DataColumn: []byte{1, 2, 3}}, // Period 1 - Epoch 4098 - }, - {5}: { - {Slot: 131_138, ColumnIndex: 1, DataColumn: []byte{1, 2, 3}}, // Period 1 - Epoch 4098 - }, - {6}: { - {Slot: 131_168, ColumnIndex: 0, DataColumn: []byte{1, 2, 3}}, // Period 1 - Epoch 4099 - }, + []util.DataColumnParam{ + {Slot: 33, Index: 2, Column: [][]byte{{1}, {2}, {3}}}, // Period 0 - Epoch 1 + {Slot: 33, Index: 4, Column: [][]byte{{2}, {3}, {4}}}, // Period 0 - Epoch 1 + {Slot: 128_002, Index: 2, Column: [][]byte{{1}, {2}, {3}}}, // Period 0 - Epoch 4000 + {Slot: 128_002, Index: 4, Column: [][]byte{{2}, {3}, {4}}}, // Period 0 - Epoch 4000 + {Slot: 128_003, Index: 1, Column: [][]byte{{1}, {2}, {3}}}, // Period 0 - Epoch 4000 + {Slot: 128_003, Index: 3, Column: [][]byte{{2}, {3}, {4}}}, // Period 0 - Epoch 4000 + {Slot: 128_034, Index: 2, Column: [][]byte{{1}, {2}, {3}}}, // Period 0 - Epoch 4001 + {Slot: 128_034, Index: 4, Column: [][]byte{{2}, {3}, {4}}}, // Period 0 - Epoch 4001 + {Slot: 131_138, Index: 2, Column: [][]byte{{1}, {2}, {3}}}, // Period 1 - Epoch 4098 + {Slot: 131_138, Index: 1, Column: [][]byte{{1}, {2}, {3}}}, // Period 1 - Epoch 4098 + {Slot: 131_168, Index: 0, Column: [][]byte{{1}, {2}, {3}}}, // Period 1 - Epoch 4099 }, ) @@ -76,29 +63,25 @@ func TestWarmCache(t *testing.T) { storage.WarmCache() require.Equal(t, primitives.Epoch(4_000), storage.cache.lowestCachedEpoch) - require.Equal(t, 6, len(storage.cache.cache)) + require.Equal(t, 5, len(storage.cache.cache)) - summary, ok := storage.cache.get([fieldparams.RootLength]byte{1}) + summary, ok := storage.cache.get(verifiedRoDataColumnSidecars[2].BlockRoot()) require.Equal(t, true, ok) require.DeepEqual(t, DataColumnStorageSummary{epoch: 4_000, mask: [fieldparams.NumberOfColumns]bool{false, false, true, false, true}}, summary) - summary, ok = storage.cache.get([fieldparams.RootLength]byte{2}) + summary, ok = storage.cache.get(verifiedRoDataColumnSidecars[4].BlockRoot()) require.Equal(t, true, ok) require.DeepEqual(t, DataColumnStorageSummary{epoch: 4_000, mask: [fieldparams.NumberOfColumns]bool{false, true, false, true}}, summary) - summary, ok = storage.cache.get([fieldparams.RootLength]byte{3}) + summary, ok = storage.cache.get(verifiedRoDataColumnSidecars[6].BlockRoot()) require.Equal(t, true, ok) require.DeepEqual(t, DataColumnStorageSummary{epoch: 4_001, mask: [fieldparams.NumberOfColumns]bool{false, false, true, false, true}}, summary) - summary, ok = storage.cache.get([fieldparams.RootLength]byte{4}) + summary, ok = storage.cache.get(verifiedRoDataColumnSidecars[8].BlockRoot()) require.Equal(t, true, ok) - require.DeepEqual(t, DataColumnStorageSummary{epoch: 4_098, mask: [fieldparams.NumberOfColumns]bool{false, false, true}}, summary) + require.DeepEqual(t, DataColumnStorageSummary{epoch: 4_098, mask: [fieldparams.NumberOfColumns]bool{false, true, true}}, summary) - summary, ok = storage.cache.get([fieldparams.RootLength]byte{5}) - require.Equal(t, true, ok) - require.DeepEqual(t, DataColumnStorageSummary{epoch: 4_098, mask: [fieldparams.NumberOfColumns]bool{false, true}}, summary) - - summary, ok = storage.cache.get([fieldparams.RootLength]byte{6}) + summary, ok = storage.cache.get(verifiedRoDataColumnSidecars[10].BlockRoot()) require.Equal(t, true, ok) require.DeepEqual(t, DataColumnStorageSummary{epoch: 4_099, mask: [fieldparams.NumberOfColumns]bool{true}}, summary) } @@ -112,9 +95,7 @@ func TestSaveDataColumnsSidecars(t *testing.T) { _, verifiedRoDataColumnSidecars := util.CreateTestVerifiedRoDataColumnSidecars( t, - util.DataColumnsParamsByRoot{ - {}: {{ColumnIndex: 12}, {ColumnIndex: 1_000_000}, {ColumnIndex: 48}}, - }, + []util.DataColumnParam{{Index: 12}, {Index: 1_000_000}, {Index: 48}}, ) _, dataColumnStorage := NewEphemeralDataColumnStorageAndFs(t) @@ -125,7 +106,7 @@ func TestSaveDataColumnsSidecars(t *testing.T) { t.Run("one of the column index is too large", func(t *testing.T) { _, verifiedRoDataColumnSidecars := util.CreateTestVerifiedRoDataColumnSidecars( t, - util.DataColumnsParamsByRoot{{}: {{ColumnIndex: 12}, {ColumnIndex: 1_000_000}, {ColumnIndex: 48}}}, + []util.DataColumnParam{{Index: 12}, {Index: 1_000_000}, {Index: 48}}, ) _, dataColumnStorage := NewEphemeralDataColumnStorageAndFs(t) @@ -136,23 +117,34 @@ func TestSaveDataColumnsSidecars(t *testing.T) { t.Run("different slots", func(t *testing.T) { _, verifiedRoDataColumnSidecars := util.CreateTestVerifiedRoDataColumnSidecars( t, - util.DataColumnsParamsByRoot{ - {}: { - {Slot: 1, ColumnIndex: 12, DataColumn: []byte{1, 2, 3}}, - {Slot: 2, ColumnIndex: 12, DataColumn: []byte{1, 2, 3}}, - }, + []util.DataColumnParam{ + {Slot: 1, Index: 12, Column: [][]byte{{1}, {2}, {3}}}, + {Slot: 2, Index: 12, Column: [][]byte{{1}, {2}, {3}}}, }, ) + // Create a sidecar with a different slot but the same root. + alteredVerifiedRoDataColumnSidecars := make([]blocks.VerifiedRODataColumn, 0, 2) + alteredVerifiedRoDataColumnSidecars = append(alteredVerifiedRoDataColumnSidecars, verifiedRoDataColumnSidecars[0]) + + altered, err := blocks.NewRODataColumnWithRoot( + verifiedRoDataColumnSidecars[1].RODataColumn.DataColumnSidecar, + verifiedRoDataColumnSidecars[0].BlockRoot(), + ) + require.NoError(t, err) + + verifiedAltered := blocks.NewVerifiedRODataColumn(altered) + alteredVerifiedRoDataColumnSidecars = append(alteredVerifiedRoDataColumnSidecars, verifiedAltered) + _, dataColumnStorage := NewEphemeralDataColumnStorageAndFs(t) - err := dataColumnStorage.Save(verifiedRoDataColumnSidecars) + err = dataColumnStorage.Save(alteredVerifiedRoDataColumnSidecars) require.ErrorIs(t, err, errDataColumnSidecarsFromDifferentSlots) }) t.Run("new file - no data columns to save", func(t *testing.T) { _, verifiedRoDataColumnSidecars := util.CreateTestVerifiedRoDataColumnSidecars( t, - util.DataColumnsParamsByRoot{{}: {}}, + []util.DataColumnParam{}, ) _, dataColumnStorage := NewEphemeralDataColumnStorageAndFs(t) @@ -163,11 +155,9 @@ func TestSaveDataColumnsSidecars(t *testing.T) { t.Run("new file - different data column size", func(t *testing.T) { _, verifiedRoDataColumnSidecars := util.CreateTestVerifiedRoDataColumnSidecars( t, - util.DataColumnsParamsByRoot{ - {}: { - {ColumnIndex: 12, DataColumn: []byte{1, 2, 3}}, - {ColumnIndex: 11, DataColumn: []byte{1, 2, 3, 4}}, - }, + []util.DataColumnParam{ + {Slot: 1, Index: 12, Column: [][]byte{{1}, {2}, {3}}}, + {Slot: 1, Index: 13, Column: [][]byte{{1}, {2}, {3}, {4}}}, }, ) @@ -179,7 +169,9 @@ func TestSaveDataColumnsSidecars(t *testing.T) { t.Run("existing file - wrong incoming SSZ encoded size", func(t *testing.T) { _, verifiedRoDataColumnSidecars := util.CreateTestVerifiedRoDataColumnSidecars( t, - util.DataColumnsParamsByRoot{{1}: {{ColumnIndex: 12, DataColumn: []byte{1, 2, 3}}}}, + []util.DataColumnParam{ + {Slot: 1, Index: 12, Column: [][]byte{{1}, {2}, {3}}}, + }, ) // Save data columns into a file. @@ -191,7 +183,9 @@ func TestSaveDataColumnsSidecars(t *testing.T) { // column index and an different SSZ encoded size. _, verifiedRoDataColumnSidecars = util.CreateTestVerifiedRoDataColumnSidecars( t, - util.DataColumnsParamsByRoot{{1}: {{ColumnIndex: 13, DataColumn: []byte{1, 2, 3, 4}}}}, + []util.DataColumnParam{ + {Slot: 1, Index: 13, Column: [][]byte{{1}, {2}, {3}, {4}}}, + }, ) // Try to rewrite the file. @@ -202,17 +196,13 @@ func TestSaveDataColumnsSidecars(t *testing.T) { t.Run("nominal", func(t *testing.T) { _, inputVerifiedRoDataColumnSidecars := util.CreateTestVerifiedRoDataColumnSidecars( t, - util.DataColumnsParamsByRoot{ - {1}: { - {ColumnIndex: 12, DataColumn: []byte{1, 2, 3}}, - {ColumnIndex: 11, DataColumn: []byte{3, 4, 5}}, - {ColumnIndex: 12, DataColumn: []byte{1, 2, 3}}, // OK if duplicate - {ColumnIndex: 13, DataColumn: []byte{6, 7, 8}}, - }, - {2}: { - {ColumnIndex: 12, DataColumn: []byte{3, 4, 5}}, - {ColumnIndex: 13, DataColumn: []byte{6, 7, 8}}, - }, + []util.DataColumnParam{ + {Slot: 1, Index: 12, Column: [][]byte{{1}, {2}, {3}}}, + {Slot: 1, Index: 11, Column: [][]byte{{3}, {4}, {5}}}, + {Slot: 1, Index: 12, Column: [][]byte{{1}, {2}, {3}}}, // OK if duplicate + {Slot: 1, Index: 13, Column: [][]byte{{6}, {7}, {8}}}, + {Slot: 2, Index: 12, Column: [][]byte{{3}, {4}, {5}}}, + {Slot: 2, Index: 13, Column: [][]byte{{6}, {7}, {8}}}, }, ) @@ -222,16 +212,12 @@ func TestSaveDataColumnsSidecars(t *testing.T) { _, inputVerifiedRoDataColumnSidecars = util.CreateTestVerifiedRoDataColumnSidecars( t, - util.DataColumnsParamsByRoot{ - {1}: { - {ColumnIndex: 12, DataColumn: []byte{1, 2, 3}}, // OK if duplicate - {ColumnIndex: 15, DataColumn: []byte{2, 3, 4}}, - {ColumnIndex: 1, DataColumn: []byte{2, 3, 4}}, - }, - {3}: { - {ColumnIndex: 6, DataColumn: []byte{3, 4, 5}}, - {ColumnIndex: 2, DataColumn: []byte{6, 7, 8}}, - }, + []util.DataColumnParam{ + {Slot: 1, Index: 12, Column: [][]byte{{1}, {2}, {3}}}, // OK if duplicate + {Slot: 1, Index: 15, Column: [][]byte{{2}, {3}, {4}}}, + {Slot: 1, Index: 1, Column: [][]byte{{2}, {3}, {4}}}, + {Slot: 3, Index: 6, Column: [][]byte{{3}, {4}, {5}}}, + {Slot: 3, Index: 2, Column: [][]byte{{6}, {7}, {8}}}, }, ) @@ -240,51 +226,47 @@ func TestSaveDataColumnsSidecars(t *testing.T) { type fixture struct { fileName string - blockRoot [fieldparams.RootLength]byte expectedIndices [mandatoryNumberOfColumns]byte - dataColumnParams []util.DataColumnParams + dataColumnParams []util.DataColumnParam } fixtures := []fixture{ { - fileName: "0/0/0x0100000000000000000000000000000000000000000000000000000000000000.sszs", - blockRoot: [fieldparams.RootLength]byte{1}, + fileName: "0/0/0x8bb2f09de48c102635622dc27e6de03ae2b22639df7c33edbc8222b2ec423746.sszs", expectedIndices: [mandatoryNumberOfColumns]byte{ 0, nonZeroOffset + 4, 0, 0, 0, 0, 0, 0, 0, 0, 0, nonZeroOffset + 1, nonZeroOffset, nonZeroOffset + 2, 0, nonZeroOffset + 3, // The rest is filled with zeroes. }, - dataColumnParams: []util.DataColumnParams{ - {ColumnIndex: 12, DataColumn: []byte{1, 2, 3}}, - {ColumnIndex: 11, DataColumn: []byte{3, 4, 5}}, - {ColumnIndex: 13, DataColumn: []byte{6, 7, 8}}, - {ColumnIndex: 15, DataColumn: []byte{2, 3, 4}}, - {ColumnIndex: 1, DataColumn: []byte{2, 3, 4}}, + dataColumnParams: []util.DataColumnParam{ + {Slot: 1, Index: 12, Column: [][]byte{{1}, {2}, {3}}}, + {Slot: 1, Index: 11, Column: [][]byte{{3}, {4}, {5}}}, + {Slot: 1, Index: 13, Column: [][]byte{{6}, {7}, {8}}}, + {Slot: 1, Index: 15, Column: [][]byte{{2}, {3}, {4}}}, + {Slot: 1, Index: 1, Column: [][]byte{{2}, {3}, {4}}}, }, }, { - fileName: "0/0/0x0200000000000000000000000000000000000000000000000000000000000000.sszs", - blockRoot: [fieldparams.RootLength]byte{2}, + fileName: "0/0/0x221f88cae2219050d4e9d8c2d0d83cb4c8ce4c84ab1bb3e0b89f3dec36077c4f.sszs", expectedIndices: [mandatoryNumberOfColumns]byte{ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, nonZeroOffset, nonZeroOffset + 1, 0, 0, // The rest is filled with zeroes. }, - dataColumnParams: []util.DataColumnParams{ - {ColumnIndex: 12, DataColumn: []byte{3, 4, 5}}, - {ColumnIndex: 13, DataColumn: []byte{6, 7, 8}}, + dataColumnParams: []util.DataColumnParam{ + {Slot: 2, Index: 12, Column: [][]byte{{3}, {4}, {5}}}, + {Slot: 2, Index: 13, Column: [][]byte{{6}, {7}, {8}}}, }, }, { - fileName: "0/0/0x0300000000000000000000000000000000000000000000000000000000000000.sszs", - blockRoot: [fieldparams.RootLength]byte{3}, + fileName: "0/0/0x7b163bd57e1c4c8b5048c5389698098f4c957d62d7ce86f4ffa9bdc75c16a18b.sszs", expectedIndices: [mandatoryNumberOfColumns]byte{ 0, 0, nonZeroOffset + 1, 0, 0, 0, nonZeroOffset, 0, // The rest is filled with zeroes. }, - dataColumnParams: []util.DataColumnParams{ - {ColumnIndex: 6, DataColumn: []byte{3, 4, 5}}, - {ColumnIndex: 2, DataColumn: []byte{6, 7, 8}}, + dataColumnParams: []util.DataColumnParam{ + {Slot: 3, Index: 6, Column: [][]byte{{3}, {4}, {5}}}, + {Slot: 3, Index: 2, Column: [][]byte{{6}, {7}, {8}}}, }, }, } @@ -293,7 +275,7 @@ func TestSaveDataColumnsSidecars(t *testing.T) { // Build expected data column sidecars. _, expectedDataColumnSidecars := util.CreateTestVerifiedRoDataColumnSidecars( t, - util.DataColumnsParamsByRoot{fixture.blockRoot: fixture.dataColumnParams}, + fixture.dataColumnParams, ) // Build expected bytes. @@ -320,6 +302,8 @@ func TestSaveDataColumnsSidecars(t *testing.T) { expectedBytes = append(expectedBytes, fixture.expectedIndices[:]...) expectedBytes = append(expectedBytes, sszEncodedDataColumnSidecars...) + blockRoot := expectedDataColumnSidecars[0].BlockRoot() + // Check the actual content of the file. actualBytes, err := afero.ReadFile(dataColumnStorage.fs, fixture.fileName) require.NoError(t, err) @@ -328,18 +312,18 @@ func TestSaveDataColumnsSidecars(t *testing.T) { // Check the summary. indices := map[uint64]bool{} for _, dataColumnParam := range fixture.dataColumnParams { - indices[dataColumnParam.ColumnIndex] = true + indices[dataColumnParam.Index] = true } - summary := dataColumnStorage.Summary(fixture.blockRoot) + summary := dataColumnStorage.Summary(blockRoot) for index := range uint64(mandatoryNumberOfColumns) { require.Equal(t, indices[index], summary.HasIndex(index)) } - err = dataColumnStorage.Remove(fixture.blockRoot) + err = dataColumnStorage.Remove(blockRoot) require.NoError(t, err) - summary = dataColumnStorage.Summary(fixture.blockRoot) + summary = dataColumnStorage.Summary(blockRoot) for index := range uint64(mandatoryNumberOfColumns) { require.Equal(t, false, summary.HasIndex(index)) } @@ -362,11 +346,9 @@ func TestGetDataColumnSidecars(t *testing.T) { t.Run("indices not found", func(t *testing.T) { _, savedVerifiedRoDataColumnSidecars := util.CreateTestVerifiedRoDataColumnSidecars( t, - util.DataColumnsParamsByRoot{ - {1}: { - {ColumnIndex: 12, DataColumn: []byte{1, 2, 3}}, - {ColumnIndex: 14, DataColumn: []byte{2, 3, 4}}, - }, + []util.DataColumnParam{ + {Index: 12, Column: [][]byte{{1}, {2}, {3}}}, + {Index: 14, Column: [][]byte{{2}, {3}, {4}}}, }, ) @@ -374,7 +356,7 @@ func TestGetDataColumnSidecars(t *testing.T) { err := dataColumnStorage.Save(savedVerifiedRoDataColumnSidecars) require.NoError(t, err) - verifiedRODataColumnSidecars, err := dataColumnStorage.Get([fieldparams.RootLength]byte{1}, []uint64{3, 1, 2}) + verifiedRODataColumnSidecars, err := dataColumnStorage.Get(savedVerifiedRoDataColumnSidecars[0].BlockRoot(), []uint64{3, 1, 2}) require.NoError(t, err) require.Equal(t, 0, len(verifiedRODataColumnSidecars)) }) @@ -382,11 +364,9 @@ func TestGetDataColumnSidecars(t *testing.T) { t.Run("nominal", func(t *testing.T) { _, expectedVerifiedRoDataColumnSidecars := util.CreateTestVerifiedRoDataColumnSidecars( t, - util.DataColumnsParamsByRoot{ - {1}: { - {ColumnIndex: 12, DataColumn: []byte{1, 2, 3}}, - {ColumnIndex: 14, DataColumn: []byte{2, 3, 4}}, - }, + []util.DataColumnParam{ + {Index: 12, Column: [][]byte{{1}, {2}, {3}}}, + {Index: 14, Column: [][]byte{{2}, {3}, {4}}}, }, ) @@ -394,11 +374,13 @@ func TestGetDataColumnSidecars(t *testing.T) { err := dataColumnStorage.Save(expectedVerifiedRoDataColumnSidecars) require.NoError(t, err) - verifiedRODataColumnSidecars, err := dataColumnStorage.Get([fieldparams.RootLength]byte{1}, nil) + root := expectedVerifiedRoDataColumnSidecars[0].BlockRoot() + + verifiedRODataColumnSidecars, err := dataColumnStorage.Get(root, nil) require.NoError(t, err) require.DeepSSZEqual(t, expectedVerifiedRoDataColumnSidecars, verifiedRODataColumnSidecars) - verifiedRODataColumnSidecars, err = dataColumnStorage.Get([fieldparams.RootLength]byte{1}, []uint64{12, 13, 14}) + verifiedRODataColumnSidecars, err = dataColumnStorage.Get(root, []uint64{12, 13, 14}) require.NoError(t, err) require.DeepSSZEqual(t, expectedVerifiedRoDataColumnSidecars, verifiedRODataColumnSidecars) }) @@ -414,15 +396,11 @@ func TestRemove(t *testing.T) { t.Run("nominal", func(t *testing.T) { _, inputVerifiedRoDataColumnSidecars := util.CreateTestVerifiedRoDataColumnSidecars( t, - util.DataColumnsParamsByRoot{ - {1}: { - {Slot: 32, ColumnIndex: 10, DataColumn: []byte{1, 2, 3}}, - {Slot: 32, ColumnIndex: 11, DataColumn: []byte{2, 3, 4}}, - }, - {2}: { - {Slot: 33, ColumnIndex: 10, DataColumn: []byte{1, 2, 3}}, - {Slot: 33, ColumnIndex: 11, DataColumn: []byte{2, 3, 4}}, - }, + []util.DataColumnParam{ + {Slot: 32, Index: 10, Column: [][]byte{{1}, {2}, {3}}}, + {Slot: 32, Index: 11, Column: [][]byte{{2}, {3}, {4}}}, + {Slot: 33, Index: 10, Column: [][]byte{{1}, {2}, {3}}}, + {Slot: 33, Index: 11, Column: [][]byte{{2}, {3}, {4}}}, }, ) @@ -430,22 +408,22 @@ func TestRemove(t *testing.T) { err := dataColumnStorage.Save(inputVerifiedRoDataColumnSidecars) require.NoError(t, err) - err = dataColumnStorage.Remove([fieldparams.RootLength]byte{1}) + err = dataColumnStorage.Remove(inputVerifiedRoDataColumnSidecars[0].BlockRoot()) require.NoError(t, err) - summary := dataColumnStorage.Summary([fieldparams.RootLength]byte{1}) + summary := dataColumnStorage.Summary(inputVerifiedRoDataColumnSidecars[0].BlockRoot()) require.Equal(t, primitives.Epoch(0), summary.epoch) require.Equal(t, uint64(0), summary.Count()) - summary = dataColumnStorage.Summary([fieldparams.RootLength]byte{2}) + summary = dataColumnStorage.Summary(inputVerifiedRoDataColumnSidecars[3].BlockRoot()) require.Equal(t, primitives.Epoch(1), summary.epoch) require.Equal(t, uint64(2), summary.Count()) - actual, err := dataColumnStorage.Get([fieldparams.RootLength]byte{1}, nil) + actual, err := dataColumnStorage.Get(inputVerifiedRoDataColumnSidecars[0].BlockRoot(), nil) require.NoError(t, err) require.Equal(t, 0, len(actual)) - actual, err = dataColumnStorage.Get([fieldparams.RootLength]byte{2}, nil) + actual, err = dataColumnStorage.Get(inputVerifiedRoDataColumnSidecars[3].BlockRoot(), nil) require.NoError(t, err) require.Equal(t, 2, len(actual)) }) @@ -454,9 +432,9 @@ func TestRemove(t *testing.T) { func TestClear(t *testing.T) { _, inputVerifiedRoDataColumnSidecars := util.CreateTestVerifiedRoDataColumnSidecars( t, - util.DataColumnsParamsByRoot{ - {1}: {{ColumnIndex: 12, DataColumn: []byte{1, 2, 3}}}, - {2}: {{ColumnIndex: 13, DataColumn: []byte{6, 7, 8}}}, + []util.DataColumnParam{ + {Slot: 1, Index: 12, Column: [][]byte{{1}, {2}, {3}}}, + {Slot: 2, Index: 13, Column: [][]byte{{6}, {7}, {8}}}, }, ) @@ -465,8 +443,8 @@ func TestClear(t *testing.T) { require.NoError(t, err) filePaths := []string{ - "0/0/0x0100000000000000000000000000000000000000000000000000000000000000.sszs", - "0/0/0x0200000000000000000000000000000000000000000000000000000000000000.sszs", + "0/0/0x8bb2f09de48c102635622dc27e6de03ae2b22639df7c33edbc8222b2ec423746.sszs", + "0/0/0x221f88cae2219050d4e9d8c2d0d83cb4c8ce4c84ab1bb3e0b89f3dec36077c4f.sszs", } for _, filePath := range filePaths { @@ -492,8 +470,8 @@ func TestMetadata(t *testing.T) { t.Run("wrong version", func(t *testing.T) { _, verifiedRoDataColumnSidecars := util.CreateTestVerifiedRoDataColumnSidecars( t, - util.DataColumnsParamsByRoot{ - {1}: {{ColumnIndex: 12, DataColumn: []byte{1, 2, 3}}}, + []util.DataColumnParam{ + {Slot: 1, Index: 12, Column: [][]byte{{1}, {2}, {3}}}, }, ) @@ -503,7 +481,7 @@ func TestMetadata(t *testing.T) { require.NoError(t, err) // Alter the version. - const filePath = "0/0/0x0100000000000000000000000000000000000000000000000000000000000000.sszs" + const filePath = "0/0/0x8bb2f09de48c102635622dc27e6de03ae2b22639df7c33edbc8222b2ec423746.sszs" file, err := dataColumnStorage.fs.OpenFile(filePath, os.O_WRONLY, os.FileMode(0600)) require.NoError(t, err) @@ -643,31 +621,19 @@ func TestPrune(t *testing.T) { } _, verifiedRoDataColumnSidecars := util.CreateTestVerifiedRoDataColumnSidecars( t, - util.DataColumnsParamsByRoot{ - {0}: { - {Slot: 33, ColumnIndex: 2, DataColumn: []byte{1, 2, 3}}, // Period 0 - Epoch 1 - {Slot: 33, ColumnIndex: 4, DataColumn: []byte{2, 3, 4}}, // Period 0 - Epoch 1 - }, - {1}: { - {Slot: 128_002, ColumnIndex: 2, DataColumn: []byte{1, 2, 3}}, // Period 0 - Epoch 4000 - {Slot: 128_002, ColumnIndex: 4, DataColumn: []byte{2, 3, 4}}, // Period 0 - Epoch 4000 - }, - {2}: { - {Slot: 128_003, ColumnIndex: 1, DataColumn: []byte{1, 2, 3}}, // Period 0 - Epoch 4000 - {Slot: 128_003, ColumnIndex: 3, DataColumn: []byte{2, 3, 4}}, // Period 0 - Epoch 4000 - }, - {3}: { - {Slot: 131_138, ColumnIndex: 2, DataColumn: []byte{1, 2, 3}}, // Period 1 - Epoch 4098 - {Slot: 131_138, ColumnIndex: 3, DataColumn: []byte{1, 2, 3}}, // Period 1 - Epoch 4098 - }, - {4}: { - {Slot: 131_169, ColumnIndex: 2, DataColumn: []byte{1, 2, 3}}, // Period 1 - Epoch 4099 - {Slot: 131_169, ColumnIndex: 3, DataColumn: []byte{1, 2, 3}}, // Period 1 - Epoch 4099 - }, - {5}: { - {Slot: 262_144, ColumnIndex: 2, DataColumn: []byte{1, 2, 3}}, // Period 2 - Epoch 8192 - {Slot: 262_144, ColumnIndex: 3, DataColumn: []byte{1, 2, 3}}, // Period 2 - Epoch 8292 - }, + []util.DataColumnParam{ + {Slot: 33, Index: 2, Column: [][]byte{{1}, {2}, {3}}}, // Period 0 - Epoch 1 + {Slot: 33, Index: 4, Column: [][]byte{{2}, {3}, {4}}}, // Period 0 - Epoch 1 + {Slot: 128_002, Index: 2, Column: [][]byte{{1}, {2}, {3}}}, // Period 0 - Epoch 4000 + {Slot: 128_002, Index: 4, Column: [][]byte{{2}, {3}, {4}}}, // Period 0 - Epoch 4000 + {Slot: 128_003, Index: 1, Column: [][]byte{{1}, {2}, {3}}}, // Period 0 - Epoch 4000 + {Slot: 128_003, Index: 3, Column: [][]byte{{2}, {3}, {4}}}, // Period 0 - Epoch 4000 + {Slot: 131_138, Index: 2, Column: [][]byte{{1}, {2}, {3}}}, // Period 1 - Epoch 4098 + {Slot: 131_138, Index: 3, Column: [][]byte{{1}, {2}, {3}}}, // Period 1 - Epoch 4098 + {Slot: 131_169, Index: 2, Column: [][]byte{{1}, {2}, {3}}}, // Period 1 - Epoch 4099 + {Slot: 131_169, Index: 3, Column: [][]byte{{1}, {2}, {3}}}, // Period 1 - Epoch 4099 + {Slot: 262_144, Index: 2, Column: [][]byte{{1}, {2}, {3}}}, // Period 2 - Epoch 8192 + {Slot: 262_144, Index: 3, Column: [][]byte{{1}, {2}, {3}}}, // Period 2 - Epoch 8292 }, ) @@ -696,31 +662,31 @@ func TestPrune(t *testing.T) { dirs, err = listDir(dataColumnStorage.fs, "0/1") require.NoError(t, err) - require.Equal(t, true, compareSlices([]string{"0x0000000000000000000000000000000000000000000000000000000000000000.sszs"}, dirs)) + require.Equal(t, true, compareSlices([]string{"0x775283f428813c949b7e8af07f01fef9790137f021b3597ad2d0d81e8be8f0f0.sszs"}, dirs)) dirs, err = listDir(dataColumnStorage.fs, "0/4000") require.NoError(t, err) require.Equal(t, true, compareSlices([]string{ - "0x0200000000000000000000000000000000000000000000000000000000000000.sszs", - "0x0100000000000000000000000000000000000000000000000000000000000000.sszs", + "0x9977031132157ebb9c81bce952003ce07a4f54e921ca63b7693d1562483fdf9f.sszs", + "0xb2b14d9d858fa99b70f0405e4e39f38e51e36dd9a70343c109e24eeb5f77e369.sszs", }, dirs)) dirs, err = listDir(dataColumnStorage.fs, "1/4098") require.NoError(t, err) - require.Equal(t, true, compareSlices([]string{"0x0300000000000000000000000000000000000000000000000000000000000000.sszs"}, dirs)) + require.Equal(t, true, compareSlices([]string{"0x5106745cdd6b1aa3602ef4d000ef373af672019264c167fa4bd641a1094aa5c5.sszs"}, dirs)) dirs, err = listDir(dataColumnStorage.fs, "1/4099") require.NoError(t, err) - require.Equal(t, true, compareSlices([]string{"0x0400000000000000000000000000000000000000000000000000000000000000.sszs"}, dirs)) + require.Equal(t, true, compareSlices([]string{"0x4e5f2bd5bb84bf0422af8edd1cc5a52cc6cea85baf3d66d172fe41831ac1239c.sszs"}, dirs)) dirs, err = listDir(dataColumnStorage.fs, "2/8192") require.NoError(t, err) - require.Equal(t, true, compareSlices([]string{"0x0500000000000000000000000000000000000000000000000000000000000000.sszs"}, dirs)) + require.Equal(t, true, compareSlices([]string{"0xa8adba7446eb56a01a9dd6d55e9c3990b10c91d43afb77847b4a21ac4ee62527.sszs"}, dirs)) _, verifiedRoDataColumnSidecars = util.CreateTestVerifiedRoDataColumnSidecars( t, - util.DataColumnsParamsByRoot{ - {6}: {{Slot: 451_141, ColumnIndex: 2, DataColumn: []byte{1, 2, 3}}}, // Period 3 - Epoch 14_098 + []util.DataColumnParam{ + {Slot: 451_141, Index: 2, Column: [][]byte{{1}, {2}, {3}}}, // Period 3 - Epoch 14_098 }, ) @@ -748,14 +714,14 @@ func TestPrune(t *testing.T) { dirs, err = listDir(dataColumnStorage.fs, "1/4099") require.NoError(t, err) - require.Equal(t, true, compareSlices([]string{"0x0400000000000000000000000000000000000000000000000000000000000000.sszs"}, dirs)) + require.Equal(t, true, compareSlices([]string{"0x4e5f2bd5bb84bf0422af8edd1cc5a52cc6cea85baf3d66d172fe41831ac1239c.sszs"}, dirs)) dirs, err = listDir(dataColumnStorage.fs, "2/8192") require.NoError(t, err) - require.Equal(t, true, compareSlices([]string{"0x0500000000000000000000000000000000000000000000000000000000000000.sszs"}, dirs)) + require.Equal(t, true, compareSlices([]string{"0xa8adba7446eb56a01a9dd6d55e9c3990b10c91d43afb77847b4a21ac4ee62527.sszs"}, dirs)) dirs, err = listDir(dataColumnStorage.fs, "3/14098") require.NoError(t, err) - require.Equal(t, true, compareSlices([]string{"0x0600000000000000000000000000000000000000000000000000000000000000.sszs"}, dirs)) + require.Equal(t, true, compareSlices([]string{"0x0de28a18cae63cbc6f0b20dc1afb0b1df38da40824a5f09f92d485ade04de97f.sszs"}, dirs)) }) } diff --git a/beacon-chain/node/node.go b/beacon-chain/node/node.go index d7998618b7..cc99461f19 100644 --- a/beacon-chain/node/node.go +++ b/beacon-chain/node/node.go @@ -87,45 +87,47 @@ type serviceFlagOpts struct { // full PoS node. It handles the lifecycle of the entire system and registers // services to a service registry. type BeaconNode struct { - cliCtx *cli.Context - ctx context.Context - cancel context.CancelFunc - services *runtime.ServiceRegistry - lock sync.RWMutex - stop chan struct{} // Channel to wait for termination notifications. - db db.Database - slasherDB db.SlasherDatabase - attestationCache *cache.AttestationCache - attestationPool attestations.Pool - exitPool voluntaryexits.PoolManager - slashingsPool slashings.PoolManager - syncCommitteePool synccommittee.Pool - blsToExecPool blstoexec.PoolManager - depositCache cache.DepositCache - trackedValidatorsCache *cache.TrackedValidatorsCache - payloadIDCache *cache.PayloadIDCache - stateFeed *event.Feed - blockFeed *event.Feed - opFeed *event.Feed - stateGen *stategen.State - collector *bcnodeCollector - slasherBlockHeadersFeed *event.Feed - slasherAttestationsFeed *event.Feed - finalizedStateAtStartUp state.BeaconState - serviceFlagOpts *serviceFlagOpts - GenesisInitializer genesis.Initializer - CheckpointInitializer checkpoint.Initializer - forkChoicer forkchoice.ForkChoicer - clockWaiter startup.ClockWaiter - BackfillOpts []backfill.ServiceOption - initialSyncComplete chan struct{} - BlobStorage *filesystem.BlobStorage - BlobStorageOptions []filesystem.BlobStorageOption - custodyInfo *peerdas.CustodyInfo - verifyInitWaiter *verification.InitializerWaiter - syncChecker *initialsync.SyncChecker - slasherEnabled bool - lcStore *lightclient.Store + cliCtx *cli.Context + ctx context.Context + cancel context.CancelFunc + services *runtime.ServiceRegistry + lock sync.RWMutex + stop chan struct{} // Channel to wait for termination notifications. + db db.Database + slasherDB db.SlasherDatabase + attestationCache *cache.AttestationCache + attestationPool attestations.Pool + exitPool voluntaryexits.PoolManager + slashingsPool slashings.PoolManager + syncCommitteePool synccommittee.Pool + blsToExecPool blstoexec.PoolManager + depositCache cache.DepositCache + trackedValidatorsCache *cache.TrackedValidatorsCache + payloadIDCache *cache.PayloadIDCache + stateFeed *event.Feed + blockFeed *event.Feed + opFeed *event.Feed + stateGen *stategen.State + collector *bcnodeCollector + slasherBlockHeadersFeed *event.Feed + slasherAttestationsFeed *event.Feed + finalizedStateAtStartUp state.BeaconState + serviceFlagOpts *serviceFlagOpts + GenesisInitializer genesis.Initializer + CheckpointInitializer checkpoint.Initializer + forkChoicer forkchoice.ForkChoicer + clockWaiter startup.ClockWaiter + BackfillOpts []backfill.ServiceOption + initialSyncComplete chan struct{} + BlobStorage *filesystem.BlobStorage + BlobStorageOptions []filesystem.BlobStorageOption + DataColumnStorage *filesystem.DataColumnStorage + DataColumnStorageOptions []filesystem.DataColumnStorageOption + custodyInfo *peerdas.CustodyInfo + verifyInitWaiter *verification.InitializerWaiter + syncChecker *initialsync.SyncChecker + slasherEnabled bool + lcStore *lightclient.Store } // New creates a new node instance, sets up configuration options, and registers @@ -193,6 +195,15 @@ func New(cliCtx *cli.Context, cancel context.CancelFunc, opts ...Option) (*Beaco beacon.BlobStorage = blobs } + if beacon.DataColumnStorage == nil { + dataColumnStorage, err := filesystem.NewDataColumnStorage(cliCtx.Context, beacon.DataColumnStorageOptions...) + if err != nil { + return nil, errors.Wrap(err, "new data column storage") + } + + beacon.DataColumnStorage = dataColumnStorage + } + bfs, err := startBaseServices(cliCtx, beacon, depositAddress) if err != nil { return nil, errors.Wrap(err, "could not start modules") @@ -780,6 +791,7 @@ func (b *BeaconNode) registerBlockchainService(fc forkchoice.ForkChoicer, gs *st blockchain.WithClockSynchronizer(gs), blockchain.WithSyncComplete(syncComplete), blockchain.WithBlobStorage(b.BlobStorage), + blockchain.WithDataColumnStorage(b.DataColumnStorage), blockchain.WithTrackedValidatorsCache(b.trackedValidatorsCache), blockchain.WithPayloadIDCache(b.payloadIDCache), blockchain.WithSyncChecker(b.syncChecker), @@ -868,6 +880,7 @@ func (b *BeaconNode) registerSyncService(initialSyncComplete chan struct{}, bFil regularsync.WithInitialSyncComplete(initialSyncComplete), regularsync.WithStateNotifier(b), regularsync.WithBlobStorage(b.BlobStorage), + regularsync.WithDataColumnStorage(b.DataColumnStorage), regularsync.WithVerifierWaiter(b.verifyInitWaiter), regularsync.WithAvailableBlocker(bFillStore), regularsync.WithSlasherEnabled(b.slasherEnabled), diff --git a/beacon-chain/node/node_test.go b/beacon-chain/node/node_test.go index 139b96f95b..384f004162 100644 --- a/beacon-chain/node/node_test.go +++ b/beacon-chain/node/node_test.go @@ -54,7 +54,12 @@ func TestNodeClose_OK(t *testing.T) { cmd.ValidatorMonitorIndicesFlag.Value.SetInt(1) ctx, cancel := newCliContextWithCancel(&app, set) - node, err := New(ctx, cancel, WithBlobStorage(filesystem.NewEphemeralBlobStorage(t))) + options := []Option{ + WithBlobStorage(filesystem.NewEphemeralBlobStorage(t)), + WithDataColumnStorage(filesystem.NewEphemeralDataColumnStorage(t)), + } + + node, err := New(ctx, cancel, options...) require.NoError(t, err) node.Close() @@ -72,10 +77,16 @@ func TestNodeStart_Ok(t *testing.T) { require.NoError(t, set.Set("suggested-fee-recipient", "0x6e35733c5af9B61374A128e6F85f553aF09ff89A")) ctx, cancel := newCliContextWithCancel(&app, set) - node, err := New(ctx, cancel, WithBlockchainFlagOptions([]blockchain.Option{}), + + options := []Option{ + WithBlockchainFlagOptions([]blockchain.Option{}), WithBuilderFlagOptions([]builder.Option{}), WithExecutionChainOptions([]execution.Option{}), - WithBlobStorage(filesystem.NewEphemeralBlobStorage(t))) + WithBlobStorage(filesystem.NewEphemeralBlobStorage(t)), + WithDataColumnStorage(filesystem.NewEphemeralDataColumnStorage(t)), + } + + node, err := New(ctx, cancel, options...) require.NoError(t, err) node.services = &runtime.ServiceRegistry{} go func() { @@ -96,10 +107,16 @@ func TestNodeStart_SyncChecker(t *testing.T) { require.NoError(t, set.Set("suggested-fee-recipient", "0x6e35733c5af9B61374A128e6F85f553aF09ff89A")) ctx, cancel := newCliContextWithCancel(&app, set) - node, err := New(ctx, cancel, WithBlockchainFlagOptions([]blockchain.Option{}), + + options := []Option{ + WithBlockchainFlagOptions([]blockchain.Option{}), WithBuilderFlagOptions([]builder.Option{}), WithExecutionChainOptions([]execution.Option{}), - WithBlobStorage(filesystem.NewEphemeralBlobStorage(t))) + WithBlobStorage(filesystem.NewEphemeralBlobStorage(t)), + WithDataColumnStorage(filesystem.NewEphemeralDataColumnStorage(t)), + } + + node, err := New(ctx, cancel, options...) require.NoError(t, err) go func() { node.Start() @@ -128,10 +145,13 @@ func TestClearDB(t *testing.T) { set.String("suggested-fee-recipient", "0x6e35733c5af9B61374A128e6F85f553aF09ff89A", "fee recipient") require.NoError(t, set.Set("suggested-fee-recipient", "0x6e35733c5af9B61374A128e6F85f553aF09ff89A")) context, cancel := newCliContextWithCancel(&app, set) + options := []Option{ WithExecutionChainOptions([]execution.Option{execution.WithHttpEndpoint(endpoint)}), WithBlobStorage(filesystem.NewEphemeralBlobStorage(t)), + WithDataColumnStorage(filesystem.NewEphemeralDataColumnStorage(t)), } + _, err = New(context, cancel, options...) require.NoError(t, err) require.LogsContain(t, hook, "Removing database") diff --git a/beacon-chain/node/options.go b/beacon-chain/node/options.go index e324ccbcc9..052d759bb1 100644 --- a/beacon-chain/node/options.go +++ b/beacon-chain/node/options.go @@ -50,3 +50,20 @@ func WithBlobStorageOptions(opt ...filesystem.BlobStorageOption) Option { return nil } } + +// WithDataColumnStorage sets the DataColumnStorage backend for the BeaconNode +func WithDataColumnStorage(bs *filesystem.DataColumnStorage) Option { + return func(bn *BeaconNode) error { + bn.DataColumnStorage = bs + return nil + } +} + +// WithDataColumnStorageOptions appends 1 or more filesystem.DataColumnStorageOption on the beacon node, +// to be used when initializing data column storage. +func WithDataColumnStorageOptions(opt ...filesystem.DataColumnStorageOption) Option { + return func(bn *BeaconNode) error { + bn.DataColumnStorageOptions = append(bn.DataColumnStorageOptions, opt...) + return nil + } +} diff --git a/beacon-chain/p2p/broadcaster_test.go b/beacon-chain/p2p/broadcaster_test.go index c255d3529e..058b5e489d 100644 --- a/beacon-chain/p2p/broadcaster_test.go +++ b/beacon-chain/p2p/broadcaster_test.go @@ -712,7 +712,7 @@ func TestService_BroadcastDataColumn(t *testing.T) { subnet := peerdas.ComputeSubnetForDataColumnSidecar(columnIndex) topic := fmt.Sprintf(topicFormat, digest, subnet) - roSidecars, _ := util.CreateTestVerifiedRoDataColumnSidecars(t, util.DataColumnsParamsByRoot{{}: {{ColumnIndex: columnIndex}}}) + roSidecars, _ := util.CreateTestVerifiedRoDataColumnSidecars(t, []util.DataColumnParam{{Index: columnIndex}}) sidecar := roSidecars[0].DataColumnSidecar // Async listen for the pubsub, must be before the broadcast. diff --git a/beacon-chain/p2p/rpc_topic_mappings.go b/beacon-chain/p2p/rpc_topic_mappings.go index d4bef88b5c..d533b84e96 100644 --- a/beacon-chain/p2p/rpc_topic_mappings.go +++ b/beacon-chain/p2p/rpc_topic_mappings.go @@ -61,6 +61,12 @@ const LightClientFinalityUpdateName = "/light_client_finality_update" // LightClientOptimisticUpdateName is the name for the LightClientOptimisticUpdate topic. const LightClientOptimisticUpdateName = "/light_client_optimistic_update" +// DataColumnSidecarsByRootName is the name for the DataColumnSidecarsByRoot v1 message topic. +const DataColumnSidecarsByRootName = "/data_column_sidecars_by_root" + +// DataColumnSidecarsByRangeName is the name for the DataColumnSidecarsByRange v1 message topic. +const DataColumnSidecarsByRangeName = "/data_column_sidecars_by_range" + const ( // V1 RPC Topics // RPCStatusTopicV1 defines the v1 topic for the status rpc method. @@ -92,6 +98,9 @@ const ( RPCLightClientFinalityUpdateTopicV1 = protocolPrefix + LightClientFinalityUpdateName + SchemaVersionV1 // RPCLightClientOptimisticUpdateTopicV1 is a topic for requesting a light client Optimistic update. RPCLightClientOptimisticUpdateTopicV1 = protocolPrefix + LightClientOptimisticUpdateName + SchemaVersionV1 + // RPCDataColumnSidecarsByRootTopicV1 is a topic for requesting data column sidecars by their block root. + // /eth2/beacon_chain/req/data_column_sidecars_by_root/1 - New in Fulu. + RPCDataColumnSidecarsByRootTopicV1 = protocolPrefix + DataColumnSidecarsByRootName + SchemaVersionV1 // V2 RPC Topics // RPCBlocksByRangeTopicV2 defines v2 the topic for the blocks by range rpc method. @@ -139,6 +148,9 @@ var RPCTopicMappings = map[string]interface{}{ RPCLightClientUpdatesByRangeTopicV1: new(pb.LightClientUpdatesByRangeRequest), RPCLightClientFinalityUpdateTopicV1: new(interface{}), RPCLightClientOptimisticUpdateTopicV1: new(interface{}), + + // DataColumnSidecarsByRoot v1 Message + RPCDataColumnSidecarsByRootTopicV1: new(p2ptypes.DataColumnsByRootIdentifiers), } // Maps all registered protocol prefixes. @@ -161,6 +173,7 @@ var messageMapping = map[string]bool{ LightClientUpdatesByRangeName: true, LightClientFinalityUpdateName: true, LightClientOptimisticUpdateName: true, + DataColumnSidecarsByRootName: true, } // Maps all the RPC messages which are to updated in altair. diff --git a/beacon-chain/p2p/types/rpc_errors.go b/beacon-chain/p2p/types/rpc_errors.go index de99e0ecba..bb641ec1f4 100644 --- a/beacon-chain/p2p/types/rpc_errors.go +++ b/beacon-chain/p2p/types/rpc_errors.go @@ -9,10 +9,13 @@ var ( ErrInvalidSequenceNum = errors.New("invalid sequence number provided") ErrGeneric = errors.New("internal service error") - ErrRateLimited = errors.New("rate limited") - ErrIODeadline = errors.New("i/o deadline exceeded") - ErrInvalidRequest = errors.New("invalid range, step or count") - ErrBlobLTMinRequest = errors.New("blob epoch < minimum_request_epoch") - ErrMaxBlobReqExceeded = errors.New("requested more than MAX_REQUEST_BLOB_SIDECARS") + ErrRateLimited = errors.New("rate limited") + ErrIODeadline = errors.New("i/o deadline exceeded") + ErrInvalidRequest = errors.New("invalid range, step or count") + ErrBlobLTMinRequest = errors.New("blob epoch < minimum_request_epoch") + + ErrMaxBlobReqExceeded = errors.New("requested more than MAX_REQUEST_BLOB_SIDECARS") + ErrMaxDataColumnReqExceeded = errors.New("requested more than MAX_REQUEST_DATA_COLUMN_SIDECARS") + ErrResourceUnavailable = errors.New("resource requested unavailable") ) diff --git a/beacon-chain/sync/BUILD.bazel b/beacon-chain/sync/BUILD.bazel index 42666faf02..d5323ba128 100644 --- a/beacon-chain/sync/BUILD.bazel +++ b/beacon-chain/sync/BUILD.bazel @@ -25,6 +25,7 @@ go_library( "rpc_blob_sidecars_by_range.go", "rpc_blob_sidecars_by_root.go", "rpc_chunked_response.go", + "rpc_data_column_sidecars_by_root.go", "rpc_goodbye.go", "rpc_light_client.go", "rpc_metadata.go", @@ -170,6 +171,7 @@ go_test( "rpc_beacon_blocks_by_root_test.go", "rpc_blob_sidecars_by_range_test.go", "rpc_blob_sidecars_by_root_test.go", + "rpc_data_column_sidecars_by_root_test.go", "rpc_goodbye_test.go", "rpc_handler_test.go", "rpc_light_client_test.go", diff --git a/beacon-chain/sync/error.go b/beacon-chain/sync/error.go index b2992ba62a..301cdb9b49 100644 --- a/beacon-chain/sync/error.go +++ b/beacon-chain/sync/error.go @@ -15,13 +15,17 @@ import ( "github.com/sirupsen/logrus" ) -var ErrNoValidDigest = errors.New("no valid digest matched") -var ErrUnrecognizedVersion = errors.New("cannot determine context bytes for unrecognized object") +var ( + ErrNoValidDigest = errors.New("no valid digest matched") + ErrUnrecognizedVersion = errors.New("cannot determine context bytes for unrecognized object") +) -var responseCodeSuccess = byte(0x00) -var responseCodeInvalidRequest = byte(0x01) -var responseCodeServerError = byte(0x02) -var responseCodeResourceUnavailable = byte(0x03) +var ( + responseCodeSuccess = byte(0x00) + responseCodeInvalidRequest = byte(0x01) + responseCodeServerError = byte(0x02) + responseCodeResourceUnavailable = byte(0x03) +) func (s *Service) generateErrorResponse(code byte, reason string) ([]byte, error) { return createErrorResponse(code, reason, s.cfg.p2p) diff --git a/beacon-chain/sync/options.go b/beacon-chain/sync/options.go index d599f08307..ac0fa2e354 100644 --- a/beacon-chain/sync/options.go +++ b/beacon-chain/sync/options.go @@ -173,6 +173,14 @@ func WithBlobStorage(b *filesystem.BlobStorage) Option { } } +// WithDataColumnStorage gives the sync package direct access to DataColumnStorage. +func WithDataColumnStorage(b *filesystem.DataColumnStorage) Option { + return func(s *Service) error { + s.cfg.dataColumnStorage = b + return nil + } +} + // WithVerifierWaiter gives the sync package direct access to the verifier waiter. func WithVerifierWaiter(v *verification.InitializerWaiter) Option { return func(s *Service) error { diff --git a/beacon-chain/sync/rpc.go b/beacon-chain/sync/rpc.go index a25125bd80..b6259a6719 100644 --- a/beacon-chain/sync/rpc.go +++ b/beacon-chain/sync/rpc.go @@ -39,6 +39,21 @@ type rpcHandler func(context.Context, interface{}, libp2pcore.Stream) error // rpcHandlerByTopicFromFork returns the RPC handlers for a given fork index. func (s *Service) rpcHandlerByTopicFromFork(forkIndex int) (map[string]rpcHandler, error) { + // Fulu: https://github.com/ethereum/consensus-specs/blob/dev/specs/fulu/p2p-interface.md#messages + if forkIndex >= version.Fulu { + return map[string]rpcHandler{ + p2p.RPCStatusTopicV1: s.statusRPCHandler, + p2p.RPCGoodByeTopicV1: s.goodbyeRPCHandler, + p2p.RPCBlocksByRangeTopicV2: s.beaconBlocksByRangeRPCHandler, + p2p.RPCBlocksByRootTopicV2: s.beaconBlocksRootRPCHandler, + p2p.RPCPingTopicV1: s.pingHandler, + p2p.RPCMetaDataTopicV3: s.metaDataHandler, // Modified in Fulu + p2p.RPCBlobSidecarsByRootTopicV1: s.blobSidecarByRootRPCHandler, + p2p.RPCBlobSidecarsByRangeTopicV1: s.blobSidecarsByRangeRPCHandler, + p2p.RPCDataColumnSidecarsByRootTopicV1: s.dataColumnSidecarByRootRPCHandler, // Added in Fulu + }, nil + } + // Electra: https://github.com/ethereum/consensus-specs/blob/dev/specs/electra/p2p-interface.md#messages if forkIndex >= version.Electra { return map[string]rpcHandler{ @@ -258,9 +273,15 @@ func (s *Service) registerRPC(baseTopic string, handle rpcHandler) { // since some requests do not have any data in the payload, we // do not decode anything. - if baseTopic == p2p.RPCMetaDataTopicV1 || baseTopic == p2p.RPCMetaDataTopicV2 || - baseTopic == p2p.RPCLightClientOptimisticUpdateTopicV1 || - baseTopic == p2p.RPCLightClientFinalityUpdateTopicV1 { + topics := map[string]bool{ + p2p.RPCMetaDataTopicV1: true, + p2p.RPCMetaDataTopicV2: true, + p2p.RPCMetaDataTopicV3: true, + p2p.RPCLightClientOptimisticUpdateTopicV1: true, + p2p.RPCLightClientFinalityUpdateTopicV1: true, + } + + if topics[baseTopic] { if err := handle(ctx, base, stream); err != nil { messageFailedProcessingCounter.WithLabelValues(topic).Inc() if !errors.Is(err, p2ptypes.ErrWrongForkDigestVersion) { diff --git a/beacon-chain/sync/rpc_chunked_response.go b/beacon-chain/sync/rpc_chunked_response.go index 49d16593a0..4dc8c6f467 100644 --- a/beacon-chain/sync/rpc_chunked_response.go +++ b/beacon-chain/sync/rpc_chunked_response.go @@ -9,6 +9,7 @@ import ( "github.com/OffchainLabs/prysm/v6/consensus-types/blocks" "github.com/OffchainLabs/prysm/v6/consensus-types/interfaces" "github.com/OffchainLabs/prysm/v6/network/forks" + ethpb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1" "github.com/OffchainLabs/prysm/v6/runtime/version" "github.com/OffchainLabs/prysm/v6/time/slots" libp2pcore "github.com/libp2p/go-libp2p/core" @@ -238,3 +239,30 @@ func WriteLightClientFinalityUpdateChunk(stream libp2pcore.Stream, tor blockchai _, err = encoding.EncodeWithMaxLength(stream, update) return err } + +// WriteDataColumnSidecarChunk writes data column chunk object to stream. +// response_chunk ::= | | | +func WriteDataColumnSidecarChunk(stream libp2pcore.Stream, tor blockchain.TemporalOracle, encoding encoder.NetworkEncoding, sidecar *ethpb.DataColumnSidecar) error { + // Success response code. + if _, err := stream.Write([]byte{responseCodeSuccess}); err != nil { + return errors.Wrap(err, "stream write") + } + + // Fork digest. + genesisValidatorsRoot := tor.GenesisValidatorsRoot() + ctxBytes, err := forks.ForkDigestFromEpoch(slots.ToEpoch(sidecar.SignedBlockHeader.Header.Slot), genesisValidatorsRoot[:]) + if err != nil { + return errors.Wrap(err, "fork digest from epoch") + } + + if err := writeContextToStream(ctxBytes[:], stream); err != nil { + return errors.Wrap(err, "write context to stream") + } + + // Sidecar. + if _, err = encoding.EncodeWithMaxLength(stream, sidecar); err != nil { + return errors.Wrap(err, "encode with max length") + } + + return nil +} diff --git a/beacon-chain/sync/rpc_data_column_sidecars_by_root.go b/beacon-chain/sync/rpc_data_column_sidecars_by_root.go new file mode 100644 index 0000000000..ecfc0d0761 --- /dev/null +++ b/beacon-chain/sync/rpc_data_column_sidecars_by_root.go @@ -0,0 +1,174 @@ +package sync + +import ( + "context" + "fmt" + "math" + "slices" + "time" + + "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/types" + "github.com/OffchainLabs/prysm/v6/cmd/beacon-chain/flags" + fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams" + "github.com/OffchainLabs/prysm/v6/config/params" + "github.com/OffchainLabs/prysm/v6/consensus-types/primitives" + "github.com/OffchainLabs/prysm/v6/monitoring/tracing" + "github.com/OffchainLabs/prysm/v6/monitoring/tracing/trace" + "github.com/OffchainLabs/prysm/v6/time/slots" + libp2pcore "github.com/libp2p/go-libp2p/core" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" +) + +var ( + notDataColumnsByRootIdentifiersError = errors.New("not data columns by root identifiers") + tickerDelay = time.Second +) + +// dataColumnSidecarByRootRPCHandler handles the data column sidecars by root RPC request. +// https://github.com/ethereum/consensus-specs/blob/dev/specs/fulu/p2p-interface.md#datacolumnsidecarsbyroot-v1 +func (s *Service) dataColumnSidecarByRootRPCHandler(ctx context.Context, msg interface{}, stream libp2pcore.Stream) error { + ctx, span := trace.StartSpan(ctx, "sync.dataColumnSidecarByRootRPCHandler") + defer span.End() + + batchSize := flags.Get().DataColumnBatchLimit + numberOfColumns := params.BeaconConfig().NumberOfColumns + + // Check if the message type is the one expected. + ref, ok := msg.(*types.DataColumnsByRootIdentifiers) + if !ok { + return notDataColumnsByRootIdentifiersError + } + + requestedColumnIdents := *ref + remotePeerId := stream.Conn().RemotePeer() + + ctx, cancel := context.WithTimeout(ctx, ttfbTimeout) + defer cancel() + + SetRPCStreamDeadlines(stream) + + // Penalize peers that send invalid requests. + if err := validateDataColumnsByRootRequest(requestedColumnIdents); err != nil { + s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(remotePeerId) + s.writeErrorResponseToStream(responseCodeInvalidRequest, err.Error(), stream) + return errors.Wrap(err, "validate data columns by root request") + } + + requestedColumnsByRoot := make(map[[fieldparams.RootLength]byte][]uint64) + for _, columnIdent := range requestedColumnIdents { + var root [fieldparams.RootLength]byte + copy(root[:], columnIdent.BlockRoot) + requestedColumnsByRoot[root] = append(requestedColumnsByRoot[root], columnIdent.Columns...) + } + + // Sort by column index for each root. + for _, columns := range requestedColumnsByRoot { + slices.Sort(columns) + } + + // Format nice logs. + requestedColumnsByRootLog := make(map[string]interface{}) + for root, columns := range requestedColumnsByRoot { + rootStr := fmt.Sprintf("%#x", root) + requestedColumnsByRootLog[rootStr] = "all" + if uint64(len(columns)) != numberOfColumns { + requestedColumnsByRootLog[rootStr] = columns + } + } + + // Compute the oldest slot we'll allow a peer to request, based on the current slot. + minReqSlot, err := dataColumnsRPCMinValidSlot(s.cfg.clock.CurrentSlot()) + if err != nil { + return errors.Wrapf(err, "data columns RPC min valid slot") + } + + log := log.WithFields(logrus.Fields{ + "peer": remotePeerId, + "columns": requestedColumnsByRootLog, + }) + + defer closeStream(stream, log) + + var ticker *time.Ticker + if len(requestedColumnIdents) > batchSize { + ticker = time.NewTicker(tickerDelay) + } + + log.Debug("Serving data column sidecar by root request") + + count := 0 + for root, columns := range requestedColumnsByRoot { + if err := ctx.Err(); err != nil { + closeStream(stream, log) + return errors.Wrap(err, "context error") + } + + // Throttle request processing to no more than batchSize/sec. + for range columns { + if ticker != nil && count != 0 && count%batchSize == 0 { + <-ticker.C + } + + count++ + } + + s.rateLimiter.add(stream, int64(len(columns))) + + // Retrieve the requested sidecars from the store. + verifiedRODataColumns, err := s.cfg.dataColumnStorage.Get(root, columns) + if err != nil { + s.writeErrorResponseToStream(responseCodeServerError, types.ErrGeneric.Error(), stream) + return errors.Wrap(err, "get data column sidecars") + } + + for _, verifiedRODataColumn := range verifiedRODataColumns { + // Filter out data column sidecars that are too old. + if verifiedRODataColumn.SignedBlockHeader.Header.Slot < minReqSlot { + continue + } + + SetStreamWriteDeadline(stream, defaultWriteDuration) + if chunkErr := WriteDataColumnSidecarChunk(stream, s.cfg.chain, s.cfg.p2p.Encoding(), verifiedRODataColumn.DataColumnSidecar); chunkErr != nil { + s.writeErrorResponseToStream(responseCodeServerError, types.ErrGeneric.Error(), stream) + tracing.AnnotateError(span, chunkErr) + return chunkErr + } + } + } + + return nil +} + +// validateDataColumnsByRootRequest checks if the request for data column sidecars is valid. +func validateDataColumnsByRootRequest(colIdents types.DataColumnsByRootIdentifiers) error { + total := uint64(0) + for _, id := range colIdents { + total += uint64(len(id.Columns)) + } + + if total > params.BeaconConfig().MaxRequestDataColumnSidecars { + return types.ErrMaxDataColumnReqExceeded + } + + return nil +} + +// dataColumnsRPCMinValidSlot returns the minimum slot that a peer can request data column sidecars for. +func dataColumnsRPCMinValidSlot(currentSlot primitives.Slot) (primitives.Slot, error) { + // Avoid overflow if we're running on a config where fulu is set to far future epoch. + if !params.FuluEnabled() { + return primitives.Slot(math.MaxUint64), nil + } + + beaconConfig := params.BeaconConfig() + minReqEpochs := beaconConfig.MinEpochsForDataColumnSidecarsRequest + minStartEpoch := beaconConfig.FuluForkEpoch + + currEpoch := slots.ToEpoch(currentSlot) + if currEpoch > minReqEpochs && currEpoch-minReqEpochs > minStartEpoch { + minStartEpoch = currEpoch - minReqEpochs + } + + return slots.EpochStart(minStartEpoch) +} diff --git a/beacon-chain/sync/rpc_data_column_sidecars_by_root_test.go b/beacon-chain/sync/rpc_data_column_sidecars_by_root_test.go new file mode 100644 index 0000000000..aa4ceb477d --- /dev/null +++ b/beacon-chain/sync/rpc_data_column_sidecars_by_root_test.go @@ -0,0 +1,314 @@ +package sync + +import ( + "context" + "io" + "math" + "sync" + "testing" + "time" + + chainMock "github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain/testing" + "github.com/OffchainLabs/prysm/v6/beacon-chain/db/filesystem" + "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p" + p2ptest "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/testing" + "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/types" + "github.com/OffchainLabs/prysm/v6/beacon-chain/startup" + "github.com/OffchainLabs/prysm/v6/cmd/beacon-chain/flags" + fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams" + "github.com/OffchainLabs/prysm/v6/config/params" + "github.com/OffchainLabs/prysm/v6/consensus-types/blocks" + "github.com/OffchainLabs/prysm/v6/consensus-types/primitives" + "github.com/OffchainLabs/prysm/v6/runtime/version" + "github.com/OffchainLabs/prysm/v6/testing/require" + "github.com/OffchainLabs/prysm/v6/testing/util" + "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/core/protocol" + "github.com/pkg/errors" +) + +func TestDataColumnSidecarsByRootRPCHandler(t *testing.T) { + ctx := context.Background() + t.Run("wrong message type", func(t *testing.T) { + service := &Service{} + err := service.dataColumnSidecarByRootRPCHandler(ctx, nil, nil) + require.ErrorIs(t, err, notDataColumnsByRootIdentifiersError) + }) + + t.Run("invalid request", func(t *testing.T) { + params.SetupTestConfigCleanup(t) + beaconConfig := params.BeaconConfig() + beaconConfig.MaxRequestDataColumnSidecars = 1 + params.OverrideBeaconConfig(beaconConfig) + + localP2P := p2ptest.NewTestP2P(t) + service := &Service{cfg: &config{p2p: localP2P}} + + protocolID := protocol.ID(p2p.RPCDataColumnSidecarsByRootTopicV1) + remoteP2P := p2ptest.NewTestP2P(t) + + var wg sync.WaitGroup + wg.Add(1) + + remoteP2P.BHost.SetStreamHandler(protocolID, func(stream network.Stream) { + defer wg.Done() + code, errMsg, err := readStatusCodeNoDeadline(stream, localP2P.Encoding()) + require.NoError(t, err) + require.Equal(t, responseCodeInvalidRequest, code) + require.Equal(t, types.ErrMaxDataColumnReqExceeded.Error(), errMsg) + }) + + localP2P.Connect(remoteP2P) + stream, err := localP2P.BHost.NewStream(ctx, remoteP2P.BHost.ID(), protocolID) + require.NoError(t, err) + + msg := &types.DataColumnsByRootIdentifiers{{Columns: []uint64{1, 2, 3}}} + require.Equal(t, true, localP2P.Peers().Scorers().BadResponsesScorer().Score(remoteP2P.PeerID()) >= 0) + + err = service.dataColumnSidecarByRootRPCHandler(ctx, msg, stream) + require.NotNil(t, err) + require.Equal(t, true, localP2P.Peers().Scorers().BadResponsesScorer().Score(remoteP2P.PeerID()) < 0) + + if util.WaitTimeout(&wg, 1*time.Second) { + t.Fatal("Did not receive stream within 1 sec") + } + }) + + t.Run("nominal", func(t *testing.T) { + resetFlags := flags.Get() + gFlags := new(flags.GlobalFlags) + gFlags.DataColumnBatchLimit = 2 + flags.Init(gFlags) + defer flags.Init(resetFlags) + + // Setting the ticker to 0 will cause the ticker to panic. + // Setting it to the minimum value instead. + refTickerDelay := tickerDelay + tickerDelay = time.Nanosecond + defer func() { + tickerDelay = refTickerDelay + }() + + params.SetupTestConfigCleanup(t) + beaconConfig := params.BeaconConfig() + beaconConfig.FuluForkEpoch = 1 + params.OverrideBeaconConfig(beaconConfig) + + localP2P := p2ptest.NewTestP2P(t) + clock := startup.NewClock(time.Now(), [fieldparams.RootLength]byte{}) + + params := []util.DataColumnParam{ + {Slot: 10, Index: 1}, {Slot: 10, Index: 2}, {Slot: 10, Index: 3}, + {Slot: 40, Index: 4}, {Slot: 40, Index: 6}, + {Slot: 45, Index: 7}, {Slot: 45, Index: 8}, {Slot: 45, Index: 9}, + } + + _, verifiedRODataColumns := util.CreateTestVerifiedRoDataColumnSidecars(t, params) + + storage := filesystem.NewEphemeralDataColumnStorage(t) + err := storage.Save(verifiedRODataColumns) + require.NoError(t, err) + + service := &Service{ + cfg: &config{ + p2p: localP2P, + clock: clock, + dataColumnStorage: storage, + chain: &chainMock.ChainService{}, + }, + rateLimiter: newRateLimiter(localP2P), + } + + protocolID := protocol.ID(p2p.RPCDataColumnSidecarsByRootTopicV1) + remoteP2P := p2ptest.NewTestP2P(t) + + var wg sync.WaitGroup + wg.Add(1) + + ctxMap := ContextByteVersions{ + [4]byte{245, 165, 253, 66}: version.Fulu, + } + + root0 := verifiedRODataColumns[0].BlockRoot() + root3 := verifiedRODataColumns[3].BlockRoot() + root5 := verifiedRODataColumns[5].BlockRoot() + + remoteP2P.BHost.SetStreamHandler(protocolID, func(stream network.Stream) { + defer wg.Done() + + sidecars := make([]*blocks.RODataColumn, 0, 5) + + for i := uint64(0); ; /* no stop condition */ i++ { + sidecar, err := readChunkedDataColumnSideCar(stream, remoteP2P, ctxMap) + if errors.Is(err, io.EOF) { + // End of stream. + break + } + + require.NoError(t, err) + sidecars = append(sidecars, sidecar) + } + + require.Equal(t, 5, len(sidecars)) + require.Equal(t, root3, sidecars[0].BlockRoot()) + require.Equal(t, root3, sidecars[1].BlockRoot()) + require.Equal(t, root5, sidecars[2].BlockRoot()) + require.Equal(t, root5, sidecars[3].BlockRoot()) + require.Equal(t, root5, sidecars[4].BlockRoot()) + + require.Equal(t, uint64(4), sidecars[0].Index) + require.Equal(t, uint64(6), sidecars[1].Index) + require.Equal(t, uint64(7), sidecars[2].Index) + require.Equal(t, uint64(8), sidecars[3].Index) + require.Equal(t, uint64(9), sidecars[4].Index) + }) + + localP2P.Connect(remoteP2P) + stream, err := localP2P.BHost.NewStream(ctx, remoteP2P.BHost.ID(), protocolID) + require.NoError(t, err) + + msg := &types.DataColumnsByRootIdentifiers{ + { + BlockRoot: root0[:], + Columns: []uint64{1, 2, 3}, + }, + { + BlockRoot: root3[:], + Columns: []uint64{4, 5, 6}, + }, + { + BlockRoot: root5[:], + Columns: []uint64{7, 8, 9}, + }, + } + + err = service.dataColumnSidecarByRootRPCHandler(ctx, msg, stream) + require.NoError(t, err) + require.Equal(t, true, localP2P.Peers().Scorers().BadResponsesScorer().Score(remoteP2P.PeerID()) >= 0) + + if util.WaitTimeout(&wg, 1*time.Minute) { + t.Fatal("Did not receive stream within 1 sec") + } + }) +} + +func TestValidateDataColumnsByRootRequest(t *testing.T) { + params.SetupTestConfigCleanup(t) + config := params.BeaconConfig() + maxCols := uint64(10) // Set a small value for testing + config.MaxRequestDataColumnSidecars = maxCols + params.OverrideBeaconConfig(config) + + tests := []struct { + name string + colIdents types.DataColumnsByRootIdentifiers + expectedErr error + }{ + { + name: "Invalid request - multiple identifiers exceed max", + colIdents: types.DataColumnsByRootIdentifiers{ + { + BlockRoot: make([]byte, fieldparams.RootLength), + Columns: make([]uint64, maxCols/2+1), + }, + { + BlockRoot: make([]byte, fieldparams.RootLength), + Columns: make([]uint64, maxCols/2+1), + }, + }, + expectedErr: types.ErrMaxDataColumnReqExceeded, + }, + { + name: "Valid request - less than max", + colIdents: types.DataColumnsByRootIdentifiers{ + { + BlockRoot: make([]byte, fieldparams.RootLength), + Columns: make([]uint64, maxCols-1), + }, + }, + expectedErr: nil, + }, + { + name: "Valid request - multiple identifiers sum to max", + colIdents: types.DataColumnsByRootIdentifiers{ + { + BlockRoot: make([]byte, fieldparams.RootLength), + Columns: make([]uint64, maxCols/2), + }, + { + BlockRoot: make([]byte, fieldparams.RootLength), + Columns: make([]uint64, maxCols/2), + }, + }, + expectedErr: nil, + }, + } + + // Run tests + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := validateDataColumnsByRootRequest(tt.colIdents) + if tt.expectedErr == nil { + require.NoError(t, err) + } else { + require.ErrorIs(t, err, tt.expectedErr) + } + }) + } +} + +func TestDataColumnsRPCMinValidSlot(t *testing.T) { + type testCase struct { + name string + fuluForkEpoch primitives.Epoch + minReqEpochs primitives.Epoch + currentSlot primitives.Slot + expected primitives.Slot + } + + slotsPerEpoch := params.BeaconConfig().SlotsPerEpoch + testCases := []testCase{ + { + name: "Fulu not enabled", + fuluForkEpoch: math.MaxUint64, // Disable Fulu + minReqEpochs: 5, + currentSlot: 0, + expected: primitives.Slot(math.MaxUint64), + }, + { + name: "Current epoch equals fork epoch", + fuluForkEpoch: 10, + minReqEpochs: 5, + currentSlot: primitives.Slot(10 * slotsPerEpoch), + expected: primitives.Slot(10 * slotsPerEpoch), + }, + { + name: "Current epoch less than minReqEpochs", + fuluForkEpoch: 10, + minReqEpochs: 20, + currentSlot: primitives.Slot(15 * slotsPerEpoch), + expected: primitives.Slot(10 * slotsPerEpoch), + }, + { + name: "Current epoch greater than minReqEpochs + fork epoch", + fuluForkEpoch: 10, + minReqEpochs: 5, + currentSlot: primitives.Slot(20 * slotsPerEpoch), + expected: primitives.Slot(15 * slotsPerEpoch), + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + params.SetupTestConfigCleanup(t) + config := params.BeaconConfig() + config.FuluForkEpoch = tc.fuluForkEpoch + config.MinEpochsForDataColumnSidecarsRequest = tc.minReqEpochs + params.OverrideBeaconConfig(config) + + actual, err := dataColumnsRPCMinValidSlot(tc.currentSlot) + require.NoError(t, err) + require.Equal(t, tc.expected, actual) + }) + } +} diff --git a/beacon-chain/sync/rpc_handler_test.go b/beacon-chain/sync/rpc_handler_test.go index b08f478eec..28b52021a6 100644 --- a/beacon-chain/sync/rpc_handler_test.go +++ b/beacon-chain/sync/rpc_handler_test.go @@ -20,8 +20,8 @@ type rpcHandlerTest struct { s *Service } -func (rt *rpcHandlerTest) testHandler(nh network.StreamHandler, rh rpcHandler, rhi interface{}) { - ctx, cancel := context.WithTimeout(rt.t.Context(), rt.timeout) +func (rt *rpcHandlerTest) testHandler(streamHandler network.StreamHandler, rpcHandler rpcHandler, message interface{}) { + ctx, cancel := context.WithTimeout(context.Background(), rt.timeout) defer func() { cancel() }() @@ -36,16 +36,18 @@ func (rt *rpcHandlerTest) testHandler(nh network.StreamHandler, rh rpcHandler, r defer func() { require.NoError(rt.t, client.Disconnect(server.PeerID())) }() + require.Equal(rt.t, 1, len(client.BHost.Network().Peers()), "Expected peers to be connected") - h := func(stream network.Stream) { + handler := func(stream network.Stream) { defer w.Done() - nh(stream) + streamHandler(stream) } - server.BHost.SetStreamHandler(rt.topic, h) + + server.BHost.SetStreamHandler(rt.topic, handler) stream, err := client.BHost.NewStream(ctx, server.BHost.ID(), rt.topic) require.NoError(rt.t, err) - err = rh(ctx, rhi, stream) + err = rpcHandler(ctx, message, stream) if rt.err == nil { require.NoError(rt.t, err) } else { diff --git a/beacon-chain/sync/rpc_send_request.go b/beacon-chain/sync/rpc_send_request.go index d10ad856ed..d7a7e82b53 100644 --- a/beacon-chain/sync/rpc_send_request.go +++ b/beacon-chain/sync/rpc_send_request.go @@ -39,6 +39,7 @@ var ( errBlobResponseOutOfBounds = errors.Wrap(verification.ErrBlobInvalid, "received BlobSidecar with slot outside BlobSidecarsByRangeRequest bounds") errChunkResponseBlockMismatch = errors.Wrap(verification.ErrBlobInvalid, "blob block details do not match") errChunkResponseParentMismatch = errors.Wrap(verification.ErrBlobInvalid, "parent root for response element doesn't match previous element root") + errDataColumnChunkedReadFailure = errors.New("failed to read stream of chunk-encoded data columns") ) // BeaconBlockProcessor defines a block processing function, which allows to start utilizing @@ -383,3 +384,53 @@ func readChunkedBlobSidecar(stream network.Stream, encoding encoder.NetworkEncod return rob, nil } + +func readChunkedDataColumnSideCar( + stream network.Stream, + p2pApi p2p.P2P, + ctxMap ContextByteVersions, +) (*blocks.RODataColumn, error) { + // Read the status code from the stream. + statusCode, errMessage, err := ReadStatusCode(stream, p2pApi.Encoding()) + if err != nil { + return nil, errors.Wrap(err, "read status code") + } + + if statusCode != 0 { + return nil, errors.Wrap(errDataColumnChunkedReadFailure, errMessage) + } + + // Retrieve the fork digest. + ctxBytes, err := readContextFromStream(stream) + if err != nil { + return nil, errors.Wrap(err, "read context from stream") + } + + // Check if the fork digest is recognized. + msgVersion, ok := ctxMap[bytesutil.ToBytes4(ctxBytes)] + if !ok { + return nil, errors.Errorf("unrecognized fork digest %#x", ctxBytes) + } + + // Check if we are on Fulu. + if msgVersion < version.Fulu { + return nil, errors.Errorf( + "unexpected context bytes for DataColumnSidecar, ctx=%#x, msgVersion=%v, minimalSupportedVersion=%v", + ctxBytes, version.String(msgVersion), version.String(version.Fulu), + ) + } + + // Decode the data column sidecar from the stream. + dataColumnSidecar := new(ethpb.DataColumnSidecar) + if err := p2pApi.Encoding().DecodeWithMaxLength(stream, dataColumnSidecar); err != nil { + return nil, errors.Wrap(err, "failed to decode the protobuf-encoded BlobSidecar message from RPC chunk stream") + } + + // Create a read-only data column from the data column sidecar. + roDataColumn, err := blocks.NewRODataColumn(dataColumnSidecar) + if err != nil { + return nil, errors.Wrap(err, "new read only data column") + } + + return &roDataColumn, nil +} diff --git a/beacon-chain/sync/service.go b/beacon-chain/sync/service.go index 6aa4e52fc3..f6ec74f71b 100644 --- a/beacon-chain/sync/service.go +++ b/beacon-chain/sync/service.go @@ -102,6 +102,7 @@ type config struct { clock *startup.Clock stateNotifier statefeed.Notifier blobStorage *filesystem.BlobStorage + dataColumnStorage *filesystem.DataColumnStorage } // This defines the interface for interacting with block chain service diff --git a/changelog/manu-peerdas-columns-by-root-handler.md b/changelog/manu-peerdas-columns-by-root-handler.md new file mode 100644 index 0000000000..e8055305b9 --- /dev/null +++ b/changelog/manu-peerdas-columns-by-root-handler.md @@ -0,0 +1,2 @@ +### Added +- Implement `dataColumnSidecarByRootRPCHandler`. diff --git a/cmd/beacon-chain/flags/base.go b/cmd/beacon-chain/flags/base.go index 7c863dd872..cd383425f1 100644 --- a/cmd/beacon-chain/flags/base.go +++ b/cmd/beacon-chain/flags/base.go @@ -212,6 +212,11 @@ var ( Usage: "The factor by which blob batch limit may increase on burst.", Value: 3, } + DataColumnBatchLimit = &cli.IntFlag{ + Name: "data-column-batch-limit", + Usage: "The amount of data columns the local peer is bounded to request and respond to in a batch.", + Value: 4096, + } // DisableDebugRPCEndpoints disables the debug Beacon API namespace. DisableDebugRPCEndpoints = &cli.BoolFlag{ Name: "disable-debug-rpc-endpoints", diff --git a/cmd/beacon-chain/flags/config.go b/cmd/beacon-chain/flags/config.go index 72aef4104d..1b9720938a 100644 --- a/cmd/beacon-chain/flags/config.go +++ b/cmd/beacon-chain/flags/config.go @@ -16,6 +16,7 @@ type GlobalFlags struct { BlockBatchLimit int BlockBatchLimitBurstFactor int BlobBatchLimit int + DataColumnBatchLimit int BlobBatchLimitBurstFactor int } @@ -53,6 +54,7 @@ func ConfigureGlobalFlags(ctx *cli.Context) { cfg.BlockBatchLimitBurstFactor = ctx.Int(BlockBatchLimitBurstFactor.Name) cfg.BlobBatchLimit = ctx.Int(BlobBatchLimit.Name) cfg.BlobBatchLimitBurstFactor = ctx.Int(BlobBatchLimitBurstFactor.Name) + cfg.DataColumnBatchLimit = ctx.Int(DataColumnBatchLimit.Name) cfg.MinimumPeersPerSubnet = ctx.Int(MinPeersPerSubnet.Name) cfg.MaxConcurrentDials = ctx.Int(MaxConcurrentDials.Name) configureMinimumPeers(ctx, cfg) diff --git a/cmd/beacon-chain/storage/options.go b/cmd/beacon-chain/storage/options.go index 85fe6aeebb..bb2b96473a 100644 --- a/cmd/beacon-chain/storage/options.go +++ b/cmd/beacon-chain/storage/options.go @@ -14,7 +14,6 @@ import ( ) var ( - // BlobStoragePathFlag defines a flag to start the beacon chain from a give genesis state file. BlobStoragePathFlag = &cli.PathFlag{ Name: "blob-path", Usage: "Location for blob storage. Default location will be a 'blobs' directory next to the beacon db.", @@ -30,6 +29,10 @@ var ( Usage: layoutFlagUsage(), Value: filesystem.LayoutNameFlat, } + DataColumnStoragePathFlag = &cli.PathFlag{ + Name: "data-column-path", + Usage: "Location for data column storage. Default location will be a 'data-columns' directory next to the beacon db.", + } ) func layoutOptions() string { @@ -54,15 +57,23 @@ func validateLayoutFlag(_ *cli.Context, v string) error { // create a cancellable context. If we switch to using App.RunContext, we can set up this cancellation in the cmd // package instead, and allow the functional options to tap into context cancellation. func BeaconNodeOptions(c *cli.Context) ([]node.Option, error) { - e, err := blobRetentionEpoch(c) + blobRetentionEpoch, err := blobRetentionEpoch(c) if err != nil { - return nil, err + return nil, errors.Wrap(err, "blob retention epoch") } - opts := []node.Option{node.WithBlobStorageOptions( - filesystem.WithBlobRetentionEpochs(e), + + blobStorageOptions := node.WithBlobStorageOptions( + filesystem.WithBlobRetentionEpochs(blobRetentionEpoch), filesystem.WithBasePath(blobStoragePath(c)), filesystem.WithLayout(c.String(BlobStorageLayout.Name)), // This is validated in the Action func for BlobStorageLayout. - )} + ) + + dataColumnStorageOption := node.WithDataColumnStorageOptions( + filesystem.WithDataColumnRetentionEpochs(blobRetentionEpoch), + filesystem.WithDataColumnBasePath(dataColumnStoragePath(c)), + ) + + opts := []node.Option{blobStorageOptions, dataColumnStorageOption} return opts, nil } @@ -75,6 +86,16 @@ func blobStoragePath(c *cli.Context) string { return blobsPath } +func dataColumnStoragePath(c *cli.Context) string { + dataColumnsPath := c.Path(DataColumnStoragePathFlag.Name) + if dataColumnsPath == "" { + // append a "data-columns" subdir to the end of the data dir path + dataColumnsPath = path.Join(c.String(cmd.DataDirFlag.Name), "data-columns") + } + + return dataColumnsPath +} + var errInvalidBlobRetentionEpochs = errors.New("value is smaller than spec minimum") // blobRetentionEpoch returns the spec default MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUEST diff --git a/testing/util/BUILD.bazel b/testing/util/BUILD.bazel index a5da3dbbc2..87218c6aa6 100644 --- a/testing/util/BUILD.bazel +++ b/testing/util/BUILD.bazel @@ -70,7 +70,6 @@ go_library( "//testing/require:go_default_library", "//time/slots:go_default_library", "@com_github_crate_crypto_go_kzg_4844//:go_default_library", - "@com_github_ethereum_c_kzg_4844//bindings/go:go_default_library", "@com_github_ethereum_go_ethereum//common:go_default_library", "@com_github_ethereum_go_ethereum//common/hexutil:go_default_library", "@com_github_ethereum_go_ethereum//core/types:go_default_library", diff --git a/testing/util/data_column.go b/testing/util/data_column.go index b92a04fdb9..a3a75079d5 100644 --- a/testing/util/data_column.go +++ b/testing/util/data_column.go @@ -3,80 +3,91 @@ package util import ( "testing" + "github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain/kzg" fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams" - "github.com/OffchainLabs/prysm/v6/config/params" "github.com/OffchainLabs/prysm/v6/consensus-types/blocks" "github.com/OffchainLabs/prysm/v6/consensus-types/primitives" ethpb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1" - ckzg4844 "github.com/ethereum/c-kzg-4844/v2/bindings/go" ) type ( - DataColumnParams struct { - Slot primitives.Slot - ColumnIndex uint64 - KzgCommitments [][]byte - DataColumn []byte // A whole data cell will be filled with the content of one item of this slice. - } + // DataColumnParam is a struct that holds parameters for creating test RODataColumn and VerifiedRODataColumn sidecars. + DataColumnParam struct { + Index uint64 + Column [][]byte + KzgCommitments [][]byte + KzgProofs [][]byte + KzgCommitmentsInclusionProof [][]byte - DataColumnsParamsByRoot map[[fieldparams.RootLength]byte][]DataColumnParams + // Part of the beacon block header. + Slot primitives.Slot + ProposerIndex primitives.ValidatorIndex + ParentRoot []byte + StateRoot []byte + BodyRoot []byte + } ) -func CreateTestVerifiedRoDataColumnSidecars(t *testing.T, dataColumnParamsByBlockRoot DataColumnsParamsByRoot) ([]blocks.RODataColumn, []blocks.VerifiedRODataColumn) { - params.SetupTestConfigCleanup(t) - cfg := params.BeaconConfig().Copy() - cfg.FuluForkEpoch = 0 - params.OverrideBeaconConfig(cfg) - - count := 0 - for _, indices := range dataColumnParamsByBlockRoot { - count += len(indices) - } +// CreateTestVerifiedRoDataColumnSidecars creates test RODataColumn and VerifiedRODataColumn sidecars for testing purposes. +func CreateTestVerifiedRoDataColumnSidecars(t *testing.T, params []DataColumnParam) ([]blocks.RODataColumn, []blocks.VerifiedRODataColumn) { + const ( + kzgCommitmentsInclusionProofSize = 4 + proofSize = 32 + ) + count := len(params) verifiedRoDataColumnSidecars := make([]blocks.VerifiedRODataColumn, 0, count) rodataColumnSidecars := make([]blocks.RODataColumn, 0, count) - for blockRoot, params := range dataColumnParamsByBlockRoot { - for _, param := range params { - dataColumn := make([][]byte, 0, len(param.DataColumn)) - for _, value := range param.DataColumn { - cell := make([]byte, ckzg4844.BytesPerCell) - for i := range ckzg4844.BytesPerCell { - cell[i] = value - } - dataColumn = append(dataColumn, cell) - } - kzgCommitmentsInclusionProof := make([][]byte, 4) - for i := range kzgCommitmentsInclusionProof { - kzgCommitmentsInclusionProof[i] = make([]byte, 32) - } + for _, param := range params { + var parentRoot, stateRoot, bodyRoot [fieldparams.RootLength]byte + copy(parentRoot[:], param.ParentRoot) + copy(stateRoot[:], param.StateRoot) + copy(bodyRoot[:], param.BodyRoot) - dataColumnSidecar := ðpb.DataColumnSidecar{ - Index: param.ColumnIndex, - KzgCommitments: param.KzgCommitments, - Column: dataColumn, - KzgCommitmentsInclusionProof: kzgCommitmentsInclusionProof, - SignedBlockHeader: ðpb.SignedBeaconBlockHeader{ - Header: ðpb.BeaconBlockHeader{ - Slot: param.Slot, - ParentRoot: make([]byte, fieldparams.RootLength), - StateRoot: make([]byte, fieldparams.RootLength), - BodyRoot: make([]byte, fieldparams.RootLength), - }, - Signature: make([]byte, fieldparams.BLSSignatureLength), - }, - } - - roDataColumnSidecar, err := blocks.NewRODataColumnWithRoot(dataColumnSidecar, blockRoot) - if err != nil { - t.Fatal(err) - } - - rodataColumnSidecars = append(rodataColumnSidecars, roDataColumnSidecar) - - verifiedRoDataColumnSidecar := blocks.NewVerifiedRODataColumn(roDataColumnSidecar) - verifiedRoDataColumnSidecars = append(verifiedRoDataColumnSidecars, verifiedRoDataColumnSidecar) + column := make([][]byte, 0, len(param.Column)) + for _, cell := range param.Column { + var completeCell [kzg.BytesPerCell]byte + copy(completeCell[:], cell) + column = append(column, completeCell[:]) } + + kzgCommitmentsInclusionProof := make([][]byte, 0, kzgCommitmentsInclusionProofSize) + for range kzgCommitmentsInclusionProofSize { + kzgCommitmentsInclusionProof = append(kzgCommitmentsInclusionProof, make([]byte, proofSize)) + } + + for i, proof := range param.KzgCommitmentsInclusionProof { + copy(kzgCommitmentsInclusionProof[i], proof) + } + + dataColumnSidecar := ðpb.DataColumnSidecar{ + Index: param.Index, + Column: column, + KzgCommitments: param.KzgCommitments, + KzgProofs: param.KzgProofs, + SignedBlockHeader: ðpb.SignedBeaconBlockHeader{ + Header: ðpb.BeaconBlockHeader{ + Slot: param.Slot, + ProposerIndex: param.ProposerIndex, + ParentRoot: parentRoot[:], + StateRoot: stateRoot[:], + BodyRoot: bodyRoot[:], + }, + Signature: make([]byte, fieldparams.BLSSignatureLength), + }, + KzgCommitmentsInclusionProof: kzgCommitmentsInclusionProof, + } + + roDataColumnSidecar, err := blocks.NewRODataColumn(dataColumnSidecar) + if err != nil { + t.Fatal(err) + } + + rodataColumnSidecars = append(rodataColumnSidecars, roDataColumnSidecar) + + verifiedRoDataColumnSidecar := blocks.NewVerifiedRODataColumn(roDataColumnSidecar) + verifiedRoDataColumnSidecars = append(verifiedRoDataColumnSidecars, verifiedRoDataColumnSidecar) } return rodataColumnSidecars, verifiedRoDataColumnSidecars