mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-09 15:37:56 -05:00
metrics for count and size of blob files (#13614)
* metrics for count and size of blob files * Update beacon-chain/db/filesystem/metrics.go Co-authored-by: Sammy Rosso <15244892+saolyn@users.noreply.github.com> * double check ssz extension --------- Co-authored-by: Kasey Kirkham <kasey@users.noreply.github.com> Co-authored-by: Sammy Rosso <15244892+saolyn@users.noreply.github.com>
This commit is contained in:
@@ -102,7 +102,9 @@ func (bs *BlobStorage) Save(sidecar blocks.VerifiedROBlob) error {
|
||||
return nil
|
||||
}
|
||||
if bs.pruner != nil {
|
||||
bs.pruner.notify(sidecar.BlockRoot(), sidecar.Slot())
|
||||
if err := bs.pruner.notify(sidecar.BlockRoot(), sidecar.Slot(), sidecar.Index); err != nil {
|
||||
return errors.Wrapf(err, "problem maintaining pruning cache/metrics for sidecar with root=%#x", sidecar.BlockRoot())
|
||||
}
|
||||
}
|
||||
|
||||
// Serialize the ethpb.BlobSidecar to binary data using SSZ.
|
||||
|
||||
@@ -22,7 +22,15 @@ var (
|
||||
Help: "Number of BlobSidecar files pruned.",
|
||||
})
|
||||
blobsWrittenCounter = promauto.NewCounter(prometheus.CounterOpts{
|
||||
Name: "blobs_written",
|
||||
Help: "Number of BlobSidecar files written.",
|
||||
Name: "blob_written",
|
||||
Help: "Number of BlobSidecar files written",
|
||||
})
|
||||
blobDiskCount = promauto.NewGauge(prometheus.GaugeOpts{
|
||||
Name: "blob_disk_count",
|
||||
Help: "Approximate number of blob files in storage",
|
||||
})
|
||||
blobDiskSize = promauto.NewGauge(prometheus.GaugeOpts{
|
||||
Name: "blob_disk_bytes",
|
||||
Help: "Approximate number of bytes occupied by blobs in storage",
|
||||
})
|
||||
)
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"io"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
@@ -20,9 +21,11 @@ import (
|
||||
)
|
||||
|
||||
const retentionBuffer primitives.Epoch = 2
|
||||
const bytesPerSidecar = 131928
|
||||
|
||||
var (
|
||||
errPruningFailures = errors.New("blobs could not be pruned for some roots")
|
||||
errNotBlobSSZ = errors.New("not a blob ssz file")
|
||||
)
|
||||
|
||||
type blobPruner struct {
|
||||
@@ -43,17 +46,20 @@ func newBlobPruner(fs afero.Fs, retain primitives.Epoch) (*blobPruner, error) {
|
||||
|
||||
// notify updates the pruner's view of root->blob mappings. This allows the pruner to build a cache
|
||||
// of root->slot mappings and decide when to evict old blobs based on the age of present blobs.
|
||||
func (p *blobPruner) notify(root [32]byte, latest primitives.Slot) {
|
||||
p.slotMap.ensure(rootString(root), latest)
|
||||
func (p *blobPruner) notify(root [32]byte, latest primitives.Slot, idx uint64) error {
|
||||
if err := p.slotMap.ensure(rootString(root), latest, idx); err != nil {
|
||||
return err
|
||||
}
|
||||
pruned := uint64(windowMin(latest, p.windowSize))
|
||||
if p.prunedBefore.Swap(pruned) == pruned {
|
||||
return
|
||||
return nil
|
||||
}
|
||||
go func() {
|
||||
if err := p.prune(primitives.Slot(pruned)); err != nil {
|
||||
log.WithError(err).Errorf("Failed to prune blobs from slot %d", latest)
|
||||
}
|
||||
}()
|
||||
return nil
|
||||
}
|
||||
|
||||
func windowMin(latest primitives.Slot, offset primitives.Slot) primitives.Slot {
|
||||
@@ -140,7 +146,15 @@ func (p *blobPruner) tryPruneDir(dir string, pruneBefore primitives.Slot) (int,
|
||||
if err != nil {
|
||||
return 0, errors.Wrapf(err, "slot could not be read from blob file %s", scFiles[0])
|
||||
}
|
||||
p.slotMap.ensure(root, slot)
|
||||
for i := range scFiles {
|
||||
idx, err := idxFromPath(scFiles[i])
|
||||
if err != nil {
|
||||
return 0, errors.Wrapf(err, "index could not be determined for blob file %s", scFiles[i])
|
||||
}
|
||||
if err := p.slotMap.ensure(root, slot, idx); err != nil {
|
||||
return 0, errors.Wrapf(err, "could not update prune cache for blob file %s", scFiles[i])
|
||||
}
|
||||
}
|
||||
if shouldRetain(slot, pruneBefore) {
|
||||
return 0, nil
|
||||
}
|
||||
@@ -169,6 +183,19 @@ func (p *blobPruner) tryPruneDir(dir string, pruneBefore primitives.Slot) (int,
|
||||
return len(scFiles), nil
|
||||
}
|
||||
|
||||
func idxFromPath(fname string) (uint64, error) {
|
||||
fname = path.Base(fname)
|
||||
|
||||
if filepath.Ext(fname) != dotSszExt {
|
||||
return 0, errors.Wrap(errNotBlobSSZ, "does not have .ssz extension")
|
||||
}
|
||||
parts := strings.Split(fname, ".")
|
||||
if len(parts) != 2 {
|
||||
return 0, errors.Wrap(errNotBlobSSZ, "unexpected filename structure (want <index>.ssz)")
|
||||
}
|
||||
return strconv.ParseUint(parts[0], 10, 64)
|
||||
}
|
||||
|
||||
func rootFromDir(dir string) string {
|
||||
return filepath.Base(dir) // end of the path should be the blob directory, named by hex encoding of root
|
||||
}
|
||||
@@ -245,30 +272,68 @@ func filterPart(s string) bool {
|
||||
|
||||
func newSlotForRoot() *slotForRoot {
|
||||
return &slotForRoot{
|
||||
cache: make(map[string]primitives.Slot, params.BeaconConfig().MinEpochsForBlobsSidecarsRequest*fieldparams.SlotsPerEpoch),
|
||||
cache: make(map[string]*slotCacheEntry, params.BeaconConfig().MinEpochsForBlobsSidecarsRequest*fieldparams.SlotsPerEpoch),
|
||||
}
|
||||
}
|
||||
|
||||
type slotCacheEntry struct {
|
||||
slot primitives.Slot
|
||||
mask [fieldparams.MaxBlobsPerBlock]bool
|
||||
}
|
||||
|
||||
type slotForRoot struct {
|
||||
sync.RWMutex
|
||||
cache map[string]primitives.Slot
|
||||
nBlobs float64
|
||||
cache map[string]*slotCacheEntry
|
||||
}
|
||||
|
||||
func (s *slotForRoot) ensure(key string, slot primitives.Slot) {
|
||||
func (s *slotForRoot) updateMetrics(delta float64) {
|
||||
s.nBlobs += delta
|
||||
blobDiskCount.Set(s.nBlobs)
|
||||
blobDiskSize.Set(s.nBlobs * bytesPerSidecar)
|
||||
}
|
||||
|
||||
func (s *slotForRoot) ensure(key string, slot primitives.Slot, idx uint64) error {
|
||||
if idx >= fieldparams.MaxBlobsPerBlock {
|
||||
return errIndexOutOfBounds
|
||||
}
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
s.cache[key] = slot
|
||||
v, ok := s.cache[key]
|
||||
if !ok {
|
||||
v = &slotCacheEntry{}
|
||||
}
|
||||
v.slot = slot
|
||||
if !v.mask[idx] {
|
||||
s.updateMetrics(1)
|
||||
}
|
||||
v.mask[idx] = true
|
||||
s.cache[key] = v
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *slotForRoot) slot(key string) (primitives.Slot, bool) {
|
||||
s.RLock()
|
||||
defer s.RUnlock()
|
||||
slot, ok := s.cache[key]
|
||||
return slot, ok
|
||||
v, ok := s.cache[key]
|
||||
if !ok {
|
||||
return 0, false
|
||||
}
|
||||
return v.slot, ok
|
||||
}
|
||||
|
||||
func (s *slotForRoot) evict(key string) {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
v, ok := s.cache[key]
|
||||
var deleted float64
|
||||
if ok {
|
||||
for i := range v.mask {
|
||||
if v.mask[i] {
|
||||
deleted += 1
|
||||
}
|
||||
}
|
||||
s.updateMetrics(-deleted)
|
||||
}
|
||||
delete(s.cache, key)
|
||||
}
|
||||
|
||||
@@ -28,7 +28,7 @@ func TestTryPruneDir_CachedNotExpired(t *testing.T) {
|
||||
root := fmt.Sprintf("%#x", sc.BlockRoot())
|
||||
// This slot is right on the edge of what would need to be pruned, so by adding it to the cache and
|
||||
// skipping any other test setup, we can be certain the hot cache path never touches the filesystem.
|
||||
pr.slotMap.ensure(root, sc.Slot())
|
||||
require.NoError(t, pr.slotMap.ensure(root, sc.Slot(), 0))
|
||||
pruned, err := pr.tryPruneDir(root, pr.windowSize)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 0, pruned)
|
||||
@@ -45,7 +45,7 @@ func TestTryPruneDir_CachedExpired(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
root := fmt.Sprintf("%#x", sc.BlockRoot())
|
||||
require.NoError(t, fs.Mkdir(root, directoryPermissions)) // make empty directory
|
||||
pr.slotMap.ensure(root, sc.Slot())
|
||||
require.NoError(t, pr.slotMap.ensure(root, sc.Slot(), 0))
|
||||
pruned, err := pr.tryPruneDir(root, slot+1)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 0, pruned)
|
||||
|
||||
Reference in New Issue
Block a user