diff --git a/beacon-chain/db/filesystem/BUILD.bazel b/beacon-chain/db/filesystem/BUILD.bazel index 65af25f6a3..1deadce105 100644 --- a/beacon-chain/db/filesystem/BUILD.bazel +++ b/beacon-chain/db/filesystem/BUILD.bazel @@ -5,6 +5,9 @@ go_library( srcs = [ "blob.go", "cache.go", + "data_column.go", + "data_column_cache.go", + "doc.go", "iteration.go", "layout.go", "layout_by_epoch.go", @@ -17,6 +20,8 @@ go_library( importpath = "github.com/OffchainLabs/prysm/v6/beacon-chain/db/filesystem", visibility = ["//visibility:public"], deps = [ + "//async:go_default_library", + "//async/event:go_default_library", "//beacon-chain/db:go_default_library", "//beacon-chain/verification:go_default_library", "//config/fieldparams:go_default_library", @@ -41,6 +46,8 @@ go_test( srcs = [ "blob_test.go", "cache_test.go", + "data_column_cache_test.go", + "data_column_test.go", "iteration_test.go", "layout_test.go", "migration_test.go", @@ -50,6 +57,7 @@ go_test( deps = [ "//beacon-chain/db:go_default_library", "//beacon-chain/verification:go_default_library", + "//config/fieldparams:go_default_library", "//config/params:go_default_library", "//consensus-types/primitives:go_default_library", "//encoding/bytesutil:go_default_library", diff --git a/beacon-chain/db/filesystem/data_column.go b/beacon-chain/db/filesystem/data_column.go new file mode 100644 index 0000000000..0c4767b74d --- /dev/null +++ b/beacon-chain/db/filesystem/data_column.go @@ -0,0 +1,1027 @@ +package filesystem + +import ( + "context" + "encoding/binary" + "encoding/hex" + "fmt" + "io" + "os" + "path" + "path/filepath" + "strconv" + "strings" + "sync" + "time" + + "github.com/OffchainLabs/prysm/v6/async" + "github.com/OffchainLabs/prysm/v6/async/event" + "github.com/OffchainLabs/prysm/v6/beacon-chain/verification" + fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams" + "github.com/OffchainLabs/prysm/v6/config/params" + "github.com/OffchainLabs/prysm/v6/consensus-types/blocks" + "github.com/OffchainLabs/prysm/v6/consensus-types/primitives" + "github.com/OffchainLabs/prysm/v6/io/file" + "github.com/OffchainLabs/prysm/v6/time/slots" + "github.com/pkg/errors" + "github.com/spf13/afero" +) + +const ( + version = 0x01 + versionOffset = 0 // bytes + versionSize = 1 // bytes + sidecarByteLenOffset = versionOffset + versionSize // (Offset of the encoded size of the SSZ encoded data column sidecar) + sidecarByteLenSize = 4 // bytes (Size of the encoded size of the SSZ encoded data column sidecar) + mandatoryNumberOfColumns = 128 // 2**7 + indicesOffset = sidecarByteLenOffset + sidecarByteLenSize + nonZeroOffset = mandatoryNumberOfColumns + headerSize = versionSize + sidecarByteLenSize + mandatoryNumberOfColumns + dataColumnsFileExtension = "sszs" + prunePeriod = 1 * time.Minute +) + +var ( + errWrongNumberOfColumns = errors.New("wrong number of data columns") + errDataColumnIndexTooLarge = errors.New("data column index too large") + errWrongBytesWritten = errors.New("wrong number of bytes written") + errWrongVersion = errors.New("wrong version") + errWrongBytesHeaderRead = errors.New("wrong number of bytes header read") + errTooManyDataColumns = errors.New("too many data columns") + errWrongSszEncodedDataColumnSidecarSize = errors.New("wrong SSZ encoded data column sidecar size") + errDataColumnSidecarsFromDifferentSlots = errors.New("data column sidecars from different slots") +) + +type ( + // DataColumnStorage is the concrete implementation of the filesystem backend for saving and retrieving DataColumnSidecars. + DataColumnStorage struct { + base string + retentionEpochs primitives.Epoch + fs afero.Fs + cache *dataColumnStorageSummaryCache + dataColumnFeed *event.Feed + pruneMu sync.RWMutex + + mu sync.Mutex // protects muChans + muChans map[[fieldparams.RootLength]byte]*muChan + } + + // DataColumnStorageOption is a functional option for configuring a DataColumnStorage. + DataColumnStorageOption func(*DataColumnStorage) error + + // DataColumnsIdent is a collection of unique identifiers for data column sidecars. + DataColumnsIdent struct { + Root [fieldparams.RootLength]byte + Epoch primitives.Epoch + Indices []uint64 + } + + storageIndices struct { + indices [mandatoryNumberOfColumns]byte + count int64 + } + + metadata struct { + indices *storageIndices + sszEncodedDataColumnSidecarSize uint32 + fileSize int64 + } + + fileMetadata struct { + period uint64 + epoch primitives.Epoch + blockRoot [fieldparams.RootLength]byte + } + + muChan struct { + mu *sync.RWMutex + toStore chan []blocks.VerifiedRODataColumn + } +) + +// WithDataColumnBasePath is a required option that sets the base path of data column storage. +func WithDataColumnBasePath(base string) DataColumnStorageOption { + return func(b *DataColumnStorage) error { + b.base = base + return nil + } +} + +// WithDataColumnRetentionEpochs is an option that changes the number of epochs data columns will be persisted. +func WithDataColumnRetentionEpochs(e primitives.Epoch) DataColumnStorageOption { + return func(b *DataColumnStorage) error { + b.retentionEpochs = e + return nil + } +} + +// WithDataColumnFs allows the afero.Fs implementation to be customized. +// Used by tests to substitute an in-memory filesystem. +func WithDataColumnFs(fs afero.Fs) DataColumnStorageOption { + return func(b *DataColumnStorage) error { + b.fs = fs + return nil + } +} + +// NewDataColumnStorage creates a new instance of the DataColumnStorage object. Note that the implementation of DataColumnStorage may +// attempt to hold a file lock to guarantee exclusive control of the data column storage directory, so this should only be +// initialized once per beacon node. +func NewDataColumnStorage(ctx context.Context, opts ...DataColumnStorageOption) (*DataColumnStorage, error) { + storage := &DataColumnStorage{ + dataColumnFeed: new(event.Feed), + muChans: make(map[[fieldparams.RootLength]byte]*muChan), + } + + for _, o := range opts { + if err := o(storage); err != nil { + return nil, errors.Wrap(err, "failed to create data column storage") + } + } + + // Allow tests to set up a different fs using WithFs. + if storage.fs == nil { + if storage.base == "" { + return nil, errNoBasePath + } + + storage.base = path.Clean(storage.base) + if err := file.MkdirAll(storage.base); err != nil { + return nil, errors.Wrapf(err, "failed to create data column storage at %s", storage.base) + } + + storage.fs = afero.NewBasePathFs(afero.NewOsFs(), storage.base) + } + + storage.cache = newDataColumnStorageSummaryCache() + + async.RunEvery(ctx, prunePeriod, func() { + storage.pruneMu.Lock() + defer storage.pruneMu.Unlock() + + storage.prune() + }) + + return storage, nil +} + +// WarmCache warms the cache of the data column filesystem. +// It holds the database (read) lock for all the time it is running. +func (dcs *DataColumnStorage) WarmCache() { + start := time.Now() + log.Info("Data column filesystem cache warm-up started") + + dcs.pruneMu.Lock() + defer dcs.pruneMu.Unlock() + + highestStoredEpoch := primitives.Epoch(0) + + // Walk the data column filesystem to warm up the cache. + if err := afero.Walk(dcs.fs, ".", func(path string, info os.FileInfo, fileErr error) (err error) { + if fileErr != nil { + return fileErr + } + + // If not a leaf, skip. + if info.IsDir() { + return nil + } + + // Extract metadata from the file path. + fileMetadata, err := extractFileMetadata(path) + if err != nil { + log.WithError(err).Error("Error encountered while extracting file metadata") + } + + // Open the data column filesystem file. + f, err := dcs.fs.Open(path) + if err != nil { + log.WithError(err).Error("Error encountered while opening data column filesystem file") + return nil + } + + // Close the file. + defer func() { + // Overwrite the existing error only if it is nil, since the close error is less important. + closeErr := f.Close() + if closeErr != nil && err == nil { + err = closeErr + } + }() + + // Read the metadata of the file. + metadata, err := dcs.metadata(f) + if err != nil { + log.WithError(err).Error("Error encountered while reading metadata from data column filesystem file") + return nil + } + + // Check the indices. + indices := metadata.indices.all() + if len(indices) == 0 { + return nil + } + + // Build the ident. + dataColumnsIdent := DataColumnsIdent{Root: fileMetadata.blockRoot, Epoch: fileMetadata.epoch, Indices: indices} + + // Update the highest stored epoch. + highestStoredEpoch = max(highestStoredEpoch, fileMetadata.epoch) + + // Set the ident in the cache. + if err := dcs.cache.set(dataColumnsIdent); err != nil { + log.WithError(err).Error("Error encountered while ensuring data column filesystem cache") + } + + return nil + }); err != nil { + log.WithError(err).Error("Error encountered while walking data column filesystem.") + } + + // Prune the cache and the filesystem. + dcs.prune() + + log.WithField("elapsed", time.Since(start)).Info("Data column filesystem cache warm-up complete") +} + +// Summary returns the DataColumnStorageSummary. +func (dcs *DataColumnStorage) Summary(root [fieldparams.RootLength]byte) DataColumnStorageSummary { + return dcs.cache.Summary(root) +} + +// Save saves data column sidecars into the database and asynchronously performs pruning. +// The returned chanel is closed when the pruning is complete. +func (dcs *DataColumnStorage) Save(dataColumnSidecars []blocks.VerifiedRODataColumn) error { + startTime := time.Now() + + if len(dataColumnSidecars) == 0 { + return nil + } + + // Check the number of columns is the one expected. + // While implementing this, we expect the number of columns won't change. + // If it does, we will need to create a new version of the data column sidecar file. + if params.BeaconConfig().NumberOfColumns != mandatoryNumberOfColumns { + return errWrongNumberOfColumns + } + + highestEpoch := primitives.Epoch(0) + dataColumnSidecarsbyRoot := make(map[[fieldparams.RootLength]byte][]blocks.VerifiedRODataColumn) + + // Group data column sidecars by root. + for _, dataColumnSidecar := range dataColumnSidecars { + // Check if the data column index is too large. + if dataColumnSidecar.Index >= mandatoryNumberOfColumns { + return errDataColumnIndexTooLarge + } + + // Group data column sidecars by root. + root := dataColumnSidecar.BlockRoot() + dataColumnSidecarsbyRoot[root] = append(dataColumnSidecarsbyRoot[root], dataColumnSidecar) + } + + for root, dataColumnSidecars := range dataColumnSidecarsbyRoot { + // Safety check all data column sidecars for this root are from the same slot. + firstSlot := dataColumnSidecars[0].SignedBlockHeader.Header.Slot + for _, dataColumnSidecar := range dataColumnSidecars[1:] { + if dataColumnSidecar.SignedBlockHeader.Header.Slot != firstSlot { + return errDataColumnSidecarsFromDifferentSlots + } + } + + // Set the highest epoch. + epoch := slots.ToEpoch(dataColumnSidecars[0].Slot()) + highestEpoch = max(highestEpoch, epoch) + + // Save data columns in the filesystem. + if err := dcs.saveFilesystem(root, epoch, dataColumnSidecars); err != nil { + return errors.Wrap(err, "save filesystem") + } + + // Get all indices. + indices := make([]uint64, 0, len(dataColumnSidecars)) + for _, dataColumnSidecar := range dataColumnSidecars { + indices = append(indices, dataColumnSidecar.Index) + } + + // Compute the data columns ident. + dataColumnsIdent := DataColumnsIdent{Root: root, Epoch: slots.ToEpoch(dataColumnSidecars[0].Slot()), Indices: indices} + + // Set data columns in the cache. + if err := dcs.cache.set(dataColumnsIdent); err != nil { + return errors.Wrap(err, "cache set") + } + + // Notify the data column feed. + dcs.dataColumnFeed.Send(dataColumnsIdent) + } + + dataColumnSaveLatency.Observe(float64(time.Since(startTime).Milliseconds())) + + return nil +} + +// Subscribe subscribes to the data column feed. +// It returns the subscription and a 1-size buffer channel to receive data column sidecars. +// It is the responsibility of the caller to: +// - call `subscription.Unsubscribe` when done, and to +// - read from the channel as fast as possible until the channel is closed. +// A call to `Save` will buffer a new value to the channel. +// If a call to `Save` is done while a value is already in the buffer of the channel: +// - the call to `Save` will block until the new value can be bufferized in the channel, and +// - all other subscribers won't be notified until the new value can be bufferized in the channel. +func (dcs *DataColumnStorage) Subscribe() (event.Subscription, <-chan DataColumnsIdent) { + // Subscribe to newly data columns stored in the database. + identsChan := make(chan DataColumnsIdent, 1) + subscription := dcs.dataColumnFeed.Subscribe(identsChan) + + return subscription, identsChan +} + +// saveFilesystem saves data column sidecars into the database. +// This function expects all data column sidecars to belong to the same block. +func (dcs *DataColumnStorage) saveFilesystem(root [fieldparams.RootLength]byte, epoch primitives.Epoch, dataColumnSidecars []blocks.VerifiedRODataColumn) error { + // Compute the file path. + filePath := filePath(root, epoch) + + dcs.pruneMu.RLock() + defer dcs.pruneMu.RUnlock() + + fileMu, toStore := dcs.fileMutexChan(root) + toStore <- dataColumnSidecars + + fileMu.Lock() + defer fileMu.Unlock() + + // Check if the file exists. + exists, err := afero.Exists(dcs.fs, filePath) + if err != nil { + return errors.Wrap(err, "afero exists") + } + + if exists { + if err := dcs.saveDataColumnSidecarsExistingFile(filePath, toStore); err != nil { + return errors.Wrap(err, "save data column existing file") + } + + return nil + } + + if err := dcs.saveDataColumnSidecarsNewFile(filePath, toStore); err != nil { + return errors.Wrap(err, "save data columns new file") + } + + return nil +} + +// Get retrieves data column sidecars from the database. +// If one of the requested data column sidecars is not found, it is just skipped. +// If indices is nil, then all stored data column sidecars are returned. +// Since DataColumnStorage only writes data columns that have undergone full verification, the return +// value is always a VerifiedRODataColumn. +func (dcs *DataColumnStorage) Get(root [fieldparams.RootLength]byte, indices []uint64) ([]blocks.VerifiedRODataColumn, error) { + dcs.pruneMu.RLock() + defer dcs.pruneMu.RUnlock() + + fileMu, _ := dcs.fileMutexChan(root) + fileMu.RLock() + defer fileMu.RUnlock() + + startTime := time.Now() + + // Build all indices if none are provided. + if indices == nil { + indices = make([]uint64, mandatoryNumberOfColumns) + for i := range indices { + indices[i] = uint64(i) + } + } + + summary, ok := dcs.cache.get(root) + if !ok { + // Nothing found in db. Exit early. + return nil, nil + } + + // Compute the file path. + filePath := filePath(root, summary.epoch) + + // Open the data column sidecars file. + // We do not specially check if the file exists since we have already checked the cache. + file, err := dcs.fs.Open(filePath) + if err != nil { + return nil, errors.Wrap(err, "data column sidecars file path open") + } + + // Read file metadata. + metadata, err := dcs.metadata(file) + if err != nil { + return nil, errors.Wrap(err, "metadata") + } + + // Retrieve data column sidecars from the file. + verifiedRODataColumnSidecars := make([]blocks.VerifiedRODataColumn, 0, len(indices)) + for _, index := range indices { + ok, position, err := metadata.indices.get(index) + if err != nil { + return nil, errors.Wrap(err, "get index") + } + + // Skip if the data column is not saved. + if !ok { + continue + } + + // Compute the offset of the data column sidecar. + offset := headerSize + position*int64(metadata.sszEncodedDataColumnSidecarSize) + + // Seek to the beginning of the data column sidecar. + _, err = file.Seek(offset, io.SeekStart) + if err != nil { + return nil, errors.Wrap(err, "seek") + } + + verifiedRODataColumn, err := verification.VerifiedRODataColumnFromDisk(file, root, metadata.sszEncodedDataColumnSidecarSize) + if err != nil { + return nil, errors.Wrap(err, "verified RO data column from disk") + } + + // Append the verified RO data column to the data column sidecars. + verifiedRODataColumnSidecars = append(verifiedRODataColumnSidecars, verifiedRODataColumn) + } + + dataColumnFetchLatency.Observe(float64(time.Since(startTime).Milliseconds())) + + return verifiedRODataColumnSidecars, nil +} + +// Remove deletes all data column sidecars for a given root. +func (dcs *DataColumnStorage) Remove(blockRoot [fieldparams.RootLength]byte) error { + dcs.pruneMu.RLock() + defer dcs.pruneMu.RUnlock() + + fileMu, _ := dcs.fileMutexChan(blockRoot) + fileMu.Lock() + defer fileMu.Unlock() + + summary, ok := dcs.cache.get(blockRoot) + if !ok { + // Nothing found in db. Exit early. + return nil + } + + // Remove the data column sidecars from the cache. + dcs.cache.evict(blockRoot) + + // Remove the data column sidecars file. + filePath := filePath(blockRoot, summary.epoch) + if err := dcs.fs.Remove(filePath); err != nil { + return errors.Wrap(err, "remove") + } + + return nil +} + +// Clear deletes all files on the filesystem. +func (dcs *DataColumnStorage) Clear() error { + dcs.pruneMu.Lock() + defer dcs.pruneMu.Unlock() + + dirs, err := listDir(dcs.fs, ".") + if err != nil { + return errors.Wrap(err, "list dir") + } + + dcs.cache.clear() + + for _, dir := range dirs { + if err := dcs.fs.RemoveAll(dir); err != nil { + return errors.Wrap(err, "remove all") + } + } + + return nil +} + +// prune clean the cache, the filesystem and mutexes. +func (dcs *DataColumnStorage) prune() { + highestStoredEpoch := dcs.cache.HighestEpoch() + + // Check if we need to prune. + if highestStoredEpoch < dcs.retentionEpochs { + return + } + + highestEpochToPrune := highestStoredEpoch - dcs.retentionEpochs + highestPeriodToPrune := period(highestEpochToPrune) + + // Prune the cache. + prunedCount := dcs.cache.pruneUpTo(highestEpochToPrune) + + if prunedCount == 0 { + return + } + + dataColumnPrunedCounter.Add(float64(prunedCount)) + + // Prune the filesystem. + periodFileInfos, err := afero.ReadDir(dcs.fs, ".") + if err != nil { + log.WithError(err).Error("Error encountered while reading top directory") + return + } + + for _, periodFileInfo := range periodFileInfos { + periodStr := periodFileInfo.Name() + period, err := strconv.ParseUint(periodStr, 10, 64) + if err != nil { + log.WithError(err).Errorf("Error encountered while parsing period %s", periodStr) + continue + } + + if period < highestPeriodToPrune { + // Remove everything lower thant highest period to prune. + if err := dcs.fs.RemoveAll(periodStr); err != nil { + log.WithError(err).Error("Error encountered while removing period directory") + } + + continue + } + + if period > highestPeriodToPrune { + // Do not remove anything higher than highest period to prune. + continue + } + + // if period == highestPeriodToPrune + epochFileInfos, err := afero.ReadDir(dcs.fs, periodStr) + if err != nil { + log.WithError(err).Error("Error encountered while reading epoch directory") + continue + } + + for _, epochFileInfo := range epochFileInfos { + epochStr := epochFileInfo.Name() + epochDir := path.Join(periodStr, epochStr) + + epoch, err := strconv.ParseUint(epochStr, 10, 64) + if err != nil { + log.WithError(err).Errorf("Error encountered while parsing epoch %s", epochStr) + continue + } + + if primitives.Epoch(epoch) > highestEpochToPrune { + continue + } + + if err := dcs.fs.RemoveAll(epochDir); err != nil { + log.WithError(err).Error("Error encountered while removing epoch directory") + continue + } + } + } + + dcs.mu.Lock() + defer dcs.mu.Unlock() + clear(dcs.muChans) +} + +// saveDataColumnSidecarsExistingFile saves data column sidecars into an existing file. +// This function expects all data column sidecars to belong to the same block. +func (dcs *DataColumnStorage) saveDataColumnSidecarsExistingFile(filePath string, inputDataColumnSidecars chan []blocks.VerifiedRODataColumn) (err error) { + // Open the data column sidecars file. + file, err := dcs.fs.OpenFile(filePath, os.O_RDWR, os.FileMode(0600)) + if err != nil { + return errors.Wrap(err, "data column sidecars file path open") + } + + defer func() { + closeErr := file.Close() + + // Overwrite the existing error only if it is nil, since the close error is less important. + if closeErr != nil && err == nil { + err = closeErr + } + }() + + metadata, err := dcs.metadata(file) + if err != nil { + return errors.Wrap(err, "metadata") + } + + // Create the SSZ encoded data column sidecars. + var sszEncodedDataColumnSidecars []byte + + for { + dataColumnSidecars := pullChan(inputDataColumnSidecars) + if len(dataColumnSidecars) == 0 { + break + } + + for _, dataColumnSidecar := range dataColumnSidecars { + // Extract the data columns index. + dataColumnIndex := dataColumnSidecar.Index + + ok, _, err := metadata.indices.get(dataColumnIndex) + if err != nil { + return errors.Wrap(err, "get index") + } + + // Skip if the data column is already saved. + if ok { + continue + } + + // Check if the number of saved data columns is too large. + // This is impossible to happen in practice is this function is called + // by SaveDataColumnSidecars. + if metadata.indices.len() >= mandatoryNumberOfColumns { + return errTooManyDataColumns + } + + // SSZ encode the data column sidecar. + sszEncodedDataColumnSidecar, err := dataColumnSidecar.MarshalSSZ() + if err != nil { + return errors.Wrap(err, "data column sidecar marshal SSZ") + } + + // Compute the size of the SSZ encoded data column sidecar. + incomingSszEncodedDataColumnSidecarSize := uint32(len(sszEncodedDataColumnSidecar)) + + // Check if the incoming encoded data column sidecar size corresponds to the one read from the file. + if incomingSszEncodedDataColumnSidecarSize != metadata.sszEncodedDataColumnSidecarSize { + return errWrongSszEncodedDataColumnSidecarSize + } + + // Alter indices to mark the data column as saved. + if err := metadata.indices.set(dataColumnIndex, uint8(metadata.indices.len())); err != nil { + return errors.Wrap(err, "set index") + } + + // Append the SSZ encoded data column sidecar to the SSZ encoded data column sidecars. + sszEncodedDataColumnSidecars = append(sszEncodedDataColumnSidecars, sszEncodedDataColumnSidecar...) + } + } + + // Save indices to the file. + indices := metadata.indices.raw() + count, err := file.WriteAt(indices[:], int64(versionSize+sidecarByteLenSize)) + if err != nil { + return errors.Wrap(err, "write indices") + } + if count != mandatoryNumberOfColumns { + return errWrongBytesWritten + } + + // Append the SSZ encoded data column sidecars to the end of the file. + count, err = file.WriteAt(sszEncodedDataColumnSidecars, metadata.fileSize) + if err != nil { + return errors.Wrap(err, "write SSZ encoded data column sidecars") + } + if count != len(sszEncodedDataColumnSidecars) { + return errWrongBytesWritten + } + + if err := file.Sync(); err != nil { + return errors.Wrap(err, "sync") + } + + return nil +} + +// saveDataColumnSidecarsNewFile saves data column sidecars into a new file. +// This function expects all data column sidecars to belong to the same block. +func (dcs *DataColumnStorage) saveDataColumnSidecarsNewFile(filePath string, inputDataColumnSidecars chan []blocks.VerifiedRODataColumn) (err error) { + // Initialize the indices. + var indices storageIndices + + var ( + sszEncodedDataColumnSidecarRefSize int + sszEncodedDataColumnSidecars []byte + ) + + // Initialize the count of the saved SSZ encoded data column sidecar. + storedCount := uint8(0) + + for { + dataColumnSidecars := pullChan(inputDataColumnSidecars) + if len(dataColumnSidecars) == 0 { + break + } + + for _, dataColumnSidecar := range dataColumnSidecars { + // Extract the data column index. + dataColumnIndex := dataColumnSidecar.Index + + // Skip if the data column is already stored. + ok, _, err := indices.get(dataColumnIndex) + if err != nil { + return errors.Wrap(err, "get index") + } + if ok { + continue + } + + // Alter the indices to mark the first data column sidecar as saved. + // savedCount can safely be cast to uint8 since it is less than limit. + if err := indices.set(dataColumnIndex, storedCount); err != nil { + return errors.Wrap(err, "set index") + } + + // Increment the count of the saved SSZ encoded data column sidecar. + storedCount++ + + // SSZ encode the first data column sidecar. + sszEncodedDataColumnSidecar, err := dataColumnSidecar.MarshalSSZ() + if err != nil { + return errors.Wrap(err, "data column sidecar marshal SSZ") + } + + // Check if the size of the SSZ encoded data column sidecar is correct. + if sszEncodedDataColumnSidecarRefSize != 0 && len(sszEncodedDataColumnSidecar) != sszEncodedDataColumnSidecarRefSize { + return errWrongSszEncodedDataColumnSidecarSize + } + + // Set the SSZ encoded data column sidecar reference size. + sszEncodedDataColumnSidecarRefSize = len(sszEncodedDataColumnSidecar) + + // Append the first SSZ encoded data column sidecar to the SSZ encoded data column sidecars. + sszEncodedDataColumnSidecars = append(sszEncodedDataColumnSidecars, sszEncodedDataColumnSidecar...) + } + } + + if storedCount == 0 { + // Nothing to save. + return nil + } + + // Create the data column sidecars file. + dir := filepath.Dir(filePath) + if err := dcs.fs.MkdirAll(dir, directoryPermissions()); err != nil { + return errors.Wrapf(err, "mkdir all") + } + + file, err := dcs.fs.Create(filePath) + if err != nil { + return errors.Wrap(err, "data column sidecars file path create") + } + + defer func() { + closeErr := file.Close() + + // Overwrite the existing error only if it is nil, since the close error is less important. + if closeErr != nil && err == nil { + err = closeErr + } + }() + + // Encode the SSZ encoded data column sidecar size. + var encodedSszEncodedDataColumnSidecarSize [sidecarByteLenSize]byte + binary.BigEndian.PutUint32(encodedSszEncodedDataColumnSidecarSize[:], uint32(sszEncodedDataColumnSidecarRefSize)) + + // Get the raw indices. + rawIndices := indices.raw() + + // Concatenate the version, the data column sidecar size, the data column indices and the SSZ encoded data column sidecar. + countToWrite := headerSize + len(sszEncodedDataColumnSidecars) + bytes := make([]byte, 0, countToWrite) + bytes = append(bytes, byte(version)) + bytes = append(bytes, encodedSszEncodedDataColumnSidecarSize[:]...) + bytes = append(bytes, rawIndices[:]...) + bytes = append(bytes, sszEncodedDataColumnSidecars...) + + countWritten, err := file.Write(bytes) + if err != nil { + return errors.Wrap(err, "write") + } + if countWritten != countToWrite { + return errWrongBytesWritten + } + + if err := file.Sync(); err != nil { + return errors.Wrap(err, "sync") + } + + return nil +} + +// metadata runs file sanity checks and retrieves metadata of the file. +// The file descriptor is left at the beginning of the first SSZ encoded data column sidecar. +func (dcs *DataColumnStorage) metadata(file afero.File) (*metadata, error) { + var header [headerSize]byte + countRead, err := file.ReadAt(header[:], 0) + if err != nil { + return nil, errors.Wrap(err, "read at") + } + if countRead != headerSize { + return nil, errWrongBytesHeaderRead + } + + // Read the encoded file version. + encodedFileVersion := header[versionOffset : versionOffset+versionSize] + + // Convert the version to an int. + fileVersion := int(encodedFileVersion[0]) + + // Check if the version is the expected one. + if fileVersion != version { + return nil, errWrongVersion + } + + // DataColumnSidecar is a variable sized ssz object, but all data columns for a block will be the same size. + encodedSszEncodedDataColumnSidecarSize := header[sidecarByteLenOffset : sidecarByteLenOffset+sidecarByteLenSize] + + // Convert the SSZ encoded data column sidecar size to an int. + sszEncodedDataColumnSidecarSize := binary.BigEndian.Uint32(encodedSszEncodedDataColumnSidecarSize) + + // Read the data column indices. + indices, err := newStorageIndices(header[indicesOffset : indicesOffset+mandatoryNumberOfColumns]) + if err != nil { + return nil, errors.Wrap(err, "new storage indices") + } + + // Compute the saved columns count. + savedDataColumnSidecarCount := indices.len() + + // Compute the size of the file. + // It is safe to cast the SSZ encoded data column sidecar size to int64 since it is less than 2**63. + fileSize := int64(headerSize) + savedDataColumnSidecarCount*int64(sszEncodedDataColumnSidecarSize) // lint:ignore uintcast + + metadata := &metadata{ + indices: indices, + sszEncodedDataColumnSidecarSize: sszEncodedDataColumnSidecarSize, + fileSize: fileSize, + } + + return metadata, nil +} + +func (dcs *DataColumnStorage) fileMutexChan(root [fieldparams.RootLength]byte) (*sync.RWMutex, chan []blocks.VerifiedRODataColumn) { + dcs.mu.Lock() + defer dcs.mu.Unlock() + + mc, ok := dcs.muChans[root] + if !ok { + mc = &muChan{ + mu: new(sync.RWMutex), + toStore: make(chan []blocks.VerifiedRODataColumn, 1), + } + dcs.muChans[root] = mc + + return mc.mu, mc.toStore + } + + return mc.mu, mc.toStore +} + +func newStorageIndices(originalIndices []byte) (*storageIndices, error) { + if len(originalIndices) != mandatoryNumberOfColumns { + return nil, errWrongNumberOfColumns + } + + count := int64(0) + for _, i := range originalIndices { + if i >= nonZeroOffset { + count++ + } + } + + var indices [mandatoryNumberOfColumns]byte + copy(indices[:], originalIndices) + + storageIndices := storageIndices{ + indices: indices, + count: count, + } + + return &storageIndices, nil +} + +// get returns a boolean indicating if the data column sidecar is saved, +// and the position of the data column sidecar in the file. +func (si *storageIndices) get(dataColumnIndex uint64) (bool, int64, error) { + if dataColumnIndex >= mandatoryNumberOfColumns { + return false, 0, errDataColumnIndexTooLarge + } + + if si.indices[dataColumnIndex] < nonZeroOffset { + return false, 0, nil + } + + return true, int64(si.indices[dataColumnIndex] - nonZeroOffset), nil +} + +func (si *storageIndices) len() int64 { + return si.count +} + +// all returns all saved data column sidecars. +func (si *storageIndices) all() []uint64 { + indices := make([]uint64, 0, len(si.indices)) + + for index, i := range si.indices { + if i >= nonZeroOffset { + indices = append(indices, uint64(index)) + } + } + + return indices +} + +// raw returns the raw data column sidecar indices. +// It can be safely modified by the caller. +func (si *storageIndices) raw() [mandatoryNumberOfColumns]byte { + var result [mandatoryNumberOfColumns]byte + copy(result[:], si.indices[:]) + return result +} + +// set sets the data column sidecar as saved. +func (si *storageIndices) set(dataColumnIndex uint64, position uint8) error { + if dataColumnIndex >= mandatoryNumberOfColumns || position >= mandatoryNumberOfColumns { + return errDataColumnIndexTooLarge + } + + existing := si.indices[dataColumnIndex] >= nonZeroOffset + if !existing { + si.count++ + } + + si.indices[dataColumnIndex] = nonZeroOffset + position + + return nil +} + +// pullChan pulls data column sidecars from the input channel until it is empty. +func pullChan(inputRoDataColumns chan []blocks.VerifiedRODataColumn) []blocks.VerifiedRODataColumn { + numberOfColumns := params.BeaconConfig().NumberOfColumns + dataColumnSidecars := make([]blocks.VerifiedRODataColumn, 0, numberOfColumns) + + for { + select { + case dataColumnSidecar := <-inputRoDataColumns: + dataColumnSidecars = append(dataColumnSidecars, dataColumnSidecar...) + default: + return dataColumnSidecars + } + } +} + +// filePath builds the file path in database for a given root and epoch. +func filePath(root [fieldparams.RootLength]byte, epoch primitives.Epoch) string { + return path.Join( + fmt.Sprintf("%d", period(epoch)), + fmt.Sprintf("%d", epoch), + fmt.Sprintf("%#x.%s", root, dataColumnsFileExtension), + ) +} + +// extractFileMetadata extracts the metadata from a file path. +// If the path is not a leaf, it returns nil. +func extractFileMetadata(path string) (*fileMetadata, error) { + // Is this Windows friendly? + parts := strings.Split(path, "/") + if len(parts) != 3 { + return nil, errors.Errorf("unexpected file %s", path) + } + + period, err := strconv.ParseUint(parts[0], 10, 64) + if err != nil { + return nil, errors.Wrapf(err, "failed to parse period from %s", path) + } + + epoch, err := strconv.ParseUint(parts[1], 10, 64) + if err != nil { + return nil, errors.Wrapf(err, "failed to parse epoch from %s", path) + } + + partsRoot := strings.Split(parts[2], ".") + if len(partsRoot) != 2 { + return nil, errors.Errorf("failed to parse root from %s", path) + } + + blockRootString := partsRoot[0] + if len(blockRootString) != 2+2*fieldparams.RootLength || blockRootString[:2] != "0x" { + return nil, errors.Errorf("unexpected file name %s", path) + } + + if partsRoot[1] != dataColumnsFileExtension { + return nil, errors.Errorf("unexpected extension %s", path) + } + + blockRootSlice, err := hex.DecodeString(blockRootString[2:]) + if err != nil { + return nil, errors.Wrapf(err, "decode string from %s", path) + } + + var blockRoot [fieldparams.RootLength]byte + copy(blockRoot[:], blockRootSlice) + + result := &fileMetadata{period: period, epoch: primitives.Epoch(epoch), blockRoot: blockRoot} + return result, nil +} + +// period computes the period of a given epoch. +func period(epoch primitives.Epoch) uint64 { + return uint64(epoch / params.BeaconConfig().MinEpochsForBlobsSidecarsRequest) +} diff --git a/beacon-chain/db/filesystem/data_column_cache.go b/beacon-chain/db/filesystem/data_column_cache.go new file mode 100644 index 0000000000..6255ac5214 --- /dev/null +++ b/beacon-chain/db/filesystem/data_column_cache.go @@ -0,0 +1,220 @@ +package filesystem + +import ( + "sync" + + fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams" + "github.com/OffchainLabs/prysm/v6/config/params" + "github.com/OffchainLabs/prysm/v6/consensus-types/primitives" + "github.com/pkg/errors" +) + +var errDataColumnIndexOutOfBounds = errors.New("data column index too high") + +// DataColumnStorageSummary represents cached information about the DataColumnSidecars on disk for each root the cache knows about. +type DataColumnStorageSummary struct { + epoch primitives.Epoch + mask [fieldparams.NumberOfColumns]bool +} + +// NewDataColumnStorageSummary creates a new DataColumnStorageSummary for a given epoch and mask. +func NewDataColumnStorageSummary(epoch primitives.Epoch, mask [fieldparams.NumberOfColumns]bool) DataColumnStorageSummary { + return DataColumnStorageSummary{ + epoch: epoch, + mask: mask, + } +} + +// HasIndex returns true if the DataColumnSidecar at the given index is available in the filesystem. +func (s DataColumnStorageSummary) HasIndex(index uint64) bool { + if index >= uint64(fieldparams.NumberOfColumns) { + return false + } + return s.mask[index] +} + +// Count returns the number of available data columns. +func (s DataColumnStorageSummary) Count() uint64 { + count := uint64(0) + + for _, available := range s.mask { + if available { + count++ + } + } + + return count +} + +// AllAvailable returns true if we have all data columns for corresponding indices. +func (s DataColumnStorageSummary) AllAvailable(indices map[uint64]bool) bool { + if len(indices) > len(s.mask) { + return false + } + + for index := range indices { + if !s.mask[index] { + return false + } + } + + return true +} + +// DataColumnStorageSummarizer can be used to receive a summary of metadata about data columns on disk for a given root. +// The DataColumnStorageSummary can be used to check which indices (if any) are available for a given block by root. +type DataColumnStorageSummarizer interface { + Summary(root [fieldparams.RootLength]byte) DataColumnStorageSummary +} + +type dataColumnStorageSummaryCache struct { + mu sync.RWMutex + dataColumnCount float64 + lowestCachedEpoch primitives.Epoch + highestCachedEpoch primitives.Epoch + cache map[[fieldparams.RootLength]byte]DataColumnStorageSummary +} + +var _ DataColumnStorageSummarizer = &dataColumnStorageSummaryCache{} + +func newDataColumnStorageSummaryCache() *dataColumnStorageSummaryCache { + return &dataColumnStorageSummaryCache{ + cache: make(map[[fieldparams.RootLength]byte]DataColumnStorageSummary), + lowestCachedEpoch: params.BeaconConfig().FarFutureEpoch, + } +} + +// Summary returns the DataColumnStorageSummary for `root`. +// The DataColumnStorageSummary can be used to check for the presence of DataColumnSidecars based on Index. +func (sc *dataColumnStorageSummaryCache) Summary(root [fieldparams.RootLength]byte) DataColumnStorageSummary { + sc.mu.RLock() + defer sc.mu.RUnlock() + + return sc.cache[root] +} + +func (sc *dataColumnStorageSummaryCache) HighestEpoch() primitives.Epoch { + sc.mu.RLock() + defer sc.mu.RUnlock() + return sc.highestCachedEpoch +} + +// set updates the cache. +func (sc *dataColumnStorageSummaryCache) set(dataColumnsIdent DataColumnsIdent) error { + numberOfColumns := params.BeaconConfig().NumberOfColumns + + sc.mu.Lock() + defer sc.mu.Unlock() + + summary := sc.cache[dataColumnsIdent.Root] + summary.epoch = dataColumnsIdent.Epoch + + count := uint64(0) + for _, index := range dataColumnsIdent.Indices { + if index >= numberOfColumns { + return errDataColumnIndexOutOfBounds + } + + if summary.mask[index] { + continue + } + + count++ + + summary.mask[index] = true + sc.lowestCachedEpoch = min(sc.lowestCachedEpoch, dataColumnsIdent.Epoch) + sc.highestCachedEpoch = max(sc.highestCachedEpoch, dataColumnsIdent.Epoch) + } + + sc.cache[dataColumnsIdent.Root] = summary + + countFloat := float64(count) + sc.dataColumnCount += countFloat + dataColumnDiskCount.Set(sc.dataColumnCount) + dataColumnWrittenCounter.Add(countFloat) + + return nil +} + +// get returns the DataColumnStorageSummary for the given block root. +// If the root is not in the cache, the second return value will be false. +func (sc *dataColumnStorageSummaryCache) get(blockRoot [fieldparams.RootLength]byte) (DataColumnStorageSummary, bool) { + sc.mu.RLock() + defer sc.mu.RUnlock() + + v, ok := sc.cache[blockRoot] + return v, ok +} + +// evict removes the DataColumnStorageSummary for the given block root from the cache. +func (s *dataColumnStorageSummaryCache) evict(blockRoot [fieldparams.RootLength]byte) int { + deleted := 0 + + s.mu.Lock() + defer s.mu.Unlock() + + summary, ok := s.cache[blockRoot] + if !ok { + return 0 + } + + for i := range summary.mask { + if summary.mask[i] { + deleted += 1 + } + } + + delete(s.cache, blockRoot) + if deleted > 0 { + s.dataColumnCount -= float64(deleted) + dataColumnDiskCount.Set(s.dataColumnCount) + } + + // The lowest and highest cached epoch may no longer be valid here, + // but is not worth the effort to recalculate. + + return deleted +} + +// pruneUpTo removes all entries from the cache up to the given target epoch included. +func (sc *dataColumnStorageSummaryCache) pruneUpTo(targetEpoch primitives.Epoch) uint64 { + sc.mu.Lock() + defer sc.mu.Unlock() + + prunedCount := uint64(0) + newLowestCachedEpoch := params.BeaconConfig().FarFutureEpoch + newHighestCachedEpoch := primitives.Epoch(0) + + for blockRoot, summary := range sc.cache { + epoch := summary.epoch + + if epoch > targetEpoch { + newLowestCachedEpoch = min(newLowestCachedEpoch, epoch) + newHighestCachedEpoch = max(newHighestCachedEpoch, epoch) + } + + if epoch <= targetEpoch { + for i := range summary.mask { + if summary.mask[i] { + prunedCount += 1 + } + } + + delete(sc.cache, blockRoot) + } + } + + if prunedCount > 0 { + sc.lowestCachedEpoch = newLowestCachedEpoch + sc.highestCachedEpoch = newHighestCachedEpoch + sc.dataColumnCount -= float64(prunedCount) + dataColumnDiskCount.Set(sc.dataColumnCount) + } + + return prunedCount +} + +// clear removes all entries from the cache. +func (sc *dataColumnStorageSummaryCache) clear() uint64 { + return sc.pruneUpTo(params.BeaconConfig().FarFutureEpoch) +} diff --git a/beacon-chain/db/filesystem/data_column_cache_test.go b/beacon-chain/db/filesystem/data_column_cache_test.go new file mode 100644 index 0000000000..868a9e9928 --- /dev/null +++ b/beacon-chain/db/filesystem/data_column_cache_test.go @@ -0,0 +1,235 @@ +package filesystem + +import ( + "testing" + + fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams" + "github.com/OffchainLabs/prysm/v6/config/params" + "github.com/OffchainLabs/prysm/v6/consensus-types/primitives" + "github.com/OffchainLabs/prysm/v6/testing/require" +) + +func TestHasIndex(t *testing.T) { + summary := NewDataColumnStorageSummary(0, [fieldparams.NumberOfColumns]bool{false, true}) + + hasIndex := summary.HasIndex(1_000_000) + require.Equal(t, false, hasIndex) + + hasIndex = summary.HasIndex(0) + require.Equal(t, false, hasIndex) + + hasIndex = summary.HasIndex(1) + require.Equal(t, true, hasIndex) +} + +func TestCount(t *testing.T) { + summary := NewDataColumnStorageSummary(0, [fieldparams.NumberOfColumns]bool{false, true, false, true}) + + count := summary.Count() + require.Equal(t, uint64(2), count) +} + +func TestAllAvailableDataColumns(t *testing.T) { + const count = uint64(1_000) + + summary := NewDataColumnStorageSummary(0, [fieldparams.NumberOfColumns]bool{false, true, false, true}) + + indices := make(map[uint64]bool, count) + for i := range count { + indices[i] = true + } + + allAvailable := summary.AllAvailable(indices) + require.Equal(t, false, allAvailable) + + indices = map[uint64]bool{1: true, 2: true} + allAvailable = summary.AllAvailable(indices) + require.Equal(t, false, allAvailable) + + indices = map[uint64]bool{1: true, 3: true} + allAvailable = summary.AllAvailable(indices) + require.Equal(t, true, allAvailable) +} + +func TestSummary(t *testing.T) { + root := [fieldparams.RootLength]byte{} + + summaryCache := newDataColumnStorageSummaryCache() + expected := NewDataColumnStorageSummary(0, [fieldparams.NumberOfColumns]bool{}) + actual := summaryCache.Summary(root) + require.DeepEqual(t, expected, actual) + + summaryCache = newDataColumnStorageSummaryCache() + expected = NewDataColumnStorageSummary(0, [fieldparams.NumberOfColumns]bool{true, false, true, false}) + summaryCache.cache[root] = expected + actual = summaryCache.Summary(root) + require.DeepEqual(t, expected, actual) +} + +func TestHighestEpoch(t *testing.T) { + root1 := [fieldparams.RootLength]byte{1} + root2 := [fieldparams.RootLength]byte{2} + root3 := [fieldparams.RootLength]byte{3} + + summaryCache := newDataColumnStorageSummaryCache() + actual := summaryCache.HighestEpoch() + require.Equal(t, primitives.Epoch(0), actual) + + err := summaryCache.set(DataColumnsIdent{Root: root1, Epoch: 42, Indices: []uint64{1, 3}}) + require.NoError(t, err) + require.Equal(t, primitives.Epoch(42), summaryCache.HighestEpoch()) + + err = summaryCache.set(DataColumnsIdent{Root: root2, Epoch: 43, Indices: []uint64{1, 3}}) + require.NoError(t, err) + require.Equal(t, primitives.Epoch(43), summaryCache.HighestEpoch()) + + err = summaryCache.set(DataColumnsIdent{Root: root3, Epoch: 40, Indices: []uint64{1, 3}}) + require.NoError(t, err) + require.Equal(t, primitives.Epoch(43), summaryCache.HighestEpoch()) +} + +func TestSet(t *testing.T) { + t.Run("Index out of bounds", func(t *testing.T) { + summaryCache := newDataColumnStorageSummaryCache() + err := summaryCache.set(DataColumnsIdent{Indices: []uint64{1_000_000}}) + require.ErrorIs(t, err, errDataColumnIndexOutOfBounds) + require.Equal(t, params.BeaconConfig().FarFutureEpoch, summaryCache.lowestCachedEpoch) + require.Equal(t, 0, len(summaryCache.cache)) + }) + + t.Run("Nominal", func(t *testing.T) { + root1 := [fieldparams.RootLength]byte{1} + root2 := [fieldparams.RootLength]byte{2} + + summaryCache := newDataColumnStorageSummaryCache() + + err := summaryCache.set(DataColumnsIdent{Root: root1, Epoch: 42, Indices: []uint64{1, 3}}) + require.NoError(t, err) + require.Equal(t, primitives.Epoch(42), summaryCache.lowestCachedEpoch) + require.Equal(t, 1, len(summaryCache.cache)) + expected := DataColumnStorageSummary{epoch: 42, mask: [fieldparams.NumberOfColumns]bool{false, true, false, true}} + actual := summaryCache.cache[root1] + require.DeepEqual(t, expected, actual) + + err = summaryCache.set(DataColumnsIdent{Root: root1, Epoch: 42, Indices: []uint64{0, 1}}) + require.NoError(t, err) + require.Equal(t, primitives.Epoch(42), summaryCache.lowestCachedEpoch) + require.Equal(t, 1, len(summaryCache.cache)) + expected = DataColumnStorageSummary{epoch: 42, mask: [fieldparams.NumberOfColumns]bool{true, true, false, true}} + actual = summaryCache.cache[root1] + require.DeepEqual(t, expected, actual) + + err = summaryCache.set(DataColumnsIdent{Root: root2, Epoch: 43, Indices: []uint64{1}}) + require.NoError(t, err) + require.Equal(t, primitives.Epoch(42), summaryCache.lowestCachedEpoch) // Epoch 42 is still the lowest + require.Equal(t, 2, len(summaryCache.cache)) + expected = DataColumnStorageSummary{epoch: 43, mask: [fieldparams.NumberOfColumns]bool{false, true}} + actual = summaryCache.cache[root2] + require.DeepEqual(t, expected, actual) + }) +} + +func TestGet(t *testing.T) { + t.Run("Not in cache", func(t *testing.T) { + summaryCache := newDataColumnStorageSummaryCache() + root := [fieldparams.RootLength]byte{} + _, ok := summaryCache.get(root) + require.Equal(t, false, ok) + }) + + t.Run("In cache", func(t *testing.T) { + root := [fieldparams.RootLength]byte{} + summaryCache := newDataColumnStorageSummaryCache() + summaryCache.cache[root] = NewDataColumnStorageSummary(42, [fieldparams.NumberOfColumns]bool{true, false, true, false}) + actual, ok := summaryCache.get(root) + require.Equal(t, true, ok) + expected := NewDataColumnStorageSummary(42, [fieldparams.NumberOfColumns]bool{true, false, true, false}) + require.DeepEqual(t, expected, actual) + }) +} + +func TestEvict(t *testing.T) { + t.Run("No eviction", func(t *testing.T) { + root := [fieldparams.RootLength]byte{} + summaryCache := newDataColumnStorageSummaryCache() + + evicted := summaryCache.evict(root) + require.Equal(t, 0, evicted) + }) + + t.Run("Eviction", func(t *testing.T) { + root1 := [fieldparams.RootLength]byte{1} + root2 := [fieldparams.RootLength]byte{2} + summaryCache := newDataColumnStorageSummaryCache() + summaryCache.cache[root1] = NewDataColumnStorageSummary(42, [fieldparams.NumberOfColumns]bool{true, false, true, false}) + summaryCache.cache[root2] = NewDataColumnStorageSummary(43, [fieldparams.NumberOfColumns]bool{false, true, false, true}) + + evicted := summaryCache.evict(root1) + require.Equal(t, 2, evicted) + require.Equal(t, 1, len(summaryCache.cache)) + + _, ok := summaryCache.cache[root1] + require.Equal(t, false, ok) + + _, ok = summaryCache.cache[root2] + require.Equal(t, true, ok) + }) +} + +func TestPruneUpTo(t *testing.T) { + t.Run("No pruning", func(t *testing.T) { + summaryCache := newDataColumnStorageSummaryCache() + err := summaryCache.set(DataColumnsIdent{Root: [fieldparams.RootLength]byte{1}, Epoch: 42, Indices: []uint64{1}}) + require.NoError(t, err) + + err = summaryCache.set(DataColumnsIdent{Root: [fieldparams.RootLength]byte{2}, Epoch: 43, Indices: []uint64{2, 4}}) + require.NoError(t, err) + + count := summaryCache.pruneUpTo(41) + require.Equal(t, uint64(0), count) + require.Equal(t, 2, len(summaryCache.cache)) + require.Equal(t, primitives.Epoch(42), summaryCache.lowestCachedEpoch) + }) + + t.Run("Pruning", func(t *testing.T) { + summaryCache := newDataColumnStorageSummaryCache() + err := summaryCache.set(DataColumnsIdent{Root: [fieldparams.RootLength]byte{1}, Epoch: 42, Indices: []uint64{1}}) + require.NoError(t, err) + + err = summaryCache.set(DataColumnsIdent{Root: [fieldparams.RootLength]byte{2}, Epoch: 44, Indices: []uint64{2, 4}}) + require.NoError(t, err) + + err = summaryCache.set(DataColumnsIdent{Root: [fieldparams.RootLength]byte{3}, Epoch: 45, Indices: []uint64{2, 4}}) + require.NoError(t, err) + + count := summaryCache.pruneUpTo(42) + require.Equal(t, uint64(1), count) + require.Equal(t, 2, len(summaryCache.cache)) + require.Equal(t, primitives.Epoch(44), summaryCache.lowestCachedEpoch) + + count = summaryCache.pruneUpTo(45) + require.Equal(t, uint64(4), count) + require.Equal(t, 0, len(summaryCache.cache)) + require.Equal(t, params.BeaconConfig().FarFutureEpoch, summaryCache.lowestCachedEpoch) + require.Equal(t, primitives.Epoch(0), summaryCache.highestCachedEpoch) + + }) + + t.Run("Clear", func(t *testing.T) { + summaryCache := newDataColumnStorageSummaryCache() + err := summaryCache.set(DataColumnsIdent{Root: [fieldparams.RootLength]byte{1}, Epoch: 42, Indices: []uint64{1}}) + require.NoError(t, err) + + err = summaryCache.set(DataColumnsIdent{Root: [fieldparams.RootLength]byte{2}, Epoch: 44, Indices: []uint64{2, 4}}) + require.NoError(t, err) + + err = summaryCache.set(DataColumnsIdent{Root: [fieldparams.RootLength]byte{3}, Epoch: 45, Indices: []uint64{2, 4}}) + require.NoError(t, err) + + count := summaryCache.clear() + require.Equal(t, uint64(5), count) + require.Equal(t, 0, len(summaryCache.cache)) + require.Equal(t, params.BeaconConfig().FarFutureEpoch, summaryCache.lowestCachedEpoch) + require.Equal(t, primitives.Epoch(0), summaryCache.highestCachedEpoch) + }) +} diff --git a/beacon-chain/db/filesystem/data_column_test.go b/beacon-chain/db/filesystem/data_column_test.go new file mode 100644 index 0000000000..6391c4a9e6 --- /dev/null +++ b/beacon-chain/db/filesystem/data_column_test.go @@ -0,0 +1,742 @@ +package filesystem + +import ( + "context" + "encoding/binary" + "os" + "testing" + + fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams" + "github.com/OffchainLabs/prysm/v6/config/params" + "github.com/OffchainLabs/prysm/v6/consensus-types/primitives" + "github.com/OffchainLabs/prysm/v6/testing/require" + "github.com/OffchainLabs/prysm/v6/testing/util" + "github.com/spf13/afero" +) + +func TestNewDataColumnStorage(t *testing.T) { + ctx := context.Background() + + t.Run("No base path", func(t *testing.T) { + _, err := NewDataColumnStorage(ctx) + require.ErrorIs(t, err, errNoBasePath) + }) + + t.Run("Nominal", func(t *testing.T) { + dir := t.TempDir() + + storage, err := NewDataColumnStorage(ctx, WithDataColumnBasePath(dir)) + require.NoError(t, err) + require.Equal(t, dir, storage.base) + }) +} + +func TestWarmCache(t *testing.T) { + storage, err := NewDataColumnStorage( + context.Background(), + WithDataColumnBasePath(t.TempDir()), + WithDataColumnRetentionEpochs(10_000), + ) + require.NoError(t, err) + + _, verifiedRoDataColumnSidecars := util.CreateTestVerifiedRoDataColumnSidecars( + t, + util.DataColumnsParamsByRoot{ + {0}: { + {Slot: 33, ColumnIndex: 2, DataColumn: []byte{1, 2, 3}}, // Period 0 - Epoch 1 + {Slot: 33, ColumnIndex: 4, DataColumn: []byte{2, 3, 4}}, // Period 0 - Epoch 1 + }, + {1}: { + {Slot: 128_002, ColumnIndex: 2, DataColumn: []byte{1, 2, 3}}, // Period 0 - Epoch 4000 + {Slot: 128_002, ColumnIndex: 4, DataColumn: []byte{2, 3, 4}}, // Period 0 - Epoch 4000 + }, + {2}: { + {Slot: 128_003, ColumnIndex: 1, DataColumn: []byte{1, 2, 3}}, // Period 0 - Epoch 4000 + {Slot: 128_003, ColumnIndex: 3, DataColumn: []byte{2, 3, 4}}, // Period 0 - Epoch 4000 + }, + {3}: { + {Slot: 128_034, ColumnIndex: 2, DataColumn: []byte{1, 2, 3}}, // Period 0 - Epoch 4001 + {Slot: 128_034, ColumnIndex: 4, DataColumn: []byte{2, 3, 4}}, // Period 0 - Epoch 4001 + }, + {4}: { + {Slot: 131_138, ColumnIndex: 2, DataColumn: []byte{1, 2, 3}}, // Period 1 - Epoch 4098 + }, + {5}: { + {Slot: 131_138, ColumnIndex: 1, DataColumn: []byte{1, 2, 3}}, // Period 1 - Epoch 4098 + }, + {6}: { + {Slot: 131_168, ColumnIndex: 0, DataColumn: []byte{1, 2, 3}}, // Period 1 - Epoch 4099 + }, + }, + ) + + err = storage.Save(verifiedRoDataColumnSidecars) + require.NoError(t, err) + + storage.retentionEpochs = 4_096 + + storage.WarmCache() + require.Equal(t, primitives.Epoch(4_000), storage.cache.lowestCachedEpoch) + require.Equal(t, 6, len(storage.cache.cache)) + + summary, ok := storage.cache.get([fieldparams.RootLength]byte{1}) + require.Equal(t, true, ok) + require.DeepEqual(t, DataColumnStorageSummary{epoch: 4_000, mask: [fieldparams.NumberOfColumns]bool{false, false, true, false, true}}, summary) + + summary, ok = storage.cache.get([fieldparams.RootLength]byte{2}) + require.Equal(t, true, ok) + require.DeepEqual(t, DataColumnStorageSummary{epoch: 4_000, mask: [fieldparams.NumberOfColumns]bool{false, true, false, true}}, summary) + + summary, ok = storage.cache.get([fieldparams.RootLength]byte{3}) + require.Equal(t, true, ok) + require.DeepEqual(t, DataColumnStorageSummary{epoch: 4_001, mask: [fieldparams.NumberOfColumns]bool{false, false, true, false, true}}, summary) + + summary, ok = storage.cache.get([fieldparams.RootLength]byte{4}) + require.Equal(t, true, ok) + require.DeepEqual(t, DataColumnStorageSummary{epoch: 4_098, mask: [fieldparams.NumberOfColumns]bool{false, false, true}}, summary) + + summary, ok = storage.cache.get([fieldparams.RootLength]byte{5}) + require.Equal(t, true, ok) + require.DeepEqual(t, DataColumnStorageSummary{epoch: 4_098, mask: [fieldparams.NumberOfColumns]bool{false, true}}, summary) + + summary, ok = storage.cache.get([fieldparams.RootLength]byte{6}) + require.Equal(t, true, ok) + require.DeepEqual(t, DataColumnStorageSummary{epoch: 4_099, mask: [fieldparams.NumberOfColumns]bool{true}}, summary) +} + +func TestSaveDataColumnsSidecars(t *testing.T) { + t.Run("wrong numbers of columns", func(t *testing.T) { + cfg := params.BeaconConfig().Copy() + cfg.NumberOfColumns = 0 + params.OverrideBeaconConfig(cfg) + params.SetupTestConfigCleanup(t) + + _, verifiedRoDataColumnSidecars := util.CreateTestVerifiedRoDataColumnSidecars( + t, + util.DataColumnsParamsByRoot{ + {}: {{ColumnIndex: 12}, {ColumnIndex: 1_000_000}, {ColumnIndex: 48}}, + }, + ) + + _, dataColumnStorage := NewEphemeralDataColumnStorageAndFs(t) + err := dataColumnStorage.Save(verifiedRoDataColumnSidecars) + require.ErrorIs(t, err, errWrongNumberOfColumns) + }) + + t.Run("one of the column index is too large", func(t *testing.T) { + _, verifiedRoDataColumnSidecars := util.CreateTestVerifiedRoDataColumnSidecars( + t, + util.DataColumnsParamsByRoot{{}: {{ColumnIndex: 12}, {ColumnIndex: 1_000_000}, {ColumnIndex: 48}}}, + ) + + _, dataColumnStorage := NewEphemeralDataColumnStorageAndFs(t) + err := dataColumnStorage.Save(verifiedRoDataColumnSidecars) + require.ErrorIs(t, err, errDataColumnIndexTooLarge) + }) + + t.Run("different slots", func(t *testing.T) { + _, verifiedRoDataColumnSidecars := util.CreateTestVerifiedRoDataColumnSidecars( + t, + util.DataColumnsParamsByRoot{ + {}: { + {Slot: 1, ColumnIndex: 12, DataColumn: []byte{1, 2, 3}}, + {Slot: 2, ColumnIndex: 12, DataColumn: []byte{1, 2, 3}}, + }, + }, + ) + + _, dataColumnStorage := NewEphemeralDataColumnStorageAndFs(t) + err := dataColumnStorage.Save(verifiedRoDataColumnSidecars) + require.ErrorIs(t, err, errDataColumnSidecarsFromDifferentSlots) + }) + + t.Run("new file - no data columns to save", func(t *testing.T) { + _, verifiedRoDataColumnSidecars := util.CreateTestVerifiedRoDataColumnSidecars( + t, + util.DataColumnsParamsByRoot{{}: {}}, + ) + + _, dataColumnStorage := NewEphemeralDataColumnStorageAndFs(t) + err := dataColumnStorage.Save(verifiedRoDataColumnSidecars) + require.NoError(t, err) + }) + + t.Run("new file - different data column size", func(t *testing.T) { + _, verifiedRoDataColumnSidecars := util.CreateTestVerifiedRoDataColumnSidecars( + t, + util.DataColumnsParamsByRoot{ + {}: { + {ColumnIndex: 12, DataColumn: []byte{1, 2, 3}}, + {ColumnIndex: 11, DataColumn: []byte{1, 2, 3, 4}}, + }, + }, + ) + + _, dataColumnStorage := NewEphemeralDataColumnStorageAndFs(t) + err := dataColumnStorage.Save(verifiedRoDataColumnSidecars) + require.ErrorIs(t, err, errWrongSszEncodedDataColumnSidecarSize) + }) + + t.Run("existing file - wrong incoming SSZ encoded size", func(t *testing.T) { + _, verifiedRoDataColumnSidecars := util.CreateTestVerifiedRoDataColumnSidecars( + t, + util.DataColumnsParamsByRoot{{1}: {{ColumnIndex: 12, DataColumn: []byte{1, 2, 3}}}}, + ) + + // Save data columns into a file. + _, dataColumnStorage := NewEphemeralDataColumnStorageAndFs(t) + err := dataColumnStorage.Save(verifiedRoDataColumnSidecars) + require.NoError(t, err) + + // Build a data column sidecar for the same block but with a different + // column index and an different SSZ encoded size. + _, verifiedRoDataColumnSidecars = util.CreateTestVerifiedRoDataColumnSidecars( + t, + util.DataColumnsParamsByRoot{{1}: {{ColumnIndex: 13, DataColumn: []byte{1, 2, 3, 4}}}}, + ) + + // Try to rewrite the file. + err = dataColumnStorage.Save(verifiedRoDataColumnSidecars) + require.ErrorIs(t, err, errWrongSszEncodedDataColumnSidecarSize) + }) + + t.Run("nominal", func(t *testing.T) { + _, inputVerifiedRoDataColumnSidecars := util.CreateTestVerifiedRoDataColumnSidecars( + t, + util.DataColumnsParamsByRoot{ + {1}: { + {ColumnIndex: 12, DataColumn: []byte{1, 2, 3}}, + {ColumnIndex: 11, DataColumn: []byte{3, 4, 5}}, + {ColumnIndex: 12, DataColumn: []byte{1, 2, 3}}, // OK if duplicate + {ColumnIndex: 13, DataColumn: []byte{6, 7, 8}}, + }, + {2}: { + {ColumnIndex: 12, DataColumn: []byte{3, 4, 5}}, + {ColumnIndex: 13, DataColumn: []byte{6, 7, 8}}, + }, + }, + ) + + _, dataColumnStorage := NewEphemeralDataColumnStorageAndFs(t) + err := dataColumnStorage.Save(inputVerifiedRoDataColumnSidecars) + require.NoError(t, err) + + _, inputVerifiedRoDataColumnSidecars = util.CreateTestVerifiedRoDataColumnSidecars( + t, + util.DataColumnsParamsByRoot{ + {1}: { + {ColumnIndex: 12, DataColumn: []byte{1, 2, 3}}, // OK if duplicate + {ColumnIndex: 15, DataColumn: []byte{2, 3, 4}}, + {ColumnIndex: 1, DataColumn: []byte{2, 3, 4}}, + }, + {3}: { + {ColumnIndex: 6, DataColumn: []byte{3, 4, 5}}, + {ColumnIndex: 2, DataColumn: []byte{6, 7, 8}}, + }, + }, + ) + + err = dataColumnStorage.Save(inputVerifiedRoDataColumnSidecars) + require.NoError(t, err) + + type fixture struct { + fileName string + blockRoot [fieldparams.RootLength]byte + expectedIndices [mandatoryNumberOfColumns]byte + dataColumnParams []util.DataColumnParams + } + + fixtures := []fixture{ + { + fileName: "0/0/0x0100000000000000000000000000000000000000000000000000000000000000.sszs", + blockRoot: [fieldparams.RootLength]byte{1}, + expectedIndices: [mandatoryNumberOfColumns]byte{ + 0, nonZeroOffset + 4, 0, 0, 0, 0, 0, 0, + 0, 0, 0, nonZeroOffset + 1, nonZeroOffset, nonZeroOffset + 2, 0, nonZeroOffset + 3, + // The rest is filled with zeroes. + }, + dataColumnParams: []util.DataColumnParams{ + {ColumnIndex: 12, DataColumn: []byte{1, 2, 3}}, + {ColumnIndex: 11, DataColumn: []byte{3, 4, 5}}, + {ColumnIndex: 13, DataColumn: []byte{6, 7, 8}}, + {ColumnIndex: 15, DataColumn: []byte{2, 3, 4}}, + {ColumnIndex: 1, DataColumn: []byte{2, 3, 4}}, + }, + }, + { + fileName: "0/0/0x0200000000000000000000000000000000000000000000000000000000000000.sszs", + blockRoot: [fieldparams.RootLength]byte{2}, + expectedIndices: [mandatoryNumberOfColumns]byte{ + 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, nonZeroOffset, nonZeroOffset + 1, 0, 0, + // The rest is filled with zeroes. + }, + dataColumnParams: []util.DataColumnParams{ + {ColumnIndex: 12, DataColumn: []byte{3, 4, 5}}, + {ColumnIndex: 13, DataColumn: []byte{6, 7, 8}}, + }, + }, + { + fileName: "0/0/0x0300000000000000000000000000000000000000000000000000000000000000.sszs", + blockRoot: [fieldparams.RootLength]byte{3}, + expectedIndices: [mandatoryNumberOfColumns]byte{ + 0, 0, nonZeroOffset + 1, 0, 0, 0, nonZeroOffset, 0, + // The rest is filled with zeroes. + }, + dataColumnParams: []util.DataColumnParams{ + {ColumnIndex: 6, DataColumn: []byte{3, 4, 5}}, + {ColumnIndex: 2, DataColumn: []byte{6, 7, 8}}, + }, + }, + } + + for _, fixture := range fixtures { + // Build expected data column sidecars. + _, expectedDataColumnSidecars := util.CreateTestVerifiedRoDataColumnSidecars( + t, + util.DataColumnsParamsByRoot{fixture.blockRoot: fixture.dataColumnParams}, + ) + + // Build expected bytes. + firstSszEncodedDataColumnSidecar, err := expectedDataColumnSidecars[0].MarshalSSZ() + require.NoError(t, err) + + dataColumnSidecarsCount := len(expectedDataColumnSidecars) + sszEncodedDataColumnSidecarSize := len(firstSszEncodedDataColumnSidecar) + + sszEncodedDataColumnSidecars := make([]byte, 0, dataColumnSidecarsCount*sszEncodedDataColumnSidecarSize) + sszEncodedDataColumnSidecars = append(sszEncodedDataColumnSidecars, firstSszEncodedDataColumnSidecar...) + for _, dataColumnSidecar := range expectedDataColumnSidecars[1:] { + sszEncodedDataColumnSidecar, err := dataColumnSidecar.MarshalSSZ() + require.NoError(t, err) + sszEncodedDataColumnSidecars = append(sszEncodedDataColumnSidecars, sszEncodedDataColumnSidecar...) + } + + var encodedSszEncodedDataColumnSidecarSize [sidecarByteLenSize]byte + binary.BigEndian.PutUint32(encodedSszEncodedDataColumnSidecarSize[:], uint32(sszEncodedDataColumnSidecarSize)) + + expectedBytes := make([]byte, 0, headerSize+dataColumnSidecarsCount*sszEncodedDataColumnSidecarSize) + expectedBytes = append(expectedBytes, []byte{0x01}...) + expectedBytes = append(expectedBytes, encodedSszEncodedDataColumnSidecarSize[:]...) + expectedBytes = append(expectedBytes, fixture.expectedIndices[:]...) + expectedBytes = append(expectedBytes, sszEncodedDataColumnSidecars...) + + // Check the actual content of the file. + actualBytes, err := afero.ReadFile(dataColumnStorage.fs, fixture.fileName) + require.NoError(t, err) + require.DeepSSZEqual(t, expectedBytes, actualBytes) + + // Check the summary. + indices := map[uint64]bool{} + for _, dataColumnParam := range fixture.dataColumnParams { + indices[dataColumnParam.ColumnIndex] = true + } + + summary := dataColumnStorage.Summary(fixture.blockRoot) + for index := range uint64(mandatoryNumberOfColumns) { + require.Equal(t, indices[index], summary.HasIndex(index)) + } + + err = dataColumnStorage.Remove(fixture.blockRoot) + require.NoError(t, err) + + summary = dataColumnStorage.Summary(fixture.blockRoot) + for index := range uint64(mandatoryNumberOfColumns) { + require.Equal(t, false, summary.HasIndex(index)) + } + + _, err = afero.ReadFile(dataColumnStorage.fs, fixture.fileName) + require.ErrorIs(t, err, os.ErrNotExist) + } + }) +} + +func TestGetDataColumnSidecars(t *testing.T) { + t.Run("not found", func(t *testing.T) { + _, dataColumnStorage := NewEphemeralDataColumnStorageAndFs(t) + + verifiedRODataColumnSidecars, err := dataColumnStorage.Get([fieldparams.RootLength]byte{1}, []uint64{12, 13, 14}) + require.NoError(t, err) + require.Equal(t, 0, len(verifiedRODataColumnSidecars)) + }) + + t.Run("nominal", func(t *testing.T) { + _, expectedVerifiedRoDataColumnSidecars := util.CreateTestVerifiedRoDataColumnSidecars( + t, + util.DataColumnsParamsByRoot{ + {1}: { + {ColumnIndex: 12, DataColumn: []byte{1, 2, 3}}, + {ColumnIndex: 14, DataColumn: []byte{2, 3, 4}}, + }, + }, + ) + + _, dataColumnStorage := NewEphemeralDataColumnStorageAndFs(t) + err := dataColumnStorage.Save(expectedVerifiedRoDataColumnSidecars) + require.NoError(t, err) + + verifiedRODataColumnSidecars, err := dataColumnStorage.Get([fieldparams.RootLength]byte{1}, nil) + require.NoError(t, err) + require.DeepSSZEqual(t, expectedVerifiedRoDataColumnSidecars, verifiedRODataColumnSidecars) + + verifiedRODataColumnSidecars, err = dataColumnStorage.Get([fieldparams.RootLength]byte{1}, []uint64{12, 13, 14}) + require.NoError(t, err) + require.DeepSSZEqual(t, expectedVerifiedRoDataColumnSidecars, verifiedRODataColumnSidecars) + }) +} + +func TestRemove(t *testing.T) { + t.Run("not found", func(t *testing.T) { + _, dataColumnStorage := NewEphemeralDataColumnStorageAndFs(t) + err := dataColumnStorage.Remove([fieldparams.RootLength]byte{1}) + require.NoError(t, err) + }) + + t.Run("nominal", func(t *testing.T) { + _, inputVerifiedRoDataColumnSidecars := util.CreateTestVerifiedRoDataColumnSidecars( + t, + util.DataColumnsParamsByRoot{ + {1}: { + {Slot: 32, ColumnIndex: 10, DataColumn: []byte{1, 2, 3}}, + {Slot: 32, ColumnIndex: 11, DataColumn: []byte{2, 3, 4}}, + }, + {2}: { + {Slot: 33, ColumnIndex: 10, DataColumn: []byte{1, 2, 3}}, + {Slot: 33, ColumnIndex: 11, DataColumn: []byte{2, 3, 4}}, + }, + }, + ) + + _, dataColumnStorage := NewEphemeralDataColumnStorageAndFs(t) + err := dataColumnStorage.Save(inputVerifiedRoDataColumnSidecars) + require.NoError(t, err) + + err = dataColumnStorage.Remove([fieldparams.RootLength]byte{1}) + require.NoError(t, err) + + summary := dataColumnStorage.Summary([fieldparams.RootLength]byte{1}) + require.Equal(t, primitives.Epoch(0), summary.epoch) + require.Equal(t, uint64(0), summary.Count()) + + summary = dataColumnStorage.Summary([fieldparams.RootLength]byte{2}) + require.Equal(t, primitives.Epoch(1), summary.epoch) + require.Equal(t, uint64(2), summary.Count()) + + actual, err := dataColumnStorage.Get([fieldparams.RootLength]byte{1}, nil) + require.NoError(t, err) + require.Equal(t, 0, len(actual)) + + actual, err = dataColumnStorage.Get([fieldparams.RootLength]byte{2}, nil) + require.NoError(t, err) + require.Equal(t, 2, len(actual)) + }) +} + +func TestClear(t *testing.T) { + _, inputVerifiedRoDataColumnSidecars := util.CreateTestVerifiedRoDataColumnSidecars( + t, + util.DataColumnsParamsByRoot{ + {1}: {{ColumnIndex: 12, DataColumn: []byte{1, 2, 3}}}, + {2}: {{ColumnIndex: 13, DataColumn: []byte{6, 7, 8}}}, + }, + ) + + _, dataColumnStorage := NewEphemeralDataColumnStorageAndFs(t) + err := dataColumnStorage.Save(inputVerifiedRoDataColumnSidecars) + require.NoError(t, err) + + filePaths := []string{ + "0/0/0x0100000000000000000000000000000000000000000000000000000000000000.sszs", + "0/0/0x0200000000000000000000000000000000000000000000000000000000000000.sszs", + } + + for _, filePath := range filePaths { + _, err = afero.ReadFile(dataColumnStorage.fs, filePath) + require.NoError(t, err) + } + + err = dataColumnStorage.Clear() + require.NoError(t, err) + + summary := dataColumnStorage.Summary([fieldparams.RootLength]byte{1}) + for index := range uint64(mandatoryNumberOfColumns) { + require.Equal(t, false, summary.HasIndex(index)) + } + + for _, filePath := range filePaths { + _, err = afero.ReadFile(dataColumnStorage.fs, filePath) + require.ErrorIs(t, err, os.ErrNotExist) + } +} + +func TestMetadata(t *testing.T) { + t.Run("wrong version", func(t *testing.T) { + _, verifiedRoDataColumnSidecars := util.CreateTestVerifiedRoDataColumnSidecars( + t, + util.DataColumnsParamsByRoot{ + {1}: {{ColumnIndex: 12, DataColumn: []byte{1, 2, 3}}}, + }, + ) + + // Save data columns into a file. + _, dataColumnStorage := NewEphemeralDataColumnStorageAndFs(t) + err := dataColumnStorage.Save(verifiedRoDataColumnSidecars) + require.NoError(t, err) + + // Alter the version. + const filePath = "0/0/0x0100000000000000000000000000000000000000000000000000000000000000.sszs" + file, err := dataColumnStorage.fs.OpenFile(filePath, os.O_WRONLY, os.FileMode(0600)) + require.NoError(t, err) + + count, err := file.Write([]byte{42}) + require.NoError(t, err) + require.Equal(t, 1, count) + + // Try to read the metadata. + _, err = dataColumnStorage.metadata(file) + require.ErrorIs(t, err, errWrongVersion) + + err = file.Close() + require.NoError(t, err) + }) +} + +func TestNewStorageIndices(t *testing.T) { + t.Run("wrong number of columns", func(t *testing.T) { + _, err := newStorageIndices(nil) + require.ErrorIs(t, err, errWrongNumberOfColumns) + }) + + t.Run("nominal", func(t *testing.T) { + var indices [mandatoryNumberOfColumns]byte + indices[0] = 1 + + storageIndices, err := newStorageIndices(indices[:]) + require.NoError(t, err) + require.Equal(t, indices, storageIndices.indices) + }) +} + +func TestStorageIndicesGet(t *testing.T) { + t.Run("index too large", func(t *testing.T) { + var indices storageIndices + _, _, err := indices.get(1_000_000) + require.ErrorIs(t, errDataColumnIndexTooLarge, err) + }) + + t.Run("index not set", func(t *testing.T) { + const expected = false + var indices storageIndices + actual, _, err := indices.get(0) + require.NoError(t, err) + require.Equal(t, expected, actual) + }) + + t.Run("index set", func(t *testing.T) { + const ( + expectedOk = true + expectedPosition = int64(3) + ) + + indices := storageIndices{indices: [mandatoryNumberOfColumns]byte{0, 131}} + actualOk, actualPosition, err := indices.get(1) + require.NoError(t, err) + require.Equal(t, expectedOk, actualOk) + require.Equal(t, expectedPosition, actualPosition) + }) +} + +func TestStorageIndicesLen(t *testing.T) { + const expected = int64(2) + indices := storageIndices{count: 2} + actual := indices.len() + require.Equal(t, expected, actual) +} + +func TestStorageIndicesAll(t *testing.T) { + expectedIndices := []uint64{1, 3} + indices := storageIndices{indices: [mandatoryNumberOfColumns]byte{0, 131, 0, 128}} + actualIndices := indices.all() + require.DeepEqual(t, expectedIndices, actualIndices) +} + +func TestStorageIndicesSet(t *testing.T) { + t.Run("data column index too large", func(t *testing.T) { + var indices storageIndices + err := indices.set(1_000_000, 0) + require.ErrorIs(t, errDataColumnIndexTooLarge, err) + }) + + t.Run("position too large", func(t *testing.T) { + var indices storageIndices + err := indices.set(0, 255) + require.ErrorIs(t, errDataColumnIndexTooLarge, err) + }) + + t.Run("nominal", func(t *testing.T) { + expected := [mandatoryNumberOfColumns]byte{0, 0, 128, 0, 131} + var storageIndices storageIndices + require.Equal(t, int64(0), storageIndices.len()) + + err := storageIndices.set(2, 1) + require.NoError(t, err) + require.Equal(t, int64(1), storageIndices.len()) + + err = storageIndices.set(4, 3) + require.NoError(t, err) + require.Equal(t, int64(2), storageIndices.len()) + + err = storageIndices.set(2, 0) + require.NoError(t, err) + require.Equal(t, int64(2), storageIndices.len()) + + actual := storageIndices.indices + require.Equal(t, expected, actual) + }) +} + +func TestPrune(t *testing.T) { + t.Run(("nothing to prune"), func(t *testing.T) { + dir := t.TempDir() + dataColumnStorage, err := NewDataColumnStorage(context.Background(), WithDataColumnBasePath(dir)) + require.NoError(t, err) + + dataColumnStorage.prune() + }) + t.Run("nominal", func(t *testing.T) { + var compareSlices = func(left, right []string) bool { + if len(left) != len(right) { + return false + } + + leftMap := make(map[string]bool, len(left)) + for _, leftItem := range left { + leftMap[leftItem] = true + } + + for _, rightItem := range right { + if _, ok := leftMap[rightItem]; !ok { + return false + } + } + + return true + } + _, verifiedRoDataColumnSidecars := util.CreateTestVerifiedRoDataColumnSidecars( + t, + util.DataColumnsParamsByRoot{ + {0}: { + {Slot: 33, ColumnIndex: 2, DataColumn: []byte{1, 2, 3}}, // Period 0 - Epoch 1 + {Slot: 33, ColumnIndex: 4, DataColumn: []byte{2, 3, 4}}, // Period 0 - Epoch 1 + }, + {1}: { + {Slot: 128_002, ColumnIndex: 2, DataColumn: []byte{1, 2, 3}}, // Period 0 - Epoch 4000 + {Slot: 128_002, ColumnIndex: 4, DataColumn: []byte{2, 3, 4}}, // Period 0 - Epoch 4000 + }, + {2}: { + {Slot: 128_003, ColumnIndex: 1, DataColumn: []byte{1, 2, 3}}, // Period 0 - Epoch 4000 + {Slot: 128_003, ColumnIndex: 3, DataColumn: []byte{2, 3, 4}}, // Period 0 - Epoch 4000 + }, + {3}: { + {Slot: 131_138, ColumnIndex: 2, DataColumn: []byte{1, 2, 3}}, // Period 1 - Epoch 4098 + {Slot: 131_138, ColumnIndex: 3, DataColumn: []byte{1, 2, 3}}, // Period 1 - Epoch 4098 + }, + {4}: { + {Slot: 131_169, ColumnIndex: 2, DataColumn: []byte{1, 2, 3}}, // Period 1 - Epoch 4099 + {Slot: 131_169, ColumnIndex: 3, DataColumn: []byte{1, 2, 3}}, // Period 1 - Epoch 4099 + }, + {5}: { + {Slot: 262_144, ColumnIndex: 2, DataColumn: []byte{1, 2, 3}}, // Period 2 - Epoch 8192 + {Slot: 262_144, ColumnIndex: 3, DataColumn: []byte{1, 2, 3}}, // Period 2 - Epoch 8292 + }, + }, + ) + + dir := t.TempDir() + dataColumnStorage, err := NewDataColumnStorage(context.Background(), WithDataColumnBasePath(dir), WithDataColumnRetentionEpochs(10_000)) + require.NoError(t, err) + + err = dataColumnStorage.Save(verifiedRoDataColumnSidecars) + require.NoError(t, err) + + dirs, err := listDir(dataColumnStorage.fs, ".") + require.NoError(t, err) + require.Equal(t, true, compareSlices([]string{"0", "1", "2"}, dirs)) + + dirs, err = listDir(dataColumnStorage.fs, "0") + require.NoError(t, err) + require.Equal(t, true, compareSlices([]string{"1", "4000"}, dirs)) + + dirs, err = listDir(dataColumnStorage.fs, "1") + require.NoError(t, err) + require.Equal(t, true, compareSlices([]string{"4099", "4098"}, dirs)) + + dirs, err = listDir(dataColumnStorage.fs, "2") + require.NoError(t, err) + require.Equal(t, true, compareSlices([]string{"8192"}, dirs)) + + dirs, err = listDir(dataColumnStorage.fs, "0/1") + require.NoError(t, err) + require.Equal(t, true, compareSlices([]string{"0x0000000000000000000000000000000000000000000000000000000000000000.sszs"}, dirs)) + + dirs, err = listDir(dataColumnStorage.fs, "0/4000") + require.NoError(t, err) + require.Equal(t, true, compareSlices([]string{ + "0x0200000000000000000000000000000000000000000000000000000000000000.sszs", + "0x0100000000000000000000000000000000000000000000000000000000000000.sszs", + }, dirs)) + + dirs, err = listDir(dataColumnStorage.fs, "1/4098") + require.NoError(t, err) + require.Equal(t, true, compareSlices([]string{"0x0300000000000000000000000000000000000000000000000000000000000000.sszs"}, dirs)) + + dirs, err = listDir(dataColumnStorage.fs, "1/4099") + require.NoError(t, err) + require.Equal(t, true, compareSlices([]string{"0x0400000000000000000000000000000000000000000000000000000000000000.sszs"}, dirs)) + + dirs, err = listDir(dataColumnStorage.fs, "2/8192") + require.NoError(t, err) + require.Equal(t, true, compareSlices([]string{"0x0500000000000000000000000000000000000000000000000000000000000000.sszs"}, dirs)) + + _, verifiedRoDataColumnSidecars = util.CreateTestVerifiedRoDataColumnSidecars( + t, + util.DataColumnsParamsByRoot{ + {6}: {{Slot: 451_141, ColumnIndex: 2, DataColumn: []byte{1, 2, 3}}}, // Period 3 - Epoch 14_098 + }, + ) + + err = dataColumnStorage.Save(verifiedRoDataColumnSidecars) + require.NoError(t, err) + + // dataColumnStorage.prune(14_098) + dataColumnStorage.prune() + + dirs, err = listDir(dataColumnStorage.fs, ".") + require.NoError(t, err) + require.Equal(t, true, compareSlices([]string{"1", "2", "3"}, dirs)) + + dirs, err = listDir(dataColumnStorage.fs, "1") + require.NoError(t, err) + require.Equal(t, true, compareSlices([]string{"4099"}, dirs)) + + dirs, err = listDir(dataColumnStorage.fs, "2") + require.NoError(t, err) + require.Equal(t, true, compareSlices([]string{"8192"}, dirs)) + + dirs, err = listDir(dataColumnStorage.fs, "3") + require.NoError(t, err) + require.Equal(t, true, compareSlices([]string{"14098"}, dirs)) + + dirs, err = listDir(dataColumnStorage.fs, "1/4099") + require.NoError(t, err) + require.Equal(t, true, compareSlices([]string{"0x0400000000000000000000000000000000000000000000000000000000000000.sszs"}, dirs)) + + dirs, err = listDir(dataColumnStorage.fs, "2/8192") + require.NoError(t, err) + require.Equal(t, true, compareSlices([]string{"0x0500000000000000000000000000000000000000000000000000000000000000.sszs"}, dirs)) + + dirs, err = listDir(dataColumnStorage.fs, "3/14098") + require.NoError(t, err) + require.Equal(t, true, compareSlices([]string{"0x0600000000000000000000000000000000000000000000000000000000000000.sszs"}, dirs)) + }) +} diff --git a/beacon-chain/db/filesystem/doc.go b/beacon-chain/db/filesystem/doc.go new file mode 100644 index 0000000000..2231be4ef8 --- /dev/null +++ b/beacon-chain/db/filesystem/doc.go @@ -0,0 +1,104 @@ +package filesystem + +// nolint:dupword +/* +Data column sidecars storage documentation +========================================== + +File organisation +----------------- +- The first byte represents the version of the file structure (up to 0xff = 255). + We set it to 0x01. + Note: This is not strictly needed, but it will help a lot if, in the future, + we want to modify the file structure. +- The next 4 bytes represents the size of a SSZ encoded data column sidecar. + (See the `Computation of the maximum size of a DataColumnSidecar` section to a description + of how this value is computed). +- The next 128 bytes represent the index in the file of a given column. + The first bit of each byte in the index is set to 0 if there is no data column, + and set to 1 if there is a data column. + The remaining 7 bits (from 0 to 127) represent the index of the data column. + This sentinel bit is needed to distinguish between the column with index 0 and no column. + Example: If the column with index 5 is in the 3th position in the file, then indices[5] = 0x80 + 0x03 = 0x83. +- The rest of the file is a repeat of the SSZ encoded data columns sidecars. + + +|------------------------------------------|------------------------------------------------------------------------------------| +| Byte offset | Description | +|------------------------------------------|------------------------------------------------------------------------------------| +| 0 | version (1 byte) | sszEncodedDataColumnSidecarSize (4 bytes) | indices (128 bytes) | +|133 + 0*sszEncodedDataColumnSidecarSize | sszEncodedDataColumnSidecar (sszEncodedDataColumnSidecarSize bytes) | +|133 + 1*sszEncodedDataColumnSidecarSize | sszEncodedDataColumnSidecar (sszEncodedDataColumnSidecarSize bytes) | +|133 + 2*sszEncodedDataColumnSidecarSize | sszEncodedDataColumnSidecar (sszEncodedDataColumnSidecarSize bytes) | +| ... | ... | +|133 + 127*sszEncodedDataColumnSidecarSize | sszEncodedDataColumnSidecar (sszEncodedDataColumnSidecarSize bytes) | +|------------------------------------------|------------------------------------------------------------------------------------| + +Each file is named after the block root where the data columns were data columns are committed to. +Example: `0x259c6d2f6a0bb75e2405cea7cb248e5663dc26b9404fd3bcd777afc20de91c1e.sszs` + +Database organisation +--------------------- +SSZ encoded data column sidecars are stored following the `by-epoch` layout. +- The first layer is a directory corresponding to the `period`, which corresponds to the epoch divided by the 4096. +- The second layer is a directory corresponding to the epoch. +- Then all files are stored in the epoch directory. + +Example: +data-columns +├── 0 +│   ├── 3638 +│   │   ├── 0x259c6d2f6a0bb75e2405cea7cb248e5663dc26b9404fd3bcd777afc20de91c1e.sszs +│   │   ├── 0x2a855b1f6e9a2f04f8383e336325bf7d5ba02d1eab3ef90ef183736f8c768533.sszs +│   │   ├── ... +│   │   ├── 0xeb78e2b2350a71c640f1e96fea9e42f38e65705ab7e6e100c8bc9c589f2c5f2b.sszs +│   │   └── 0xeb7ee68da988fd20d773d45aad01dd62527734367a146e2b048715bd68a4e370.sszs +│   └── 3639 +│      ├── 0x0fd231fe95e57936fa44f6c712c490b9e337a481b661dfd46768901e90444330.sszs +│      ├── 0x1bf5edff6b6ba2b65b1db325ff3312bbb57da461ef2ae651bd741af851aada3a.sszs +│      ├── ... +│      ├── 0xa156a527e631f858fee79fab7ef1fde3f6117a2e1201d47c09fbab0c6780c937.sszs +│      └── 0xcd80bc535ddc467dea1d19e0c39c1160875ccd1989061bcd8ce206e3c1261c87.sszs +└── 1 + ├── 4096 + │   ├── 0x0d244009093e2bedb72eb265280290199e8c7bf1d90d7583c41af40d9f662269.sszs + │   ├── 0x11f420928d8de41c50e735caab0369996824a5299c5f054e097965855925697d.sszs + │   ├── ... + │   ├── 0xbe91fc782877ed400d95c02c61aebfdd592635d11f8e64c94b46abd84f45c967.sszs + │   └── 0xf246189f078f02d30173ff74605cf31c9e65b5e463275ebdbeb40476638135ff.sszs + └── 4097 +    ├── 0x454d000674793c479e90504c0fe9827b50bb176ae022dab4e37d6a21471ab570.sszs +    ├── 0xac5eb7437d7190c48cfa863e3c45f96a7f8af371d47ac12ccda07129a06af763.sszs +    ├── ... +    ├── 0xb7df30561d9d92ab5fafdd96bca8b44526497c8debf0fc425c7a0770b2abeb83.sszs +    └── 0xc1dd0b1ae847b6ec62303a36d08c6a4a2e9e3ec4be3ff70551972a0ee3de9c14.sszs + +Computation of the maximum size of a DataColumnSidecar +------------------------------------------------------ +https://github.com/ethereum/consensus-specs/blob/dev/specs/fulu/das-core.md#datacolumnsidecar + + +class DataColumnSidecar(Container): + index: ColumnIndex # Index of column in extended matrix + column: List[Cell, MAX_BLOB_COMMITMENTS_PER_BLOCK] + kzg_commitments: List[KZGCommitment, MAX_BLOB_COMMITMENTS_PER_BLOCK] + kzg_proofs: List[KZGProof, MAX_BLOB_COMMITMENTS_PER_BLOCK] + signed_block_header: SignedBeaconBlockHeader + kzg_commitments_inclusion_proof: Vector[Bytes32, KZG_COMMITMENTS_INCLUSION_PROOF_DEPTH] + + +- index: 2 bytes (ColumnIndex) +- `column`: 4,096 (MAX_BLOB_COMMITMENTS_PER_BLOCK) * 64 (FIELD_ELEMENTS_PER_CELL) * 32 bytes (BYTES_PER_FIELD_ELEMENT) = 8,388,608 bytes +- kzg_commitments: 4,096 (MAX_BLOB_COMMITMENTS_PER_BLOCK) * 48 bytes (KZGCommitment) = 196,608 bytes +- kzg_proofs: 4,096 (MAX_BLOB_COMMITMENTS_PER_BLOCK) * 48 bytes (KZGProof) = 196,608 bytes +- signed_block_header: 2 bytes (Slot) + 2 bytes (ValidatorIndex) + 3 * 2 bytes (Root) + 96 bytes (BLSSignature) = 106 bytes +- kzg_commitments_inclusion_proof: 4 (KZG_COMMITMENTS_INCLUSION_PROOF_DEPTH) * 32 bytes = 128 bytes + +TOTAL: 8,782,060 bytes = 70,256,480 bits +log(70,256,480) / log(2) ~= 26.07 + +==> 32 bits (4 bytes) are enough to store the maximum size of a data column sidecar. + +The maximum size of an SSZ encoded data column can be 2**32 bits = 536,879,912 bytes, +which left a room of 536,879,912 bytes - 8,782,060 bytes ~= 503 mega bytes to store the extra data needed by SSZ encoding (which is more than enough.) +*/ diff --git a/beacon-chain/db/filesystem/metrics.go b/beacon-chain/db/filesystem/metrics.go index cad4ce65ae..495e13836f 100644 --- a/beacon-chain/db/filesystem/metrics.go +++ b/beacon-chain/db/filesystem/metrics.go @@ -6,6 +6,7 @@ import ( ) var ( + // Blobs blobBuckets = []float64{3, 5, 7, 9, 11, 13} blobSaveLatency = promauto.NewHistogram(prometheus.HistogramOpts{ Name: "blob_storage_save_latency", @@ -33,4 +34,29 @@ var ( Name: "blob_disk_bytes", Help: "Approximate number of bytes occupied by blobs in storage", }) + + // Data columns + dataColumnBuckets = []float64{3, 5, 7, 9, 11, 13} + dataColumnSaveLatency = promauto.NewHistogram(prometheus.HistogramOpts{ + Name: "data_column_storage_save_latency", + Help: "Latency of DataColumnSidecar storage save operations in milliseconds", + Buckets: dataColumnBuckets, + }) + dataColumnFetchLatency = promauto.NewHistogram(prometheus.HistogramOpts{ + Name: "data_column_storage_get_latency", + Help: "Latency of DataColumnSidecar storage get operations in milliseconds", + Buckets: dataColumnBuckets, + }) + dataColumnPrunedCounter = promauto.NewCounter(prometheus.CounterOpts{ + Name: "data_column_pruned", + Help: "Number of DataColumnSidecar pruned.", + }) + dataColumnWrittenCounter = promauto.NewCounter(prometheus.CounterOpts{ + Name: "data_column_written", + Help: "Number of DataColumnSidecar written", + }) + dataColumnDiskCount = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "data_column_disk_count", + Help: "Approximate number of data columns in storage", + }) ) diff --git a/beacon-chain/db/filesystem/mock.go b/beacon-chain/db/filesystem/mock.go index 11992782c8..20f1418f40 100644 --- a/beacon-chain/db/filesystem/mock.go +++ b/beacon-chain/db/filesystem/mock.go @@ -1,6 +1,7 @@ package filesystem import ( + "context" "testing" "github.com/OffchainLabs/prysm/v6/config/params" @@ -9,6 +10,9 @@ import ( "github.com/spf13/afero" ) +// Blobs +// ----- + // NewEphemeralBlobStorage should only be used for tests. // The instance of BlobStorage returned is backed by an in-memory virtual filesystem, // improving test performance and simplifying cleanup. @@ -75,3 +79,41 @@ func NewMockBlobStorageSummarizer(t *testing.T, set map[[32]byte][]int) BlobStor } return c } + +// Data columns +// ------------ + +// NewEphemeralDataColumnStorage should only be used for tests. +// The instance of DataColumnStorage returned is backed by an in-memory virtual filesystem, +// improving test performance and simplifying cleanup. +func NewEphemeralDataColumnStorage(t testing.TB, opts ...DataColumnStorageOption) *DataColumnStorage { + return NewWarmedEphemeralDataColumnStorageUsingFs(t, afero.NewMemMapFs(), opts...) +} + +// NewEphemeralDataColumnStorageAndFs can be used by tests that want access to the virtual filesystem +// in order to interact with it outside the parameters of the DataColumnStorage API. +func NewEphemeralDataColumnStorageAndFs(t testing.TB, opts ...DataColumnStorageOption) (afero.Fs, *DataColumnStorage) { + fs := afero.NewMemMapFs() + dcs := NewWarmedEphemeralDataColumnStorageUsingFs(t, fs, opts...) + return fs, dcs +} + +func NewWarmedEphemeralDataColumnStorageUsingFs(t testing.TB, fs afero.Fs, opts ...DataColumnStorageOption) *DataColumnStorage { + bs := NewEphemeralDataColumnStorageUsingFs(t, fs, opts...) + bs.WarmCache() + return bs +} + +func NewEphemeralDataColumnStorageUsingFs(t testing.TB, fs afero.Fs, opts ...DataColumnStorageOption) *DataColumnStorage { + opts = append(opts, + WithDataColumnRetentionEpochs(params.BeaconConfig().MinEpochsForBlobsSidecarsRequest), + WithDataColumnFs(fs), + ) + + bs, err := NewDataColumnStorage(context.Background(), opts...) + if err != nil { + t.Fatal(err) + } + + return bs +} diff --git a/beacon-chain/verification/fake.go b/beacon-chain/verification/fake.go index 78e85b9450..90e5008e47 100644 --- a/beacon-chain/verification/fake.go +++ b/beacon-chain/verification/fake.go @@ -3,21 +3,7 @@ package verification import ( "testing" - fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams" - "github.com/OffchainLabs/prysm/v6/consensus-types/blocks" - "github.com/OffchainLabs/prysm/v6/consensus-types/primitives" -) - -type ( - DataColumnParams struct { - Slot primitives.Slot - ColumnIndex uint64 - KzgCommitments [][]byte - DataColumn []byte // A whole data cell will be filled with the content of one item of this slice. - } - - DataColumnsParamsByRoot map[[fieldparams.RootLength]byte][]DataColumnParams ) // FakeVerifyForTest can be used by tests that need a VerifiedROBlob but don't want to do all the diff --git a/changelog/manu-peerdas-filesystem.md b/changelog/manu-peerdas-filesystem.md new file mode 100644 index 0000000000..6703d75aef --- /dev/null +++ b/changelog/manu-peerdas-filesystem.md @@ -0,0 +1,2 @@ +### Added +- Implement data column sidecars filesystem. diff --git a/config/fieldparams/mainnet.go b/config/fieldparams/mainnet.go index b892479008..8ebce5af23 100644 --- a/config/fieldparams/mainnet.go +++ b/config/fieldparams/mainnet.go @@ -44,4 +44,7 @@ const ( MaxAttesterSlashingsElectra = 1 // Maximum number of attester slashings in a block. MaxRandomByte = uint64(1<<8 - 1) // MaxRandomByte defines max for a random byte using for proposer and sync committee sampling. MaxRandomValueElectra = uint64(1<<16 - 1) // MaxRandomValueElectra defines max for a random value using for proposer and sync committee sampling. + + // Introduced in Fulu network upgrade. + NumberOfColumns = 128 // NumberOfColumns refers to the specified number of data columns that can exist in a network. ) diff --git a/config/fieldparams/minimal.go b/config/fieldparams/minimal.go index 190e6d4a7c..f1923db73d 100644 --- a/config/fieldparams/minimal.go +++ b/config/fieldparams/minimal.go @@ -44,4 +44,7 @@ const ( MaxAttesterSlashingsElectra = 1 // Maximum number of attester slashings in a block. MaxRandomByte = uint64(1<<8 - 1) // Maximum value for a random value using for proposer and sync committee sampling. MaxRandomValueElectra = uint64(1<<16 - 1) // Maximum value for a random value using for proposer and sync committee sampling. + + // Introduced in Fulu network upgrade. + NumberOfColumns = 128 // NumberOfColumns refers to the specified number of data columns that can exist in a network. ) diff --git a/testing/util/BUILD.bazel b/testing/util/BUILD.bazel index 1725c23424..2060ebf443 100644 --- a/testing/util/BUILD.bazel +++ b/testing/util/BUILD.bazel @@ -13,6 +13,7 @@ go_library( "block.go", "capella_block.go", "capella_state.go", + "data_column.go", "deneb.go", "deneb_state.go", "deposits.go", @@ -68,6 +69,7 @@ go_library( "//testing/require:go_default_library", "//time/slots:go_default_library", "@com_github_crate_crypto_go_kzg_4844//:go_default_library", + "@com_github_ethereum_c_kzg_4844//bindings/go:go_default_library", "@com_github_ethereum_go_ethereum//common:go_default_library", "@com_github_ethereum_go_ethereum//common/hexutil:go_default_library", "@com_github_ethereum_go_ethereum//core/types:go_default_library", diff --git a/testing/util/data_column.go b/testing/util/data_column.go new file mode 100644 index 0000000000..b92a04fdb9 --- /dev/null +++ b/testing/util/data_column.go @@ -0,0 +1,83 @@ +package util + +import ( + "testing" + + fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams" + "github.com/OffchainLabs/prysm/v6/config/params" + "github.com/OffchainLabs/prysm/v6/consensus-types/blocks" + "github.com/OffchainLabs/prysm/v6/consensus-types/primitives" + ethpb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1" + ckzg4844 "github.com/ethereum/c-kzg-4844/v2/bindings/go" +) + +type ( + DataColumnParams struct { + Slot primitives.Slot + ColumnIndex uint64 + KzgCommitments [][]byte + DataColumn []byte // A whole data cell will be filled with the content of one item of this slice. + } + + DataColumnsParamsByRoot map[[fieldparams.RootLength]byte][]DataColumnParams +) + +func CreateTestVerifiedRoDataColumnSidecars(t *testing.T, dataColumnParamsByBlockRoot DataColumnsParamsByRoot) ([]blocks.RODataColumn, []blocks.VerifiedRODataColumn) { + params.SetupTestConfigCleanup(t) + cfg := params.BeaconConfig().Copy() + cfg.FuluForkEpoch = 0 + params.OverrideBeaconConfig(cfg) + + count := 0 + for _, indices := range dataColumnParamsByBlockRoot { + count += len(indices) + } + + verifiedRoDataColumnSidecars := make([]blocks.VerifiedRODataColumn, 0, count) + rodataColumnSidecars := make([]blocks.RODataColumn, 0, count) + for blockRoot, params := range dataColumnParamsByBlockRoot { + for _, param := range params { + dataColumn := make([][]byte, 0, len(param.DataColumn)) + for _, value := range param.DataColumn { + cell := make([]byte, ckzg4844.BytesPerCell) + for i := range ckzg4844.BytesPerCell { + cell[i] = value + } + dataColumn = append(dataColumn, cell) + } + + kzgCommitmentsInclusionProof := make([][]byte, 4) + for i := range kzgCommitmentsInclusionProof { + kzgCommitmentsInclusionProof[i] = make([]byte, 32) + } + + dataColumnSidecar := ðpb.DataColumnSidecar{ + Index: param.ColumnIndex, + KzgCommitments: param.KzgCommitments, + Column: dataColumn, + KzgCommitmentsInclusionProof: kzgCommitmentsInclusionProof, + SignedBlockHeader: ðpb.SignedBeaconBlockHeader{ + Header: ðpb.BeaconBlockHeader{ + Slot: param.Slot, + ParentRoot: make([]byte, fieldparams.RootLength), + StateRoot: make([]byte, fieldparams.RootLength), + BodyRoot: make([]byte, fieldparams.RootLength), + }, + Signature: make([]byte, fieldparams.BLSSignatureLength), + }, + } + + roDataColumnSidecar, err := blocks.NewRODataColumnWithRoot(dataColumnSidecar, blockRoot) + if err != nil { + t.Fatal(err) + } + + rodataColumnSidecars = append(rodataColumnSidecars, roDataColumnSidecar) + + verifiedRoDataColumnSidecar := blocks.NewVerifiedRODataColumn(roDataColumnSidecar) + verifiedRoDataColumnSidecars = append(verifiedRoDataColumnSidecars, verifiedRoDataColumnSidecar) + } + } + + return rodataColumnSidecars, verifiedRoDataColumnSidecars +}