mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-10 05:47:59 -05:00
Compare commits
9 Commits
v6.1.2-rc.
...
lighter-pr
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
05d977a8fc | ||
|
|
099a01a63b | ||
|
|
3f7c54c563 | ||
|
|
f6e641f3fc | ||
|
|
423089aa0a | ||
|
|
59d4f61ebb | ||
|
|
e8cada76b6 | ||
|
|
4a4dbba067 | ||
|
|
42309121da |
@@ -6,6 +6,7 @@ go_library(
|
||||
"blob.go",
|
||||
"ephemeral.go",
|
||||
"metrics.go",
|
||||
"pruner.go",
|
||||
],
|
||||
importpath = "github.com/prysmaticlabs/prysm/v4/beacon-chain/db/filesystem",
|
||||
visibility = ["//visibility:public"],
|
||||
@@ -19,7 +20,6 @@ go_library(
|
||||
"//proto/prysm/v1alpha1:go_default_library",
|
||||
"//runtime/logging:go_default_library",
|
||||
"//time/slots:go_default_library",
|
||||
"@com_github_ethereum_go_ethereum//common/hexutil:go_default_library",
|
||||
"@com_github_pkg_errors//:go_default_library",
|
||||
"@com_github_prometheus_client_golang//prometheus:go_default_library",
|
||||
"@com_github_prometheus_client_golang//prometheus/promauto:go_default_library",
|
||||
@@ -40,7 +40,6 @@ go_test(
|
||||
"//proto/prysm/v1alpha1:go_default_library",
|
||||
"//testing/require:go_default_library",
|
||||
"//testing/util:go_default_library",
|
||||
"@com_github_ethereum_go_ethereum//common/hexutil:go_default_library",
|
||||
"@com_github_pkg_errors//:go_default_library",
|
||||
"@com_github_prysmaticlabs_fastssz//:go_default_library",
|
||||
"@com_github_spf13_afero//:go_default_library",
|
||||
|
||||
@@ -1,28 +1,21 @@
|
||||
package filesystem
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common/hexutil"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/verification"
|
||||
fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams"
|
||||
"github.com/prysmaticlabs/prysm/v4/config/params"
|
||||
"github.com/prysmaticlabs/prysm/v4/consensus-types/blocks"
|
||||
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
|
||||
"github.com/prysmaticlabs/prysm/v4/io/file"
|
||||
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
|
||||
"github.com/prysmaticlabs/prysm/v4/runtime/logging"
|
||||
"github.com/prysmaticlabs/prysm/v4/time/slots"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/spf13/afero"
|
||||
)
|
||||
@@ -35,7 +28,6 @@ const (
|
||||
sszExt = "ssz"
|
||||
partExt = "part"
|
||||
|
||||
bufferEpochs = 2
|
||||
directoryPermissions = 0700
|
||||
)
|
||||
|
||||
@@ -45,11 +37,11 @@ type BlobStorageOption func(*BlobStorage) error
|
||||
// WithBlobRetentionEpochs is an option that changes the number of epochs blobs will be persisted.
|
||||
func WithBlobRetentionEpochs(e primitives.Epoch) BlobStorageOption {
|
||||
return func(b *BlobStorage) error {
|
||||
s, err := slots.EpochStart(e + bufferEpochs)
|
||||
pruner, err := newblobPruner(b.fs, e)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "could not set retentionSlots")
|
||||
return err
|
||||
}
|
||||
b.retentionSlots = s
|
||||
b.pruner = pruner
|
||||
return nil
|
||||
}
|
||||
}
|
||||
@@ -76,9 +68,8 @@ func NewBlobStorage(base string, opts ...BlobStorageOption) (*BlobStorage, error
|
||||
|
||||
// BlobStorage is the concrete implementation of the filesystem backend for saving and retrieving BlobSidecars.
|
||||
type BlobStorage struct {
|
||||
fs afero.Fs
|
||||
retentionSlots primitives.Slot
|
||||
prunedBefore atomic.Uint64
|
||||
fs afero.Fs
|
||||
pruner *blobPruner
|
||||
}
|
||||
|
||||
// Save saves blobs given a list of sidecars.
|
||||
@@ -94,7 +85,9 @@ func (bs *BlobStorage) Save(sidecar blocks.VerifiedROBlob) error {
|
||||
log.WithFields(logging.BlobFields(sidecar.ROBlob)).Debug("ignoring a duplicate blob sidecar Save attempt")
|
||||
return nil
|
||||
}
|
||||
bs.tryPrune(sidecar.Slot())
|
||||
if bs.pruner != nil {
|
||||
bs.pruner.try(sidecar.BlockRoot(), sidecar.Slot())
|
||||
}
|
||||
|
||||
// Serialize the ethpb.BlobSidecar to binary data using SSZ.
|
||||
sidecarData, err := sidecar.MarshalSSZ()
|
||||
@@ -218,7 +211,7 @@ func namerForSidecar(sc blocks.VerifiedROBlob) blobNamer {
|
||||
}
|
||||
|
||||
func (p blobNamer) dir() string {
|
||||
return fmt.Sprintf("%#x", p.root)
|
||||
return rootString(p.root)
|
||||
}
|
||||
|
||||
func (p blobNamer) fname(ext string) string {
|
||||
@@ -233,110 +226,6 @@ func (p blobNamer) path() string {
|
||||
return p.fname(sszExt)
|
||||
}
|
||||
|
||||
// Prune prunes blobs in the base directory based on the retention epoch.
|
||||
// It deletes blobs older than currentEpoch - (retentionEpochs+bufferEpochs).
|
||||
// This is so that we keep a slight buffer and blobs are deleted after n+2 epochs.
|
||||
func (bs *BlobStorage) Prune(pruneBefore primitives.Slot) error {
|
||||
t := time.Now()
|
||||
|
||||
log.Debug("Pruning old blobs")
|
||||
|
||||
folders, err := afero.ReadDir(bs.fs, ".")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
var totalPruned int
|
||||
for _, folder := range folders {
|
||||
if folder.IsDir() {
|
||||
num, err := bs.processFolder(folder, pruneBefore)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
blobsPrunedCounter.Add(float64(num))
|
||||
blobsTotalGauge.Add(-float64(num))
|
||||
totalPruned += num
|
||||
}
|
||||
}
|
||||
pruneTime := time.Since(t)
|
||||
|
||||
log.WithFields(log.Fields{
|
||||
"lastPrunedEpoch": slots.ToEpoch(pruneBefore),
|
||||
"pruneTime": pruneTime,
|
||||
"numberBlobsPruned": totalPruned,
|
||||
}).Debug("Pruned old blobs")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// processFolder will delete the folder of blobs if the blob slot is outside the
|
||||
// retention period. We determine the slot by looking at the first blob in the folder.
|
||||
func (bs *BlobStorage) processFolder(folder os.FileInfo, pruneBefore primitives.Slot) (int, error) {
|
||||
f, err := bs.fs.Open(filepath.Join(folder.Name(), "0."+sszExt))
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
defer func() {
|
||||
if err := f.Close(); err != nil {
|
||||
log.WithError(err).Errorf("Could not close blob file")
|
||||
}
|
||||
}()
|
||||
|
||||
slot, err := slotFromBlob(f)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
var num int
|
||||
if slot < pruneBefore {
|
||||
num, err = bs.countFiles(folder.Name())
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if err = bs.fs.RemoveAll(folder.Name()); err != nil {
|
||||
return 0, errors.Wrapf(err, "failed to delete blob %s", f.Name())
|
||||
}
|
||||
}
|
||||
return num, nil
|
||||
}
|
||||
|
||||
// slotFromBlob reads the ssz data of a file at the specified offset (8 + 131072 + 48 + 48 = 131176 bytes),
|
||||
// which is calculated based on the size of the BlobSidecar struct and is based on the size of the fields
|
||||
// preceding the slot information within SignedBeaconBlockHeader.
|
||||
func slotFromBlob(at io.ReaderAt) (primitives.Slot, error) {
|
||||
b := make([]byte, 8)
|
||||
_, err := at.ReadAt(b, 131176)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
rawSlot := binary.LittleEndian.Uint64(b)
|
||||
return primitives.Slot(rawSlot), nil
|
||||
}
|
||||
|
||||
// Delete removes the directory matching the provided block root and all the blobs it contains.
|
||||
func (bs *BlobStorage) Delete(root [32]byte) error {
|
||||
if err := bs.fs.RemoveAll(hexutil.Encode(root[:])); err != nil {
|
||||
return fmt.Errorf("failed to delete blobs for root %#x: %w", root, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// tryPrune checks whether we should prune and then calls prune
|
||||
func (bs *BlobStorage) tryPrune(latest primitives.Slot) {
|
||||
pruned := uint64(pruneBefore(latest, bs.retentionSlots))
|
||||
if bs.prunedBefore.Swap(pruned) == pruned {
|
||||
return
|
||||
}
|
||||
go func() {
|
||||
if err := bs.Prune(primitives.Slot(pruned)); err != nil {
|
||||
log.WithError(err).Errorf("failed to prune blobs from slot %d", latest)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func pruneBefore(latest primitives.Slot, offset primitives.Slot) primitives.Slot {
|
||||
// Safely compute the first slot in the epoch for the latest slot
|
||||
latest = latest - latest%params.BeaconConfig().SlotsPerEpoch
|
||||
if latest < offset {
|
||||
return 0
|
||||
}
|
||||
return latest - offset
|
||||
func rootString(root [32]byte) string {
|
||||
return fmt.Sprintf("%#x", root)
|
||||
}
|
||||
|
||||
@@ -7,7 +7,6 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common/hexutil"
|
||||
"github.com/pkg/errors"
|
||||
ssz "github.com/prysmaticlabs/fastssz"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/verification"
|
||||
@@ -79,7 +78,7 @@ func TestBlobStorage_SaveBlobData(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
|
||||
// Slot in first half of epoch therefore should not prune
|
||||
bs.tryPrune(testSidecars[0].Slot())
|
||||
bs.pruner.try(testSidecars[0].BlockRoot(), testSidecars[0].Slot())
|
||||
err = bs.Save(testSidecars[0])
|
||||
require.NoError(t, err)
|
||||
actual, err := bs.Get(testSidecars[0].BlockRoot(), testSidecars[0].Index)
|
||||
@@ -92,7 +91,7 @@ func TestBlobStorage_SaveBlobData(t *testing.T) {
|
||||
testSidecars1, err := verification.BlobSidecarSliceNoop(sidecars)
|
||||
require.NoError(t, err)
|
||||
// Slot in first half of epoch therefore should not prune
|
||||
bs.tryPrune(testSidecars1[0].Slot())
|
||||
bs.pruner.try(testSidecars1[0].BlockRoot(), testSidecars1[0].Slot())
|
||||
err = bs.Save(testSidecars1[0])
|
||||
require.NoError(t, err)
|
||||
// Check previous saved sidecar was not pruned
|
||||
@@ -109,7 +108,7 @@ func TestBlobStorage_SaveBlobData(t *testing.T) {
|
||||
_, sidecars = util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, 131187, fieldparams.MaxBlobsPerBlock)
|
||||
testSidecars2, err := verification.BlobSidecarSliceNoop(sidecars)
|
||||
// Slot in second half of epoch therefore should prune
|
||||
bs.tryPrune(testSidecars2[0].Slot())
|
||||
bs.pruner.try(testSidecars2[0].BlockRoot(), testSidecars2[0].Slot())
|
||||
require.NoError(t, err)
|
||||
err = bs.Save(testSidecars2[0])
|
||||
require.NoError(t, err)
|
||||
@@ -188,7 +187,7 @@ func TestBlobStoragePrune(t *testing.T) {
|
||||
require.NoError(t, bs.Save(sidecar))
|
||||
}
|
||||
|
||||
require.NoError(t, bs.Prune(currentSlot-bs.retentionSlots))
|
||||
require.NoError(t, bs.pruner.prune(currentSlot-bs.pruner.retain))
|
||||
|
||||
remainingFolders, err := afero.ReadDir(fs, ".")
|
||||
require.NoError(t, err)
|
||||
@@ -208,7 +207,7 @@ func TestBlobStoragePrune(t *testing.T) {
|
||||
slot += 10000
|
||||
}
|
||||
|
||||
require.NoError(t, bs.Prune(currentSlot-bs.retentionSlots))
|
||||
require.NoError(t, bs.pruner.prune(currentSlot-bs.pruner.retain))
|
||||
|
||||
remainingFolders, err := afero.ReadDir(fs, ".")
|
||||
require.NoError(t, err)
|
||||
@@ -237,41 +236,11 @@ func BenchmarkPruning(b *testing.B) {
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
err := bs.Prune(currentSlot)
|
||||
err := bs.pruner.prune(currentSlot)
|
||||
require.NoError(b, err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestBlobStorageDelete(t *testing.T) {
|
||||
fs, bs, err := NewEphemeralBlobStorageWithFs(t)
|
||||
require.NoError(t, err)
|
||||
rawRoot := "0xcf9bb70c98f58092c9d6459227c9765f984d240be9690e85179bc5a6f60366ad"
|
||||
blockRoot, err := hexutil.Decode(rawRoot)
|
||||
require.NoError(t, err)
|
||||
|
||||
_, sidecars := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, 1, fieldparams.MaxBlobsPerBlock)
|
||||
testSidecars, err := verification.BlobSidecarSliceNoop(sidecars)
|
||||
require.NoError(t, err)
|
||||
for _, sidecar := range testSidecars {
|
||||
require.NoError(t, bs.Save(sidecar))
|
||||
}
|
||||
|
||||
exists, err := afero.DirExists(fs, hexutil.Encode(blockRoot))
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, true, exists)
|
||||
|
||||
// Delete the directory corresponding to the block root
|
||||
require.NoError(t, bs.Delete(bytesutil.ToBytes32(blockRoot)))
|
||||
|
||||
// Ensure that the directory no longer exists after deletion
|
||||
exists, err = afero.DirExists(fs, hexutil.Encode(blockRoot))
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, false, exists)
|
||||
|
||||
// Deleting a non-existent root does not return an error.
|
||||
require.NoError(t, bs.Delete(bytesutil.ToBytes32([]byte{0x1})))
|
||||
}
|
||||
|
||||
func TestNewBlobStorage(t *testing.T) {
|
||||
_, err := NewBlobStorage(path.Join(t.TempDir(), "good"))
|
||||
require.NoError(t, err)
|
||||
|
||||
@@ -4,26 +4,30 @@ import (
|
||||
"testing"
|
||||
|
||||
"github.com/prysmaticlabs/prysm/v4/config/params"
|
||||
"github.com/prysmaticlabs/prysm/v4/time/slots"
|
||||
"github.com/spf13/afero"
|
||||
)
|
||||
|
||||
// NewEphemeralBlobStorage should only be used for tests.
|
||||
// The instance of BlobStorage returned is backed by an in-memory virtual filesystem,
|
||||
// improving test performance and simplifying cleanup.
|
||||
func NewEphemeralBlobStorage(_ testing.TB) *BlobStorage {
|
||||
return &BlobStorage{fs: afero.NewMemMapFs()}
|
||||
func NewEphemeralBlobStorage(t testing.TB) *BlobStorage {
|
||||
fs := afero.NewMemMapFs()
|
||||
pruner, err := newblobPruner(fs, params.BeaconConfig().MinEpochsForBlobsSidecarsRequest)
|
||||
if err != nil {
|
||||
t.Fatal("test setup issue", err)
|
||||
}
|
||||
return &BlobStorage{fs: fs, pruner: pruner}
|
||||
}
|
||||
|
||||
// NewEphemeralBlobStorageWithFs can be used by tests that want access to the virtual filesystem
|
||||
// in order to interact with it outside the parameters of the BlobStorage api.
|
||||
func NewEphemeralBlobStorageWithFs(_ testing.TB) (afero.Fs, *BlobStorage, error) {
|
||||
func NewEphemeralBlobStorageWithFs(t testing.TB) (afero.Fs, *BlobStorage, error) {
|
||||
fs := afero.NewMemMapFs()
|
||||
s, err := slots.EpochStart(params.BeaconConfig().MinEpochsForBlobsSidecarsRequest)
|
||||
pruner, err := newblobPruner(fs, params.BeaconConfig().MinEpochsForBlobsSidecarsRequest)
|
||||
if err != nil {
|
||||
return fs, &BlobStorage{}, err
|
||||
t.Fatal("test setup issue", err)
|
||||
}
|
||||
return fs, &BlobStorage{fs: fs, retentionSlots: s}, nil
|
||||
return fs, &BlobStorage{fs: fs, pruner: pruner}, nil
|
||||
}
|
||||
|
||||
type BlobMocker struct {
|
||||
|
||||
248
beacon-chain/db/filesystem/pruner.go
Normal file
248
beacon-chain/db/filesystem/pruner.go
Normal file
@@ -0,0 +1,248 @@
|
||||
package filesystem
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"io"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams"
|
||||
"github.com/prysmaticlabs/prysm/v4/config/params"
|
||||
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
|
||||
"github.com/prysmaticlabs/prysm/v4/time/slots"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/spf13/afero"
|
||||
)
|
||||
|
||||
const retentionBuffer primitives.Epoch = 2
|
||||
|
||||
var (
|
||||
errBlobSlotUnknown = errors.New("could not determine blob slot from files in storage")
|
||||
errPruningFailures = errors.New("blobs could not be pruned for some roots")
|
||||
)
|
||||
|
||||
type blobPruner struct {
|
||||
sync.Mutex
|
||||
slotMap *slotForRoot
|
||||
retain primitives.Slot
|
||||
fs afero.Fs
|
||||
prunedBefore atomic.Uint64
|
||||
}
|
||||
|
||||
func newblobPruner(fs afero.Fs, retain primitives.Epoch) (*blobPruner, error) {
|
||||
r, err := slots.EpochStart(retain + retentionBuffer)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "could not set retentionSlots")
|
||||
}
|
||||
return &blobPruner{fs: fs, retain: r, slotMap: newSlotForRoot()}, nil
|
||||
}
|
||||
|
||||
// tryPrune checks whether we should prune and then calls prune in a goroutine.
|
||||
func (p *blobPruner) try(root [32]byte, latest primitives.Slot) {
|
||||
p.slotMap.ensure(rootString(root), latest)
|
||||
pruned := uint64(pruneBefore(latest, p.retain))
|
||||
if p.prunedBefore.Swap(pruned) == pruned {
|
||||
return
|
||||
}
|
||||
go func() {
|
||||
if err := p.prune(primitives.Slot(pruned)); err != nil {
|
||||
log.WithError(err).Errorf("failed to prune blobs from slot %d", latest)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func pruneBefore(latest primitives.Slot, offset primitives.Slot) primitives.Slot {
|
||||
// Safely compute the first slot in the epoch for the latest slot
|
||||
latest = latest - latest%params.BeaconConfig().SlotsPerEpoch
|
||||
if latest < offset {
|
||||
return 0
|
||||
}
|
||||
return latest - offset
|
||||
}
|
||||
|
||||
// Prune prunes blobs in the base directory based on the retention epoch.
|
||||
// It deletes blobs older than currentEpoch - (retentionEpochs+bufferEpochs).
|
||||
// This is so that we keep a slight buffer and blobs are deleted after n+2 epochs.
|
||||
func (p *blobPruner) prune(pruneBefore primitives.Slot) error {
|
||||
log.Debug("Pruning old blobs")
|
||||
start := time.Now()
|
||||
totalPruned, totalErr := 0, 0
|
||||
defer func() {
|
||||
log.WithFields(log.Fields{
|
||||
"lastPrunedEpoch": slots.ToEpoch(pruneBefore),
|
||||
"pruneTime": time.Since(start).String(),
|
||||
"numberBlobsPruned": totalPruned,
|
||||
}).Debug("Pruned old blobs")
|
||||
blobsPrunedCounter.Add(float64(totalPruned))
|
||||
}()
|
||||
|
||||
dirs, err := p.listDir(".", filterRoot)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "unable to list root blobs directory")
|
||||
}
|
||||
for _, dir := range dirs {
|
||||
pruned, err := p.tryPruneDir(dir, pruneBefore)
|
||||
if err != nil {
|
||||
totalErr += 1
|
||||
log.WithError(err).WithField("directory", dir).Error("unable to prune directory")
|
||||
}
|
||||
totalPruned += pruned
|
||||
}
|
||||
|
||||
if totalErr > 0 {
|
||||
return errors.Wrapf(errPruningFailures, "pruning failed for %d root directories", totalErr)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// directoryMeta tries a few different ways to determine the slot for the given directory.
|
||||
// The seconds argument will be nil if the function did not need to list the directory, or
|
||||
// non-nil with a list of files if it did.
|
||||
func (p *blobPruner) directoryMeta(dir string) (primitives.Slot, []string, error) {
|
||||
root := filepath.Base(dir) // end of the path should be the blob directory, named by hex encoding of root
|
||||
// First try the cheap map lookup.
|
||||
slot, ok := p.slotMap.slot(root)
|
||||
if ok {
|
||||
return slot, nil, nil
|
||||
}
|
||||
|
||||
// Next try constructing the path to the zero index blob, which will always be present unless
|
||||
// the blob directory has been damaged by something like a restart during RemoveAll.
|
||||
slot, err := slotFromFile(filepath.Join(dir, "0."+sszExt), p.fs)
|
||||
if err == nil {
|
||||
p.slotMap.ensure(root, slot)
|
||||
return slot, nil, nil
|
||||
}
|
||||
|
||||
// Fall back if getting the slot from index zero failed -- look for any ssz file.
|
||||
files, err := p.listDir(dir, filterSsz)
|
||||
if err != nil {
|
||||
return 0, nil, errors.Wrapf(err, "failed to list blobs in directory %s", dir)
|
||||
}
|
||||
if len(files) == 0 {
|
||||
return 0, files, errors.Wrapf(errBlobSlotUnknown, "contained no blob files")
|
||||
}
|
||||
slot, err = slotFromFile(files[0], p.fs)
|
||||
if err != nil {
|
||||
return 0, nil, errors.Wrapf(err, "slot could not be read from blob file %s", files[0])
|
||||
}
|
||||
p.slotMap.ensure(root, slot)
|
||||
return slot, files, nil
|
||||
}
|
||||
|
||||
// tryPruneDir will delete the directory of blobs if the blob slot is outside the
|
||||
// retention period. We determine the slot by looking at the first blob in the directory.
|
||||
func (p *blobPruner) tryPruneDir(dir string, pruneBefore primitives.Slot) (int, error) {
|
||||
slot, files, err := p.directoryMeta(dir)
|
||||
if err != nil {
|
||||
return 0, errors.Wrapf(err, "could not determine slot for directory %s", dir)
|
||||
}
|
||||
if slot >= pruneBefore {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
if len(files) == 0 {
|
||||
files, err = p.listDir(dir, filterSsz)
|
||||
if err != nil {
|
||||
return 0, errors.Wrapf(err, "failed to list blobs in directory %s", dir)
|
||||
}
|
||||
}
|
||||
if err = p.fs.RemoveAll(dir); err != nil {
|
||||
return 0, errors.Wrapf(err, "failed to delete blobs in %s", dir)
|
||||
}
|
||||
return len(files), nil
|
||||
}
|
||||
|
||||
func slotFromFile(file string, fs afero.Fs) (primitives.Slot, error) {
|
||||
f, err := fs.Open(file)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
defer func() {
|
||||
if err := f.Close(); err != nil {
|
||||
log.WithError(err).Errorf("Could not close blob file")
|
||||
}
|
||||
}()
|
||||
return slotFromBlob(f)
|
||||
}
|
||||
|
||||
// slotFromBlob reads the ssz data of a file at the specified offset (8 + 131072 + 48 + 48 = 131176 bytes),
|
||||
// which is calculated based on the size of the BlobSidecar struct and is based on the size of the fields
|
||||
// preceding the slot information within SignedBeaconBlockHeader.
|
||||
func slotFromBlob(at io.ReaderAt) (primitives.Slot, error) {
|
||||
b := make([]byte, 8)
|
||||
_, err := at.ReadAt(b, 131176)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
rawSlot := binary.LittleEndian.Uint64(b)
|
||||
return primitives.Slot(rawSlot), nil
|
||||
}
|
||||
|
||||
func (p *blobPruner) listDir(dir string, filter func(string) bool) ([]string, error) {
|
||||
top, err := p.fs.Open(dir)
|
||||
defer func() {
|
||||
if err := top.Close(); err != nil {
|
||||
log.WithError(err).Errorf("Could not close file %s", dir)
|
||||
}
|
||||
}()
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "failed to open directory descriptor")
|
||||
}
|
||||
dirs, err := top.Readdirnames(-1)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "failed to read directory listing")
|
||||
}
|
||||
if filter != nil {
|
||||
filtered := make([]string, 0, len(dirs))
|
||||
for i := range dirs {
|
||||
if filter(dirs[i]) {
|
||||
filtered = append(filtered, dirs[i])
|
||||
}
|
||||
}
|
||||
return filtered, nil
|
||||
}
|
||||
return dirs, nil
|
||||
}
|
||||
|
||||
func filterRoot(s string) bool {
|
||||
return strings.HasPrefix(s, "0x")
|
||||
}
|
||||
|
||||
func filterSsz(s string) bool {
|
||||
return filepath.Ext(s) == sszExt
|
||||
}
|
||||
|
||||
func newSlotForRoot() *slotForRoot {
|
||||
return &slotForRoot{
|
||||
cache: make(map[string]primitives.Slot, params.BeaconConfig().MinEpochsForBlobsSidecarsRequest*fieldparams.SlotsPerEpoch),
|
||||
}
|
||||
}
|
||||
|
||||
type slotForRoot struct {
|
||||
sync.RWMutex
|
||||
cache map[string]primitives.Slot
|
||||
}
|
||||
|
||||
func (s *slotForRoot) ensure(key string, slot primitives.Slot) {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
s.cache[key] = slot
|
||||
}
|
||||
|
||||
func (s *slotForRoot) slot(key string) (primitives.Slot, bool) {
|
||||
s.RLock()
|
||||
defer s.RUnlock()
|
||||
slot, ok := s.cache[key]
|
||||
return slot, ok
|
||||
}
|
||||
|
||||
func (s *slotForRoot) evict(key string) {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
delete(s.cache, key)
|
||||
}
|
||||
Reference in New Issue
Block a user