From 49e1763ec2ac840daedc18497c19fc2617bc399c Mon Sep 17 00:00:00 2001 From: Manu NALEPA Date: Fri, 2 Jan 2026 17:59:55 +0100 Subject: [PATCH] Data columns cache warmup: Parallelize computation of all files for a given epoch. (#16207) **What type of PR is this?** Other **What does this PR do? Why is it needed?** Before this PR, all `.sszs` files containing the data column sidecars were read an process sequentially, taking some time. After this PR, every `.sszs` files of a given epoch (so, up to 32 files with the current `SLOT_PER_EPOCHS` value) are processed in parallel. **Which issues(s) does this PR fix?** - https://github.com/OffchainLabs/prysm/issues/16204 Tested on - [Netcup VPS 4000 G11](https://www.netcup.com/en/server/vps). **Before this PR (3 trials)**: ``` [2026-01-02 08:55:12.71] INFO filesystem: Data column filesystem cache warm-up complete elapsed=1m22.894007534s [2026-01-02 12:59:33.62] INFO filesystem: Data column filesystem cache warm-up complete elapsed=42.346732863s [2026-01-02 13:03:13.65] INFO filesystem: Data column filesystem cache warm-up complete elapsed=56.143565960s ``` **After this PR (3 trials)**: ``` [2026-01-02 12:50:07.53] INFO filesystem: Data column filesystem cache warm-up complete elapsed=2.019424193s [2026-01-02 12:52:01.34] INFO filesystem: Data column filesystem cache warm-up complete elapsed=1.960671225s [2026-01-02 12:53:34.66] INFO filesystem: Data column filesystem cache warm-up complete elapsed=2.549555363s ``` **Acknowledgements** - [x] I have read [CONTRIBUTING.md](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md). - [x] I have included a uniquely named [changelog fragment file](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md#maintaining-changelogmd). - [x] I have added a description with sufficient context for reviewers to understand this PR. - [x] I have tested that my changes work as expected and I added a testing plan to the PR description (if applicable). --- beacon-chain/db/filesystem/BUILD.bazel | 1 + beacon-chain/db/filesystem/data_column.go | 216 +++++++++++++++------- changelog/manu-cache-warmup.md | 3 + 3 files changed, 157 insertions(+), 63 deletions(-) create mode 100644 changelog/manu-cache-warmup.md diff --git a/beacon-chain/db/filesystem/BUILD.bazel b/beacon-chain/db/filesystem/BUILD.bazel index 5c9d2df0b4..98b7717b24 100644 --- a/beacon-chain/db/filesystem/BUILD.bazel +++ b/beacon-chain/db/filesystem/BUILD.bazel @@ -38,6 +38,7 @@ go_library( "@com_github_prometheus_client_golang//prometheus/promauto:go_default_library", "@com_github_sirupsen_logrus//:go_default_library", "@com_github_spf13_afero//:go_default_library", + "@org_golang_x_sync//errgroup:go_default_library", ], ) diff --git a/beacon-chain/db/filesystem/data_column.go b/beacon-chain/db/filesystem/data_column.go index 20e2246f21..52a3c7858a 100644 --- a/beacon-chain/db/filesystem/data_column.go +++ b/beacon-chain/db/filesystem/data_column.go @@ -25,6 +25,7 @@ import ( "github.com/OffchainLabs/prysm/v7/time/slots" "github.com/pkg/errors" "github.com/spf13/afero" + "golang.org/x/sync/errgroup" ) const ( @@ -185,73 +186,162 @@ func (dcs *DataColumnStorage) WarmCache() { highestStoredEpoch := primitives.Epoch(0) - // Walk the data column filesystem to warm up the cache. - if err := afero.Walk(dcs.fs, ".", func(path string, info os.FileInfo, fileErr error) (err error) { - if fileErr != nil { - return fileErr - } - - // If not a leaf, skip. - if info.IsDir() { - return nil - } - - // Extract metadata from the file path. - fileMetadata, err := extractFileMetadata(path) - if err != nil { - log.WithError(err).Error("Error encountered while extracting file metadata") - return nil - } - - // Open the data column filesystem file. - f, err := dcs.fs.Open(path) - if err != nil { - log.WithError(err).Error("Error encountered while opening data column filesystem file") - return nil - } - - // Close the file. - defer func() { - // Overwrite the existing error only if it is nil, since the close error is less important. - closeErr := f.Close() - if closeErr != nil && err == nil { - err = closeErr - } - }() - - // Read the metadata of the file. - metadata, err := dcs.metadata(f) - if err != nil { - log.WithError(err).Error("Error encountered while reading metadata from data column filesystem file") - return nil - } - - // Check the indices. - indices := metadata.indices.all() - if len(indices) == 0 { - return nil - } - - // Build the ident. - dataColumnsIdent := DataColumnsIdent{Root: fileMetadata.blockRoot, Epoch: fileMetadata.epoch, Indices: indices} - - // Update the highest stored epoch. - highestStoredEpoch = max(highestStoredEpoch, fileMetadata.epoch) - - // Set the ident in the cache. - if err := dcs.cache.set(dataColumnsIdent); err != nil { - log.WithError(err).Error("Error encountered while ensuring data column filesystem cache") - } - - return nil - }); err != nil { - log.WithError(err).Error("Error encountered while walking data column filesystem.") + // List all period directories + periodFileInfos, err := afero.ReadDir(dcs.fs, ".") + if err != nil { + log.WithError(err).Error("Error reading top directory during warm cache") + return } - // Prune the cache and the filesystem. + // Iterate through periods + for _, periodFileInfo := range periodFileInfos { + if !periodFileInfo.IsDir() { + continue + } + + periodPath := periodFileInfo.Name() + + // List all epoch directories in this period + epochFileInfos, err := afero.ReadDir(dcs.fs, periodPath) + if err != nil { + log.WithError(err).WithField("period", periodPath).Error("Error reading period directory during warm cache") + continue + } + + // Iterate through epochs + for _, epochFileInfo := range epochFileInfos { + if !epochFileInfo.IsDir() { + continue + } + + epochPath := path.Join(periodPath, epochFileInfo.Name()) + + // List all .sszs files in this epoch + files, err := listEpochFiles(dcs.fs, epochPath) + if err != nil { + log.WithError(err).WithField("epoch", epochPath).Error("Error listing epoch files during warm cache") + continue + } + + if len(files) == 0 { + continue + } + + // Process all files in this epoch in parallel + epochHighest, err := dcs.processEpochFiles(files) + if err != nil { + log.WithError(err).WithField("epoch", epochPath).Error("Error processing epoch files during warm cache") + } + + highestStoredEpoch = max(highestStoredEpoch, epochHighest) + } + } + + // Prune the cache and the filesystem dcs.prune() - log.WithField("elapsed", time.Since(start)).Info("Data column filesystem cache warm-up complete") + totalElapsed := time.Since(start) + + // Log summary + log.WithField("elapsed", totalElapsed).Info("Data column filesystem cache warm-up complete") +} + +// listEpochFiles lists all .sszs files in an epoch directory. +func listEpochFiles(fs afero.Fs, epochPath string) ([]string, error) { + fileInfos, err := afero.ReadDir(fs, epochPath) + if err != nil { + return nil, errors.Wrap(err, "read epoch directory") + } + + files := make([]string, 0, len(fileInfos)) + for _, fileInfo := range fileInfos { + if fileInfo.IsDir() { + continue + } + + fileName := fileInfo.Name() + if strings.HasSuffix(fileName, "."+dataColumnsFileExtension) { + files = append(files, path.Join(epochPath, fileName)) + } + } + + return files, nil +} + +// processEpochFiles processes all .sszs files in an epoch directory in parallel. +func (dcs *DataColumnStorage) processEpochFiles(files []string) (primitives.Epoch, error) { + var ( + eg errgroup.Group + mu sync.Mutex + ) + + highestEpoch := primitives.Epoch(0) + for _, filePath := range files { + eg.Go(func() error { + epoch, err := dcs.processFile(filePath) + if err != nil { + log.WithError(err).WithField("file", filePath).Error("Error processing file during warm cache") + return nil + } + + mu.Lock() + defer mu.Unlock() + highestEpoch = max(highestEpoch, epoch) + + return nil + }) + } + + if err := eg.Wait(); err != nil { + return highestEpoch, err + } + + return highestEpoch, nil +} + +// processFile processes a single .sszs file. +func (dcs *DataColumnStorage) processFile(filePath string) (primitives.Epoch, error) { + // Extract metadata from the file path + fileMetadata, err := extractFileMetadata(filePath) + if err != nil { + return 0, errors.Wrap(err, "extract file metadata") + } + + // Open the file (each goroutine gets its own FD) + f, err := dcs.fs.Open(filePath) + if err != nil { + return 0, errors.Wrap(err, "open file") + } + defer func() { + if closeErr := f.Close(); closeErr != nil { + log.WithError(closeErr).WithField("file", filePath).Error("Error closing file during warm cache") + } + }() + + // Read metadata + metadata, err := dcs.metadata(f) + if err != nil { + return 0, errors.Wrap(err, "read metadata") + } + + // Extract indices + indices := metadata.indices.all() + if len(indices) == 0 { + return fileMetadata.epoch, nil // No indices, skip + } + + // Build ident and set in cache (thread-safe) + dataColumnsIdent := DataColumnsIdent{ + Root: fileMetadata.blockRoot, + Epoch: fileMetadata.epoch, + Indices: indices, + } + + if err := dcs.cache.set(dataColumnsIdent); err != nil { + return 0, errors.Wrap(err, "cache set") + } + + return fileMetadata.epoch, nil } // Summary returns the DataColumnStorageSummary. diff --git a/changelog/manu-cache-warmup.md b/changelog/manu-cache-warmup.md new file mode 100644 index 0000000000..c6cbfdb40a --- /dev/null +++ b/changelog/manu-cache-warmup.md @@ -0,0 +1,3 @@ +### Changed + +- Data column sidecars cache warmup: Process in parallel all sidecars for a given epoch. \ No newline at end of file