mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-08 23:18:15 -05:00
Data columns cache warmup: Parallelize computation of all files for a given epoch. (#16207)
**What type of PR is this?** Other **What does this PR do? Why is it needed?** Before this PR, all `.sszs` files containing the data column sidecars were read an process sequentially, taking some time. After this PR, every `.sszs` files of a given epoch (so, up to 32 files with the current `SLOT_PER_EPOCHS` value) are processed in parallel. **Which issues(s) does this PR fix?** - https://github.com/OffchainLabs/prysm/issues/16204 Tested on - [Netcup VPS 4000 G11](https://www.netcup.com/en/server/vps). **Before this PR (3 trials)**: ``` [2026-01-02 08:55:12.71] INFO filesystem: Data column filesystem cache warm-up complete elapsed=1m22.894007534s [2026-01-02 12:59:33.62] INFO filesystem: Data column filesystem cache warm-up complete elapsed=42.346732863s [2026-01-02 13:03:13.65] INFO filesystem: Data column filesystem cache warm-up complete elapsed=56.143565960s ``` **After this PR (3 trials)**: ``` [2026-01-02 12:50:07.53] INFO filesystem: Data column filesystem cache warm-up complete elapsed=2.019424193s [2026-01-02 12:52:01.34] INFO filesystem: Data column filesystem cache warm-up complete elapsed=1.960671225s [2026-01-02 12:53:34.66] INFO filesystem: Data column filesystem cache warm-up complete elapsed=2.549555363s ``` **Acknowledgements** - [x] I have read [CONTRIBUTING.md](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md). - [x] I have included a uniquely named [changelog fragment file](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md#maintaining-changelogmd). - [x] I have added a description with sufficient context for reviewers to understand this PR. - [x] I have tested that my changes work as expected and I added a testing plan to the PR description (if applicable).
This commit is contained in:
@@ -38,6 +38,7 @@ go_library(
|
|||||||
"@com_github_prometheus_client_golang//prometheus/promauto:go_default_library",
|
"@com_github_prometheus_client_golang//prometheus/promauto:go_default_library",
|
||||||
"@com_github_sirupsen_logrus//:go_default_library",
|
"@com_github_sirupsen_logrus//:go_default_library",
|
||||||
"@com_github_spf13_afero//:go_default_library",
|
"@com_github_spf13_afero//:go_default_library",
|
||||||
|
"@org_golang_x_sync//errgroup:go_default_library",
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|||||||
@@ -25,6 +25,7 @@ import (
|
|||||||
"github.com/OffchainLabs/prysm/v7/time/slots"
|
"github.com/OffchainLabs/prysm/v7/time/slots"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
"github.com/spf13/afero"
|
"github.com/spf13/afero"
|
||||||
|
"golang.org/x/sync/errgroup"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@@ -185,73 +186,162 @@ func (dcs *DataColumnStorage) WarmCache() {
|
|||||||
|
|
||||||
highestStoredEpoch := primitives.Epoch(0)
|
highestStoredEpoch := primitives.Epoch(0)
|
||||||
|
|
||||||
// Walk the data column filesystem to warm up the cache.
|
// List all period directories
|
||||||
if err := afero.Walk(dcs.fs, ".", func(path string, info os.FileInfo, fileErr error) (err error) {
|
periodFileInfos, err := afero.ReadDir(dcs.fs, ".")
|
||||||
if fileErr != nil {
|
|
||||||
return fileErr
|
|
||||||
}
|
|
||||||
|
|
||||||
// If not a leaf, skip.
|
|
||||||
if info.IsDir() {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Extract metadata from the file path.
|
|
||||||
fileMetadata, err := extractFileMetadata(path)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithError(err).Error("Error encountered while extracting file metadata")
|
log.WithError(err).Error("Error reading top directory during warm cache")
|
||||||
return nil
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Open the data column filesystem file.
|
// Iterate through periods
|
||||||
f, err := dcs.fs.Open(path)
|
for _, periodFileInfo := range periodFileInfos {
|
||||||
|
if !periodFileInfo.IsDir() {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
periodPath := periodFileInfo.Name()
|
||||||
|
|
||||||
|
// List all epoch directories in this period
|
||||||
|
epochFileInfos, err := afero.ReadDir(dcs.fs, periodPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithError(err).Error("Error encountered while opening data column filesystem file")
|
log.WithError(err).WithField("period", periodPath).Error("Error reading period directory during warm cache")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Iterate through epochs
|
||||||
|
for _, epochFileInfo := range epochFileInfos {
|
||||||
|
if !epochFileInfo.IsDir() {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
epochPath := path.Join(periodPath, epochFileInfo.Name())
|
||||||
|
|
||||||
|
// List all .sszs files in this epoch
|
||||||
|
files, err := listEpochFiles(dcs.fs, epochPath)
|
||||||
|
if err != nil {
|
||||||
|
log.WithError(err).WithField("epoch", epochPath).Error("Error listing epoch files during warm cache")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(files) == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Process all files in this epoch in parallel
|
||||||
|
epochHighest, err := dcs.processEpochFiles(files)
|
||||||
|
if err != nil {
|
||||||
|
log.WithError(err).WithField("epoch", epochPath).Error("Error processing epoch files during warm cache")
|
||||||
|
}
|
||||||
|
|
||||||
|
highestStoredEpoch = max(highestStoredEpoch, epochHighest)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Prune the cache and the filesystem
|
||||||
|
dcs.prune()
|
||||||
|
|
||||||
|
totalElapsed := time.Since(start)
|
||||||
|
|
||||||
|
// Log summary
|
||||||
|
log.WithField("elapsed", totalElapsed).Info("Data column filesystem cache warm-up complete")
|
||||||
|
}
|
||||||
|
|
||||||
|
// listEpochFiles lists all .sszs files in an epoch directory.
|
||||||
|
func listEpochFiles(fs afero.Fs, epochPath string) ([]string, error) {
|
||||||
|
fileInfos, err := afero.ReadDir(fs, epochPath)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.Wrap(err, "read epoch directory")
|
||||||
|
}
|
||||||
|
|
||||||
|
files := make([]string, 0, len(fileInfos))
|
||||||
|
for _, fileInfo := range fileInfos {
|
||||||
|
if fileInfo.IsDir() {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
fileName := fileInfo.Name()
|
||||||
|
if strings.HasSuffix(fileName, "."+dataColumnsFileExtension) {
|
||||||
|
files = append(files, path.Join(epochPath, fileName))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return files, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// processEpochFiles processes all .sszs files in an epoch directory in parallel.
|
||||||
|
func (dcs *DataColumnStorage) processEpochFiles(files []string) (primitives.Epoch, error) {
|
||||||
|
var (
|
||||||
|
eg errgroup.Group
|
||||||
|
mu sync.Mutex
|
||||||
|
)
|
||||||
|
|
||||||
|
highestEpoch := primitives.Epoch(0)
|
||||||
|
for _, filePath := range files {
|
||||||
|
eg.Go(func() error {
|
||||||
|
epoch, err := dcs.processFile(filePath)
|
||||||
|
if err != nil {
|
||||||
|
log.WithError(err).WithField("file", filePath).Error("Error processing file during warm cache")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close the file.
|
mu.Lock()
|
||||||
|
defer mu.Unlock()
|
||||||
|
highestEpoch = max(highestEpoch, epoch)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := eg.Wait(); err != nil {
|
||||||
|
return highestEpoch, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return highestEpoch, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// processFile processes a single .sszs file.
|
||||||
|
func (dcs *DataColumnStorage) processFile(filePath string) (primitives.Epoch, error) {
|
||||||
|
// Extract metadata from the file path
|
||||||
|
fileMetadata, err := extractFileMetadata(filePath)
|
||||||
|
if err != nil {
|
||||||
|
return 0, errors.Wrap(err, "extract file metadata")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Open the file (each goroutine gets its own FD)
|
||||||
|
f, err := dcs.fs.Open(filePath)
|
||||||
|
if err != nil {
|
||||||
|
return 0, errors.Wrap(err, "open file")
|
||||||
|
}
|
||||||
defer func() {
|
defer func() {
|
||||||
// Overwrite the existing error only if it is nil, since the close error is less important.
|
if closeErr := f.Close(); closeErr != nil {
|
||||||
closeErr := f.Close()
|
log.WithError(closeErr).WithField("file", filePath).Error("Error closing file during warm cache")
|
||||||
if closeErr != nil && err == nil {
|
|
||||||
err = closeErr
|
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
// Read the metadata of the file.
|
// Read metadata
|
||||||
metadata, err := dcs.metadata(f)
|
metadata, err := dcs.metadata(f)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithError(err).Error("Error encountered while reading metadata from data column filesystem file")
|
return 0, errors.Wrap(err, "read metadata")
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check the indices.
|
// Extract indices
|
||||||
indices := metadata.indices.all()
|
indices := metadata.indices.all()
|
||||||
if len(indices) == 0 {
|
if len(indices) == 0 {
|
||||||
return nil
|
return fileMetadata.epoch, nil // No indices, skip
|
||||||
}
|
}
|
||||||
|
|
||||||
// Build the ident.
|
// Build ident and set in cache (thread-safe)
|
||||||
dataColumnsIdent := DataColumnsIdent{Root: fileMetadata.blockRoot, Epoch: fileMetadata.epoch, Indices: indices}
|
dataColumnsIdent := DataColumnsIdent{
|
||||||
|
Root: fileMetadata.blockRoot,
|
||||||
|
Epoch: fileMetadata.epoch,
|
||||||
|
Indices: indices,
|
||||||
|
}
|
||||||
|
|
||||||
// Update the highest stored epoch.
|
|
||||||
highestStoredEpoch = max(highestStoredEpoch, fileMetadata.epoch)
|
|
||||||
|
|
||||||
// Set the ident in the cache.
|
|
||||||
if err := dcs.cache.set(dataColumnsIdent); err != nil {
|
if err := dcs.cache.set(dataColumnsIdent); err != nil {
|
||||||
log.WithError(err).Error("Error encountered while ensuring data column filesystem cache")
|
return 0, errors.Wrap(err, "cache set")
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return fileMetadata.epoch, nil
|
||||||
}); err != nil {
|
|
||||||
log.WithError(err).Error("Error encountered while walking data column filesystem.")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Prune the cache and the filesystem.
|
|
||||||
dcs.prune()
|
|
||||||
|
|
||||||
log.WithField("elapsed", time.Since(start)).Info("Data column filesystem cache warm-up complete")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Summary returns the DataColumnStorageSummary.
|
// Summary returns the DataColumnStorageSummary.
|
||||||
|
|||||||
3
changelog/manu-cache-warmup.md
Normal file
3
changelog/manu-cache-warmup.md
Normal file
@@ -0,0 +1,3 @@
|
|||||||
|
### Changed
|
||||||
|
|
||||||
|
- Data column sidecars cache warmup: Process in parallel all sidecars for a given epoch.
|
||||||
Reference in New Issue
Block a user