PeerDAS: Implement DAS (#15367)

* PeerDAS: Implement DAS

* Fix Terence's comment.

* Fix Terence comment.

* Fix Terence's comment.

* Fix James' comment.

* Fix James' comment.

* Rename some variable/files with blobs.

* Fix James' comment.

* Fix James' comment.

* Fix James' comment.
This commit is contained in:
Manu NALEPA
2025-06-06 12:06:52 +02:00
committed by GitHub
parent 543ebe857e
commit f1288a18ec
19 changed files with 1108 additions and 67 deletions

View File

@@ -3,22 +3,27 @@ load("@prysm//tools/go:def.bzl", "go_library", "go_test")
go_library( go_library(
name = "go_default_library", name = "go_default_library",
srcs = [ srcs = [
"availability.go", "availability_blobs.go",
"cache.go", "availability_columns.go",
"blob_cache.go",
"data_column_cache.go",
"iface.go", "iface.go",
"mock.go", "mock.go",
], ],
importpath = "github.com/OffchainLabs/prysm/v6/beacon-chain/das", importpath = "github.com/OffchainLabs/prysm/v6/beacon-chain/das",
visibility = ["//visibility:public"], visibility = ["//visibility:public"],
deps = [ deps = [
"//beacon-chain/core/peerdas:go_default_library",
"//beacon-chain/db/filesystem:go_default_library", "//beacon-chain/db/filesystem:go_default_library",
"//beacon-chain/verification:go_default_library", "//beacon-chain/verification:go_default_library",
"//config/fieldparams:go_default_library",
"//config/params:go_default_library", "//config/params:go_default_library",
"//consensus-types/blocks:go_default_library", "//consensus-types/blocks:go_default_library",
"//consensus-types/primitives:go_default_library", "//consensus-types/primitives:go_default_library",
"//runtime/logging:go_default_library", "//runtime/logging:go_default_library",
"//runtime/version:go_default_library", "//runtime/version:go_default_library",
"//time/slots:go_default_library", "//time/slots:go_default_library",
"@com_github_ethereum_go_ethereum//p2p/enode:go_default_library",
"@com_github_pkg_errors//:go_default_library", "@com_github_pkg_errors//:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library", "@com_github_sirupsen_logrus//:go_default_library",
], ],
@@ -27,13 +32,18 @@ go_library(
go_test( go_test(
name = "go_default_test", name = "go_default_test",
srcs = [ srcs = [
"availability_test.go", "availability_blobs_test.go",
"cache_test.go", "availability_columns_test.go",
"blob_cache_test.go",
"data_column_cache_test.go",
], ],
embed = [":go_default_library"], embed = [":go_default_library"],
deps = [ deps = [
"//beacon-chain/core/peerdas:go_default_library",
"//beacon-chain/db/filesystem:go_default_library", "//beacon-chain/db/filesystem:go_default_library",
"//beacon-chain/verification:go_default_library", "//beacon-chain/verification:go_default_library",
"//cmd/beacon-chain/flags:go_default_library",
"//config/fieldparams:go_default_library",
"//config/params:go_default_library", "//config/params:go_default_library",
"//consensus-types/blocks:go_default_library", "//consensus-types/blocks:go_default_library",
"//consensus-types/primitives:go_default_library", "//consensus-types/primitives:go_default_library",
@@ -41,6 +51,7 @@ go_test(
"//testing/require:go_default_library", "//testing/require:go_default_library",
"//testing/util:go_default_library", "//testing/util:go_default_library",
"//time/slots:go_default_library", "//time/slots:go_default_library",
"@com_github_ethereum_go_ethereum//p2p/enode:go_default_library",
"@com_github_pkg_errors//:go_default_library", "@com_github_pkg_errors//:go_default_library",
], ],
) )

View File

@@ -20,16 +20,16 @@ var (
errMixedRoots = errors.New("BlobSidecars must all be for the same block") errMixedRoots = errors.New("BlobSidecars must all be for the same block")
) )
// LazilyPersistentStore is an implementation of AvailabilityStore to be used when batch syncing. // LazilyPersistentStoreBlob is an implementation of AvailabilityStore to be used when batch syncing.
// This implementation will hold any blobs passed to Persist until the IsDataAvailable is called for their // This implementation will hold any blobs passed to Persist until the IsDataAvailable is called for their
// block, at which time they will undergo full verification and be saved to the disk. // block, at which time they will undergo full verification and be saved to the disk.
type LazilyPersistentStore struct { type LazilyPersistentStoreBlob struct {
store *filesystem.BlobStorage store *filesystem.BlobStorage
cache *cache cache *blobCache
verifier BlobBatchVerifier verifier BlobBatchVerifier
} }
var _ AvailabilityStore = &LazilyPersistentStore{} var _ AvailabilityStore = &LazilyPersistentStoreBlob{}
// BlobBatchVerifier enables LazyAvailabilityStore to manage the verification process // BlobBatchVerifier enables LazyAvailabilityStore to manage the verification process
// going from ROBlob->VerifiedROBlob, while avoiding the decision of which individual verifications // going from ROBlob->VerifiedROBlob, while avoiding the decision of which individual verifications
@@ -42,10 +42,10 @@ type BlobBatchVerifier interface {
// NewLazilyPersistentStore creates a new LazilyPersistentStore. This constructor should always be used // NewLazilyPersistentStore creates a new LazilyPersistentStore. This constructor should always be used
// when creating a LazilyPersistentStore because it needs to initialize the cache under the hood. // when creating a LazilyPersistentStore because it needs to initialize the cache under the hood.
func NewLazilyPersistentStore(store *filesystem.BlobStorage, verifier BlobBatchVerifier) *LazilyPersistentStore { func NewLazilyPersistentStore(store *filesystem.BlobStorage, verifier BlobBatchVerifier) *LazilyPersistentStoreBlob {
return &LazilyPersistentStore{ return &LazilyPersistentStoreBlob{
store: store, store: store,
cache: newCache(), cache: newBlobCache(),
verifier: verifier, verifier: verifier,
} }
} }
@@ -53,25 +53,31 @@ func NewLazilyPersistentStore(store *filesystem.BlobStorage, verifier BlobBatchV
// Persist adds blobs to the working blob cache. Blobs stored in this cache will be persisted // Persist adds blobs to the working blob cache. Blobs stored in this cache will be persisted
// for at least as long as the node is running. Once IsDataAvailable succeeds, all blobs referenced // for at least as long as the node is running. Once IsDataAvailable succeeds, all blobs referenced
// by the given block are guaranteed to be persisted for the remainder of the retention period. // by the given block are guaranteed to be persisted for the remainder of the retention period.
func (s *LazilyPersistentStore) Persist(current primitives.Slot, sc ...blocks.ROBlob) error { func (s *LazilyPersistentStoreBlob) Persist(current primitives.Slot, sidecars ...blocks.ROSidecar) error {
if len(sc) == 0 { if len(sidecars) == 0 {
return nil return nil
} }
if len(sc) > 1 {
first := sc[0].BlockRoot() blobSidecars, err := blocks.BlobSidecarsFromSidecars(sidecars)
for i := 1; i < len(sc); i++ { if err != nil {
if first != sc[i].BlockRoot() { return errors.Wrap(err, "blob sidecars from sidecars")
}
if len(blobSidecars) > 1 {
firstRoot := blobSidecars[0].BlockRoot()
for _, sidecar := range blobSidecars[1:] {
if sidecar.BlockRoot() != firstRoot {
return errMixedRoots return errMixedRoots
} }
} }
} }
if !params.WithinDAPeriod(slots.ToEpoch(sc[0].Slot()), slots.ToEpoch(current)) { if !params.WithinDAPeriod(slots.ToEpoch(blobSidecars[0].Slot()), slots.ToEpoch(current)) {
return nil return nil
} }
key := keyFromSidecar(sc[0]) key := keyFromSidecar(blobSidecars[0])
entry := s.cache.ensure(key) entry := s.cache.ensure(key)
for i := range sc { for _, blobSidecar := range blobSidecars {
if err := entry.stash(&sc[i]); err != nil { if err := entry.stash(&blobSidecar); err != nil {
return err return err
} }
} }
@@ -80,7 +86,7 @@ func (s *LazilyPersistentStore) Persist(current primitives.Slot, sc ...blocks.RO
// IsDataAvailable returns nil if all the commitments in the given block are persisted to the db and have been verified. // IsDataAvailable returns nil if all the commitments in the given block are persisted to the db and have been verified.
// BlobSidecars already in the db are assumed to have been previously verified against the block. // BlobSidecars already in the db are assumed to have been previously verified against the block.
func (s *LazilyPersistentStore) IsDataAvailable(ctx context.Context, current primitives.Slot, b blocks.ROBlock) error { func (s *LazilyPersistentStoreBlob) IsDataAvailable(ctx context.Context, current primitives.Slot, b blocks.ROBlock) error {
blockCommitments, err := commitmentsToCheck(b, current) blockCommitments, err := commitmentsToCheck(b, current)
if err != nil { if err != nil {
return errors.Wrapf(err, "could not check data availability for block %#x", b.Root()) return errors.Wrapf(err, "could not check data availability for block %#x", b.Root())

View File

@@ -116,9 +116,11 @@ func TestLazilyPersistent_Missing(t *testing.T) {
ctx := context.Background() ctx := context.Background()
store := filesystem.NewEphemeralBlobStorage(t) store := filesystem.NewEphemeralBlobStorage(t)
blk, scs := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, 1, 3) blk, blobSidecars := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, 1, 3)
mbv := &mockBlobBatchVerifier{t: t, scs: scs} scs := blocks.NewSidecarsFromBlobSidecars(blobSidecars)
mbv := &mockBlobBatchVerifier{t: t, scs: blobSidecars}
as := NewLazilyPersistentStore(store, mbv) as := NewLazilyPersistentStore(store, mbv)
// Only one commitment persisted, should return error with other indices // Only one commitment persisted, should return error with other indices
@@ -141,12 +143,14 @@ func TestLazilyPersistent_Mismatch(t *testing.T) {
ctx := context.Background() ctx := context.Background()
store := filesystem.NewEphemeralBlobStorage(t) store := filesystem.NewEphemeralBlobStorage(t)
blk, scs := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, 1, 3) blk, blobSidecars := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, 1, 3)
mbv := &mockBlobBatchVerifier{t: t, err: errors.New("kzg check should not run")} mbv := &mockBlobBatchVerifier{t: t, err: errors.New("kzg check should not run")}
scs[0].KzgCommitment = bytesutil.PadTo([]byte("nope"), 48) blobSidecars[0].KzgCommitment = bytesutil.PadTo([]byte("nope"), 48)
as := NewLazilyPersistentStore(store, mbv) as := NewLazilyPersistentStore(store, mbv)
scs := blocks.NewSidecarsFromBlobSidecars(blobSidecars)
// Only one commitment persisted, should return error with other indices // Only one commitment persisted, should return error with other indices
require.NoError(t, as.Persist(1, scs[0])) require.NoError(t, as.Persist(1, scs[0]))
err := as.IsDataAvailable(ctx, 1, blk) err := as.IsDataAvailable(ctx, 1, blk)
@@ -155,7 +159,10 @@ func TestLazilyPersistent_Mismatch(t *testing.T) {
} }
func TestLazyPersistOnceCommitted(t *testing.T) { func TestLazyPersistOnceCommitted(t *testing.T) {
_, scs := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, 1, 6) _, blobSidecars := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, 1, 6)
scs := blocks.NewSidecarsFromBlobSidecars(blobSidecars)
as := NewLazilyPersistentStore(filesystem.NewEphemeralBlobStorage(t), &mockBlobBatchVerifier{}) as := NewLazilyPersistentStore(filesystem.NewEphemeralBlobStorage(t), &mockBlobBatchVerifier{})
// stashes as expected // stashes as expected
require.NoError(t, as.Persist(1, scs...)) require.NoError(t, as.Persist(1, scs...))
@@ -163,10 +170,13 @@ func TestLazyPersistOnceCommitted(t *testing.T) {
require.ErrorIs(t, as.Persist(1, scs...), ErrDuplicateSidecar) require.ErrorIs(t, as.Persist(1, scs...), ErrDuplicateSidecar)
// ignores index out of bound // ignores index out of bound
scs[0].Index = 6 blobSidecars[0].Index = 6
require.ErrorIs(t, as.Persist(1, scs[0]), errIndexOutOfBounds) require.ErrorIs(t, as.Persist(1, blocks.NewSidecarFromBlobSidecar(blobSidecars[0])), errIndexOutOfBounds)
_, moreBlobSidecars := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, 1, 4)
more := blocks.NewSidecarsFromBlobSidecars(moreBlobSidecars)
_, more := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, 1, 4)
// ignores sidecars before the retention period // ignores sidecars before the retention period
slotOOB, err := slots.EpochStart(params.BeaconConfig().MinEpochsForBlobsSidecarsRequest) slotOOB, err := slots.EpochStart(params.BeaconConfig().MinEpochsForBlobsSidecarsRequest)
require.NoError(t, err) require.NoError(t, err)

View File

@@ -0,0 +1,208 @@
package das
import (
"context"
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/peerdas"
"github.com/OffchainLabs/prysm/v6/beacon-chain/db/filesystem"
"github.com/OffchainLabs/prysm/v6/beacon-chain/verification"
"github.com/OffchainLabs/prysm/v6/config/params"
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
"github.com/OffchainLabs/prysm/v6/runtime/version"
"github.com/OffchainLabs/prysm/v6/time/slots"
"github.com/ethereum/go-ethereum/p2p/enode"
errors "github.com/pkg/errors"
)
// LazilyPersistentStoreColumn is an implementation of AvailabilityStore to be used when batch syncing data columns.
// This implementation will hold any data columns passed to Persist until the IsDataAvailable is called for their
// block, at which time they will undergo full verification and be saved to the disk.
type LazilyPersistentStoreColumn struct {
store *filesystem.DataColumnStorage
nodeID enode.ID
cache *dataColumnCache
custodyInfo *peerdas.CustodyInfo
newDataColumnsVerifier verification.NewDataColumnsVerifier
}
var _ AvailabilityStore = &LazilyPersistentStoreColumn{}
// DataColumnsVerifier enables LazilyPersistentStoreColumn to manage the verification process
// going from RODataColumn->VerifiedRODataColumn, while avoiding the decision of which individual verifications
// to run and in what order. Since LazilyPersistentStoreColumn always tries to verify and save data columns only when
// they are all available, the interface takes a slice of data column sidecars.
type DataColumnsVerifier interface {
VerifiedRODataColumns(ctx context.Context, blk blocks.ROBlock, scs []blocks.RODataColumn) ([]blocks.VerifiedRODataColumn, error)
}
// NewLazilyPersistentStoreColumn creates a new LazilyPersistentStoreColumn.
// WARNING: The resulting LazilyPersistentStoreColumn is NOT thread-safe.
func NewLazilyPersistentStoreColumn(store *filesystem.DataColumnStorage, nodeID enode.ID, newDataColumnsVerifier verification.NewDataColumnsVerifier, custodyInfo *peerdas.CustodyInfo) *LazilyPersistentStoreColumn {
return &LazilyPersistentStoreColumn{
store: store,
nodeID: nodeID,
cache: newDataColumnCache(),
custodyInfo: custodyInfo,
newDataColumnsVerifier: newDataColumnsVerifier,
}
}
// PersistColumns adds columns to the working column cache. Columns stored in this cache will be persisted
// for at least as long as the node is running. Once IsDataAvailable succeeds, all columns referenced
// by the given block are guaranteed to be persisted for the remainder of the retention period.
func (s *LazilyPersistentStoreColumn) Persist(current primitives.Slot, sidecars ...blocks.ROSidecar) error {
if len(sidecars) == 0 {
return nil
}
dataColumnSidecars, err := blocks.DataColumnSidecarsFromSidecars(sidecars)
if err != nil {
return errors.Wrap(err, "blob sidecars from sidecars")
}
// It is safe to retrieve the first sidecar.
firstSidecar := dataColumnSidecars[0]
if len(sidecars) > 1 {
firstRoot := firstSidecar.BlockRoot()
for _, sidecar := range dataColumnSidecars[1:] {
if sidecar.BlockRoot() != firstRoot {
return errMixedRoots
}
}
}
firstSidecarEpoch, currentEpoch := slots.ToEpoch(firstSidecar.Slot()), slots.ToEpoch(current)
if !params.WithinDAPeriod(firstSidecarEpoch, currentEpoch) {
return nil
}
key := cacheKey{slot: firstSidecar.Slot(), root: firstSidecar.BlockRoot()}
entry := s.cache.ensure(key)
for _, sidecar := range dataColumnSidecars {
if err := entry.stash(&sidecar); err != nil {
return errors.Wrap(err, "stash DataColumnSidecar")
}
}
return nil
}
// IsDataAvailable returns nil if all the commitments in the given block are persisted to the db and have been verified.
// DataColumnsSidecars already in the db are assumed to have been previously verified against the block.
func (s *LazilyPersistentStoreColumn) IsDataAvailable(ctx context.Context, currentSlot primitives.Slot, block blocks.ROBlock) error {
blockCommitments, err := s.fullCommitmentsToCheck(s.nodeID, block, currentSlot)
if err != nil {
return errors.Wrapf(err, "full commitments to check with block root `%#x` and current slot `%d`", block.Root(), currentSlot)
}
// Return early for blocks that do not have any commitments.
if blockCommitments.count() == 0 {
return nil
}
// Get the root of the block.
blockRoot := block.Root()
// Build the cache key for the block.
key := cacheKey{slot: block.Block().Slot(), root: blockRoot}
// Retrieve the cache entry for the block, or create an empty one if it doesn't exist.
entry := s.cache.ensure(key)
// Delete the cache entry for the block at the end.
defer s.cache.delete(key)
// Set the disk summary for the block in the cache entry.
entry.setDiskSummary(s.store.Summary(blockRoot))
// Verify we have all the expected sidecars, and fail fast if any are missing or inconsistent.
// We don't try to salvage problematic batches because this indicates a misbehaving peer and we'd rather
// ignore their response and decrease their peer score.
roDataColumns, err := entry.filter(blockRoot, blockCommitments)
if err != nil {
return errors.Wrap(err, "entry filter")
}
// https://github.com/ethereum/consensus-specs/blob/dev/specs/fulu/p2p-interface.md#datacolumnsidecarsbyrange-v1
verifier := s.newDataColumnsVerifier(roDataColumns, verification.ByRangeRequestDataColumnSidecarRequirements)
if err := verifier.ValidFields(); err != nil {
return errors.Wrap(err, "valid")
}
if err := verifier.SidecarInclusionProven(); err != nil {
return errors.Wrap(err, "sidecar inclusion proven")
}
if err := verifier.SidecarKzgProofVerified(); err != nil {
return errors.Wrap(err, "sidecar KZG proof verified")
}
verifiedRoDataColumns, err := verifier.VerifiedRODataColumns()
if err != nil {
return errors.Wrap(err, "verified RO data columns - should never happen")
}
if err := s.store.Save(verifiedRoDataColumns); err != nil {
return errors.Wrap(err, "save data column sidecars")
}
return nil
}
// fullCommitmentsToCheck returns the commitments to check for a given block.
func (s *LazilyPersistentStoreColumn) fullCommitmentsToCheck(nodeID enode.ID, block blocks.ROBlock, currentSlot primitives.Slot) (*safeCommitmentsArray, error) {
// Return early for blocks that are pre-Fulu.
if block.Version() < version.Fulu {
return &safeCommitmentsArray{}, nil
}
// Compute the block epoch.
blockSlot := block.Block().Slot()
blockEpoch := slots.ToEpoch(blockSlot)
// Compute the current spoch.
currentEpoch := slots.ToEpoch(currentSlot)
// Return early if the request is out of the MIN_EPOCHS_FOR_DATA_COLUMN_SIDECARS_REQUESTS window.
if !params.WithinDAPeriod(blockEpoch, currentEpoch) {
return &safeCommitmentsArray{}, nil
}
// Retrieve the KZG commitments for the block.
kzgCommitments, err := block.Block().Body().BlobKzgCommitments()
if err != nil {
return nil, errors.Wrap(err, "blob KZG commitments")
}
// Return early if there are no commitments in the block.
if len(kzgCommitments) == 0 {
return &safeCommitmentsArray{}, nil
}
// Retrieve the groups count.
custodyGroupCount := s.custodyInfo.ActualGroupCount()
// Retrieve peer info.
peerInfo, _, err := peerdas.Info(nodeID, custodyGroupCount)
if err != nil {
return nil, errors.Wrap(err, "peer info")
}
// Create a safe commitments array for the custody columns.
commitmentsArray := &safeCommitmentsArray{}
commitmentsArraySize := uint64(len(commitmentsArray))
for column := range peerInfo.CustodyColumns {
if column >= commitmentsArraySize {
return nil, errors.Errorf("custody column index %d too high (max allowed %d) - should never happen", column, commitmentsArraySize)
}
commitmentsArray[column] = kzgCommitments
}
return commitmentsArray, nil
}

View File

@@ -0,0 +1,303 @@
package das
import (
"context"
"testing"
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/peerdas"
"github.com/OffchainLabs/prysm/v6/beacon-chain/db/filesystem"
"github.com/OffchainLabs/prysm/v6/beacon-chain/verification"
"github.com/OffchainLabs/prysm/v6/cmd/beacon-chain/flags"
fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams"
"github.com/OffchainLabs/prysm/v6/config/params"
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
"github.com/OffchainLabs/prysm/v6/encoding/bytesutil"
"github.com/OffchainLabs/prysm/v6/testing/require"
"github.com/OffchainLabs/prysm/v6/testing/util"
"github.com/OffchainLabs/prysm/v6/time/slots"
"github.com/ethereum/go-ethereum/p2p/enode"
)
var commitments = [][]byte{
bytesutil.PadTo([]byte("a"), 48),
bytesutil.PadTo([]byte("b"), 48),
bytesutil.PadTo([]byte("c"), 48),
bytesutil.PadTo([]byte("d"), 48),
}
func TestPersist(t *testing.T) {
t.Run("no sidecars", func(t *testing.T) {
dataColumnStorage := filesystem.NewEphemeralDataColumnStorage(t)
lazilyPersistentStoreColumns := NewLazilyPersistentStoreColumn(dataColumnStorage, enode.ID{}, nil, &peerdas.CustodyInfo{})
err := lazilyPersistentStoreColumns.Persist(0)
require.NoError(t, err)
require.Equal(t, 0, len(lazilyPersistentStoreColumns.cache.entries))
})
t.Run("mixed roots", func(t *testing.T) {
dataColumnStorage := filesystem.NewEphemeralDataColumnStorage(t)
dataColumnParamsByBlockRoot := map[[fieldparams.RootLength]byte][]util.DataColumnParams{
{1}: {{ColumnIndex: 1}},
{2}: {{ColumnIndex: 2}},
}
roSidecars, _ := roSidecarsFromDataColumnParamsByBlockRoot(t, dataColumnParamsByBlockRoot)
lazilyPersistentStoreColumns := NewLazilyPersistentStoreColumn(dataColumnStorage, enode.ID{}, nil, &peerdas.CustodyInfo{})
err := lazilyPersistentStoreColumns.Persist(0, roSidecars...)
require.ErrorIs(t, err, errMixedRoots)
require.Equal(t, 0, len(lazilyPersistentStoreColumns.cache.entries))
})
t.Run("outside DA period", func(t *testing.T) {
dataColumnStorage := filesystem.NewEphemeralDataColumnStorage(t)
dataColumnParamsByBlockRoot := map[[fieldparams.RootLength]byte][]util.DataColumnParams{
{1}: {{ColumnIndex: 1}},
}
roSidecars, _ := roSidecarsFromDataColumnParamsByBlockRoot(t, dataColumnParamsByBlockRoot)
lazilyPersistentStoreColumns := NewLazilyPersistentStoreColumn(dataColumnStorage, enode.ID{}, nil, &peerdas.CustodyInfo{})
err := lazilyPersistentStoreColumns.Persist(1_000_000, roSidecars...)
require.NoError(t, err)
require.Equal(t, 0, len(lazilyPersistentStoreColumns.cache.entries))
})
t.Run("nominal", func(t *testing.T) {
dataColumnStorage := filesystem.NewEphemeralDataColumnStorage(t)
dataColumnParamsByBlockRoot := map[[fieldparams.RootLength]byte][]util.DataColumnParams{
{}: {{ColumnIndex: 1}, {ColumnIndex: 5}},
}
roSidecars, roDataColumns := roSidecarsFromDataColumnParamsByBlockRoot(t, dataColumnParamsByBlockRoot)
lazilyPersistentStoreColumns := NewLazilyPersistentStoreColumn(dataColumnStorage, enode.ID{}, nil, &peerdas.CustodyInfo{})
err := lazilyPersistentStoreColumns.Persist(0, roSidecars...)
require.NoError(t, err)
require.Equal(t, 1, len(lazilyPersistentStoreColumns.cache.entries))
key := cacheKey{slot: 0, root: [32]byte{}}
entry := lazilyPersistentStoreColumns.cache.entries[key]
// A call to Persist does NOT save the sidecars to disk.
require.Equal(t, uint64(0), entry.diskSummary.Count())
require.DeepSSZEqual(t, roDataColumns[0], *entry.scs[1])
require.DeepSSZEqual(t, roDataColumns[1], *entry.scs[5])
for i, roDataColumn := range entry.scs {
if map[int]bool{1: true, 5: true}[i] {
continue
}
require.IsNil(t, roDataColumn)
}
})
}
func TestIsDataAvailable(t *testing.T) {
newDataColumnsVerifier := func(dataColumnSidecars []blocks.RODataColumn, _ []verification.Requirement) verification.DataColumnsVerifier {
return &mockDataColumnsVerifier{t: t, dataColumnSidecars: dataColumnSidecars}
}
ctx := context.Background()
t.Run("without commitments", func(t *testing.T) {
signedBeaconBlockFulu := util.NewBeaconBlockFulu()
signedRoBlock := newSignedRoBlock(t, signedBeaconBlockFulu)
dataColumnStorage := filesystem.NewEphemeralDataColumnStorage(t)
lazilyPersistentStoreColumns := NewLazilyPersistentStoreColumn(dataColumnStorage, enode.ID{}, newDataColumnsVerifier, &peerdas.CustodyInfo{})
err := lazilyPersistentStoreColumns.IsDataAvailable(ctx, 0 /*current slot*/, signedRoBlock)
require.NoError(t, err)
})
t.Run("with commitments", func(t *testing.T) {
signedBeaconBlockFulu := util.NewBeaconBlockFulu()
signedBeaconBlockFulu.Block.Body.BlobKzgCommitments = commitments
signedRoBlock := newSignedRoBlock(t, signedBeaconBlockFulu)
root := signedRoBlock.Root()
dataColumnStorage := filesystem.NewEphemeralDataColumnStorage(t)
lazilyPersistentStoreColumns := NewLazilyPersistentStoreColumn(dataColumnStorage, enode.ID{}, newDataColumnsVerifier, &peerdas.CustodyInfo{})
indices := [...]uint64{1, 17, 87, 102}
dataColumnsParams := make([]util.DataColumnParams, 0, len(indices))
for _, index := range indices {
dataColumnParams := util.DataColumnParams{
ColumnIndex: index,
KzgCommitments: commitments,
}
dataColumnsParams = append(dataColumnsParams, dataColumnParams)
}
dataColumnsParamsByBlockRoot := util.DataColumnsParamsByRoot{root: dataColumnsParams}
_, verifiedRoDataColumns := util.CreateTestVerifiedRoDataColumnSidecars(t, dataColumnsParamsByBlockRoot)
key := cacheKey{root: root}
entry := lazilyPersistentStoreColumns.cache.ensure(key)
defer lazilyPersistentStoreColumns.cache.delete(key)
for _, verifiedRoDataColumn := range verifiedRoDataColumns {
err := entry.stash(&verifiedRoDataColumn.RODataColumn)
require.NoError(t, err)
}
err := lazilyPersistentStoreColumns.IsDataAvailable(ctx, 0 /*current slot*/, signedRoBlock)
require.NoError(t, err)
actual, err := dataColumnStorage.Get(root, indices[:])
require.NoError(t, err)
summary := dataColumnStorage.Summary(root)
require.Equal(t, uint64(len(indices)), summary.Count())
require.DeepSSZEqual(t, verifiedRoDataColumns, actual)
})
}
func TestFullCommitmentsToCheck(t *testing.T) {
windowSlots, err := slots.EpochEnd(params.BeaconConfig().MinEpochsForDataColumnSidecarsRequest)
require.NoError(t, err)
testCases := []struct {
name string
commitments [][]byte
block func(*testing.T) blocks.ROBlock
slot primitives.Slot
}{
{
name: "Pre-Fulu block",
block: func(t *testing.T) blocks.ROBlock {
return newSignedRoBlock(t, util.NewBeaconBlockElectra())
},
},
{
name: "Commitments outside data availability window",
block: func(t *testing.T) blocks.ROBlock {
beaconBlockElectra := util.NewBeaconBlockElectra()
// Block is from slot 0, "current slot" is window size +1 (so outside the window)
beaconBlockElectra.Block.Body.BlobKzgCommitments = commitments
return newSignedRoBlock(t, beaconBlockElectra)
},
slot: windowSlots + 1,
},
{
name: "Commitments within data availability window",
block: func(t *testing.T) blocks.ROBlock {
signedBeaconBlockFulu := util.NewBeaconBlockFulu()
signedBeaconBlockFulu.Block.Body.BlobKzgCommitments = commitments
signedBeaconBlockFulu.Block.Slot = 100
return newSignedRoBlock(t, signedBeaconBlockFulu)
},
commitments: commitments,
slot: 100,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
resetFlags := flags.Get()
gFlags := new(flags.GlobalFlags)
gFlags.SubscribeAllDataSubnets = true
flags.Init(gFlags)
defer flags.Init(resetFlags)
b := tc.block(t)
s := NewLazilyPersistentStoreColumn(nil, enode.ID{}, nil, &peerdas.CustodyInfo{})
commitmentsArray, err := s.fullCommitmentsToCheck(enode.ID{}, b, tc.slot)
require.NoError(t, err)
for _, commitments := range commitmentsArray {
require.DeepEqual(t, tc.commitments, commitments)
}
})
}
}
func roSidecarsFromDataColumnParamsByBlockRoot(t *testing.T, dataColumnParamsByBlockRoot util.DataColumnsParamsByRoot) ([]blocks.ROSidecar, []blocks.RODataColumn) {
roDataColumns, _ := util.CreateTestVerifiedRoDataColumnSidecars(t, dataColumnParamsByBlockRoot)
roSidecars := make([]blocks.ROSidecar, 0, len(roDataColumns))
for _, roDataColumn := range roDataColumns {
roSidecars = append(roSidecars, blocks.NewSidecarFromDataColumnSidecar(roDataColumn))
}
return roSidecars, roDataColumns
}
func newSignedRoBlock(t *testing.T, signedBeaconBlock interface{}) blocks.ROBlock {
sb, err := blocks.NewSignedBeaconBlock(signedBeaconBlock)
require.NoError(t, err)
rb, err := blocks.NewROBlock(sb)
require.NoError(t, err)
return rb
}
type mockDataColumnsVerifier struct {
t *testing.T
dataColumnSidecars []blocks.RODataColumn
validCalled, SidecarInclusionProvenCalled, SidecarKzgProofVerifiedCalled bool
}
var _ verification.DataColumnsVerifier = &mockDataColumnsVerifier{}
func (m *mockDataColumnsVerifier) VerifiedRODataColumns() ([]blocks.VerifiedRODataColumn, error) {
require.Equal(m.t, true, m.validCalled && m.SidecarInclusionProvenCalled && m.SidecarKzgProofVerifiedCalled)
verifiedDataColumnSidecars := make([]blocks.VerifiedRODataColumn, 0, len(m.dataColumnSidecars))
for _, dataColumnSidecar := range m.dataColumnSidecars {
verifiedDataColumnSidecar := blocks.NewVerifiedRODataColumn(dataColumnSidecar)
verifiedDataColumnSidecars = append(verifiedDataColumnSidecars, verifiedDataColumnSidecar)
}
return verifiedDataColumnSidecars, nil
}
func (m *mockDataColumnsVerifier) SatisfyRequirement(verification.Requirement) {}
func (m *mockDataColumnsVerifier) ValidFields() error {
m.validCalled = true
return nil
}
func (m *mockDataColumnsVerifier) CorrectSubnet(dataColumnSidecarSubTopic string, expectedTopics []string) error {
return nil
}
func (m *mockDataColumnsVerifier) NotFromFutureSlot() error { return nil }
func (m *mockDataColumnsVerifier) SlotAboveFinalized() error { return nil }
func (m *mockDataColumnsVerifier) ValidProposerSignature(ctx context.Context) error { return nil }
func (m *mockDataColumnsVerifier) SidecarParentSeen(parentSeen func([fieldparams.RootLength]byte) bool) error {
return nil
}
func (m *mockDataColumnsVerifier) SidecarParentValid(badParent func([fieldparams.RootLength]byte) bool) error {
return nil
}
func (m *mockDataColumnsVerifier) SidecarParentSlotLower() error { return nil }
func (m *mockDataColumnsVerifier) SidecarDescendsFromFinalized() error { return nil }
func (m *mockDataColumnsVerifier) SidecarInclusionProven() error {
m.SidecarInclusionProvenCalled = true
return nil
}
func (m *mockDataColumnsVerifier) SidecarKzgProofVerified() error {
m.SidecarKzgProofVerifiedCalled = true
return nil
}
func (m *mockDataColumnsVerifier) SidecarProposerExpected(ctx context.Context) error { return nil }

View File

@@ -4,33 +4,29 @@ import (
"bytes" "bytes"
"github.com/OffchainLabs/prysm/v6/beacon-chain/db/filesystem" "github.com/OffchainLabs/prysm/v6/beacon-chain/db/filesystem"
fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams"
"github.com/OffchainLabs/prysm/v6/config/params" "github.com/OffchainLabs/prysm/v6/config/params"
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks" "github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives" "github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
"github.com/pkg/errors" "github.com/pkg/errors"
) )
var ( var errIndexOutOfBounds = errors.New("sidecar.index > MAX_BLOBS_PER_BLOCK")
ErrDuplicateSidecar = errors.New("duplicate sidecar stashed in AvailabilityStore")
errIndexOutOfBounds = errors.New("sidecar.index > MAX_BLOBS_PER_BLOCK")
errCommitmentMismatch = errors.New("KzgCommitment of sidecar in cache did not match block commitment")
errMissingSidecar = errors.New("no sidecar in cache for block commitment")
)
// cacheKey includes the slot so that we can easily iterate through the cache and compare // cacheKey includes the slot so that we can easily iterate through the cache and compare
// slots for eviction purposes. Whether the input is the block or the sidecar, we always have // slots for eviction purposes. Whether the input is the block or the sidecar, we always have
// the root+slot when interacting with the cache, so it isn't an inconvenience to use both. // the root+slot when interacting with the cache, so it isn't an inconvenience to use both.
type cacheKey struct { type cacheKey struct {
slot primitives.Slot slot primitives.Slot
root [32]byte root [fieldparams.RootLength]byte
} }
type cache struct { type blobCache struct {
entries map[cacheKey]*cacheEntry entries map[cacheKey]*blobCacheEntry
} }
func newCache() *cache { func newBlobCache() *blobCache {
return &cache{entries: make(map[cacheKey]*cacheEntry)} return &blobCache{entries: make(map[cacheKey]*blobCacheEntry)}
} }
// keyFromSidecar is a convenience method for constructing a cacheKey from a BlobSidecar value. // keyFromSidecar is a convenience method for constructing a cacheKey from a BlobSidecar value.
@@ -44,34 +40,34 @@ func keyFromBlock(b blocks.ROBlock) cacheKey {
} }
// ensure returns the entry for the given key, creating it if it isn't already present. // ensure returns the entry for the given key, creating it if it isn't already present.
func (c *cache) ensure(key cacheKey) *cacheEntry { func (c *blobCache) ensure(key cacheKey) *blobCacheEntry {
e, ok := c.entries[key] e, ok := c.entries[key]
if !ok { if !ok {
e = &cacheEntry{} e = &blobCacheEntry{}
c.entries[key] = e c.entries[key] = e
} }
return e return e
} }
// delete removes the cache entry from the cache. // delete removes the cache entry from the cache.
func (c *cache) delete(key cacheKey) { func (c *blobCache) delete(key cacheKey) {
delete(c.entries, key) delete(c.entries, key)
} }
// cacheEntry holds a fixed-length cache of BlobSidecars. // blobCacheEntry holds a fixed-length cache of BlobSidecars.
type cacheEntry struct { type blobCacheEntry struct {
scs []*blocks.ROBlob scs []*blocks.ROBlob
diskSummary filesystem.BlobStorageSummary diskSummary filesystem.BlobStorageSummary
} }
func (e *cacheEntry) setDiskSummary(sum filesystem.BlobStorageSummary) { func (e *blobCacheEntry) setDiskSummary(sum filesystem.BlobStorageSummary) {
e.diskSummary = sum e.diskSummary = sum
} }
// stash adds an item to the in-memory cache of BlobSidecars. // stash adds an item to the in-memory cache of BlobSidecars.
// Only the first BlobSidecar of a given Index will be kept in the cache. // Only the first BlobSidecar of a given Index will be kept in the cache.
// stash will return an error if the given blob is already in the cache, or if the Index is out of bounds. // stash will return an error if the given blob is already in the cache, or if the Index is out of bounds.
func (e *cacheEntry) stash(sc *blocks.ROBlob) error { func (e *blobCacheEntry) stash(sc *blocks.ROBlob) error {
maxBlobsPerBlock := params.BeaconConfig().MaxBlobsPerBlock(sc.Slot()) maxBlobsPerBlock := params.BeaconConfig().MaxBlobsPerBlock(sc.Slot())
if sc.Index >= uint64(maxBlobsPerBlock) { if sc.Index >= uint64(maxBlobsPerBlock) {
return errors.Wrapf(errIndexOutOfBounds, "index=%d", sc.Index) return errors.Wrapf(errIndexOutOfBounds, "index=%d", sc.Index)
@@ -92,7 +88,7 @@ func (e *cacheEntry) stash(sc *blocks.ROBlob) error {
// commitments were found in the cache and the sidecar slice return value can be used // commitments were found in the cache and the sidecar slice return value can be used
// to perform a DA check against the cached sidecars. // to perform a DA check against the cached sidecars.
// filter only returns blobs that need to be checked. Blobs already available on disk will be excluded. // filter only returns blobs that need to be checked. Blobs already available on disk will be excluded.
func (e *cacheEntry) filter(root [32]byte, kc [][]byte, slot primitives.Slot) ([]blocks.ROBlob, error) { func (e *blobCacheEntry) filter(root [32]byte, kc [][]byte, slot primitives.Slot) ([]blocks.ROBlob, error) {
count := len(kc) count := len(kc)
if e.diskSummary.AllAvailable(count) { if e.diskSummary.AllAvailable(count) {
return nil, nil return nil, nil

View File

@@ -14,7 +14,7 @@ import (
) )
func TestCacheEnsureDelete(t *testing.T) { func TestCacheEnsureDelete(t *testing.T) {
c := newCache() c := newBlobCache()
require.Equal(t, 0, len(c.entries)) require.Equal(t, 0, len(c.entries))
root := bytesutil.ToBytes32([]byte("root")) root := bytesutil.ToBytes32([]byte("root"))
slot := primitives.Slot(1234) slot := primitives.Slot(1234)
@@ -25,18 +25,18 @@ func TestCacheEnsureDelete(t *testing.T) {
c.delete(k) c.delete(k)
require.Equal(t, 0, len(c.entries)) require.Equal(t, 0, len(c.entries))
var nilEntry *cacheEntry var nilEntry *blobCacheEntry
require.Equal(t, nilEntry, c.entries[k]) require.Equal(t, nilEntry, c.entries[k])
} }
type filterTestCaseSetupFunc func(t *testing.T) (*cacheEntry, [][]byte, []blocks.ROBlob) type filterTestCaseSetupFunc func(t *testing.T) (*blobCacheEntry, [][]byte, []blocks.ROBlob)
func filterTestCaseSetup(slot primitives.Slot, nBlobs int, onDisk []int, numExpected int) filterTestCaseSetupFunc { func filterTestCaseSetup(slot primitives.Slot, nBlobs int, onDisk []int, numExpected int) filterTestCaseSetupFunc {
return func(t *testing.T) (*cacheEntry, [][]byte, []blocks.ROBlob) { return func(t *testing.T) (*blobCacheEntry, [][]byte, []blocks.ROBlob) {
blk, blobs := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, slot, nBlobs) blk, blobs := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, slot, nBlobs)
commits, err := commitmentsToCheck(blk, blk.Block().Slot()) commits, err := commitmentsToCheck(blk, blk.Block().Slot())
require.NoError(t, err) require.NoError(t, err)
entry := &cacheEntry{} entry := &blobCacheEntry{}
if len(onDisk) > 0 { if len(onDisk) > 0 {
od := map[[32]byte][]int{blk.Root(): onDisk} od := map[[32]byte][]int{blk.Root(): onDisk}
sumz := filesystem.NewMockBlobStorageSummarizer(t, od) sumz := filesystem.NewMockBlobStorageSummarizer(t, od)
@@ -125,12 +125,12 @@ func TestFilter(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
cases := []struct { cases := []struct {
name string name string
setup func(t *testing.T) (*cacheEntry, [][]byte, []blocks.ROBlob) setup func(t *testing.T) (*blobCacheEntry, [][]byte, []blocks.ROBlob)
err error err error
}{ }{
{ {
name: "commitments mismatch - extra sidecar", name: "commitments mismatch - extra sidecar",
setup: func(t *testing.T) (*cacheEntry, [][]byte, []blocks.ROBlob) { setup: func(t *testing.T) (*blobCacheEntry, [][]byte, []blocks.ROBlob) {
entry, commits, expected := filterTestCaseSetup(denebSlot, 6, []int{0, 1}, 4)(t) entry, commits, expected := filterTestCaseSetup(denebSlot, 6, []int{0, 1}, 4)(t)
commits[5] = nil commits[5] = nil
return entry, commits, expected return entry, commits, expected
@@ -139,7 +139,7 @@ func TestFilter(t *testing.T) {
}, },
{ {
name: "sidecar missing", name: "sidecar missing",
setup: func(t *testing.T) (*cacheEntry, [][]byte, []blocks.ROBlob) { setup: func(t *testing.T) (*blobCacheEntry, [][]byte, []blocks.ROBlob) {
entry, commits, expected := filterTestCaseSetup(denebSlot, 6, []int{0, 1}, 4)(t) entry, commits, expected := filterTestCaseSetup(denebSlot, 6, []int{0, 1}, 4)(t)
entry.scs[5] = nil entry.scs[5] = nil
return entry, commits, expected return entry, commits, expected
@@ -148,7 +148,7 @@ func TestFilter(t *testing.T) {
}, },
{ {
name: "commitments mismatch - different bytes", name: "commitments mismatch - different bytes",
setup: func(t *testing.T) (*cacheEntry, [][]byte, []blocks.ROBlob) { setup: func(t *testing.T) (*blobCacheEntry, [][]byte, []blocks.ROBlob) {
entry, commits, expected := filterTestCaseSetup(denebSlot, 6, []int{0, 1}, 4)(t) entry, commits, expected := filterTestCaseSetup(denebSlot, 6, []int{0, 1}, 4)(t)
entry.scs[5].KzgCommitment = []byte("nope") entry.scs[5].KzgCommitment = []byte("nope")
return entry, commits, expected return entry, commits, expected

View File

@@ -0,0 +1,131 @@
package das
import (
"bytes"
"slices"
"github.com/OffchainLabs/prysm/v6/beacon-chain/db/filesystem"
fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams"
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
"github.com/pkg/errors"
)
var (
ErrDuplicateSidecar = errors.New("duplicate sidecar stashed in AvailabilityStore")
errColumnIndexTooHigh = errors.New("column index too high")
errCommitmentMismatch = errors.New("KzgCommitment of sidecar in cache did not match block commitment")
errMissingSidecar = errors.New("no sidecar in cache for block commitment")
)
type dataColumnCache struct {
entries map[cacheKey]*dataColumnCacheEntry
}
func newDataColumnCache() *dataColumnCache {
return &dataColumnCache{entries: make(map[cacheKey]*dataColumnCacheEntry)}
}
// ensure returns the entry for the given key, creating it if it isn't already present.
func (c *dataColumnCache) ensure(key cacheKey) *dataColumnCacheEntry {
entry, ok := c.entries[key]
if !ok {
entry = &dataColumnCacheEntry{}
c.entries[key] = entry
}
return entry
}
// delete removes the cache entry from the cache.
func (c *dataColumnCache) delete(key cacheKey) {
delete(c.entries, key)
}
// dataColumnCacheEntry holds a fixed-length cache of BlobSidecars.
type dataColumnCacheEntry struct {
scs [fieldparams.NumberOfColumns]*blocks.RODataColumn
diskSummary filesystem.DataColumnStorageSummary
}
func (e *dataColumnCacheEntry) setDiskSummary(sum filesystem.DataColumnStorageSummary) {
e.diskSummary = sum
}
// stash adds an item to the in-memory cache of DataColumnSidecars.
// Only the first DataColumnSidecar of a given Index will be kept in the cache.
// stash will return an error if the given data colunn is already in the cache, or if the Index is out of bounds.
func (e *dataColumnCacheEntry) stash(sc *blocks.RODataColumn) error {
if sc.Index >= fieldparams.NumberOfColumns {
return errors.Wrapf(errColumnIndexTooHigh, "index=%d", sc.Index)
}
if e.scs[sc.Index] != nil {
return errors.Wrapf(ErrDuplicateSidecar, "root=%#x, index=%d, commitment=%#x", sc.BlockRoot(), sc.Index, sc.KzgCommitments)
}
e.scs[sc.Index] = sc
return nil
}
func (e *dataColumnCacheEntry) filter(root [32]byte, commitmentsArray *safeCommitmentsArray) ([]blocks.RODataColumn, error) {
nonEmptyIndices := commitmentsArray.nonEmptyIndices()
if e.diskSummary.AllAvailable(nonEmptyIndices) {
return nil, nil
}
commitmentsCount := commitmentsArray.count()
sidecars := make([]blocks.RODataColumn, 0, commitmentsCount)
for i := range nonEmptyIndices {
if e.diskSummary.HasIndex(i) {
continue
}
if e.scs[i] == nil {
return nil, errors.Wrapf(errMissingSidecar, "root=%#x, index=%#x", root, i)
}
if !sliceBytesEqual(commitmentsArray[i], e.scs[i].KzgCommitments) {
return nil, errors.Wrapf(errCommitmentMismatch, "root=%#x, index=%#x, commitment=%#x, block commitment=%#x", root, i, e.scs[i].KzgCommitments, commitmentsArray[i])
}
sidecars = append(sidecars, *e.scs[i])
}
return sidecars, nil
}
// safeCommitmentsArray is a fixed size array of commitments.
// This is helpful for avoiding gratuitous bounds checks.
type safeCommitmentsArray [fieldparams.NumberOfColumns][][]byte
// count returns the number of commitments in the array.
func (s *safeCommitmentsArray) count() int {
count := 0
for i := range s {
if s[i] != nil {
count++
}
}
return count
}
// nonEmptyIndices returns a map of indices that are non-nil in the array.
func (s *safeCommitmentsArray) nonEmptyIndices() map[uint64]bool {
columns := make(map[uint64]bool)
for i := range s {
if s[i] != nil {
columns[uint64(i)] = true
}
}
return columns
}
func sliceBytesEqual(a, b [][]byte) bool {
return slices.EqualFunc(a, b, bytes.Equal)
}

View File

@@ -0,0 +1,144 @@
package das
import (
"testing"
"github.com/OffchainLabs/prysm/v6/beacon-chain/db/filesystem"
fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams"
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v6/testing/require"
"github.com/OffchainLabs/prysm/v6/testing/util"
)
func TestEnsureDeleteSetDiskSummary(t *testing.T) {
c := newDataColumnCache()
key := cacheKey{}
entry := c.ensure(key)
require.DeepEqual(t, dataColumnCacheEntry{}, *entry)
diskSummary := filesystem.NewDataColumnStorageSummary(42, [fieldparams.NumberOfColumns]bool{true})
entry.setDiskSummary(diskSummary)
entry = c.ensure(key)
require.DeepEqual(t, dataColumnCacheEntry{diskSummary: diskSummary}, *entry)
c.delete(key)
entry = c.ensure(key)
require.DeepEqual(t, dataColumnCacheEntry{}, *entry)
}
func TestStash(t *testing.T) {
t.Run("Index too high", func(t *testing.T) {
dataColumnParamsByBlockRoot := util.DataColumnsParamsByRoot{{1}: {{ColumnIndex: 10_000}}}
roDataColumns, _ := util.CreateTestVerifiedRoDataColumnSidecars(t, dataColumnParamsByBlockRoot)
var entry dataColumnCacheEntry
err := entry.stash(&roDataColumns[0])
require.NotNil(t, err)
})
t.Run("Nominal and already existing", func(t *testing.T) {
dataColumnParamsByBlockRoot := util.DataColumnsParamsByRoot{{1}: {{ColumnIndex: 1}}}
roDataColumns, _ := util.CreateTestVerifiedRoDataColumnSidecars(t, dataColumnParamsByBlockRoot)
var entry dataColumnCacheEntry
err := entry.stash(&roDataColumns[0])
require.NoError(t, err)
require.DeepEqual(t, roDataColumns[0], entry.scs[1])
err = entry.stash(&roDataColumns[0])
require.NotNil(t, err)
})
}
func TestFilterDataColumns(t *testing.T) {
t.Run("All available", func(t *testing.T) {
commitmentsArray := safeCommitmentsArray{nil, [][]byte{[]byte{1}}, nil, [][]byte{[]byte{3}}}
diskSummary := filesystem.NewDataColumnStorageSummary(42, [fieldparams.NumberOfColumns]bool{false, true, false, true})
dataColumnCacheEntry := dataColumnCacheEntry{diskSummary: diskSummary}
actual, err := dataColumnCacheEntry.filter([fieldparams.RootLength]byte{}, &commitmentsArray)
require.NoError(t, err)
require.IsNil(t, actual)
})
t.Run("Some scs missing", func(t *testing.T) {
commitmentsArray := safeCommitmentsArray{nil, [][]byte{[]byte{1}}}
diskSummary := filesystem.NewDataColumnStorageSummary(42, [fieldparams.NumberOfColumns]bool{})
dataColumnCacheEntry := dataColumnCacheEntry{diskSummary: diskSummary}
_, err := dataColumnCacheEntry.filter([fieldparams.RootLength]byte{}, &commitmentsArray)
require.NotNil(t, err)
})
t.Run("Commitments not equal", func(t *testing.T) {
root := [fieldparams.RootLength]byte{}
commitmentsArray := safeCommitmentsArray{nil, [][]byte{[]byte{1}}}
dataColumnParamsByBlockRoot := util.DataColumnsParamsByRoot{root: {{ColumnIndex: 1}}}
roDataColumns, _ := util.CreateTestVerifiedRoDataColumnSidecars(t, dataColumnParamsByBlockRoot)
var scs [fieldparams.NumberOfColumns]*blocks.RODataColumn
scs[1] = &roDataColumns[0]
dataColumnCacheEntry := dataColumnCacheEntry{scs: scs}
_, err := dataColumnCacheEntry.filter(root, &commitmentsArray)
require.NotNil(t, err)
})
t.Run("Nominal", func(t *testing.T) {
root := [fieldparams.RootLength]byte{}
commitmentsArray := safeCommitmentsArray{nil, [][]byte{[]byte{1}}, nil, [][]byte{[]byte{3}}}
diskSummary := filesystem.NewDataColumnStorageSummary(42, [fieldparams.NumberOfColumns]bool{false, true})
dataColumnParamsByBlockRoot := util.DataColumnsParamsByRoot{root: {{ColumnIndex: 3, KzgCommitments: [][]byte{[]byte{3}}}}}
expected, _ := util.CreateTestVerifiedRoDataColumnSidecars(t, dataColumnParamsByBlockRoot)
var scs [fieldparams.NumberOfColumns]*blocks.RODataColumn
scs[3] = &expected[0]
dataColumnCacheEntry := dataColumnCacheEntry{scs: scs, diskSummary: diskSummary}
actual, err := dataColumnCacheEntry.filter(root, &commitmentsArray)
require.NoError(t, err)
require.DeepEqual(t, expected, actual)
})
}
func TestCount(t *testing.T) {
s := safeCommitmentsArray{nil, [][]byte{[]byte{1}}, nil, [][]byte{[]byte{3}}}
require.Equal(t, 2, s.count())
}
func TestNonEmptyIndices(t *testing.T) {
s := safeCommitmentsArray{nil, [][]byte{[]byte{10}}, nil, [][]byte{[]byte{20}}}
actual := s.nonEmptyIndices()
require.DeepEqual(t, map[uint64]bool{1: true, 3: true}, actual)
}
func TestSliceBytesEqual(t *testing.T) {
t.Run("Different lengths", func(t *testing.T) {
a := [][]byte{[]byte{1, 2, 3}}
b := [][]byte{[]byte{1, 2, 3}, []byte{4, 5, 6}}
require.Equal(t, false, sliceBytesEqual(a, b))
})
t.Run("Same length but different content", func(t *testing.T) {
a := [][]byte{[]byte{1, 2, 3}, []byte{4, 5, 6}}
b := [][]byte{[]byte{1, 2, 3}, []byte{4, 5, 7}}
require.Equal(t, false, sliceBytesEqual(a, b))
})
t.Run("Equal slices", func(t *testing.T) {
a := [][]byte{[]byte{1, 2, 3}, []byte{4, 5, 6}}
b := [][]byte{[]byte{1, 2, 3}, []byte{4, 5, 6}}
require.Equal(t, true, sliceBytesEqual(a, b))
})
}

View File

@@ -15,5 +15,5 @@ import (
// durably persisted before returning a non-error value. // durably persisted before returning a non-error value.
type AvailabilityStore interface { type AvailabilityStore interface {
IsDataAvailable(ctx context.Context, current primitives.Slot, b blocks.ROBlock) error IsDataAvailable(ctx context.Context, current primitives.Slot, b blocks.ROBlock) error
Persist(current primitives.Slot, sc ...blocks.ROBlob) error Persist(current primitives.Slot, sc ...blocks.ROSidecar) error
} }

View File

@@ -5,6 +5,7 @@ import (
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks" "github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives" "github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
errors "github.com/pkg/errors"
) )
// MockAvailabilityStore is an implementation of AvailabilityStore that can be used by other packages in tests. // MockAvailabilityStore is an implementation of AvailabilityStore that can be used by other packages in tests.
@@ -24,9 +25,13 @@ func (m *MockAvailabilityStore) IsDataAvailable(ctx context.Context, current pri
} }
// Persist satisfies the corresponding method of the AvailabilityStore interface in a way that is useful for tests. // Persist satisfies the corresponding method of the AvailabilityStore interface in a way that is useful for tests.
func (m *MockAvailabilityStore) Persist(current primitives.Slot, sc ...blocks.ROBlob) error { func (m *MockAvailabilityStore) Persist(current primitives.Slot, sc ...blocks.ROSidecar) error {
blobSidecars, err := blocks.BlobSidecarsFromSidecars(sc)
if err != nil {
return errors.Wrap(err, "blob sidecars from sidecars")
}
if m.PersistBlobsCallback != nil { if m.PersistBlobsCallback != nil {
return m.PersistBlobsCallback(current, sc...) return m.PersistBlobsCallback(current, blobSidecars...)
} }
return nil return nil
} }

View File

@@ -90,7 +90,10 @@ func (bs *blobSync) validateNext(rb blocks.ROBlob) error {
if err := v.SidecarKzgProofVerified(); err != nil { if err := v.SidecarKzgProofVerified(); err != nil {
return err return err
} }
if err := bs.store.Persist(bs.current, rb); err != nil {
sc := blocks.NewSidecarFromBlobSidecar(rb)
if err := bs.store.Persist(bs.current, sc); err != nil {
return err return err
} }

View File

@@ -166,7 +166,8 @@ func (s *Service) processFetchedDataRegSync(ctx context.Context, data *blocksQue
"firstUnprocessed": bwb[0].Block.Block().Slot(), "firstUnprocessed": bwb[0].Block.Block().Slot(),
} }
for i, b := range bwb { for i, b := range bwb {
if err := avs.Persist(s.clock.CurrentSlot(), b.Blobs...); err != nil { sidecars := blocks.NewSidecarsFromBlobSidecars(b.Blobs)
if err := avs.Persist(s.clock.CurrentSlot(), sidecars...); err != nil {
log.WithError(err).WithFields(batchFields).WithFields(syncFields(b.Block)).Warn("Batch failure due to BlobSidecar issues") log.WithError(err).WithFields(batchFields).WithFields(syncFields(b.Block)).Warn("Batch failure due to BlobSidecar issues")
return uint64(i), err return uint64(i), err
} }
@@ -324,7 +325,10 @@ func (s *Service) processBatchedBlocks(ctx context.Context, bwb []blocks.BlockWi
if len(bb.Blobs) == 0 { if len(bb.Blobs) == 0 {
continue continue
} }
if err := avs.Persist(s.clock.CurrentSlot(), bb.Blobs...); err != nil {
sidecars := blocks.NewSidecarsFromBlobSidecars(bb.Blobs)
if err := avs.Persist(s.clock.CurrentSlot(), sidecars...); err != nil {
return 0, err return 0, err
} }
} }

View File

@@ -331,16 +331,17 @@ func (s *Service) fetchOriginBlobs(pids []peer.ID) error {
} }
shufflePeers(pids) shufflePeers(pids)
for i := range pids { for i := range pids {
sidecars, err := sync.SendBlobSidecarByRoot(s.ctx, s.clock, s.cfg.P2P, pids[i], s.ctxMap, &req, rob.Block().Slot()) blobSidecars, err := sync.SendBlobSidecarByRoot(s.ctx, s.clock, s.cfg.P2P, pids[i], s.ctxMap, &req, rob.Block().Slot())
if err != nil { if err != nil {
continue continue
} }
if len(sidecars) != len(req) { if len(blobSidecars) != len(req) {
continue continue
} }
bv := verification.NewBlobBatchVerifier(s.newBlobVerifier, verification.InitsyncBlobSidecarRequirements) bv := verification.NewBlobBatchVerifier(s.newBlobVerifier, verification.InitsyncBlobSidecarRequirements)
avs := das.NewLazilyPersistentStore(s.cfg.BlobStorage, bv) avs := das.NewLazilyPersistentStore(s.cfg.BlobStorage, bv)
current := s.clock.CurrentSlot() current := s.clock.CurrentSlot()
sidecars := blocks.NewSidecarsFromBlobSidecars(blobSidecars)
if err := avs.Persist(current, sidecars...); err != nil { if err := avs.Persist(current, sidecars...); err != nil {
return err return err
} }
@@ -348,7 +349,7 @@ func (s *Service) fetchOriginBlobs(pids []peer.ID) error {
log.WithField("root", fmt.Sprintf("%#x", r)).WithField("peerID", pids[i]).Warn("Blobs from peer for origin block were unusable") log.WithField("root", fmt.Sprintf("%#x", r)).WithField("peerID", pids[i]).Warn("Blobs from peer for origin block were unusable")
continue continue
} }
log.WithField("nBlobs", len(sidecars)).WithField("root", fmt.Sprintf("%#x", r)).Info("Successfully downloaded blobs for checkpoint sync block") log.WithField("nBlobs", len(blobSidecars)).WithField("root", fmt.Sprintf("%#x", r)).Info("Successfully downloaded blobs for checkpoint sync block")
return nil return nil
} }
return fmt.Errorf("no connected peer able to provide blobs for checkpoint sync block %#x", r) return fmt.Errorf("no connected peer able to provide blobs for checkpoint sync block %#x", r)

View File

@@ -38,6 +38,15 @@ var (
RequireSidecarProposerExpected, RequireSidecarProposerExpected,
} }
// ByRangeRequestDataColumnSidecarRequirements defines the set of requirements that DataColumnSidecars received
// via the by range request must satisfy in order to upgrade an RODataColumn to a VerifiedRODataColumn.
// https://github.com/ethereum/consensus-specs/blob/dev/specs/fulu/p2p-interface.md#datacolumnsidecarsbyrange-v1
ByRangeRequestDataColumnSidecarRequirements = []Requirement{
RequireValidFields,
RequireSidecarInclusionProven,
RequireSidecarKzgProofVerified,
}
errColumnsInvalid = errors.New("data columns failed verification") errColumnsInvalid = errors.New("data columns failed verification")
errBadTopicLength = errors.New("topic length is invalid") errBadTopicLength = errors.New("topic length is invalid")
errBadTopic = errors.New("topic is not of the one expected") errBadTopic = errors.New("topic is not of the one expected")

View File

@@ -0,0 +1,2 @@
### Added
- PeerDAS: Implement DAS.

View File

@@ -13,6 +13,7 @@ go_library(
"roblob.go", "roblob.go",
"roblock.go", "roblock.go",
"rodatacolumn.go", "rodatacolumn.go",
"rosidecar.go",
"setters.go", "setters.go",
"types.go", "types.go",
], ],
@@ -53,6 +54,7 @@ go_test(
"roblob_test.go", "roblob_test.go",
"roblock_test.go", "roblock_test.go",
"rodatacolumn_test.go", "rodatacolumn_test.go",
"rosidecar_test.go",
], ],
embed = [":go_default_library"], embed = [":go_default_library"],
deps = [ deps = [
@@ -72,5 +74,6 @@ go_test(
"@com_github_prysmaticlabs_fastssz//:go_default_library", "@com_github_prysmaticlabs_fastssz//:go_default_library",
"@com_github_prysmaticlabs_go_bitfield//:go_default_library", "@com_github_prysmaticlabs_go_bitfield//:go_default_library",
"@com_github_prysmaticlabs_gohashtree//:go_default_library", "@com_github_prysmaticlabs_gohashtree//:go_default_library",
"@com_github_stretchr_testify//require:go_default_library",
], ],
) )

View File

@@ -0,0 +1,96 @@
package blocks
import (
"github.com/pkg/errors"
)
// ROSidecar represents a read-only sidecar with its block root.
type ROSidecar struct {
blob *ROBlob
dataColumn *RODataColumn
}
var (
errBlobNeeded = errors.New("blob sidecar needed")
errDataColumnNeeded = errors.New("data column sidecar needed")
)
// NewSidecarFromBlobSidecar creates a new read-only (generic) sidecar from a read-only blob sidecar.
func NewSidecarFromBlobSidecar(blob ROBlob) ROSidecar {
return ROSidecar{blob: &blob}
}
// NewSidecarFromDataColumnSidecar creates a new read-only (generic) sidecar from a read-only data column sidecar.
func NewSidecarFromDataColumnSidecar(dataColumn RODataColumn) ROSidecar {
return ROSidecar{dataColumn: &dataColumn}
}
// NewSidecarsFromBlobSidecars creates a new slice of read-only (generic) sidecars from a slice of read-only blobs sidecars.
func NewSidecarsFromBlobSidecars(blobSidecars []ROBlob) []ROSidecar {
sidecars := make([]ROSidecar, 0, len(blobSidecars))
for _, blob := range blobSidecars {
blobSidecar := ROSidecar{blob: &blob} // #nosec G601
sidecars = append(sidecars, blobSidecar)
}
return sidecars
}
// NewSidecarsFromDataColumnSidecars creates a new slice of read-only (generic) sidecars from a slice of read-only data column sidecars.
func NewSidecarsFromDataColumnSidecars(dataColumnSidecars []RODataColumn) []ROSidecar {
sidecars := make([]ROSidecar, 0, len(dataColumnSidecars))
for _, dataColumn := range dataColumnSidecars {
dataColumnSidecar := ROSidecar{dataColumn: &dataColumn} // #nosec G601
sidecars = append(sidecars, dataColumnSidecar)
}
return sidecars
}
// Blob returns the blob sidecar.
func (sc *ROSidecar) Blob() (ROBlob, error) {
if sc.blob == nil {
return ROBlob{}, errBlobNeeded
}
return *sc.blob, nil
}
// DataColumn returns the data column sidecar.
func (sc *ROSidecar) DataColumn() (RODataColumn, error) {
if sc.dataColumn == nil {
return RODataColumn{}, errDataColumnNeeded
}
return *sc.dataColumn, nil
}
// BlobSidecarsFromSidecars creates a new slice of read-only blobs sidecars from a slice of read-only (generic) sidecars.
func BlobSidecarsFromSidecars(sidecars []ROSidecar) ([]ROBlob, error) {
blobSidecars := make([]ROBlob, 0, len(sidecars))
for _, sidecar := range sidecars {
blob, err := sidecar.Blob()
if err != nil {
return nil, errors.Wrap(err, "blob")
}
blobSidecars = append(blobSidecars, blob)
}
return blobSidecars, nil
}
// DataColumnSidecarsFromSidecars creates a new slice of read-only data column sidecars from a slice of read-only (generic) sidecars.
func DataColumnSidecarsFromSidecars(sidecars []ROSidecar) ([]RODataColumn, error) {
dataColumnSidecars := make([]RODataColumn, 0, len(sidecars))
for _, sidecar := range sidecars {
dataColumn, err := sidecar.DataColumn()
if err != nil {
return nil, errors.Wrap(err, "data column")
}
dataColumnSidecars = append(dataColumnSidecars, dataColumn)
}
return dataColumnSidecars, nil
}

View File

@@ -0,0 +1,109 @@
package blocks
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestNewSidecarFromBlobSidecar(t *testing.T) {
blob := ROBlob{}
sidecar := NewSidecarFromBlobSidecar(blob)
// Check that the blob is set
retrievedBlob, err := sidecar.Blob()
require.NoError(t, err)
require.Equal(t, blob, retrievedBlob)
// Check that data column is not set
_, err = sidecar.DataColumn()
require.ErrorIs(t, err, errDataColumnNeeded)
}
func TestNewSidecarFromDataColumnSidecar(t *testing.T) {
dataColumn := RODataColumn{}
sidecar := NewSidecarFromDataColumnSidecar(dataColumn)
// Check that the data column is set
retrievedDataColumn, err := sidecar.DataColumn()
require.NoError(t, err)
require.Equal(t, dataColumn, retrievedDataColumn)
// Check that blob is not set
_, err = sidecar.Blob()
require.ErrorIs(t, err, errBlobNeeded)
}
func TestNewSidecarsFromBlobSidecars(t *testing.T) {
blobSidecars := []ROBlob{{}, {}}
sidecars := NewSidecarsFromBlobSidecars(blobSidecars)
require.Equal(t, len(blobSidecars), len(sidecars))
for i, sidecar := range sidecars {
retrievedBlob, err := sidecar.Blob()
require.NoError(t, err)
require.Equal(t, blobSidecars[i], retrievedBlob)
}
}
func TestNewSidecarsFromDataColumnSidecars(t *testing.T) {
dataColumnSidecars := []RODataColumn{{}, {}}
sidecars := NewSidecarsFromDataColumnSidecars(dataColumnSidecars)
require.Equal(t, len(dataColumnSidecars), len(sidecars))
for i, sidecar := range sidecars {
retrievedDataColumn, err := sidecar.DataColumn()
require.NoError(t, err)
require.Equal(t, dataColumnSidecars[i], retrievedDataColumn)
}
}
func TestBlobSidecarsFromSidecars(t *testing.T) {
// Create sidecars with blobs
blobSidecars := []ROBlob{{}, {}}
sidecars := NewSidecarsFromBlobSidecars(blobSidecars)
// Convert back to blob sidecars
retrievedBlobSidecars, err := BlobSidecarsFromSidecars(sidecars)
require.NoError(t, err)
require.Equal(t, len(blobSidecars), len(retrievedBlobSidecars))
for i, blob := range retrievedBlobSidecars {
require.Equal(t, blobSidecars[i], blob)
}
// Test with a mix of sidecar types
mixedSidecars := []ROSidecar{
NewSidecarFromBlobSidecar(ROBlob{}),
NewSidecarFromDataColumnSidecar(RODataColumn{}),
}
_, err = BlobSidecarsFromSidecars(mixedSidecars)
require.Error(t, err)
}
func TestDataColumnSidecarsFromSidecars(t *testing.T) {
// Create sidecars with data columns
dataColumnSidecars := []RODataColumn{{}, {}}
sidecars := NewSidecarsFromDataColumnSidecars(dataColumnSidecars)
// Convert back to data column sidecars
retrievedDataColumnSidecars, err := DataColumnSidecarsFromSidecars(sidecars)
require.NoError(t, err)
require.Equal(t, len(dataColumnSidecars), len(retrievedDataColumnSidecars))
for i, dataColumn := range retrievedDataColumnSidecars {
require.Equal(t, dataColumnSidecars[i], dataColumn)
}
// Test with a mix of sidecar types
mixedSidecars := []ROSidecar{
NewSidecarFromDataColumnSidecar(RODataColumn{}),
NewSidecarFromBlobSidecar(ROBlob{}),
}
_, err = DataColumnSidecarsFromSidecars(mixedSidecars)
require.Error(t, err)
}