Compare commits

...

9 Commits

Author SHA1 Message Date
Kasey Kirkham
05d977a8fc minimize syscalls in pruning routine 2024-01-05 12:30:52 -06:00
terence tsao
099a01a63b More err to the end 2024-01-04 07:10:04 -08:00
terence tsao
3f7c54c563 Merge branch 'use-afero-walk' of github.com:prysmaticlabs/prysm into use-afero-walk 2024-01-04 07:05:41 -08:00
terence tsao
f6e641f3fc Use wrap 2024-01-04 07:05:34 -08:00
terence
423089aa0a Merge branch 'develop' into use-afero-walk 2024-01-04 06:51:26 -08:00
terence tsao
59d4f61ebb Return err 2024-01-04 06:42:33 -08:00
terence tsao
e8cada76b6 Merge branch 'develop' of github.com:prysmaticlabs/prysm into use-afero-walk 2024-01-04 06:34:48 -08:00
terence tsao
4a4dbba067 Merge branch 'develop' of github.com:prysmaticlabs/prysm into use-afero-walk 2024-01-03 14:42:27 -08:00
terence tsao
42309121da Use Afero walk 2024-01-03 10:04:19 -08:00
5 changed files with 277 additions and 168 deletions

View File

@@ -6,6 +6,7 @@ go_library(
"blob.go",
"ephemeral.go",
"metrics.go",
"pruner.go",
],
importpath = "github.com/prysmaticlabs/prysm/v4/beacon-chain/db/filesystem",
visibility = ["//visibility:public"],
@@ -19,7 +20,6 @@ go_library(
"//proto/prysm/v1alpha1:go_default_library",
"//runtime/logging:go_default_library",
"//time/slots:go_default_library",
"@com_github_ethereum_go_ethereum//common/hexutil:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_prometheus_client_golang//prometheus:go_default_library",
"@com_github_prometheus_client_golang//prometheus/promauto:go_default_library",
@@ -40,7 +40,6 @@ go_test(
"//proto/prysm/v1alpha1:go_default_library",
"//testing/require:go_default_library",
"//testing/util:go_default_library",
"@com_github_ethereum_go_ethereum//common/hexutil:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_prysmaticlabs_fastssz//:go_default_library",
"@com_github_spf13_afero//:go_default_library",

View File

@@ -1,28 +1,21 @@
package filesystem
import (
"encoding/binary"
"fmt"
"io"
"os"
"path"
"path/filepath"
"strconv"
"strings"
"sync/atomic"
"time"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/verification"
fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams"
"github.com/prysmaticlabs/prysm/v4/config/params"
"github.com/prysmaticlabs/prysm/v4/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v4/io/file"
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v4/runtime/logging"
"github.com/prysmaticlabs/prysm/v4/time/slots"
log "github.com/sirupsen/logrus"
"github.com/spf13/afero"
)
@@ -35,7 +28,6 @@ const (
sszExt = "ssz"
partExt = "part"
bufferEpochs = 2
directoryPermissions = 0700
)
@@ -45,11 +37,11 @@ type BlobStorageOption func(*BlobStorage) error
// WithBlobRetentionEpochs is an option that changes the number of epochs blobs will be persisted.
func WithBlobRetentionEpochs(e primitives.Epoch) BlobStorageOption {
return func(b *BlobStorage) error {
s, err := slots.EpochStart(e + bufferEpochs)
pruner, err := newblobPruner(b.fs, e)
if err != nil {
return errors.Wrap(err, "could not set retentionSlots")
return err
}
b.retentionSlots = s
b.pruner = pruner
return nil
}
}
@@ -76,9 +68,8 @@ func NewBlobStorage(base string, opts ...BlobStorageOption) (*BlobStorage, error
// BlobStorage is the concrete implementation of the filesystem backend for saving and retrieving BlobSidecars.
type BlobStorage struct {
fs afero.Fs
retentionSlots primitives.Slot
prunedBefore atomic.Uint64
fs afero.Fs
pruner *blobPruner
}
// Save saves blobs given a list of sidecars.
@@ -94,7 +85,9 @@ func (bs *BlobStorage) Save(sidecar blocks.VerifiedROBlob) error {
log.WithFields(logging.BlobFields(sidecar.ROBlob)).Debug("ignoring a duplicate blob sidecar Save attempt")
return nil
}
bs.tryPrune(sidecar.Slot())
if bs.pruner != nil {
bs.pruner.try(sidecar.BlockRoot(), sidecar.Slot())
}
// Serialize the ethpb.BlobSidecar to binary data using SSZ.
sidecarData, err := sidecar.MarshalSSZ()
@@ -218,7 +211,7 @@ func namerForSidecar(sc blocks.VerifiedROBlob) blobNamer {
}
func (p blobNamer) dir() string {
return fmt.Sprintf("%#x", p.root)
return rootString(p.root)
}
func (p blobNamer) fname(ext string) string {
@@ -233,110 +226,6 @@ func (p blobNamer) path() string {
return p.fname(sszExt)
}
// Prune prunes blobs in the base directory based on the retention epoch.
// It deletes blobs older than currentEpoch - (retentionEpochs+bufferEpochs).
// This is so that we keep a slight buffer and blobs are deleted after n+2 epochs.
func (bs *BlobStorage) Prune(pruneBefore primitives.Slot) error {
t := time.Now()
log.Debug("Pruning old blobs")
folders, err := afero.ReadDir(bs.fs, ".")
if err != nil {
return err
}
var totalPruned int
for _, folder := range folders {
if folder.IsDir() {
num, err := bs.processFolder(folder, pruneBefore)
if err != nil {
return err
}
blobsPrunedCounter.Add(float64(num))
blobsTotalGauge.Add(-float64(num))
totalPruned += num
}
}
pruneTime := time.Since(t)
log.WithFields(log.Fields{
"lastPrunedEpoch": slots.ToEpoch(pruneBefore),
"pruneTime": pruneTime,
"numberBlobsPruned": totalPruned,
}).Debug("Pruned old blobs")
return nil
}
// processFolder will delete the folder of blobs if the blob slot is outside the
// retention period. We determine the slot by looking at the first blob in the folder.
func (bs *BlobStorage) processFolder(folder os.FileInfo, pruneBefore primitives.Slot) (int, error) {
f, err := bs.fs.Open(filepath.Join(folder.Name(), "0."+sszExt))
if err != nil {
return 0, err
}
defer func() {
if err := f.Close(); err != nil {
log.WithError(err).Errorf("Could not close blob file")
}
}()
slot, err := slotFromBlob(f)
if err != nil {
return 0, err
}
var num int
if slot < pruneBefore {
num, err = bs.countFiles(folder.Name())
if err != nil {
return 0, err
}
if err = bs.fs.RemoveAll(folder.Name()); err != nil {
return 0, errors.Wrapf(err, "failed to delete blob %s", f.Name())
}
}
return num, nil
}
// slotFromBlob reads the ssz data of a file at the specified offset (8 + 131072 + 48 + 48 = 131176 bytes),
// which is calculated based on the size of the BlobSidecar struct and is based on the size of the fields
// preceding the slot information within SignedBeaconBlockHeader.
func slotFromBlob(at io.ReaderAt) (primitives.Slot, error) {
b := make([]byte, 8)
_, err := at.ReadAt(b, 131176)
if err != nil {
return 0, err
}
rawSlot := binary.LittleEndian.Uint64(b)
return primitives.Slot(rawSlot), nil
}
// Delete removes the directory matching the provided block root and all the blobs it contains.
func (bs *BlobStorage) Delete(root [32]byte) error {
if err := bs.fs.RemoveAll(hexutil.Encode(root[:])); err != nil {
return fmt.Errorf("failed to delete blobs for root %#x: %w", root, err)
}
return nil
}
// tryPrune checks whether we should prune and then calls prune
func (bs *BlobStorage) tryPrune(latest primitives.Slot) {
pruned := uint64(pruneBefore(latest, bs.retentionSlots))
if bs.prunedBefore.Swap(pruned) == pruned {
return
}
go func() {
if err := bs.Prune(primitives.Slot(pruned)); err != nil {
log.WithError(err).Errorf("failed to prune blobs from slot %d", latest)
}
}()
}
func pruneBefore(latest primitives.Slot, offset primitives.Slot) primitives.Slot {
// Safely compute the first slot in the epoch for the latest slot
latest = latest - latest%params.BeaconConfig().SlotsPerEpoch
if latest < offset {
return 0
}
return latest - offset
func rootString(root [32]byte) string {
return fmt.Sprintf("%#x", root)
}

View File

@@ -7,7 +7,6 @@ import (
"testing"
"time"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/pkg/errors"
ssz "github.com/prysmaticlabs/fastssz"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/verification"
@@ -79,7 +78,7 @@ func TestBlobStorage_SaveBlobData(t *testing.T) {
require.NoError(t, err)
// Slot in first half of epoch therefore should not prune
bs.tryPrune(testSidecars[0].Slot())
bs.pruner.try(testSidecars[0].BlockRoot(), testSidecars[0].Slot())
err = bs.Save(testSidecars[0])
require.NoError(t, err)
actual, err := bs.Get(testSidecars[0].BlockRoot(), testSidecars[0].Index)
@@ -92,7 +91,7 @@ func TestBlobStorage_SaveBlobData(t *testing.T) {
testSidecars1, err := verification.BlobSidecarSliceNoop(sidecars)
require.NoError(t, err)
// Slot in first half of epoch therefore should not prune
bs.tryPrune(testSidecars1[0].Slot())
bs.pruner.try(testSidecars1[0].BlockRoot(), testSidecars1[0].Slot())
err = bs.Save(testSidecars1[0])
require.NoError(t, err)
// Check previous saved sidecar was not pruned
@@ -109,7 +108,7 @@ func TestBlobStorage_SaveBlobData(t *testing.T) {
_, sidecars = util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, 131187, fieldparams.MaxBlobsPerBlock)
testSidecars2, err := verification.BlobSidecarSliceNoop(sidecars)
// Slot in second half of epoch therefore should prune
bs.tryPrune(testSidecars2[0].Slot())
bs.pruner.try(testSidecars2[0].BlockRoot(), testSidecars2[0].Slot())
require.NoError(t, err)
err = bs.Save(testSidecars2[0])
require.NoError(t, err)
@@ -188,7 +187,7 @@ func TestBlobStoragePrune(t *testing.T) {
require.NoError(t, bs.Save(sidecar))
}
require.NoError(t, bs.Prune(currentSlot-bs.retentionSlots))
require.NoError(t, bs.pruner.prune(currentSlot-bs.pruner.retain))
remainingFolders, err := afero.ReadDir(fs, ".")
require.NoError(t, err)
@@ -208,7 +207,7 @@ func TestBlobStoragePrune(t *testing.T) {
slot += 10000
}
require.NoError(t, bs.Prune(currentSlot-bs.retentionSlots))
require.NoError(t, bs.pruner.prune(currentSlot-bs.pruner.retain))
remainingFolders, err := afero.ReadDir(fs, ".")
require.NoError(t, err)
@@ -237,41 +236,11 @@ func BenchmarkPruning(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
err := bs.Prune(currentSlot)
err := bs.pruner.prune(currentSlot)
require.NoError(b, err)
}
}
func TestBlobStorageDelete(t *testing.T) {
fs, bs, err := NewEphemeralBlobStorageWithFs(t)
require.NoError(t, err)
rawRoot := "0xcf9bb70c98f58092c9d6459227c9765f984d240be9690e85179bc5a6f60366ad"
blockRoot, err := hexutil.Decode(rawRoot)
require.NoError(t, err)
_, sidecars := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, 1, fieldparams.MaxBlobsPerBlock)
testSidecars, err := verification.BlobSidecarSliceNoop(sidecars)
require.NoError(t, err)
for _, sidecar := range testSidecars {
require.NoError(t, bs.Save(sidecar))
}
exists, err := afero.DirExists(fs, hexutil.Encode(blockRoot))
require.NoError(t, err)
require.Equal(t, true, exists)
// Delete the directory corresponding to the block root
require.NoError(t, bs.Delete(bytesutil.ToBytes32(blockRoot)))
// Ensure that the directory no longer exists after deletion
exists, err = afero.DirExists(fs, hexutil.Encode(blockRoot))
require.NoError(t, err)
require.Equal(t, false, exists)
// Deleting a non-existent root does not return an error.
require.NoError(t, bs.Delete(bytesutil.ToBytes32([]byte{0x1})))
}
func TestNewBlobStorage(t *testing.T) {
_, err := NewBlobStorage(path.Join(t.TempDir(), "good"))
require.NoError(t, err)

View File

@@ -4,26 +4,30 @@ import (
"testing"
"github.com/prysmaticlabs/prysm/v4/config/params"
"github.com/prysmaticlabs/prysm/v4/time/slots"
"github.com/spf13/afero"
)
// NewEphemeralBlobStorage should only be used for tests.
// The instance of BlobStorage returned is backed by an in-memory virtual filesystem,
// improving test performance and simplifying cleanup.
func NewEphemeralBlobStorage(_ testing.TB) *BlobStorage {
return &BlobStorage{fs: afero.NewMemMapFs()}
func NewEphemeralBlobStorage(t testing.TB) *BlobStorage {
fs := afero.NewMemMapFs()
pruner, err := newblobPruner(fs, params.BeaconConfig().MinEpochsForBlobsSidecarsRequest)
if err != nil {
t.Fatal("test setup issue", err)
}
return &BlobStorage{fs: fs, pruner: pruner}
}
// NewEphemeralBlobStorageWithFs can be used by tests that want access to the virtual filesystem
// in order to interact with it outside the parameters of the BlobStorage api.
func NewEphemeralBlobStorageWithFs(_ testing.TB) (afero.Fs, *BlobStorage, error) {
func NewEphemeralBlobStorageWithFs(t testing.TB) (afero.Fs, *BlobStorage, error) {
fs := afero.NewMemMapFs()
s, err := slots.EpochStart(params.BeaconConfig().MinEpochsForBlobsSidecarsRequest)
pruner, err := newblobPruner(fs, params.BeaconConfig().MinEpochsForBlobsSidecarsRequest)
if err != nil {
return fs, &BlobStorage{}, err
t.Fatal("test setup issue", err)
}
return fs, &BlobStorage{fs: fs, retentionSlots: s}, nil
return fs, &BlobStorage{fs: fs, pruner: pruner}, nil
}
type BlobMocker struct {

View File

@@ -0,0 +1,248 @@
package filesystem
import (
"encoding/binary"
"io"
"path/filepath"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/pkg/errors"
fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams"
"github.com/prysmaticlabs/prysm/v4/config/params"
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v4/time/slots"
log "github.com/sirupsen/logrus"
"github.com/spf13/afero"
)
const retentionBuffer primitives.Epoch = 2
var (
errBlobSlotUnknown = errors.New("could not determine blob slot from files in storage")
errPruningFailures = errors.New("blobs could not be pruned for some roots")
)
type blobPruner struct {
sync.Mutex
slotMap *slotForRoot
retain primitives.Slot
fs afero.Fs
prunedBefore atomic.Uint64
}
func newblobPruner(fs afero.Fs, retain primitives.Epoch) (*blobPruner, error) {
r, err := slots.EpochStart(retain + retentionBuffer)
if err != nil {
return nil, errors.Wrap(err, "could not set retentionSlots")
}
return &blobPruner{fs: fs, retain: r, slotMap: newSlotForRoot()}, nil
}
// tryPrune checks whether we should prune and then calls prune in a goroutine.
func (p *blobPruner) try(root [32]byte, latest primitives.Slot) {
p.slotMap.ensure(rootString(root), latest)
pruned := uint64(pruneBefore(latest, p.retain))
if p.prunedBefore.Swap(pruned) == pruned {
return
}
go func() {
if err := p.prune(primitives.Slot(pruned)); err != nil {
log.WithError(err).Errorf("failed to prune blobs from slot %d", latest)
}
}()
}
func pruneBefore(latest primitives.Slot, offset primitives.Slot) primitives.Slot {
// Safely compute the first slot in the epoch for the latest slot
latest = latest - latest%params.BeaconConfig().SlotsPerEpoch
if latest < offset {
return 0
}
return latest - offset
}
// Prune prunes blobs in the base directory based on the retention epoch.
// It deletes blobs older than currentEpoch - (retentionEpochs+bufferEpochs).
// This is so that we keep a slight buffer and blobs are deleted after n+2 epochs.
func (p *blobPruner) prune(pruneBefore primitives.Slot) error {
log.Debug("Pruning old blobs")
start := time.Now()
totalPruned, totalErr := 0, 0
defer func() {
log.WithFields(log.Fields{
"lastPrunedEpoch": slots.ToEpoch(pruneBefore),
"pruneTime": time.Since(start).String(),
"numberBlobsPruned": totalPruned,
}).Debug("Pruned old blobs")
blobsPrunedCounter.Add(float64(totalPruned))
}()
dirs, err := p.listDir(".", filterRoot)
if err != nil {
return errors.Wrap(err, "unable to list root blobs directory")
}
for _, dir := range dirs {
pruned, err := p.tryPruneDir(dir, pruneBefore)
if err != nil {
totalErr += 1
log.WithError(err).WithField("directory", dir).Error("unable to prune directory")
}
totalPruned += pruned
}
if totalErr > 0 {
return errors.Wrapf(errPruningFailures, "pruning failed for %d root directories", totalErr)
}
return nil
}
// directoryMeta tries a few different ways to determine the slot for the given directory.
// The seconds argument will be nil if the function did not need to list the directory, or
// non-nil with a list of files if it did.
func (p *blobPruner) directoryMeta(dir string) (primitives.Slot, []string, error) {
root := filepath.Base(dir) // end of the path should be the blob directory, named by hex encoding of root
// First try the cheap map lookup.
slot, ok := p.slotMap.slot(root)
if ok {
return slot, nil, nil
}
// Next try constructing the path to the zero index blob, which will always be present unless
// the blob directory has been damaged by something like a restart during RemoveAll.
slot, err := slotFromFile(filepath.Join(dir, "0."+sszExt), p.fs)
if err == nil {
p.slotMap.ensure(root, slot)
return slot, nil, nil
}
// Fall back if getting the slot from index zero failed -- look for any ssz file.
files, err := p.listDir(dir, filterSsz)
if err != nil {
return 0, nil, errors.Wrapf(err, "failed to list blobs in directory %s", dir)
}
if len(files) == 0 {
return 0, files, errors.Wrapf(errBlobSlotUnknown, "contained no blob files")
}
slot, err = slotFromFile(files[0], p.fs)
if err != nil {
return 0, nil, errors.Wrapf(err, "slot could not be read from blob file %s", files[0])
}
p.slotMap.ensure(root, slot)
return slot, files, nil
}
// tryPruneDir will delete the directory of blobs if the blob slot is outside the
// retention period. We determine the slot by looking at the first blob in the directory.
func (p *blobPruner) tryPruneDir(dir string, pruneBefore primitives.Slot) (int, error) {
slot, files, err := p.directoryMeta(dir)
if err != nil {
return 0, errors.Wrapf(err, "could not determine slot for directory %s", dir)
}
if slot >= pruneBefore {
return 0, nil
}
if len(files) == 0 {
files, err = p.listDir(dir, filterSsz)
if err != nil {
return 0, errors.Wrapf(err, "failed to list blobs in directory %s", dir)
}
}
if err = p.fs.RemoveAll(dir); err != nil {
return 0, errors.Wrapf(err, "failed to delete blobs in %s", dir)
}
return len(files), nil
}
func slotFromFile(file string, fs afero.Fs) (primitives.Slot, error) {
f, err := fs.Open(file)
if err != nil {
return 0, err
}
defer func() {
if err := f.Close(); err != nil {
log.WithError(err).Errorf("Could not close blob file")
}
}()
return slotFromBlob(f)
}
// slotFromBlob reads the ssz data of a file at the specified offset (8 + 131072 + 48 + 48 = 131176 bytes),
// which is calculated based on the size of the BlobSidecar struct and is based on the size of the fields
// preceding the slot information within SignedBeaconBlockHeader.
func slotFromBlob(at io.ReaderAt) (primitives.Slot, error) {
b := make([]byte, 8)
_, err := at.ReadAt(b, 131176)
if err != nil {
return 0, err
}
rawSlot := binary.LittleEndian.Uint64(b)
return primitives.Slot(rawSlot), nil
}
func (p *blobPruner) listDir(dir string, filter func(string) bool) ([]string, error) {
top, err := p.fs.Open(dir)
defer func() {
if err := top.Close(); err != nil {
log.WithError(err).Errorf("Could not close file %s", dir)
}
}()
if err != nil {
return nil, errors.Wrap(err, "failed to open directory descriptor")
}
dirs, err := top.Readdirnames(-1)
if err != nil {
return nil, errors.Wrap(err, "failed to read directory listing")
}
if filter != nil {
filtered := make([]string, 0, len(dirs))
for i := range dirs {
if filter(dirs[i]) {
filtered = append(filtered, dirs[i])
}
}
return filtered, nil
}
return dirs, nil
}
func filterRoot(s string) bool {
return strings.HasPrefix(s, "0x")
}
func filterSsz(s string) bool {
return filepath.Ext(s) == sszExt
}
func newSlotForRoot() *slotForRoot {
return &slotForRoot{
cache: make(map[string]primitives.Slot, params.BeaconConfig().MinEpochsForBlobsSidecarsRequest*fieldparams.SlotsPerEpoch),
}
}
type slotForRoot struct {
sync.RWMutex
cache map[string]primitives.Slot
}
func (s *slotForRoot) ensure(key string, slot primitives.Slot) {
s.Lock()
defer s.Unlock()
s.cache[key] = slot
}
func (s *slotForRoot) slot(key string) (primitives.Slot, bool) {
s.RLock()
defer s.RUnlock()
slot, ok := s.cache[key]
return slot, ok
}
func (s *slotForRoot) evict(key string) {
s.Lock()
defer s.Unlock()
delete(s.cache, key)
}