Compare commits

...

14 Commits

Author SHA1 Message Date
Kasey Kirkham
7d11b27c85 mock fixes 2024-04-24 23:58:14 -05:00
Kasey Kirkham
ac94da8706 pruning tests 2024-04-24 23:26:04 -05:00
Kasey Kirkham
6df85c72a6 naming 2024-04-24 19:28:22 -05:00
Kasey Kirkham
442a28c2f9 handle errors identifying blobs 2024-04-24 14:53:24 -05:00
Kasey Kirkham
43cb6919ea assume blocking cache warm and other cleanups 2024-04-24 13:39:17 -05:00
Kasey Kirkham
9ed237faec improve naming, implement pruning for new layout 2024-04-24 09:25:58 -05:00
Kasey Kirkham
7635a4654c tests and bugfixes 2024-04-24 08:45:41 -05:00
Kasey Kirkham
9062c9c05e updating migration tests 2024-04-23 11:40:31 -05:00
Kasey Kirkham
d132d74b6b wip 2024-04-23 07:56:47 -05:00
Kasey Kirkham
1c4dd7e21c aggressive refactoring in progress 2024-04-19 17:49:25 -05:00
Kasey Kirkham
546d8a7f00 wip 2024-04-18 17:58:37 -05:00
Kasey Kirkham
a4d54488c7 start populating slot in blob namer 2024-04-17 20:45:06 -05:00
Kasey Kirkham
dd4cb07455 new blob storage scheme avoiding large base dir 2024-04-17 14:46:12 -05:00
Kasey Kirkham
4179582a72 use [32]byte keys in the filesystem cache 2024-04-17 14:32:51 -05:00
26 changed files with 1671 additions and 827 deletions

View File

@@ -2115,7 +2115,7 @@ func TestMissingIndices(t *testing.T) {
for _, c := range cases {
bm, bs := filesystem.NewEphemeralBlobStorageWithMocker(t)
t.Run(c.name, func(t *testing.T) {
require.NoError(t, bm.CreateFakeIndices(c.root, c.present...))
require.NoError(t, bm.CreateFakeIndices(c.root, 0, c.present...))
missing, err := missingIndices(bs, c.root, c.expected)
if c.err != nil {
require.ErrorIs(t, err, c.err)

View File

@@ -94,14 +94,7 @@ func (s *LazilyPersistentStore) IsDataAvailable(ctx context.Context, current pri
entry := s.cache.ensure(key)
defer s.cache.delete(key)
root := b.Root()
sumz, err := s.store.WaitForSummarizer(ctx)
if err != nil {
log.WithField("root", fmt.Sprintf("%#x", b.Root())).
WithError(err).
Debug("Failed to receive BlobStorageSummarizer within IsDataAvailable")
} else {
entry.setDiskSummary(sumz.Summary(root))
}
entry.setDiskSummary(s.store.Summary(root))
// Verify we have all the expected sidecars, and fail fast if any are missing or inconsistent.
// We don't try to salvage problematic batches because this indicates a misbehaving peer and we'd rather

View File

@@ -5,6 +5,8 @@ go_library(
srcs = [
"blob.go",
"cache.go",
"iteration.go",
"layout.go",
"log.go",
"metrics.go",
"mock.go",
@@ -13,15 +15,17 @@ go_library(
importpath = "github.com/prysmaticlabs/prysm/v5/beacon-chain/db/filesystem",
visibility = ["//visibility:public"],
deps = [
"//beacon-chain/db:go_default_library",
"//beacon-chain/verification:go_default_library",
"//config/fieldparams:go_default_library",
"//config/params:go_default_library",
"//consensus-types/blocks:go_default_library",
"//consensus-types/primitives:go_default_library",
"//encoding/bytesutil:go_default_library",
"//io/file:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",
"//runtime/logging:go_default_library",
"//time/slots:go_default_library",
"@com_github_ethereum_go_ethereum//common/hexutil:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_prometheus_client_golang//prometheus:go_default_library",
"@com_github_prometheus_client_golang//prometheus/promauto:go_default_library",
@@ -35,17 +39,24 @@ go_test(
srcs = [
"blob_test.go",
"cache_test.go",
"iteration_test.go",
"layout_test.go",
"migration_test.go",
"pruner_test.go",
],
embed = [":go_default_library"],
deps = [
"//beacon-chain/db:go_default_library",
"//beacon-chain/verification:go_default_library",
"//config/fieldparams:go_default_library",
"//config/params:go_default_library",
"//consensus-types/blocks:go_default_library",
"//consensus-types/primitives:go_default_library",
"//encoding/bytesutil:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",
"//testing/require:go_default_library",
"//testing/util:go_default_library",
"//time/slots:go_default_library",
"@com_github_prysmaticlabs_fastssz//:go_default_library",
"@com_github_spf13_afero//:go_default_library",
],

View File

@@ -1,13 +1,9 @@
package filesystem
import (
"context"
"fmt"
"math"
"os"
"path"
"strconv"
"strings"
"time"
"github.com/pkg/errors"
@@ -16,7 +12,6 @@ import (
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v5/io/file"
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v5/runtime/logging"
"github.com/sirupsen/logrus"
"github.com/spf13/afero"
@@ -29,12 +24,7 @@ var (
errNoBasePath = errors.New("BlobStorage base path not specified in init")
)
const (
sszExt = "ssz"
partExt = "part"
directoryPermissions = 0700
)
const directoryPermissions = 0700
// BlobStorageOption is a functional option for configuring a BlobStorage.
type BlobStorageOption func(*BlobStorage) error
@@ -63,6 +53,13 @@ func WithSaveFsync(fsync bool) BlobStorageOption {
}
}
func WithFs(fs afero.Fs) BlobStorageOption {
return func(b *BlobStorage) error {
b.fs = fs
return nil
}
}
// NewBlobStorage creates a new instance of the BlobStorage object. Note that the implementation of BlobStorage may
// attempt to hold a file lock to guarantee exclusive control of the blob storage directory, so this should only be
// initialized once per beacon node.
@@ -73,19 +70,24 @@ func NewBlobStorage(opts ...BlobStorageOption) (*BlobStorage, error) {
return nil, errors.Wrap(err, "failed to create blob storage")
}
}
if b.base == "" {
return nil, errNoBasePath
// Allow tests to set up a different fs using WithFs.
if b.fs == nil {
if b.base == "" {
return nil, errNoBasePath
}
b.base = path.Clean(b.base)
if err := file.MkdirAll(b.base); err != nil {
return nil, errors.Wrapf(err, "failed to create blob storage at %s", b.base)
}
b.fs = afero.NewBasePathFs(afero.NewOsFs(), b.base)
}
b.base = path.Clean(b.base)
if err := file.MkdirAll(b.base); err != nil {
return nil, errors.Wrapf(err, "failed to create blob storage at %s", b.base)
}
b.fs = afero.NewBasePathFs(afero.NewOsFs(), b.base)
pruner, err := newBlobPruner(b.fs, b.retentionEpochs)
b.cache = newBlobStorageCache()
pruner := newBlobPruner(b.retentionEpochs)
layout, err := newPeriodicEpochLayout(b.fs, b.cache, pruner)
if err != nil {
return nil, err
}
b.pruner = pruner
b.layout = layout
return b, nil
}
@@ -96,43 +98,29 @@ type BlobStorage struct {
fsync bool
fs afero.Fs
pruner *blobPruner
layout runtimeLayout
cache *blobStorageCache
}
// WarmCache runs the prune routine with an expiration of slot of 0, so nothing will be pruned, but the pruner's cache
// will be populated at node startup, avoiding a costly cold prune (~4s in syscalls) during syncing.
func (bs *BlobStorage) WarmCache() {
if bs.pruner == nil {
return
start := time.Now()
if err := warmCache(bs.layout, bs.cache); err != nil {
log.WithError(err).Error("Error encountered while warming up blob filesystem cache.")
}
go func() {
start := time.Now()
if err := bs.pruner.warmCache(); err != nil {
log.WithError(err).Error("Error encountered while warming up blob pruner cache")
}
log.WithField("elapsed", time.Since(start)).Info("Blob filesystem cache warm-up complete.")
}()
}
// ErrBlobStorageSummarizerUnavailable is a sentinel error returned when there is no pruner/cache available.
// This should be used by code that optionally uses the summarizer to optimize rpc requests. Being able to
// fallback when there is no summarizer allows client code to avoid test complexity where the summarizer doesn't matter.
var ErrBlobStorageSummarizerUnavailable = errors.New("BlobStorage not initialized with a pruner or cache")
// WaitForSummarizer blocks until the BlobStorageSummarizer is ready to use.
// BlobStorageSummarizer is not ready immediately on node startup because it needs to sample the blob filesystem to
// determine which blobs are available.
func (bs *BlobStorage) WaitForSummarizer(ctx context.Context) (BlobStorageSummarizer, error) {
if bs == nil || bs.pruner == nil {
return nil, ErrBlobStorageSummarizerUnavailable
log.WithField("elapsed", time.Since(start)).Info("Blob filesystem cache warm-up complete.")
from := &flatRootLayout{fs: bs.fs}
if err := migrateLayout(bs.fs, from, bs.layout, bs.cache); err != nil {
log.WithError(err).Error("Error encountered while migrating legacy blob storage scheme.")
}
return bs.pruner.waitForCache(ctx)
}
// Save saves blobs given a list of sidecars.
func (bs *BlobStorage) Save(sidecar blocks.VerifiedROBlob) error {
startTime := time.Now()
fname := namerForSidecar(sidecar)
sszPath := fname.path()
ident := identForSidecar(sidecar)
sszPath := bs.layout.sszPath(ident)
exists, err := afero.Exists(bs.fs, sszPath)
if err != nil {
return err
@@ -141,10 +129,9 @@ func (bs *BlobStorage) Save(sidecar blocks.VerifiedROBlob) error {
log.WithFields(logging.BlobFields(sidecar.ROBlob)).Debug("Ignoring a duplicate blob sidecar save attempt")
return nil
}
if bs.pruner != nil {
if err := bs.pruner.notify(sidecar.BlockRoot(), sidecar.Slot(), sidecar.Index); err != nil {
return errors.Wrapf(err, "problem maintaining pruning cache/metrics for sidecar with root=%#x", sidecar.BlockRoot())
}
if err := bs.layout.notify(ident); err != nil {
return errors.Wrapf(err, "problem maintaining pruning cache/metrics for sidecar with root=%#x", sidecar.BlockRoot())
}
// Serialize the ethpb.BlobSidecar to binary data using SSZ.
@@ -155,10 +142,10 @@ func (bs *BlobStorage) Save(sidecar blocks.VerifiedROBlob) error {
return errSidecarEmptySSZData
}
if err := bs.fs.MkdirAll(fname.dir(), directoryPermissions); err != nil {
if err := bs.fs.MkdirAll(bs.layout.dir(ident), directoryPermissions); err != nil {
return err
}
partPath := fname.partPath(fmt.Sprintf("%p", sidecarData))
partPath := bs.layout.partPath(ident, fmt.Sprintf("%p", sidecarData))
partialMoved := false
// Ensure the partial file is deleted.
@@ -223,67 +210,37 @@ func (bs *BlobStorage) Save(sidecar blocks.VerifiedROBlob) error {
// value is always a VerifiedROBlob.
func (bs *BlobStorage) Get(root [32]byte, idx uint64) (blocks.VerifiedROBlob, error) {
startTime := time.Now()
expected := blobNamer{root: root, index: idx}
encoded, err := afero.ReadFile(bs.fs, expected.path())
var v blocks.VerifiedROBlob
ident, err := bs.layout.ident(root, idx)
if err != nil {
return v, err
}
s := &ethpb.BlobSidecar{}
if err := s.UnmarshalSSZ(encoded); err != nil {
return v, err
}
ro, err := blocks.NewROBlobWithRoot(s, root)
if err != nil {
return blocks.VerifiedROBlob{}, err
return verification.VerifiedROBlobError(err)
}
defer func() {
blobFetchLatency.Observe(float64(time.Since(startTime).Milliseconds()))
}()
return verification.BlobSidecarNoop(ro)
return verification.VerifiedROBlobFromDisk(bs.fs, root, bs.layout.sszPath(ident))
}
// Remove removes all blobs for a given root.
func (bs *BlobStorage) Remove(root [32]byte) error {
rootDir := blobNamer{root: root}.dir()
return bs.fs.RemoveAll(rootDir)
dirIdent, err := bs.layout.dirIdent(root)
if err != nil {
return err
}
_, err = bs.layout.remove(dirIdent)
return err
}
// Indices generates a bitmap representing which BlobSidecar.Index values are present on disk for a given root.
// This value can be compared to the commitments observed in a block to determine which indices need to be found
// on the network to confirm data availability.
func (bs *BlobStorage) Indices(root [32]byte) ([fieldparams.MaxBlobsPerBlock]bool, error) {
var mask [fieldparams.MaxBlobsPerBlock]bool
rootDir := blobNamer{root: root}.dir()
entries, err := afero.ReadDir(bs.fs, rootDir)
if err != nil {
if os.IsNotExist(err) {
return mask, nil
}
return mask, err
}
for i := range entries {
if entries[i].IsDir() {
continue
}
name := entries[i].Name()
if !strings.HasSuffix(name, sszExt) {
continue
}
parts := strings.Split(name, ".")
if len(parts) != 2 {
continue
}
u, err := strconv.ParseUint(parts[0], 10, 64)
if err != nil {
return mask, errors.Wrapf(err, "unexpected directory entry breaks listing, %s", parts[0])
}
if u >= fieldparams.MaxBlobsPerBlock {
return mask, errIndexOutOfBounds
}
mask[u] = true
}
return mask, nil
return bs.Summary(root).mask, nil
}
// Summary returns the BlobStorageSummary from the layout.
// Internally, this is a cached representation of the directory listing for the given root.
func (bs *BlobStorage) Summary(root [32]byte) BlobStorageSummary {
return bs.layout.summary(root)
}
// Clear deletes all files on the filesystem.
@@ -308,28 +265,3 @@ func (bs *BlobStorage) WithinRetentionPeriod(requested, current primitives.Epoch
}
return requested+bs.retentionEpochs >= current
}
type blobNamer struct {
root [32]byte
index uint64
}
func namerForSidecar(sc blocks.VerifiedROBlob) blobNamer {
return blobNamer{root: sc.BlockRoot(), index: sc.Index}
}
func (p blobNamer) dir() string {
return rootString(p.root)
}
func (p blobNamer) partPath(entropy string) string {
return path.Join(p.dir(), fmt.Sprintf("%s-%d.%s", entropy, p.index, partExt))
}
func (p blobNamer) path() string {
return path.Join(p.dir(), fmt.Sprintf("%d.%s", p.index, sszExt))
}
func rootString(root [32]byte) string {
return fmt.Sprintf("%#x", root)
}

View File

@@ -9,26 +9,26 @@ import (
"testing"
ssz "github.com/prysmaticlabs/fastssz"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/db"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/verification"
fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams"
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v5/encoding/bytesutil"
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v5/testing/require"
"github.com/prysmaticlabs/prysm/v5/testing/util"
"github.com/prysmaticlabs/prysm/v5/time/slots"
"github.com/spf13/afero"
)
func TestBlobStorage_SaveBlobData(t *testing.T) {
_, sidecars := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, 1, fieldparams.MaxBlobsPerBlock)
testSidecars, err := verification.BlobSidecarSliceNoop(sidecars)
require.NoError(t, err)
testSidecars := verification.FakeVerifySliceForTest(t, sidecars)
t.Run("no error for duplicate", func(t *testing.T) {
fs, bs := NewEphemeralBlobStorageWithFs(t)
fs, bs := NewEphemeralBlobStorageAndFs(t)
existingSidecar := testSidecars[0]
blobPath := namerForSidecar(existingSidecar).path()
blobPath := bs.layout.sszPath(identForSidecar(existingSidecar))
// Serialize the existing BlobSidecar to binary data.
existingSidecarData, err := ssz.MarshalSSZ(existingSidecar)
require.NoError(t, err)
@@ -85,7 +85,7 @@ func TestBlobStorage_SaveBlobData(t *testing.T) {
require.NoError(t, bs.Remove(expected.BlockRoot()))
_, err = bs.Get(expected.BlockRoot(), expected.Index)
require.ErrorContains(t, "file does not exist", err)
require.Equal(t, true, db.IsNotFound(err))
})
t.Run("clear", func(t *testing.T) {
@@ -126,14 +126,13 @@ func TestBlobStorage_SaveBlobData(t *testing.T) {
})
}
// pollUntil polls a condition function until it returns true or a timeout is reached.
func TestBlobIndicesBounds(t *testing.T) {
fs, bs := NewEphemeralBlobStorageWithFs(t)
fs := afero.NewMemMapFs()
root := [32]byte{}
okIdx := uint64(fieldparams.MaxBlobsPerBlock - 1)
writeFakeSSZ(t, fs, root, okIdx)
writeFakeSSZ(t, fs, root, 0, okIdx)
bs := NewEphemeralBlobStorageUsingFs(t, fs)
indices, err := bs.Indices(root)
require.NoError(t, err)
var expected [fieldparams.MaxBlobsPerBlock]bool
@@ -143,24 +142,27 @@ func TestBlobIndicesBounds(t *testing.T) {
}
oobIdx := uint64(fieldparams.MaxBlobsPerBlock)
writeFakeSSZ(t, fs, root, oobIdx)
_, err = bs.Indices(root)
require.ErrorIs(t, err, errIndexOutOfBounds)
writeFakeSSZ(t, fs, root, 0, oobIdx)
// This now fails at cache warmup time.
require.ErrorIs(t, err, warmCache(bs.layout, bs.cache))
}
func writeFakeSSZ(t *testing.T, fs afero.Fs, root [32]byte, idx uint64) {
namer := blobNamer{root: root, index: idx}
require.NoError(t, fs.MkdirAll(namer.dir(), 0700))
fh, err := fs.Create(namer.path())
func writeFakeSSZ(t *testing.T, fs afero.Fs, root [32]byte, slot primitives.Slot, idx uint64) {
epoch := slots.ToEpoch(slot)
namer := newBlobIdent(root, epoch, idx)
layout := periodicEpochLayout{}
require.NoError(t, fs.MkdirAll(layout.dir(namer), 0700))
fh, err := fs.Create(layout.sszPath(namer))
require.NoError(t, err)
_, err = fh.Write([]byte("derp"))
require.NoError(t, err)
require.NoError(t, fh.Close())
}
/*
func TestBlobStoragePrune(t *testing.T) {
currentSlot := primitives.Slot(200000)
fs, bs := NewEphemeralBlobStorageWithFs(t)
fs, bs := NewEphemeralBlobStorageAndFs(t)
t.Run("PruneOne", func(t *testing.T) {
_, sidecars := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, 300, fieldparams.MaxBlobsPerBlock)
@@ -170,10 +172,15 @@ func TestBlobStoragePrune(t *testing.T) {
for _, sidecar := range testSidecars {
require.NoError(t, bs.Save(sidecar))
}
ident := identForSidecar(testSidecars[0])
beforeFolders, err := afero.ReadDir(fs, ident.groupDir())
require.NoError(t, err)
require.Equal(t, 1, len(beforeFolders))
require.NoError(t, bs.pruner.prune(currentSlot-bs.pruner.windowSize))
remainingFolders, err := afero.ReadDir(fs, ".")
remainingFolders, err := afero.ReadDir(fs, ident.groupDir())
require.NoError(t, err)
require.Equal(t, 0, len(remainingFolders))
})
@@ -181,6 +188,7 @@ func TestBlobStoragePrune(t *testing.T) {
_, sidecars := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, 299, fieldparams.MaxBlobsPerBlock)
testSidecars, err := verification.BlobSidecarSliceNoop(sidecars)
require.NoError(t, err)
ident := identForSidecar(testSidecars[0])
for _, sidecar := range testSidecars[4:] {
require.NoError(t, bs.Save(sidecar))
@@ -188,56 +196,46 @@ func TestBlobStoragePrune(t *testing.T) {
require.NoError(t, bs.pruner.prune(currentSlot-bs.pruner.windowSize))
remainingFolders, err := afero.ReadDir(fs, ".")
remainingFolders, err := afero.ReadDir(fs, ident.groupDir())
require.NoError(t, err)
require.Equal(t, 0, len(remainingFolders))
})
t.Run("PruneMany", func(t *testing.T) {
blockQty := 10
slot := primitives.Slot(1)
for j := 0; j <= blockQty; j++ {
root := bytesutil.ToBytes32(bytesutil.ToBytes(uint64(slot), 32))
_, sidecars := util.GenerateTestDenebBlockWithSidecar(t, root, slot, fieldparams.MaxBlobsPerBlock)
pruneBefore := currentSlot - bs.pruner.windowSize
increment := primitives.Slot(10000)
slots := []primitives.Slot{
pruneBefore - increment,
pruneBefore - (2 * increment),
pruneBefore,
pruneBefore + increment,
pruneBefore + (2 * increment),
}
namers := make([]blobIdent, len(slots))
for i, s := range slots {
_, sidecars := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, s, 1)
testSidecars, err := verification.BlobSidecarSliceNoop(sidecars)
require.NoError(t, err)
require.NoError(t, bs.Save(testSidecars[0]))
slot += 10000
namers[i] = identForSidecar(testSidecars[0])
}
require.NoError(t, bs.pruner.prune(currentSlot-bs.pruner.windowSize))
remainingFolders, err := afero.ReadDir(fs, ".")
require.NoError(t, err)
require.Equal(t, 4, len(remainingFolders))
// first 2 subdirs should be removed
for _, nmr := range namers[0:2] {
entries, err := listDir(fs, nmr.dir())
require.Equal(t, 0, len(entries))
require.ErrorIs(t, err, os.ErrNotExist)
}
// the rest should still be there
for _, nmr := range namers[2:] {
entries, err := listDir(fs, nmr.dir())
require.NoError(t, err)
require.Equal(t, 1, len(entries))
}
})
}
func BenchmarkPruning(b *testing.B) {
var t *testing.T
_, bs := NewEphemeralBlobStorageWithFs(t)
blockQty := 10000
currentSlot := primitives.Slot(150000)
slot := primitives.Slot(0)
for j := 0; j <= blockQty; j++ {
root := bytesutil.ToBytes32(bytesutil.ToBytes(uint64(slot), 32))
_, sidecars := util.GenerateTestDenebBlockWithSidecar(t, root, slot, fieldparams.MaxBlobsPerBlock)
testSidecars, err := verification.BlobSidecarSliceNoop(sidecars)
require.NoError(t, err)
require.NoError(t, bs.Save(testSidecars[0]))
slot += 100
}
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
err := bs.pruner.prune(currentSlot)
require.NoError(b, err)
}
}
*/
func TestNewBlobStorage(t *testing.T) {
_, err := NewBlobStorage()

View File

@@ -3,18 +3,20 @@ package filesystem
import (
"sync"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/db"
fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams"
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
)
const bytesPerSidecar = 131928
// blobIndexMask is a bitmask representing the set of blob indices that are currently set.
type blobIndexMask [fieldparams.MaxBlobsPerBlock]bool
// BlobStorageSummary represents cached information about the BlobSidecars on disk for each root the cache knows about.
type BlobStorageSummary struct {
slot primitives.Slot
mask blobIndexMask
epoch primitives.Epoch
mask blobIndexMask
}
// HasIndex returns true if the BlobSidecar at the given index is available in the filesystem.
@@ -48,34 +50,33 @@ type BlobStorageSummarizer interface {
type blobStorageCache struct {
mu sync.RWMutex
nBlobs float64
cache map[string]BlobStorageSummary
cache map[[32]byte]BlobStorageSummary
}
var _ BlobStorageSummarizer = &blobStorageCache{}
func newBlobStorageCache() *blobStorageCache {
return &blobStorageCache{
cache: make(map[string]BlobStorageSummary, params.BeaconConfig().MinEpochsForBlobsSidecarsRequest*fieldparams.SlotsPerEpoch),
cache: make(map[[32]byte]BlobStorageSummary),
}
}
// Summary returns the BlobStorageSummary for `root`. The BlobStorageSummary can be used to check for the presence of
// BlobSidecars based on Index.
func (s *blobStorageCache) Summary(root [32]byte) BlobStorageSummary {
k := rootString(root)
s.mu.RLock()
defer s.mu.RUnlock()
return s.cache[k]
return s.cache[root]
}
func (s *blobStorageCache) ensure(key string, slot primitives.Slot, idx uint64) error {
func (s *blobStorageCache) ensure(key [32]byte, epoch primitives.Epoch, idx uint64) error {
if idx >= fieldparams.MaxBlobsPerBlock {
return errIndexOutOfBounds
}
s.mu.Lock()
defer s.mu.Unlock()
v := s.cache[key]
v.slot = slot
v.epoch = epoch
if !v.mask[idx] {
s.updateMetrics(1)
}
@@ -84,18 +85,48 @@ func (s *blobStorageCache) ensure(key string, slot primitives.Slot, idx uint64)
return nil
}
func (s *blobStorageCache) slot(key string) (primitives.Slot, bool) {
func (s *blobStorageCache) epoch(key [32]byte) (primitives.Epoch, bool) {
s.mu.RLock()
defer s.mu.RUnlock()
v, ok := s.cache[key]
if !ok {
return 0, false
}
return v.slot, ok
return v.epoch, ok
}
func (s *blobStorageCache) evict(key string) {
var deleted float64
func (s *blobStorageCache) get(key [32]byte) (BlobStorageSummary, bool) {
s.mu.RLock()
defer s.mu.RUnlock()
v, ok := s.cache[key]
return v, ok
}
func (s *blobStorageCache) identForIdx(key [32]byte, idx uint64) (blobIdent, error) {
v, ok := s.get(key)
if !ok || !v.HasIndex(idx) {
return blobIdent{}, db.ErrNotFound
}
return blobIdent{
root: key,
index: idx,
epoch: v.epoch,
}, nil
}
func (s *blobStorageCache) identForRoot(key [32]byte) (blobIdent, error) {
v, ok := s.get(key)
if !ok {
return blobIdent{}, db.ErrNotFound
}
return blobIdent{
root: key,
epoch: v.epoch,
}, nil
}
func (s *blobStorageCache) evict(key [32]byte) int {
deleted := 0
s.mu.Lock()
v, ok := s.cache[key]
if ok {
@@ -108,8 +139,9 @@ func (s *blobStorageCache) evict(key string) {
delete(s.cache, key)
s.mu.Unlock()
if deleted > 0 {
s.updateMetrics(-deleted)
s.updateMetrics(-float64(deleted))
}
return deleted
}
func (s *blobStorageCache) updateMetrics(delta float64) {

View File

@@ -48,8 +48,8 @@ func TestSlotByRoot_Summary(t *testing.T) {
sc := newBlobStorageCache()
for _, c := range cases {
if c.expected != nil {
key := rootString(bytesutil.ToBytes32([]byte(c.name)))
sc.cache[key] = BlobStorageSummary{slot: 0, mask: *c.expected}
key := bytesutil.ToBytes32([]byte(c.name))
sc.cache[key] = BlobStorageSummary{epoch: 0, mask: *c.expected}
}
}
for _, c := range cases {

View File

@@ -0,0 +1,328 @@
package filesystem
import (
"encoding/binary"
"fmt"
"io"
"path"
"path/filepath"
"strconv"
"strings"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/pkg/errors"
fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams"
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v5/encoding/bytesutil"
"github.com/prysmaticlabs/prysm/v5/time/slots"
"github.com/sirupsen/logrus"
"github.com/spf13/afero"
)
var errIdentFailure = errors.New("failed to determine blob metadata, ignoring all sub-path.")
type identificationError struct {
err error
path string
ident blobIdent
}
func (ide *identificationError) Error() string {
return fmt.Sprintf("%s path=%s, err=%s", errIdentFailure.Error(), ide.path, ide.err.Error())
}
func (ide *identificationError) Unwrap() error {
return ide.err
}
func (ide *identificationError) Is(err error) bool {
return err == errIdentFailure
}
func (ide *identificationError) LogFields() logrus.Fields {
fields := ide.ident.logFields()
fields["path"] = ide.path
return fields
}
func newIdentificationError(path string, ident blobIdent, err error) *identificationError {
return &identificationError{path: path, ident: ident, err: err}
}
func listDir(fs afero.Fs, dir string) ([]string, error) {
top, err := fs.Open(dir)
if err != nil {
return nil, errors.Wrap(err, "failed to open directory descriptor")
}
defer func() {
if err := top.Close(); err != nil {
log.WithError(err).Errorf("Could not close file %s", dir)
}
}()
// re the -1 param: "If n <= 0, Readdirnames returns all the names from the directory in a single slice"
dirs, err := top.Readdirnames(-1)
if err != nil {
return nil, errors.Wrap(err, "failed to read directory listing")
}
return dirs, nil
}
type layoutLevel struct {
populateIdent identPopulator
filter func(string) bool
}
type identPopulator func(blobIdent, string) (blobIdent, error)
type identIterator struct {
fs afero.Fs
path string
child *identIterator
ident blobIdent
levels []layoutLevel
entries []string
offset int
}
func (iter *identIterator) next() (blobIdent, error) {
if iter.child != nil {
next, err := iter.child.next()
if err == nil {
return next, nil
}
if err != io.EOF {
return blobIdent{}, err
}
}
return iter.advanceChild()
}
func (iter *identIterator) advanceChild() (blobIdent, error) {
defer func() {
iter.offset += 1
}()
for i := iter.offset; i < len(iter.entries); i++ {
iter.offset = i
nextPath := filepath.Join(iter.path, iter.entries[iter.offset])
nextLevel := iter.levels[0]
if !nextLevel.filter(nextPath) {
continue
}
ident, err := nextLevel.populateIdent(iter.ident, nextPath)
if err != nil {
return ident, newIdentificationError(nextPath, ident, err)
}
// if we're at the leaf level, we can return the updated ident.
if len(iter.levels) == 1 {
return ident, nil
}
entries, err := listDir(iter.fs, nextPath)
if err != nil {
return blobIdent{}, err
}
if len(entries) == 0 {
continue
}
iter.child = &identIterator{
fs: iter.fs,
path: nextPath,
ident: ident,
levels: iter.levels[1:],
entries: entries,
}
return iter.child.next()
}
return blobIdent{}, io.EOF
}
func populateNoop(namer blobIdent, dir string) (blobIdent, error) {
return namer, nil
}
func populateEpoch(namer blobIdent, dir string) (blobIdent, error) {
epoch, err := epochFromPath(dir)
if err != nil {
return namer, err
}
namer.epoch = epoch
return namer, nil
}
func populateRoot(namer blobIdent, dir string) (blobIdent, error) {
root, err := rootFromPath(dir)
if err != nil {
return namer, err
}
namer.root = root
return namer, nil
}
func populateIndex(namer blobIdent, fname string) (blobIdent, error) {
idx, err := idxFromPath(fname)
if err != nil {
return namer, err
}
namer.index = idx
return namer, nil
}
type readSlotOncePerRoot struct {
fs afero.Fs
lastRoot [32]byte
epoch primitives.Epoch
}
func (l *readSlotOncePerRoot) populateIdent(ident blobIdent, fname string) (blobIdent, error) {
ident, err := populateIndex(ident, fname)
if err != nil {
return ident, err
}
if ident.root != l.lastRoot {
slot, err := slotFromFile(fname, l.fs)
if err != nil {
return ident, err
}
l.lastRoot = ident.root
l.epoch = slots.ToEpoch(slot)
}
ident.epoch = l.epoch
return ident, nil
}
func epochFromPath(p string) (primitives.Epoch, error) {
subdir := filepath.Base(p)
epoch, err := strconv.ParseUint(subdir, 10, 64)
if err != nil {
return 0, errors.Wrapf(errInvalidDirectoryLayout,
"failed to decode epoch as uint, err=%s, dir=%s", err.Error(), p)
}
return primitives.Epoch(epoch), nil
}
func periodFromPath(p string) (uint64, error) {
subdir := filepath.Base(p)
period, err := strconv.ParseUint(subdir, 10, 64)
if err != nil {
return 0, errors.Wrapf(errInvalidDirectoryLayout,
"failed to decode period from path as uint, err=%s, dir=%s", err.Error(), p)
}
return period, nil
}
func rootFromPath(p string) ([32]byte, error) {
subdir := filepath.Base(p)
root, err := stringToRoot(subdir)
if err != nil {
return root, errors.Wrapf(err, "invalid directory, could not parse subdir as root %s", p)
}
return root, nil
}
func idxFromPath(p string) (uint64, error) {
p = path.Base(p)
if !isSszFile(p) {
return 0, errors.Wrap(errNotBlobSSZ, "does not have .ssz extension")
}
parts := strings.Split(p, ".")
if len(parts) != 2 {
return 0, errors.Wrap(errNotBlobSSZ, "unexpected filename structure (want <index>.ssz)")
}
idx, err := strconv.ParseUint(parts[0], 10, 64)
if err != nil {
return 0, err
}
if idx >= fieldparams.MaxBlobsPerBlock {
return 0, errors.Wrapf(errIndexOutOfBounds, "index=%d", idx)
}
return idx, nil
}
// Read slot from marshaled BlobSidecar data in the given file. See slotFromBlob for details.
func slotFromFile(name string, fs afero.Fs) (primitives.Slot, error) {
f, err := fs.Open(name)
if err != nil {
return 0, err
}
defer func() {
if err := f.Close(); err != nil {
log.WithError(err).Errorf("Could not close blob file")
}
}()
return slotFromBlob(f)
}
// slotFromBlob reads the ssz data of a file at the specified offset (8 + 131072 + 48 + 48 = 131176 bytes),
// which is calculated based on the size of the BlobSidecar struct and is based on the size of the fields
// preceding the slot information within SignedBeaconBlockHeader.
func slotFromBlob(at io.ReaderAt) (primitives.Slot, error) {
b := make([]byte, 8)
_, err := at.ReadAt(b, 131176)
if err != nil {
return 0, err
}
rawSlot := binary.LittleEndian.Uint64(b)
return primitives.Slot(rawSlot), nil
}
func filterNoop(_ string) bool {
return true
}
func isRootDir(p string) bool {
dir := filepath.Base(p)
return len(dir) == rootStringLen && strings.HasPrefix(dir, "0x")
}
func isSszFile(s string) bool {
return filepath.Ext(s) == "."+sszExt
}
func isBeforeEpoch(before primitives.Epoch) func(string) bool {
if before == 0 {
return filterNoop
}
return func(p string) bool {
epoch, err := epochFromPath(p)
if err != nil {
return false
}
return epoch < before
}
}
func isBeforePeriod(before primitives.Epoch) func(string) bool {
if before == 0 {
return filterNoop
}
beforePeriod := periodForEpoch(before)
if before%4096 != 0 {
// Add one because we need to include the period the epoch is in, unless it is the first epoch in the period,
// in which case we can just look at any previous period.
beforePeriod += 1
}
return func(p string) bool {
period, err := periodFromPath(p)
if err != nil {
return false
}
return primitives.Epoch(period) < beforePeriod
}
}
func rootToString(root [32]byte) string {
return fmt.Sprintf("%#x", root)
}
func stringToRoot(str string) ([32]byte, error) {
if len(str) != rootStringLen {
return [32]byte{}, errors.Wrapf(errInvalidRootString, "incorrect len for input=%s", str)
}
slice, err := hexutil.Decode(str)
if err != nil {
return [32]byte{}, errors.Wrapf(errInvalidRootString, "input=%s", str)
}
return bytesutil.ToBytes32(slice), nil
}

View File

@@ -0,0 +1,242 @@
package filesystem
import (
"bytes"
"fmt"
"math"
"os"
"path"
"sort"
"testing"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/verification"
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v5/testing/require"
"github.com/prysmaticlabs/prysm/v5/testing/util"
"github.com/spf13/afero"
)
func TestRootFromDir(t *testing.T) {
cases := []struct {
name string
dir string
err error
root [32]byte
}{
{
name: "happy path",
dir: "0xffff875e1d985c5ccb214894983f2428edb271f0f87b68ba7010e4a99df3b5cb",
root: [32]byte{255, 255, 135, 94, 29, 152, 92, 92, 203, 33, 72, 148, 152, 63, 36, 40,
237, 178, 113, 240, 248, 123, 104, 186, 112, 16, 228, 169, 157, 243, 181, 203},
},
{
name: "too short",
dir: "0xffff875e1d985c5ccb214894983f2428edb271f0f87b68ba7010e4a99df3b5c",
err: errInvalidRootString,
},
{
name: "too log",
dir: "0xffff875e1d985c5ccb214894983f2428edb271f0f87b68ba7010e4a99df3b5cbb",
err: errInvalidRootString,
},
{
name: "missing prefix",
dir: "ffff875e1d985c5ccb214894983f2428edb271f0f87b68ba7010e4a99df3b5cb",
err: errInvalidRootString,
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
root, err := stringToRoot(c.dir)
if c.err != nil {
require.ErrorIs(t, err, c.err)
return
}
require.NoError(t, err)
require.Equal(t, c.root, root)
})
}
}
func TestSlotFromFile(t *testing.T) {
cases := []struct {
slot primitives.Slot
}{
{slot: 0},
{slot: 2},
{slot: 1123581321},
{slot: math.MaxUint64},
}
for _, c := range cases {
t.Run(fmt.Sprintf("slot %d", c.slot), func(t *testing.T) {
fs, bs := NewEphemeralBlobStorageAndFs(t)
_, sidecars := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, c.slot, 1)
sc := verification.FakeVerifyForTest(t, sidecars[0])
require.NoError(t, bs.Save(sc))
namer := identForSidecar(sc)
sszPath := bs.layout.sszPath(namer)
slot, err := slotFromFile(sszPath, fs)
require.NoError(t, err)
require.Equal(t, c.slot, slot)
})
}
}
type dirFiles struct {
name string
isDir bool
children []dirFiles
}
func (df dirFiles) reify(t *testing.T, fs afero.Fs, base string) {
fullPath := path.Join(base, df.name)
if df.isDir {
if df.name != "" {
require.NoError(t, fs.Mkdir(fullPath, directoryPermissions))
}
for _, c := range df.children {
c.reify(t, fs, fullPath)
}
} else {
fp, err := fs.Create(fullPath)
require.NoError(t, err)
_, err = fp.WriteString("derp")
require.NoError(t, err)
}
}
func (df dirFiles) childNames() []string {
cn := make([]string, len(df.children))
for i := range df.children {
cn[i] = df.children[i].name
}
return cn
}
func TestListDir(t *testing.T) {
fs := afero.NewMemMapFs()
rootStrs := []string{
"0x0023dc5d063c7c1b37016bb54963c6ff4bfe5dfdf6dac29e7ceeb2b8fa81ed7a",
"0xff30526cd634a5af3a09cc9bff67f33a621fc5b975750bb4432f74df077554b4",
"0x23f5f795aaeb78c01fadaf3d06da2e99bd4b3622ae4dfea61b05b7d9adb119c2",
}
// parent directory
tree := dirFiles{isDir: true}
// break out each subdir for easier assertions
notABlob := dirFiles{name: "notABlob", isDir: true}
childlessBlob := dirFiles{name: rootStrs[0], isDir: true}
blobWithSsz := dirFiles{name: rootStrs[1], isDir: true,
children: []dirFiles{{name: "1.ssz"}, {name: "2.ssz"}},
}
blobWithSszAndTmp := dirFiles{name: rootStrs[2], isDir: true,
children: []dirFiles{{name: "5.ssz"}, {name: "0.part"}}}
tree.children = append(tree.children,
notABlob, childlessBlob, blobWithSsz, blobWithSszAndTmp)
topChildren := make([]string, len(tree.children))
for i := range tree.children {
topChildren[i] = tree.children[i].name
}
var filter = func(entries []string, filt func(string) bool) []string {
filtered := make([]string, 0, len(entries))
for i := range entries {
if filt(entries[i]) {
filtered = append(filtered, entries[i])
}
}
return filtered
}
tree.reify(t, fs, "")
cases := []struct {
name string
dirPath string
expected []string
filter func(string) bool
err error
}{
{
name: "non-existent",
dirPath: "derp",
expected: []string{},
err: os.ErrNotExist,
},
{
name: "empty",
dirPath: childlessBlob.name,
expected: []string{},
},
{
name: "top",
dirPath: ".",
expected: topChildren,
},
{
name: "custom filter: only notABlob",
dirPath: ".",
expected: []string{notABlob.name},
filter: func(s string) bool {
return s == notABlob.name
},
},
{
name: "root filter",
dirPath: ".",
expected: []string{childlessBlob.name, blobWithSsz.name, blobWithSszAndTmp.name},
filter: isRootDir,
},
{
name: "ssz filter",
dirPath: blobWithSsz.name,
expected: blobWithSsz.childNames(),
filter: isSszFile,
},
{
name: "ssz mixed filter",
dirPath: blobWithSszAndTmp.name,
expected: []string{"5.ssz"},
filter: isSszFile,
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
result, err := listDir(fs, c.dirPath)
if c.filter != nil {
result = filter(result, c.filter)
}
if c.err != nil {
require.ErrorIs(t, err, c.err)
require.Equal(t, 0, len(result))
} else {
require.NoError(t, err)
sort.Strings(c.expected)
sort.Strings(result)
require.DeepEqual(t, c.expected, result)
}
})
}
}
func TestSlotFromBlob(t *testing.T) {
cases := []struct {
slot primitives.Slot
}{
{slot: 0},
{slot: 2},
{slot: 1123581321},
{slot: math.MaxUint64},
}
for _, c := range cases {
t.Run(fmt.Sprintf("slot %d", c.slot), func(t *testing.T) {
_, sidecars := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, c.slot, 1)
sc := sidecars[0]
enc, err := sc.MarshalSSZ()
require.NoError(t, err)
slot, err := slotFromBlob(bytes.NewReader(enc))
require.NoError(t, err)
require.Equal(t, c.slot, slot)
})
}
}

View File

@@ -0,0 +1,330 @@
package filesystem
import (
"fmt"
"io"
"path"
"path/filepath"
"time"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v5/time/slots"
"github.com/sirupsen/logrus"
"github.com/spf13/afero"
)
const (
rootPrefixLen = 4
// Full root in directory will be 66 chars, eg:
// >>> len('0x0002fb4db510b8618b04dc82d023793739c26346a8b02eb73482e24b0fec0555') == 66
rootStringLen = 66
sszExt = "ssz"
partExt = "part"
periodicEpochBaseDir = "by-epoch"
hexPrefixBaseDir = "by-hex-prefix"
)
var (
errMigrationFailure = errors.New("unable to migrate blob directory between old and new layout")
errCacheWarmFailed = errors.New("failed to warm blob filesystem cache")
errPruneFailed = errors.New("failed to prune root")
errInvalidRootString = errors.New("Could not parse hex string as a [32]byte")
errInvalidDirectoryLayout = errors.New("Could not parse blob directory path")
)
type migratableLayout interface {
dir(n blobIdent) string
sszPath(n blobIdent) string
partPath(n blobIdent, entropy string) string
iterateIdents(before primitives.Epoch) (*identIterator, error)
}
type runtimeLayout interface {
migratableLayout
ident(root [32]byte, idx uint64) (blobIdent, error)
dirIdent(root [32]byte) (blobIdent, error)
summary(root [32]byte) BlobStorageSummary
notify(ident blobIdent) error
pruneBefore(before primitives.Epoch) (*pruneSummary, error)
remove(ident blobIdent) (int, error)
}
func warmCache(l runtimeLayout, cache *blobStorageCache) error {
iter, err := l.iterateIdents(0)
if err != nil {
return errors.Wrap(errCacheWarmFailed, err.Error())
}
for ident, err := iter.next(); err != io.EOF; ident, err = iter.next() {
if errors.Is(err, errIdentFailure) {
idf := &identificationError{}
if errors.As(err, &idf) {
log.WithFields(idf.LogFields()).WithError(err).Error("Failed to cache blob data for path")
}
continue
}
if err != nil {
return errors.Wrapf(errCacheWarmFailed, "failed to populate blob data cache err=%s", err.Error())
}
if err := cache.ensure(ident.root, ident.epoch, ident.index); err != nil {
return errors.Wrapf(errCacheWarmFailed, "failed to write cache entry for %s, err=%s", l.sszPath(ident), err.Error())
}
}
return nil
}
func migrateLayout(fs afero.Fs, from, to migratableLayout, cache *blobStorageCache) error {
start := time.Now()
iter, err := from.iterateIdents(0)
if err != nil {
return errors.Wrapf(errMigrationFailure, "failed to iterate legacy structure while migrating blobs, err=%s", err.Error())
}
lastMoved := ""
parentDirs := make(map[string]bool) // this map should have < 65k keys by design
moved := 0
for ident, err := iter.next(); err != io.EOF; ident, err = iter.next() {
if err != nil {
if errors.Is(err, errIdentFailure) {
idf := &identificationError{}
if errors.As(err, &idf) {
log.WithFields(idf.LogFields()).WithError(err).Error("Failed to migrate blob path")
}
continue
}
return errors.Wrapf(errMigrationFailure, "failed to iterate legacy structure while migrating blobs, err=%s", err.Error())
}
src := from.dir(ident)
target := to.dir(ident)
if src != lastMoved {
targetParent := filepath.Dir(target)
if targetParent != "" && targetParent != "." && !parentDirs[targetParent] {
if err := fs.MkdirAll(targetParent, directoryPermissions); err != nil {
return errors.Wrapf(errMigrationFailure, "failed to make enclosing path before moving %s to %s", src, target)
}
parentDirs[targetParent] = true
}
if err := fs.Rename(src, target); err != nil {
return errors.Wrapf(errMigrationFailure, "could not rename %s to %s", src, target)
}
moved += 1
lastMoved = src
}
if err := cache.ensure(ident.root, ident.epoch, ident.index); err != nil {
return errors.Wrapf(errMigrationFailure, "could not cache path %s, err=%s", to.sszPath(ident), err.Error())
}
}
if moved > 0 {
log.WithField("dirsMoved", moved).WithField("elapsed", time.Since(start)).
Info("Blob filesystem migration complete.")
}
return nil
}
type blobIdent struct {
root [32]byte
epoch primitives.Epoch
index uint64
}
func newBlobIdent(root [32]byte, epoch primitives.Epoch, index uint64) blobIdent {
return blobIdent{root: root, epoch: epoch, index: index}
}
func identForSidecar(sc blocks.VerifiedROBlob) blobIdent {
return newBlobIdent(sc.BlockRoot(), slots.ToEpoch(sc.Slot()), sc.Index)
}
func (n blobIdent) sszFname() string {
return fmt.Sprintf("%d.%s", n.index, sszExt)
}
func (n blobIdent) partFname(entropy string) string {
return fmt.Sprintf("%s-%d.%s", entropy, n.index, partExt)
}
func (n blobIdent) logFields() logrus.Fields {
return logrus.Fields{
"root": fmt.Sprintf("%#x", n.root),
"epoch": n.epoch,
"index": n.index,
}
}
type pruneSummary struct {
blobsPruned int
failedRemovals []string
}
func (s pruneSummary) LogFields() logrus.Fields {
return logrus.Fields{}
}
func newPeriodicEpochLayout(fs afero.Fs, cache *blobStorageCache, pruner *blobPruner) (*periodicEpochLayout, error) {
l := &periodicEpochLayout{fs: fs, cache: cache, pruner: pruner}
if err := l.initialize(); err != nil {
return nil, err
}
return l, nil
}
var _ migratableLayout = &flatRootLayout{}
var _ runtimeLayout = &periodicEpochLayout{}
type periodicEpochLayout struct {
fs afero.Fs
cache *blobStorageCache
pruner *blobPruner
}
func (l *periodicEpochLayout) notify(ident blobIdent) error {
if err := l.cache.ensure(ident.root, ident.epoch, ident.index); err != nil {
return err
}
l.pruner.notify(ident.epoch, l)
return nil
}
func (l *periodicEpochLayout) initialize() error {
return l.fs.MkdirAll(periodicEpochBaseDir, directoryPermissions)
}
// If before == 0, it won't be used as a filter and all idents will be returned.
func (l *periodicEpochLayout) iterateIdents(before primitives.Epoch) (*identIterator, error) {
// iterate root, which should have directories named by "period"
entries, err := listDir(l.fs, periodicEpochBaseDir)
if err != nil {
return nil, errors.Wrapf(err, "failed to list %s", periodicEpochBaseDir)
}
return &identIterator{
fs: l.fs,
path: periodicEpochBaseDir,
levels: []layoutLevel{
{populateIdent: populateNoop, filter: isBeforePeriod(before)},
{populateIdent: populateEpoch, filter: isBeforeEpoch(before)},
{populateIdent: populateRoot, filter: isRootDir}, // extract root from path
{populateIdent: populateIndex, filter: isSszFile}, // extract index from filename
},
entries: entries,
}, nil
}
func (l *periodicEpochLayout) ident(root [32]byte, idx uint64) (blobIdent, error) {
return l.cache.identForIdx(root, idx)
}
func (l *periodicEpochLayout) dirIdent(root [32]byte) (blobIdent, error) {
return l.cache.identForRoot(root)
}
func (l *periodicEpochLayout) summary(root [32]byte) BlobStorageSummary {
return l.cache.Summary(root)
}
func (l *periodicEpochLayout) dir(n blobIdent) string {
return filepath.Join(l.epochDir(n.epoch), rootToString(n.root))
}
func (l *periodicEpochLayout) epochDir(epoch primitives.Epoch) string {
return filepath.Join(periodicEpochBaseDir, fmt.Sprintf("%d", periodForEpoch(epoch)), fmt.Sprintf("%d", epoch))
}
func periodForEpoch(epoch primitives.Epoch) primitives.Epoch {
return epoch / params.BeaconConfig().MinEpochsForBlobsSidecarsRequest
}
func (l *periodicEpochLayout) sszPath(n blobIdent) string {
return filepath.Join(l.dir(n), n.sszFname())
}
func (l *periodicEpochLayout) partPath(n blobIdent, entropy string) string {
return path.Join(l.dir(n), n.partFname(entropy))
}
func (l *periodicEpochLayout) pruneBefore(before primitives.Epoch) (*pruneSummary, error) {
sums := make(map[primitives.Epoch]*pruneSummary)
iter, err := l.iterateIdents(before)
rollup := &pruneSummary{}
for ident, err := iter.next(); err != io.EOF; ident, err = iter.next() {
if err != nil {
if errors.Is(err, errIdentFailure) {
idf := &identificationError{}
if errors.As(err, &idf) {
log.WithFields(idf.LogFields()).WithError(err).Error("Failed to prune blob path due to identification errors")
}
continue
}
log.WithError(err).Error("encountered unhandled error during pruning")
return nil, errors.Wrap(errPruneFailed, err.Error())
}
_, ok := sums[ident.epoch]
if !ok {
sums[ident.epoch] = &pruneSummary{}
}
s := sums[ident.epoch]
removed, err := l.remove(ident)
if err != nil {
s.failedRemovals = append(s.failedRemovals, l.dir(ident))
log.WithField("root", fmt.Sprintf("%#x", ident.root)).Error("Failed to delete blob directory for root")
}
s.blobsPruned += removed
}
// Roll up summaries and clean up per-epoch directories.
for epoch, sum := range sums {
rollup.blobsPruned += sum.blobsPruned
rollup.failedRemovals = append(rollup.failedRemovals, sum.failedRemovals...)
rmdir := l.epochDir(epoch)
if len(sum.failedRemovals) == 0 {
if err := l.fs.Remove(rmdir); err != nil {
log.WithField("dir", rmdir).WithError(err).Error("Failed to remove epoch directory while pruning")
}
} else {
log.WithField("dir", rmdir).WithField("numFailed", len(sum.failedRemovals)).WithError(err).Error("Unable to remove epoch directory due to pruning failures")
}
}
return rollup, nil
}
func (l *periodicEpochLayout) remove(ident blobIdent) (int, error) {
removed := l.cache.evict(ident.root)
if err := l.fs.RemoveAll(l.dir(ident)); err != nil {
return removed, err
}
return removed, nil
}
type flatRootLayout struct {
fs afero.Fs
}
func (l *flatRootLayout) iterateIdents(_ primitives.Epoch) (*identIterator, error) {
entries, err := listDir(l.fs, ".")
if err != nil {
return nil, errors.Wrapf(err, "could not list root directory")
}
slotAndIndex := &readSlotOncePerRoot{fs: l.fs}
return &identIterator{
fs: l.fs,
levels: []layoutLevel{
{populateIdent: populateRoot, filter: isRootDir},
{populateIdent: slotAndIndex.populateIdent, filter: isSszFile}},
entries: entries,
}, nil
}
func (l *flatRootLayout) dir(n blobIdent) string {
return rootToString(n.root)
}
func (l *flatRootLayout) sszPath(n blobIdent) string {
return path.Join(l.dir(n), n.sszFname())
}
func (l *flatRootLayout) partPath(n blobIdent, entropy string) string {
return path.Join(l.dir(n), n.partFname(entropy))
}

View File

@@ -0,0 +1,52 @@
package filesystem
import (
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
)
type mockLayout struct {
pruneBeforeFunc func(primitives.Epoch) (*pruneSummary, error)
}
func (m *mockLayout) dir(n blobIdent) string {
return ""
}
func (m *mockLayout) sszPath(n blobIdent) string {
return ""
}
func (m *mockLayout) partPath(n blobIdent, entropy string) string {
return ""
}
func (m *mockLayout) iterateIdents(before primitives.Epoch) (*identIterator, error) {
return nil, nil
}
func (m *mockLayout) ident(root [32]byte, idx uint64) (blobIdent, error) {
return blobIdent{}, nil
}
func (m *mockLayout) dirIdent(root [32]byte) (blobIdent, error) {
return blobIdent{}, nil
}
func (m *mockLayout) summary(root [32]byte) BlobStorageSummary {
return BlobStorageSummary{}
}
func (m *mockLayout) notify(sidecar blocks.VerifiedROBlob) error {
return nil
}
func (m *mockLayout) pruneBefore(before primitives.Epoch) (*pruneSummary, error) {
return m.pruneBeforeFunc(before)
}
func (m *mockLayout) remove(ident blobIdent) (int, error) {
return 0, nil
}
var _ runtimeLayout = &mockLayout{}

View File

@@ -0,0 +1,194 @@
package filesystem
import (
"os"
"path/filepath"
"testing"
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v5/testing/require"
"github.com/prysmaticlabs/prysm/v5/testing/util"
"github.com/prysmaticlabs/prysm/v5/time/slots"
"github.com/spf13/afero"
)
func testSetupPaths(t *testing.T, fs afero.Fs, paths []migrateBeforeAfter) {
for _, ba := range paths {
slot, err := slots.EpochStart(ba.epoch)
require.NoError(t, err)
slot += ba.slotOffset
_, sc := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, slot, 1)
scb, err := sc[0].MarshalSSZ()
require.NoError(t, err)
p := ba.before
dir := filepath.Dir(p)
require.NoError(t, fs.MkdirAll(dir, directoryPermissions))
require.NoError(t, afero.WriteFile(fs, p, scb, 0666))
_, err = fs.Stat(ba.before)
require.NoError(t, err)
}
}
func testAssertNewPaths(t *testing.T, fs afero.Fs, bs *BlobStorage, paths []migrateBeforeAfter) {
for _, ba := range paths {
if ba.before != ba.after {
_, err := fs.Stat(ba.before)
require.ErrorIs(t, err, os.ErrNotExist)
dir := filepath.Dir(ba.before)
_, err = listDir(fs, dir)
require.ErrorIs(t, err, os.ErrNotExist)
}
_, err := fs.Stat(ba.after)
require.NoError(t, err)
root, err := stringToRoot(ba.root)
require.NoError(t, err)
namer, err := bs.layout.ident(root, ba.index)
require.NoError(t, err)
path := bs.layout.sszPath(namer)
require.Equal(t, ba.after, path)
}
}
type migrateBeforeAfter struct {
before string
after string
epoch primitives.Epoch
slotOffset primitives.Slot
index uint64
root string
}
func TestPeriodicEpochMigrator(t *testing.T) {
cases := []struct {
name string
plan []migrateBeforeAfter
err error
}{
{
name: "happy path",
plan: []migrateBeforeAfter{
{
before: "0x0125e54c64c925018c9296965a5b622d9f5ab626c10917860dcfb6aa09a0a00b/0.ssz",
epoch: 1234,
slotOffset: 0,
root: "0x0125e54c64c925018c9296965a5b622d9f5ab626c10917860dcfb6aa09a0a00b",
index: 0,
after: periodicEpochBaseDir + "/0/1234/0x0125e54c64c925018c9296965a5b622d9f5ab626c10917860dcfb6aa09a0a00b/0.ssz",
},
{
before: "0x0127dba6fd30fdbb47e73e861d5c6e602b38ac3ddc945bb6a2fc4e10761e9a86/0.ssz",
root: "0x0127dba6fd30fdbb47e73e861d5c6e602b38ac3ddc945bb6a2fc4e10761e9a86",
index: 0,
epoch: 5330,
slotOffset: 0,
after: periodicEpochBaseDir + "/1/5330/0x0127dba6fd30fdbb47e73e861d5c6e602b38ac3ddc945bb6a2fc4e10761e9a86/0.ssz",
},
{
before: "0x0127dba6fd30fdbb47e73e861d5c6e602b38ac3ddc945bb6a2fc4e10761e9a86/1.ssz",
root: "0x0127dba6fd30fdbb47e73e861d5c6e602b38ac3ddc945bb6a2fc4e10761e9a86",
index: 1,
epoch: 5330,
slotOffset: 31,
after: periodicEpochBaseDir + "/1/5330/0x0127dba6fd30fdbb47e73e861d5c6e602b38ac3ddc945bb6a2fc4e10761e9a86/1.ssz",
},
{
before: "0x0232521756a0b965eab2c2245d7ad85feaeaf5f427cd14d1a7531f9d555b415c/0.ssz",
root: "0x0232521756a0b965eab2c2245d7ad85feaeaf5f427cd14d1a7531f9d555b415c",
index: 0,
epoch: 16777216,
slotOffset: 16,
after: periodicEpochBaseDir + "/4096/16777216/0x0232521756a0b965eab2c2245d7ad85feaeaf5f427cd14d1a7531f9d555b415c/0.ssz",
},
},
},
{
name: "mix old and new",
plan: []migrateBeforeAfter{
{
before: "0x0125e54c64c925018c9296965a5b622d9f5ab626c10917860dcfb6aa09a0a00b/0.ssz",
root: "0x0125e54c64c925018c9296965a5b622d9f5ab626c10917860dcfb6aa09a0a00b",
index: 0,
epoch: 1234,
slotOffset: 0,
after: periodicEpochBaseDir + "/0/1234/0x0125e54c64c925018c9296965a5b622d9f5ab626c10917860dcfb6aa09a0a00b/0.ssz",
},
{
before: "0x0127dba6fd30fdbb47e73e861d5c6e602b38ac3ddc945bb6a2fc4e10761e9a86/0.ssz",
root: "0x0127dba6fd30fdbb47e73e861d5c6e602b38ac3ddc945bb6a2fc4e10761e9a86",
index: 0,
epoch: 5330,
slotOffset: 0,
after: periodicEpochBaseDir + "/1/5330/0x0127dba6fd30fdbb47e73e861d5c6e602b38ac3ddc945bb6a2fc4e10761e9a86/0.ssz",
},
{
before: "0x0127dba6fd30fdbb47e73e861d5c6e602b38ac3ddc945bb6a2fc4e10761e9a86/1.ssz",
root: "0x0127dba6fd30fdbb47e73e861d5c6e602b38ac3ddc945bb6a2fc4e10761e9a86",
index: 1,
epoch: 5330,
slotOffset: 31,
after: periodicEpochBaseDir + "/1/5330/0x0127dba6fd30fdbb47e73e861d5c6e602b38ac3ddc945bb6a2fc4e10761e9a86/1.ssz",
},
{
before: "0x0232521756a0b965eab2c2245d7ad85feaeaf5f427cd14d1a7531f9d555b415c/0.ssz",
root: "0x0232521756a0b965eab2c2245d7ad85feaeaf5f427cd14d1a7531f9d555b415c",
index: 0,
epoch: 16777216,
slotOffset: 16,
after: periodicEpochBaseDir + "/4096/16777216/0x0232521756a0b965eab2c2245d7ad85feaeaf5f427cd14d1a7531f9d555b415c/0.ssz",
},
{
before: periodicEpochBaseDir + "/4096/16777217/0x42eabe3d2c125410cd226de6f2825fb7575ab896c3f52e43de1fa29e4c809aba/0.ssz",
root: "0x42eabe3d2c125410cd226de6f2825fb7575ab896c3f52e43de1fa29e4c809aba",
index: 0,
epoch: 16777217,
slotOffset: 16,
after: periodicEpochBaseDir + "/4096/16777217/0x42eabe3d2c125410cd226de6f2825fb7575ab896c3f52e43de1fa29e4c809aba/0.ssz",
},
{
before: "0x0232521756a0b965eab2c2245d7ad85feaeaf5f427cd14d1a7531f9d555b415c/0.ssz",
root: "0x0232521756a0b965eab2c2245d7ad85feaeaf5f427cd14d1a7531f9d555b415c",
index: 0,
epoch: 16777216,
slotOffset: 16,
after: periodicEpochBaseDir + "/4096/16777216/0x0232521756a0b965eab2c2245d7ad85feaeaf5f427cd14d1a7531f9d555b415c/0.ssz",
},
{
before: periodicEpochBaseDir + "/4096/16777216/0x2326de064f828c564740da17fc247b30d7e7300da24b0aae39a0c91791acc19f/0.ssz",
root: "0x2326de064f828c564740da17fc247b30d7e7300da24b0aae39a0c91791acc19f",
index: 0,
epoch: 16777216,
slotOffset: 31,
after: periodicEpochBaseDir + "/4096/16777216/0x2326de064f828c564740da17fc247b30d7e7300da24b0aae39a0c91791acc19f/0.ssz",
},
{
before: periodicEpochBaseDir + "/2/11235/0x666cea5034e22bd3b849cb33914cad59afd88ee08e4d5bc0e997411c945fbc1d/1.ssz",
root: "0x666cea5034e22bd3b849cb33914cad59afd88ee08e4d5bc0e997411c945fbc1d",
index: 1,
epoch: 11235,
slotOffset: 0,
after: periodicEpochBaseDir + "/2/11235/0x666cea5034e22bd3b849cb33914cad59afd88ee08e4d5bc0e997411c945fbc1d/1.ssz",
},
},
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
fs, bs := NewEphemeralBlobStorageAndFs(t)
from := &flatRootLayout{fs: fs}
cache := newBlobStorageCache()
pruner := newBlobPruner(bs.retentionEpochs)
to, err := newPeriodicEpochLayout(fs, cache, pruner)
require.NoError(t, err)
testSetupPaths(t, fs, c.plan)
err = migrateLayout(fs, from, to, cache)
if c.err != nil {
require.ErrorIs(t, err, c.err)
return
}
require.NoError(t, err)
require.NoError(t, warmCache(bs.layout, bs.cache))
testAssertNewPaths(t, fs, bs, c.plan)
})
}
}

View File

@@ -4,6 +4,8 @@ import (
"testing"
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v5/time/slots"
"github.com/spf13/afero"
)
@@ -11,23 +13,28 @@ import (
// The instance of BlobStorage returned is backed by an in-memory virtual filesystem,
// improving test performance and simplifying cleanup.
func NewEphemeralBlobStorage(t testing.TB) *BlobStorage {
fs := afero.NewMemMapFs()
pruner, err := newBlobPruner(fs, params.BeaconConfig().MinEpochsForBlobsSidecarsRequest, withWarmedCache())
if err != nil {
t.Fatal("test setup issue", err)
}
return &BlobStorage{fs: fs, pruner: pruner}
return NewEphemeralBlobStorageUsingFs(t, afero.NewMemMapFs())
}
// NewEphemeralBlobStorageWithFs can be used by tests that want access to the virtual filesystem
// NewEphemeralBlobStorageAndFs can be used by tests that want access to the virtual filesystem
// in order to interact with it outside the parameters of the BlobStorage api.
func NewEphemeralBlobStorageWithFs(t testing.TB) (afero.Fs, *BlobStorage) {
func NewEphemeralBlobStorageAndFs(t testing.TB) (afero.Fs, *BlobStorage) {
fs := afero.NewMemMapFs()
pruner, err := newBlobPruner(fs, params.BeaconConfig().MinEpochsForBlobsSidecarsRequest, withWarmedCache())
if err != nil {
t.Fatal("test setup issue", err)
bs := NewEphemeralBlobStorageUsingFs(t, fs)
return fs, bs
}
func NewEphemeralBlobStorageUsingFs(t testing.TB, fs afero.Fs) *BlobStorage {
opts := []BlobStorageOption{
WithBlobRetentionEpochs(params.BeaconConfig().MinEpochsForBlobsSidecarsRequest),
WithFs(fs),
}
return fs, &BlobStorage{fs: fs, pruner: pruner}
bs, err := NewBlobStorage(opts...)
if err != nil {
t.Fatalf("error initializing test BlobStorage, err=%s", err.Error())
}
bs.WarmCache()
return bs
}
type BlobMocker struct {
@@ -37,17 +44,9 @@ type BlobMocker struct {
// CreateFakeIndices creates empty blob sidecar files at the expected path for the given
// root and indices to influence the result of Indices().
func (bm *BlobMocker) CreateFakeIndices(root [32]byte, indices ...uint64) error {
func (bm *BlobMocker) CreateFakeIndices(root [32]byte, slot primitives.Slot, indices ...uint64) error {
for i := range indices {
n := blobNamer{root: root, index: indices[i]}
if err := bm.fs.MkdirAll(n.dir(), directoryPermissions); err != nil {
return err
}
f, err := bm.fs.Create(n.path())
if err != nil {
return err
}
if err := f.Close(); err != nil {
if err := bm.bs.layout.notify(newBlobIdent(root, slots.ToEpoch(slot), indices[i])); err != nil {
return err
}
}
@@ -56,9 +55,8 @@ func (bm *BlobMocker) CreateFakeIndices(root [32]byte, indices ...uint64) error
// NewEphemeralBlobStorageWithMocker returns a *BlobMocker value in addition to the BlobStorage value.
// BlockMocker encapsulates things blob path construction to avoid leaking implementation details.
func NewEphemeralBlobStorageWithMocker(_ testing.TB) (*BlobMocker, *BlobStorage) {
fs := afero.NewMemMapFs()
bs := &BlobStorage{fs: fs}
func NewEphemeralBlobStorageWithMocker(t testing.TB) (*BlobMocker, *BlobStorage) {
fs, bs := NewEphemeralBlobStorageAndFs(t)
return &BlobMocker{fs: fs, bs: bs}, bs
}
@@ -66,7 +64,7 @@ func NewMockBlobStorageSummarizer(t *testing.T, set map[[32]byte][]int) BlobStor
c := newBlobStorageCache()
for k, v := range set {
for i := range v {
if err := c.ensure(rootString(k), 0, uint64(v[i])); err != nil {
if err := c.ensure(k, 0, uint64(v[i])); err != nil {
t.Fatal(err)
}
}

View File

@@ -1,27 +1,16 @@
package filesystem
import (
"context"
"encoding/binary"
"io"
"path"
"path/filepath"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v5/time/slots"
"github.com/sirupsen/logrus"
"github.com/spf13/afero"
)
const retentionBuffer primitives.Epoch = 2
const bytesPerSidecar = 131928
var (
errPruningFailures = errors.New("blobs could not be pruned for some roots")
@@ -29,282 +18,46 @@ var (
)
type blobPruner struct {
sync.Mutex
prunedBefore atomic.Uint64
windowSize primitives.Slot
cache *blobStorageCache
cacheReady chan struct{}
warmed bool
fs afero.Fs
mu sync.Mutex
prunedBefore atomic.Uint64
retentionPeriod primitives.Epoch
}
type prunerOpt func(*blobPruner) error
func withWarmedCache() prunerOpt {
return func(p *blobPruner) error {
return p.warmCache()
}
func newBlobPruner(retain primitives.Epoch) *blobPruner {
p := &blobPruner{retentionPeriod: retain + retentionBuffer}
return p
}
func newBlobPruner(fs afero.Fs, retain primitives.Epoch, opts ...prunerOpt) (*blobPruner, error) {
r, err := slots.EpochStart(retain + retentionBuffer)
if err != nil {
return nil, errors.Wrap(err, "could not set retentionSlots")
}
cw := make(chan struct{})
p := &blobPruner{fs: fs, windowSize: r, cache: newBlobStorageCache(), cacheReady: cw}
for _, o := range opts {
if err := o(p); err != nil {
return nil, err
}
}
return p, nil
}
// notify updates the pruner's view of root->blob mappings. This allows the pruner to build a cache
// of root->slot mappings and decide when to evict old blobs based on the age of present blobs.
func (p *blobPruner) notify(root [32]byte, latest primitives.Slot, idx uint64) error {
if err := p.cache.ensure(rootString(root), latest, idx); err != nil {
return err
}
pruned := uint64(windowMin(latest, p.windowSize))
if p.prunedBefore.Swap(pruned) == pruned {
return nil
func (p *blobPruner) notify(latest primitives.Epoch, layout runtimeLayout) chan struct{} {
done := make(chan struct{})
floor := periodFloor(latest, p.retentionPeriod)
if primitives.Epoch(p.prunedBefore.Swap(uint64(floor))) >= floor {
// Only trigger pruning if the atomic swap changed the previous value of prunedBefore.
close(done)
return done
}
go func() {
p.Lock()
defer p.Unlock()
if err := p.prune(primitives.Slot(pruned)); err != nil {
log.WithError(err).Errorf("Failed to prune blobs from slot %d", latest)
p.mu.Lock()
start := time.Now()
defer p.mu.Unlock()
sum, err := layout.pruneBefore(floor)
if err != nil {
log.WithError(err).WithFields(sum.LogFields()).Warn("Encountered errors during blob pruning.")
}
log.WithFields(logrus.Fields{
"upToEpoch": floor,
"duration": time.Since(start).String(),
"filesRemoved": sum.blobsPruned,
}).Debug("Pruned old blobs")
blobsPrunedCounter.Add(float64(sum.blobsPruned))
close(done)
}()
return nil
return done
}
func windowMin(latest, offset primitives.Slot) primitives.Slot {
// Safely compute the first slot in the epoch for the latest slot
latest = latest - latest%params.BeaconConfig().SlotsPerEpoch
if latest < offset {
func periodFloor(latest, period primitives.Epoch) primitives.Epoch {
if latest < period {
return 0
}
return latest - offset
}
func (p *blobPruner) warmCache() error {
p.Lock()
defer p.Unlock()
if err := p.prune(0); err != nil {
return err
}
if !p.warmed {
p.warmed = true
close(p.cacheReady)
}
return nil
}
func (p *blobPruner) waitForCache(ctx context.Context) (*blobStorageCache, error) {
select {
case <-p.cacheReady:
return p.cache, nil
case <-ctx.Done():
return nil, ctx.Err()
}
}
// Prune prunes blobs in the base directory based on the retention epoch.
// It deletes blobs older than currentEpoch - (retentionEpochs+bufferEpochs).
// This is so that we keep a slight buffer and blobs are deleted after n+2 epochs.
func (p *blobPruner) prune(pruneBefore primitives.Slot) error {
start := time.Now()
totalPruned, totalErr := 0, 0
// Customize logging/metrics behavior for the initial cache warmup when slot=0.
// We'll never see a prune request for slot 0, unless this is the initial call to warm up the cache.
if pruneBefore == 0 {
defer func() {
log.WithField("duration", time.Since(start).String()).Debug("Warmed up pruner cache")
}()
} else {
defer func() {
log.WithFields(logrus.Fields{
"upToEpoch": slots.ToEpoch(pruneBefore),
"duration": time.Since(start).String(),
"filesRemoved": totalPruned,
}).Debug("Pruned old blobs")
blobsPrunedCounter.Add(float64(totalPruned))
}()
}
entries, err := listDir(p.fs, ".")
if err != nil {
return errors.Wrap(err, "unable to list root blobs directory")
}
dirs := filter(entries, filterRoot)
for _, dir := range dirs {
pruned, err := p.tryPruneDir(dir, pruneBefore)
if err != nil {
totalErr += 1
log.WithError(err).WithField("directory", dir).Error("Unable to prune directory")
}
totalPruned += pruned
}
if totalErr > 0 {
return errors.Wrapf(errPruningFailures, "pruning failed for %d root directories", totalErr)
}
return nil
}
func shouldRetain(slot, pruneBefore primitives.Slot) bool {
return slot >= pruneBefore
}
func (p *blobPruner) tryPruneDir(dir string, pruneBefore primitives.Slot) (int, error) {
root := rootFromDir(dir)
slot, slotCached := p.cache.slot(root)
// Return early if the slot is cached and doesn't need pruning.
if slotCached && shouldRetain(slot, pruneBefore) {
return 0, nil
}
// entries will include things that aren't ssz files, like dangling .part files. We need these to
// completely clean up the directory.
entries, err := listDir(p.fs, dir)
if err != nil {
return 0, errors.Wrapf(err, "failed to list blobs in directory %s", dir)
}
// scFiles filters the dir listing down to the ssz encoded BlobSidecar files. This allows us to peek
// at the first one in the list to figure out the slot.
scFiles := filter(entries, filterSsz)
if len(scFiles) == 0 {
log.WithField("dir", dir).Warn("Pruner ignoring directory with no blob files")
return 0, nil
}
if !slotCached {
slot, err = slotFromFile(path.Join(dir, scFiles[0]), p.fs)
if err != nil {
return 0, errors.Wrapf(err, "slot could not be read from blob file %s", scFiles[0])
}
for i := range scFiles {
idx, err := idxFromPath(scFiles[i])
if err != nil {
return 0, errors.Wrapf(err, "index could not be determined for blob file %s", scFiles[i])
}
if err := p.cache.ensure(root, slot, idx); err != nil {
return 0, errors.Wrapf(err, "could not update prune cache for blob file %s", scFiles[i])
}
}
if shouldRetain(slot, pruneBefore) {
return 0, nil
}
}
removed := 0
for _, fname := range entries {
fullName := path.Join(dir, fname)
if err := p.fs.Remove(fullName); err != nil {
return removed, errors.Wrapf(err, "unable to remove %s", fullName)
}
// Don't count other files that happen to be in the dir, like dangling .part files.
if filterSsz(fname) {
removed += 1
}
// Log a warning whenever we clean up a .part file
if filterPart(fullName) {
log.WithField("file", fullName).Warn("Deleting abandoned blob .part file")
}
}
if err := p.fs.Remove(dir); err != nil {
return removed, errors.Wrapf(err, "unable to remove blob directory %s", dir)
}
p.cache.evict(rootFromDir(dir))
return len(scFiles), nil
}
func idxFromPath(fname string) (uint64, error) {
fname = path.Base(fname)
if filepath.Ext(fname) != dotSszExt {
return 0, errors.Wrap(errNotBlobSSZ, "does not have .ssz extension")
}
parts := strings.Split(fname, ".")
if len(parts) != 2 {
return 0, errors.Wrap(errNotBlobSSZ, "unexpected filename structure (want <index>.ssz)")
}
return strconv.ParseUint(parts[0], 10, 64)
}
func rootFromDir(dir string) string {
return filepath.Base(dir) // end of the path should be the blob directory, named by hex encoding of root
}
// Read slot from marshaled BlobSidecar data in the given file. See slotFromBlob for details.
func slotFromFile(file string, fs afero.Fs) (primitives.Slot, error) {
f, err := fs.Open(file)
if err != nil {
return 0, err
}
defer func() {
if err := f.Close(); err != nil {
log.WithError(err).Errorf("Could not close blob file")
}
}()
return slotFromBlob(f)
}
// slotFromBlob reads the ssz data of a file at the specified offset (8 + 131072 + 48 + 48 = 131176 bytes),
// which is calculated based on the size of the BlobSidecar struct and is based on the size of the fields
// preceding the slot information within SignedBeaconBlockHeader.
func slotFromBlob(at io.ReaderAt) (primitives.Slot, error) {
b := make([]byte, 8)
_, err := at.ReadAt(b, 131176)
if err != nil {
return 0, err
}
rawSlot := binary.LittleEndian.Uint64(b)
return primitives.Slot(rawSlot), nil
}
func listDir(fs afero.Fs, dir string) ([]string, error) {
top, err := fs.Open(dir)
if err != nil {
return nil, errors.Wrap(err, "failed to open directory descriptor")
}
defer func() {
if err := top.Close(); err != nil {
log.WithError(err).Errorf("Could not close file %s", dir)
}
}()
// re the -1 param: "If n <= 0, Readdirnames returns all the names from the directory in a single slice"
dirs, err := top.Readdirnames(-1)
if err != nil {
return nil, errors.Wrap(err, "failed to read directory listing")
}
return dirs, nil
}
func filter(entries []string, filt func(string) bool) []string {
filtered := make([]string, 0, len(entries))
for i := range entries {
if filt(entries[i]) {
filtered = append(filtered, entries[i])
}
}
return filtered
}
func filterRoot(s string) bool {
return strings.HasPrefix(s, "0x")
}
var dotSszExt = "." + sszExt
var dotPartExt = "." + partExt
func filterSsz(s string) bool {
return filepath.Ext(s) == dotSszExt
}
func filterPart(s string) bool {
return filepath.Ext(s) == dotPartExt
return latest - period
}

View File

@@ -1,318 +1,196 @@
package filesystem
import (
"bytes"
"fmt"
"math"
"encoding/binary"
"os"
"path"
"sort"
"testing"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/verification"
fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams"
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v5/testing/require"
"github.com/prysmaticlabs/prysm/v5/testing/util"
"github.com/prysmaticlabs/prysm/v5/time/slots"
"github.com/spf13/afero"
)
func TestTryPruneDir_CachedNotExpired(t *testing.T) {
fs := afero.NewMemMapFs()
pr, err := newBlobPruner(fs, 0)
require.NoError(t, err)
slot := pr.windowSize
_, sidecars := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, slot, fieldparams.MaxBlobsPerBlock)
sc, err := verification.BlobSidecarNoop(sidecars[0])
require.NoError(t, err)
root := fmt.Sprintf("%#x", sc.BlockRoot())
// This slot is right on the edge of what would need to be pruned, so by adding it to the cache and
// skipping any other test setup, we can be certain the hot cache path never touches the filesystem.
require.NoError(t, pr.cache.ensure(root, sc.Slot(), 0))
pruned, err := pr.tryPruneDir(root, pr.windowSize)
require.NoError(t, err)
require.Equal(t, 0, pruned)
type prunerScenario struct {
name string
prunedBefore primitives.Epoch
retentionPeriod primitives.Epoch
latest primitives.Epoch
expected pruneExpectation
}
func TestTryPruneDir_CachedExpired(t *testing.T) {
t.Run("empty directory", func(t *testing.T) {
fs := afero.NewMemMapFs()
pr, err := newBlobPruner(fs, 0)
require.NoError(t, err)
var slot primitives.Slot = 0
_, sidecars := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, slot, 1)
sc, err := verification.BlobSidecarNoop(sidecars[0])
require.NoError(t, err)
root := fmt.Sprintf("%#x", sc.BlockRoot())
require.NoError(t, fs.Mkdir(root, directoryPermissions)) // make empty directory
require.NoError(t, pr.cache.ensure(root, sc.Slot(), 0))
pruned, err := pr.tryPruneDir(root, slot+1)
require.NoError(t, err)
require.Equal(t, 0, pruned)
})
t.Run("blobs to delete", func(t *testing.T) {
fs, bs := NewEphemeralBlobStorageWithFs(t)
var slot primitives.Slot = 0
_, sidecars := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, slot, 2)
scs, err := verification.BlobSidecarSliceNoop(sidecars)
require.NoError(t, err)
require.NoError(t, bs.Save(scs[0]))
require.NoError(t, bs.Save(scs[1]))
// check that the root->slot is cached
root := fmt.Sprintf("%#x", scs[0].BlockRoot())
cs, cok := bs.pruner.cache.slot(root)
require.Equal(t, true, cok)
require.Equal(t, slot, cs)
// ensure that we see the saved files in the filesystem
files, err := listDir(fs, root)
require.NoError(t, err)
require.Equal(t, 2, len(files))
pruned, err := bs.pruner.tryPruneDir(root, slot+1)
require.NoError(t, err)
require.Equal(t, 2, pruned)
files, err = listDir(fs, root)
require.ErrorIs(t, err, os.ErrNotExist)
require.Equal(t, 0, len(files))
})
type pruneExpectation struct {
called bool
arg primitives.Epoch
summary *pruneSummary
err error
}
func TestTryPruneDir_SlotFromFile(t *testing.T) {
t.Run("expired blobs deleted", func(t *testing.T) {
fs, bs := NewEphemeralBlobStorageWithFs(t)
var slot primitives.Slot = 0
_, sidecars := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, slot, 2)
scs, err := verification.BlobSidecarSliceNoop(sidecars)
require.NoError(t, err)
require.NoError(t, bs.Save(scs[0]))
require.NoError(t, bs.Save(scs[1]))
// check that the root->slot is cached
root := fmt.Sprintf("%#x", scs[0].BlockRoot())
cs, ok := bs.pruner.cache.slot(root)
require.Equal(t, true, ok)
require.Equal(t, slot, cs)
// evict it from the cache so that we trigger the file read path
bs.pruner.cache.evict(root)
_, ok = bs.pruner.cache.slot(root)
require.Equal(t, false, ok)
// ensure that we see the saved files in the filesystem
files, err := listDir(fs, root)
require.NoError(t, err)
require.Equal(t, 2, len(files))
pruned, err := bs.pruner.tryPruneDir(root, slot+1)
require.NoError(t, err)
require.Equal(t, 2, pruned)
files, err = listDir(fs, root)
require.ErrorIs(t, err, os.ErrNotExist)
require.Equal(t, 0, len(files))
})
t.Run("not expired, intact", func(t *testing.T) {
fs, bs := NewEphemeralBlobStorageWithFs(t)
// Set slot equal to the window size, so it should be retained.
slot := bs.pruner.windowSize
_, sidecars := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, slot, 2)
scs, err := verification.BlobSidecarSliceNoop(sidecars)
require.NoError(t, err)
require.NoError(t, bs.Save(scs[0]))
require.NoError(t, bs.Save(scs[1]))
// Evict slot mapping from the cache so that we trigger the file read path.
root := fmt.Sprintf("%#x", scs[0].BlockRoot())
bs.pruner.cache.evict(root)
_, ok := bs.pruner.cache.slot(root)
require.Equal(t, false, ok)
// Ensure that we see the saved files in the filesystem.
files, err := listDir(fs, root)
require.NoError(t, err)
require.Equal(t, 2, len(files))
// This should use the slotFromFile code (simulating restart).
// Setting pruneBefore == slot, so that the slot will be outside the window (at the boundary).
pruned, err := bs.pruner.tryPruneDir(root, slot)
require.NoError(t, err)
require.Equal(t, 0, pruned)
// Ensure files are still present.
files, err = listDir(fs, root)
require.NoError(t, err)
require.Equal(t, 2, len(files))
})
func (e *pruneExpectation) record(before primitives.Epoch) (*pruneSummary, error) {
e.called = true
e.arg = before
if e.summary == nil {
e.summary = &pruneSummary{}
}
return e.summary, e.err
}
func TestSlotFromBlob(t *testing.T) {
cases := []struct {
slot primitives.Slot
}{
{slot: 0},
{slot: 2},
{slot: 1123581321},
{slot: math.MaxUint64},
}
for _, c := range cases {
t.Run(fmt.Sprintf("slot %d", c.slot), func(t *testing.T) {
_, sidecars := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, c.slot, 1)
sc := sidecars[0]
enc, err := sc.MarshalSSZ()
require.NoError(t, err)
slot, err := slotFromBlob(bytes.NewReader(enc))
require.NoError(t, err)
require.Equal(t, c.slot, slot)
})
}
}
func TestSlotFromFile(t *testing.T) {
cases := []struct {
slot primitives.Slot
}{
{slot: 0},
{slot: 2},
{slot: 1123581321},
{slot: math.MaxUint64},
}
for _, c := range cases {
t.Run(fmt.Sprintf("slot %d", c.slot), func(t *testing.T) {
fs, bs := NewEphemeralBlobStorageWithFs(t)
_, sidecars := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, c.slot, 1)
sc, err := verification.BlobSidecarNoop(sidecars[0])
require.NoError(t, err)
require.NoError(t, bs.Save(sc))
fname := namerForSidecar(sc)
sszPath := fname.path()
slot, err := slotFromFile(sszPath, fs)
require.NoError(t, err)
require.Equal(t, c.slot, slot)
})
}
}
type dirFiles struct {
name string
isDir bool
children []dirFiles
}
func (df dirFiles) reify(t *testing.T, fs afero.Fs, base string) {
fullPath := path.Join(base, df.name)
if df.isDir {
if df.name != "" {
require.NoError(t, fs.Mkdir(fullPath, directoryPermissions))
}
for _, c := range df.children {
c.reify(t, fs, fullPath)
}
} else {
fp, err := fs.Create(fullPath)
require.NoError(t, err)
_, err = fp.WriteString("derp")
require.NoError(t, err)
}
}
func (df dirFiles) childNames() []string {
cn := make([]string, len(df.children))
for i := range df.children {
cn[i] = df.children[i].name
}
return cn
}
func TestListDir(t *testing.T) {
fs := afero.NewMemMapFs()
// parent directory
fsLayout := dirFiles{isDir: true}
// break out each subdir for easier assertions
notABlob := dirFiles{name: "notABlob", isDir: true}
childlessBlob := dirFiles{name: "0x0987654321", isDir: true}
blobWithSsz := dirFiles{name: "0x1123581321", isDir: true,
children: []dirFiles{{name: "1.ssz"}, {name: "2.ssz"}},
}
blobWithSszAndTmp := dirFiles{name: "0x1234567890", isDir: true,
children: []dirFiles{{name: "5.ssz"}, {name: "0.part"}}}
fsLayout.children = append(fsLayout.children,
notABlob, childlessBlob, blobWithSsz, blobWithSszAndTmp)
topChildren := make([]string, len(fsLayout.children))
for i := range fsLayout.children {
topChildren[i] = fsLayout.children[i].name
}
fsLayout.reify(t, fs, "")
cases := []struct {
name string
dirPath string
expected []string
filter func(string) bool
err error
}{
func TestPrunerNotify(t *testing.T) {
defaultRetention := params.BeaconConfig().MinEpochsForBlobsSidecarsRequest
cases := []prunerScenario{
{
name: "non-existent",
dirPath: "derp",
expected: []string{},
err: os.ErrNotExist,
name: "last epoch of period",
retentionPeriod: defaultRetention,
prunedBefore: 11235,
latest: defaultRetention + 11235,
expected: pruneExpectation{called: false},
},
{
name: "empty",
dirPath: childlessBlob.name,
expected: []string{},
name: "within period",
retentionPeriod: defaultRetention,
prunedBefore: 11235,
latest: 11235 + defaultRetention - 1,
expected: pruneExpectation{called: false},
},
{
name: "top",
dirPath: ".",
expected: topChildren,
name: "triggers",
retentionPeriod: defaultRetention,
prunedBefore: 11235,
latest: 11235 + 1 + defaultRetention,
expected: pruneExpectation{called: true, arg: 11235 + 1},
},
{
name: "custom filter: only notABlob",
dirPath: ".",
expected: []string{notABlob.name},
filter: func(s string) bool {
return s == notABlob.name
},
name: "from zero - before first period",
retentionPeriod: defaultRetention,
prunedBefore: 0,
latest: defaultRetention - 1,
expected: pruneExpectation{called: false},
},
{
name: "root filter",
dirPath: ".",
expected: []string{childlessBlob.name, blobWithSsz.name, blobWithSszAndTmp.name},
filter: filterRoot,
name: "from zero - at boundary",
retentionPeriod: defaultRetention,
prunedBefore: 0,
latest: defaultRetention,
expected: pruneExpectation{called: false},
},
{
name: "ssz filter",
dirPath: blobWithSsz.name,
expected: blobWithSsz.childNames(),
filter: filterSsz,
},
{
name: "ssz mixed filter",
dirPath: blobWithSszAndTmp.name,
expected: []string{"5.ssz"},
filter: filterSsz,
name: "from zero - triggers",
retentionPeriod: defaultRetention,
prunedBefore: 0,
latest: defaultRetention + 1,
expected: pruneExpectation{called: true, arg: 1},
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
result, err := listDir(fs, c.dirPath)
if c.filter != nil {
result = filter(result, c.filter)
}
if c.err != nil {
require.ErrorIs(t, err, c.err)
require.Equal(t, 0, len(result))
} else {
require.NoError(t, err)
sort.Strings(c.expected)
sort.Strings(result)
require.DeepEqual(t, c.expected, result)
}
actual := &pruneExpectation{}
l := &mockLayout{pruneBeforeFunc: actual.record}
pruner := &blobPruner{retentionPeriod: c.retentionPeriod}
pruner.prunedBefore.Store(uint64(c.prunedBefore))
done := pruner.notify(c.latest, l)
<-done
require.Equal(t, c.expected.called, actual.called)
require.Equal(t, c.expected.arg, actual.arg)
})
}
}
func testSetupBlobIdentPaths(t *testing.T, fs afero.Fs, bs *BlobStorage, idents []testIdent) []blobIdent {
created := make([]blobIdent, len(idents))
for i, id := range idents {
slot, err := slots.EpochStart(id.epoch)
require.NoError(t, err)
slot += id.offset
_, scs := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, slot, 1)
sc := verification.FakeVerifyForTest(t, scs[0])
require.NoError(t, bs.Save(sc))
ident := identForSidecar(sc)
_, err = fs.Stat(bs.layout.sszPath(ident))
require.NoError(t, err)
created[i] = ident
}
return created
}
func testAssertBlobsPruned(t *testing.T, fs afero.Fs, bs *BlobStorage, pruned, remain []blobIdent) {
for _, id := range pruned {
_, err := fs.Stat(bs.layout.sszPath(id))
require.Equal(t, true, os.IsNotExist(err))
}
for _, id := range remain {
_, err := fs.Stat(bs.layout.sszPath(id))
require.NoError(t, err)
}
}
type testIdent struct {
blobIdent
offset primitives.Slot
}
func testRoots(n int) [][32]byte {
roots := make([][32]byte, n)
for i := range roots {
binary.LittleEndian.PutUint32(roots[i][:], uint32(1+i))
}
return roots
}
func TestLayoutPruneBefore(t *testing.T) {
roots := testRoots(10)
cases := []struct {
name string
pruned []testIdent
remain []testIdent
pruneBefore primitives.Epoch
err error
sum pruneSummary
}{
{
name: "none pruned",
pruneBefore: 1,
pruned: []testIdent{},
remain: []testIdent{
{offset: 1, blobIdent: blobIdent{root: roots[0], epoch: 1, index: 0}},
{offset: 1, blobIdent: blobIdent{root: roots[1], epoch: 1, index: 0}},
},
},
{
name: "expected pruned before epoch",
pruneBefore: 3,
pruned: []testIdent{
{offset: 0, blobIdent: blobIdent{root: roots[0], epoch: 1, index: 0}},
{offset: 31, blobIdent: blobIdent{root: roots[1], epoch: 1, index: 5}},
{offset: 0, blobIdent: blobIdent{root: roots[2], epoch: 2, index: 0}},
{offset: 31, blobIdent: blobIdent{root: roots[3], epoch: 2, index: 3}},
},
remain: []testIdent{
{offset: 0, blobIdent: blobIdent{root: roots[4], epoch: 3, index: 2}}, // boundary
{offset: 31, blobIdent: blobIdent{root: roots[5], epoch: 3, index: 0}}, // boundary
{offset: 0, blobIdent: blobIdent{root: roots[6], epoch: 4, index: 1}},
{offset: 31, blobIdent: blobIdent{root: roots[7], epoch: 4, index: 5}},
},
sum: pruneSummary{blobsPruned: 4},
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
fs, bs := NewEphemeralBlobStorageAndFs(t)
pruned := testSetupBlobIdentPaths(t, fs, bs, c.pruned)
remain := testSetupBlobIdentPaths(t, fs, bs, c.remain)
sum, err := bs.layout.pruneBefore(c.pruneBefore)
if c.err != nil {
require.ErrorIs(t, err, c.err)
return
}
require.NoError(t, err)
testAssertBlobsPruned(t, fs, bs, pruned, remain)
require.Equal(t, c.sum.blobsPruned, sum.blobsPruned)
require.Equal(t, len(c.pruned), sum.blobsPruned)
require.Equal(t, len(c.sum.failedRemovals), len(sum.failedRemovals))
})
}
}

View File

@@ -38,8 +38,7 @@ func TestBlobs(t *testing.T) {
denebBlock, blobs := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, 123, 4)
require.NoError(t, db.SaveBlock(context.Background(), denebBlock))
bs := filesystem.NewEphemeralBlobStorage(t)
testSidecars, err := verification.BlobSidecarSliceNoop(blobs)
require.NoError(t, err)
testSidecars := verification.FakeVerifySliceForTest(t, blobs)
for i := range testSidecars {
require.NoError(t, bs.Save(testSidecars[i]))
}

View File

@@ -164,9 +164,8 @@ func TestGetBlob(t *testing.T) {
db := testDB.SetupDB(t)
denebBlock, blobs := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, 123, 4)
require.NoError(t, db.SaveBlock(context.Background(), denebBlock))
_, bs := filesystem.NewEphemeralBlobStorageWithFs(t)
testSidecars, err := verification.BlobSidecarSliceNoop(blobs)
require.NoError(t, err)
_, bs := filesystem.NewEphemeralBlobStorageAndFs(t)
testSidecars := verification.FakeVerifySliceForTest(t, blobs)
for i := range testSidecars {
require.NoError(t, bs.Save(testSidecars[i]))
}

View File

@@ -250,8 +250,7 @@ func (c *blobsTestCase) run(t *testing.T) {
}
}
for _, blobSidecars := range m {
v, err := verification.BlobSidecarSliceNoop(blobSidecars)
require.NoError(t, err)
v := verification.FakeVerifySliceForTest(t, blobSidecars)
for i := range v {
require.NoError(t, s.cfg.blobStorage.Save(v[i]))
}

View File

@@ -11,7 +11,6 @@ import (
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/transition"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/das"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/db/filesystem"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/sync"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/verification"
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
@@ -70,14 +69,6 @@ func (s *Service) startBlocksQueue(ctx context.Context, highestSlot primitives.S
return nil, errors.Wrapf(err, "unable to initialize context version map using genesis validator root = %#x", vr)
}
summarizer, err := s.cfg.BlobStorage.WaitForSummarizer(ctx)
if err != nil {
// The summarizer is an optional optimization, we can continue without, only stop if there is a different error.
if !errors.Is(err, filesystem.ErrBlobStorageSummarizerUnavailable) {
return nil, err
}
summarizer = nil // This should already be nil, but we'll set it just to be safe.
}
cfg := &blocksQueueConfig{
p2p: s.cfg.P2P,
db: s.cfg.DB,
@@ -86,7 +77,7 @@ func (s *Service) startBlocksQueue(ctx context.Context, highestSlot primitives.S
ctxMap: ctxMap,
highestExpectedSlot: highestSlot,
mode: mode,
bs: summarizer,
bs: s.cfg.BlobStorage,
}
queue := newBlocksQueue(ctx, cfg)
if err := queue.start(); err != nil {

View File

@@ -464,7 +464,7 @@ func TestMissingBlobRequest(t *testing.T) {
setup: func(t *testing.T) (blocks.ROBlock, *filesystem.BlobStorage) {
bk, _ := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, 0, 2)
bm, fs := filesystem.NewEphemeralBlobStorageWithMocker(t)
require.NoError(t, bm.CreateFakeIndices(bk.Root(), 1))
require.NoError(t, bm.CreateFakeIndices(bk.Root(), bk.Block().Slot(), 1))
return bk, fs
},
nReq: 1,
@@ -474,7 +474,7 @@ func TestMissingBlobRequest(t *testing.T) {
setup: func(t *testing.T) (blocks.ROBlock, *filesystem.BlobStorage) {
bk, _ := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, 0, 2)
bm, fs := filesystem.NewEphemeralBlobStorageWithMocker(t)
require.NoError(t, bm.CreateFakeIndices(bk.Root(), 0, 1))
require.NoError(t, bm.CreateFakeIndices(bk.Root(), bk.Block().Slot(), 0, 1))
return bk, fs
},
nReq: 0,

View File

@@ -445,8 +445,7 @@ func TestConstructPendingBlobsRequest(t *testing.T) {
util.GenerateTestDenebBlobSidecar(t, root, header, 0, bytesutil.PadTo([]byte{}, 48), make([][]byte, 0)),
util.GenerateTestDenebBlobSidecar(t, root, header, 2, bytesutil.PadTo([]byte{}, 48), make([][]byte, 0)),
}
vscs, err := verification.BlobSidecarSliceNoop(blobSidecars)
require.NoError(t, err)
vscs := verification.FakeVerifySliceForTest(t, blobSidecars)
for i := range vscs {
require.NoError(t, bs.Save(vscs[i]))
}

View File

@@ -8,6 +8,7 @@ go_library(
"cache.go",
"error.go",
"fake.go",
"filesystem.go",
"initializer.go",
"interface.go",
"metrics.go",
@@ -40,6 +41,7 @@ go_library(
"@com_github_prometheus_client_golang//prometheus:go_default_library",
"@com_github_prometheus_client_golang//prometheus/promauto:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@com_github_spf13_afero//:go_default_library",
],
)

View File

@@ -1,6 +1,9 @@
package verification
import "github.com/pkg/errors"
import (
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
)
// ErrMissingVerification indicates that the given verification function was never performed on the value.
var ErrMissingVerification = errors.New("verification was not performed for requirement")
@@ -36,3 +39,12 @@ func (ve VerificationMultiError) Failures() map[Requirement]error {
func newVerificationMultiError(r *results, err error) VerificationMultiError {
return VerificationMultiError{r: r, err: err}
}
// VerifiedROBlobError can be used by methods that have a VerifiedROBlob return type but do not have permission to
// create a value of that type in order to generate an error return value.
func VerifiedROBlobError(err error) (blocks.VerifiedROBlob, error) {
if err == nil {
panic("VerifiedROBlobError used to create a VerifiedROBlob without a checkable error.")
}
return blocks.VerifiedROBlob{}, err
}

View File

@@ -6,22 +6,6 @@ import (
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
)
// BlobSidecarNoop is a FAKE verification function that simply launders a ROBlob->VerifiedROBlob.
// TODO: find all code that uses this method and replace it with full verification.
func BlobSidecarNoop(b blocks.ROBlob) (blocks.VerifiedROBlob, error) {
return blocks.NewVerifiedROBlob(b), nil
}
// BlobSidecarSliceNoop is a FAKE verification function that simply launders a ROBlob->VerifiedROBlob.
// TODO: find all code that uses this method and replace it with full verification.
func BlobSidecarSliceNoop(b []blocks.ROBlob) ([]blocks.VerifiedROBlob, error) {
vbs := make([]blocks.VerifiedROBlob, len(b))
for i := range b {
vbs[i] = blocks.NewVerifiedROBlob(b[i])
}
return vbs, nil
}
// FakeVerifyForTest can be used by tests that need a VerifiedROBlob but don't want to do all the
// expensive set up to perform full validation.
func FakeVerifyForTest(t *testing.T, b blocks.ROBlob) blocks.VerifiedROBlob {
@@ -35,7 +19,6 @@ func FakeVerifyForTest(t *testing.T, b blocks.ROBlob) blocks.VerifiedROBlob {
func FakeVerifySliceForTest(t *testing.T, b []blocks.ROBlob) []blocks.VerifiedROBlob {
// log so that t is truly required
t.Log("producing fake []VerifiedROBlob for a test")
// tautological assertion that ensures this function can only be used in tests.
vbs := make([]blocks.VerifiedROBlob, len(b))
for i := range b {
vbs[i] = blocks.NewVerifiedROBlob(b[i])

View File

@@ -0,0 +1,23 @@
package verification
import (
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
"github.com/spf13/afero"
)
func VerifiedROBlobFromDisk(fs afero.Fs, root [32]byte, path string) (blocks.VerifiedROBlob, error) {
encoded, err := afero.ReadFile(fs, path)
if err != nil {
return VerifiedROBlobError(err)
}
s := &ethpb.BlobSidecar{}
if err := s.UnmarshalSSZ(encoded); err != nil {
return VerifiedROBlobError(err)
}
ro, err := blocks.NewROBlobWithRoot(s, root)
if err != nil {
return VerifiedROBlobError(err)
}
return blocks.NewVerifiedROBlob(ro), nil
}

View File

@@ -0,0 +1,96 @@
package main
import (
"errors"
"fmt"
"os"
"path/filepath"
"strings"
)
var errUsage = errors.New("incorrect usage - missing blob path")
const newLen = 4 // eg '0xff'
func blobPath() (string, error) {
if len(os.Args) < 2 {
return "", errUsage
}
return os.Args[1], nil
}
func usage(err error) {
fmt.Printf("%s\n", err.Error())
fmt.Println("downgrade-blob-storage: Move blob directories back to old format, without the single byte container directories at the top-level of the directory tree. usage:\n" + os.Args[0] + " <path to blobs dir>")
}
func main() {
bp, err := blobPath()
if err != nil {
if errors.Is(err, errUsage) {
usage(err)
}
os.Exit(1)
}
if err := downgrade(bp); err != nil {
fmt.Printf("fatal error: %s\n", err.Error())
os.Exit(1)
}
}
func downgrade(base string) error {
top, err := os.Open(base) // #nosec G304
if err != nil {
return err
}
// iterate over top-level blob dir, ie 'blobs' inside prysm's datadir
topdirs, err := top.Readdirnames(0)
if err != nil {
return err
}
if err := top.Close(); err != nil {
return err
}
nMoved := 0
for _, td := range topdirs {
// Ignore anything in the old layout.
if !filterNew(td) {
continue
}
dir, err := os.Open(filepath.Join(base, td)) // #nosec G304
if err != nil {
return err
}
// List the subdirectoress of the short dir containers, eg if td == '0xff'
// we want to move all the subdirectories in that dir.
subs, err := dir.Readdirnames(0)
if err != nil {
return err
}
if err := dir.Close(); err != nil {
return err
}
for _, sd := range subs {
// this is the inner layer of directory nesting,
// eg if 'td' == '0xff', 'sd' might be something like:
// '0xffff875e1d985c5ccb214894983f2428edb271f0f87b68ba7010e4a99df3b5cb'
src := filepath.Join(base, td, sd)
target := filepath.Join(base, sd)
fmt.Printf("moving %s -> %s\n", src, target)
if err := os.Rename(src, target); err != nil {
return err
}
nMoved += 1
}
}
fmt.Printf("moved %d directories\n", nMoved)
return nil
}
func filterRoot(s string) bool {
return strings.HasPrefix(s, "0x")
}
func filterNew(s string) bool {
return filterRoot(s) && len(s) == newLen
}