mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-10 13:58:09 -05:00
Compare commits
1 Commits
v6.0.5-rc.
...
nested-blo
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6b49be1eef |
@@ -7,6 +7,7 @@ go_library(
|
||||
"cache.go",
|
||||
"log.go",
|
||||
"metrics.go",
|
||||
"migration.go",
|
||||
"mock.go",
|
||||
"pruner.go",
|
||||
],
|
||||
@@ -37,6 +38,7 @@ go_test(
|
||||
srcs = [
|
||||
"blob_test.go",
|
||||
"cache_test.go",
|
||||
"migration_test.go",
|
||||
"pruner_test.go",
|
||||
],
|
||||
embed = [":go_default_library"],
|
||||
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
"math"
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
@@ -37,6 +38,7 @@ const (
|
||||
partExt = "part"
|
||||
|
||||
directoryPermissions = 0700
|
||||
rootPrefixLen = 4
|
||||
)
|
||||
|
||||
// BlobStorageOption is a functional option for configuring a BlobStorage.
|
||||
@@ -321,8 +323,14 @@ func namerForSidecar(sc blocks.VerifiedROBlob) blobNamer {
|
||||
return blobNamer{root: sc.BlockRoot(), index: sc.Index}
|
||||
}
|
||||
|
||||
func (p blobNamer) groupDir() string {
|
||||
return oneBytePrefix(rootString(p.root))
|
||||
}
|
||||
|
||||
func (p blobNamer) dir() string {
|
||||
return rootString(p.root)
|
||||
rs := rootString(p.root)
|
||||
parentDir := oneBytePrefix(rs)
|
||||
return filepath.Join(parentDir, rs)
|
||||
}
|
||||
|
||||
func (p blobNamer) partPath(entropy string) string {
|
||||
@@ -337,6 +345,11 @@ func rootString(root [32]byte) string {
|
||||
return fmt.Sprintf("%#x", root)
|
||||
}
|
||||
|
||||
func oneBytePrefix(p string) string {
|
||||
// returns eg 0x00 from 0x0002fb4db510b8618b04dc82d023793739c26346a8b02eb73482e24b0fec0555
|
||||
return p[0:rootPrefixLen]
|
||||
}
|
||||
|
||||
func stringToRoot(str string) ([32]byte, error) {
|
||||
slice, err := hexutil.Decode(str)
|
||||
if err != nil {
|
||||
|
||||
@@ -170,10 +170,15 @@ func TestBlobStoragePrune(t *testing.T) {
|
||||
for _, sidecar := range testSidecars {
|
||||
require.NoError(t, bs.Save(sidecar))
|
||||
}
|
||||
namer := namerForSidecar(testSidecars[0])
|
||||
|
||||
require.NoError(t, bs.pruner.prune(currentSlot-bs.pruner.windowSize))
|
||||
beforeFolders, err := afero.ReadDir(fs, namer.groupDir())
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 1, len(beforeFolders))
|
||||
|
||||
remainingFolders, err := afero.ReadDir(fs, ".")
|
||||
require.NoError(t, bs.pruner.prune(currentSlot-bs.pruner.windowSize, nil))
|
||||
|
||||
remainingFolders, err := afero.ReadDir(fs, namer.groupDir())
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 0, len(remainingFolders))
|
||||
})
|
||||
@@ -181,36 +186,51 @@ func TestBlobStoragePrune(t *testing.T) {
|
||||
_, sidecars := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, 299, fieldparams.MaxBlobsPerBlock)
|
||||
testSidecars, err := verification.BlobSidecarSliceNoop(sidecars)
|
||||
require.NoError(t, err)
|
||||
namer := namerForSidecar(testSidecars[0])
|
||||
|
||||
for _, sidecar := range testSidecars[4:] {
|
||||
require.NoError(t, bs.Save(sidecar))
|
||||
}
|
||||
|
||||
require.NoError(t, bs.pruner.prune(currentSlot-bs.pruner.windowSize))
|
||||
require.NoError(t, bs.pruner.prune(currentSlot-bs.pruner.windowSize, nil))
|
||||
|
||||
remainingFolders, err := afero.ReadDir(fs, ".")
|
||||
remainingFolders, err := afero.ReadDir(fs, namer.groupDir())
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 0, len(remainingFolders))
|
||||
})
|
||||
t.Run("PruneMany", func(t *testing.T) {
|
||||
blockQty := 10
|
||||
slot := primitives.Slot(1)
|
||||
|
||||
for j := 0; j <= blockQty; j++ {
|
||||
root := bytesutil.ToBytes32(bytesutil.ToBytes(uint64(slot), 32))
|
||||
_, sidecars := util.GenerateTestDenebBlockWithSidecar(t, root, slot, fieldparams.MaxBlobsPerBlock)
|
||||
pruneBefore := currentSlot - bs.pruner.windowSize
|
||||
increment := primitives.Slot(10000)
|
||||
slots := []primitives.Slot{
|
||||
pruneBefore - increment,
|
||||
pruneBefore - (2 * increment),
|
||||
pruneBefore,
|
||||
pruneBefore + increment,
|
||||
pruneBefore + (2 * increment),
|
||||
}
|
||||
namers := make([]blobNamer, len(slots))
|
||||
for i, s := range slots {
|
||||
_, sidecars := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, s, 1)
|
||||
testSidecars, err := verification.BlobSidecarSliceNoop(sidecars)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, bs.Save(testSidecars[0]))
|
||||
|
||||
slot += 10000
|
||||
namers[i] = namerForSidecar(testSidecars[0])
|
||||
}
|
||||
|
||||
require.NoError(t, bs.pruner.prune(currentSlot-bs.pruner.windowSize))
|
||||
require.NoError(t, bs.pruner.prune(currentSlot-bs.pruner.windowSize, nil))
|
||||
|
||||
remainingFolders, err := afero.ReadDir(fs, ".")
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 4, len(remainingFolders))
|
||||
// first 2 subdirs should be removed
|
||||
for _, nmr := range namers[0:2] {
|
||||
entries, err := listDir(fs, nmr.dir())
|
||||
require.Equal(t, 0, len(entries))
|
||||
require.ErrorIs(t, err, os.ErrNotExist)
|
||||
}
|
||||
// the rest should still be there
|
||||
for _, nmr := range namers[2:] {
|
||||
entries, err := listDir(fs, nmr.dir())
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 1, len(entries))
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
@@ -234,7 +254,7 @@ func BenchmarkPruning(b *testing.B) {
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
err := bs.pruner.prune(currentSlot)
|
||||
err := bs.pruner.prune(currentSlot, nil)
|
||||
require.NoError(b, err)
|
||||
}
|
||||
}
|
||||
|
||||
61
beacon-chain/db/filesystem/migration.go
Normal file
61
beacon-chain/db/filesystem/migration.go
Normal file
@@ -0,0 +1,61 @@
|
||||
package filesystem
|
||||
|
||||
import (
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
"github.com/spf13/afero"
|
||||
)
|
||||
|
||||
type directoryMigrator interface {
|
||||
migrate(fs afero.Fs, dirs []string) error
|
||||
}
|
||||
|
||||
type oneBytePrefixMigrator struct {
|
||||
migrated []string
|
||||
}
|
||||
|
||||
func (m *oneBytePrefixMigrator) migrate(fs afero.Fs, dirs []string) error {
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
nMigrated := len(m.migrated)
|
||||
if nMigrated > 0 {
|
||||
log.WithField("elapsed", time.Since(start).String()).
|
||||
WithField("dirsMoved", nMigrated).
|
||||
Debug("Migrated blob subdirectories to byte-prefixed format")
|
||||
}
|
||||
}()
|
||||
groups := groupDirsByPrefix(dirs)
|
||||
return m.renameByGroup(fs, groups)
|
||||
}
|
||||
|
||||
func (m *oneBytePrefixMigrator) renameByGroup(fs afero.Fs, groups map[string][]string) error {
|
||||
for g, sd := range groups {
|
||||
// make the enclosing dir if needed
|
||||
if err := fs.MkdirAll(g, directoryPermissions); err != nil {
|
||||
return err
|
||||
}
|
||||
for _, dir := range sd {
|
||||
dest := filepath.Join(g, dir)
|
||||
// todo: check if directory exists and move files one at a time if so?
|
||||
// that shouldn't be a problem if we migrate in cache warmup and never write to old path.
|
||||
if err := fs.Rename(dir, dest); err != nil {
|
||||
return err
|
||||
}
|
||||
log.WithField("source", dir).WithField("dest", dest).Trace("Migrated legacy blob storage path.")
|
||||
m.migrated = append(m.migrated, dir)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func groupDirsByPrefix(dirs []string) map[string][]string {
|
||||
groups := make(map[string][]string)
|
||||
for _, dir := range dirs {
|
||||
if filterLegacy(dir) {
|
||||
key := oneBytePrefix(dir)
|
||||
groups[key] = append(groups[key], dir)
|
||||
}
|
||||
}
|
||||
return groups
|
||||
}
|
||||
216
beacon-chain/db/filesystem/migration_test.go
Normal file
216
beacon-chain/db/filesystem/migration_test.go
Normal file
@@ -0,0 +1,216 @@
|
||||
package filesystem
|
||||
|
||||
import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
"github.com/prysmaticlabs/prysm/v5/testing/require"
|
||||
"github.com/spf13/afero"
|
||||
)
|
||||
|
||||
func testSetupPaths(t *testing.T, fs afero.Fs, paths []migrateBeforeAfter) {
|
||||
for _, ba := range paths {
|
||||
p := ba.before()
|
||||
dir := filepath.Dir(p)
|
||||
require.NoError(t, fs.MkdirAll(dir, directoryPermissions))
|
||||
fh, err := fs.Create(p)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, fh.Close())
|
||||
// double check that we got the full path correct
|
||||
_, err = fs.Stat(ba.before())
|
||||
require.NoError(t, err)
|
||||
}
|
||||
}
|
||||
|
||||
func testAssertNewPaths(t *testing.T, fs afero.Fs, paths []migrateBeforeAfter) {
|
||||
for _, ba := range paths {
|
||||
if ba.before() != ba.after() {
|
||||
_, err := fs.Stat(ba.before())
|
||||
require.ErrorIs(t, err, os.ErrNotExist)
|
||||
dir := filepath.Dir(ba.before())
|
||||
_, err = listDir(fs, dir)
|
||||
require.ErrorIs(t, err, os.ErrNotExist)
|
||||
}
|
||||
_, err := fs.Stat(ba.after())
|
||||
require.NoError(t, err)
|
||||
}
|
||||
}
|
||||
|
||||
type migrateBeforeAfter [2]string
|
||||
|
||||
func (ba migrateBeforeAfter) before() string {
|
||||
return ba[0]
|
||||
}
|
||||
func (ba migrateBeforeAfter) after() string {
|
||||
return ba[1]
|
||||
}
|
||||
|
||||
func TestOneBytePrefixMigrator(t *testing.T) {
|
||||
cases := []struct {
|
||||
name string
|
||||
plan []migrateBeforeAfter
|
||||
err error
|
||||
}{
|
||||
{
|
||||
name: "happy path",
|
||||
plan: []migrateBeforeAfter{
|
||||
{
|
||||
"0x0125e54c64c925018c9296965a5b622d9f5ab626c10917860dcfb6aa09a0a00b/0.ssz",
|
||||
"0x01/0x0125e54c64c925018c9296965a5b622d9f5ab626c10917860dcfb6aa09a0a00b/0.ssz",
|
||||
},
|
||||
{
|
||||
"0x0127dba6fd30fdbb47e73e861d5c6e602b38ac3ddc945bb6a2fc4e10761e9a86/0.ssz",
|
||||
"0x01/0x0127dba6fd30fdbb47e73e861d5c6e602b38ac3ddc945bb6a2fc4e10761e9a86/0.ssz",
|
||||
},
|
||||
{
|
||||
"0x0127dba6fd30fdbb47e73e861d5c6e602b38ac3ddc945bb6a2fc4e10761e9a86/1.ssz",
|
||||
"0x01/0x0127dba6fd30fdbb47e73e861d5c6e602b38ac3ddc945bb6a2fc4e10761e9a86/1.ssz",
|
||||
},
|
||||
{
|
||||
"0x0232521756a0b965eab2c2245d7ad85feaeaf5f427cd14d1a7531f9d555b415c/0.ssz",
|
||||
"0x02/0x0232521756a0b965eab2c2245d7ad85feaeaf5f427cd14d1a7531f9d555b415c/0.ssz",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "different roots same prefix",
|
||||
plan: []migrateBeforeAfter{
|
||||
{
|
||||
"0xff/0xffff875e1d985c5ccb214894983f2428edb271f0f87b68ba7010e4a99df3b5cb/0.ssz",
|
||||
"0xff/0xffff875e1d985c5ccb214894983f2428edb271f0f87b68ba7010e4a99df3b5cb/0.ssz",
|
||||
},
|
||||
{
|
||||
"0xff0774a80664e1667dcd5a18bced866a596b6cef5f351c0f88cd310dd00cb16d/0.ssz",
|
||||
"0xff/0xff0774a80664e1667dcd5a18bced866a596b6cef5f351c0f88cd310dd00cb16d/0.ssz",
|
||||
},
|
||||
{
|
||||
"0x0125e54c64c925018c9296965a5b622d9f5ab626c10917860dcfb6aa09a0a00b/0.ssz",
|
||||
"0x01/0x0125e54c64c925018c9296965a5b622d9f5ab626c10917860dcfb6aa09a0a00b/0.ssz",
|
||||
},
|
||||
{
|
||||
"0x01/0x0127dba6fd30fdbb47e73e861d5c6e602b38ac3ddc945bb6a2fc4e10761e9a86/0.ssz",
|
||||
"0x01/0x0127dba6fd30fdbb47e73e861d5c6e602b38ac3ddc945bb6a2fc4e10761e9a86/0.ssz",
|
||||
},
|
||||
{
|
||||
"0x0127dba6fd30fdbb47e73e861d5c6e602b38ac3ddc945bb6a2fc4e10761e9a86/1.ssz",
|
||||
"0x01/0x0127dba6fd30fdbb47e73e861d5c6e602b38ac3ddc945bb6a2fc4e10761e9a86/1.ssz",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "mix old and new",
|
||||
plan: []migrateBeforeAfter{
|
||||
{
|
||||
"0xff/0xffff875e1d985c5ccb214894983f2428edb271f0f87b68ba7010e4a99df3b5cb/0.ssz",
|
||||
"0xff/0xffff875e1d985c5ccb214894983f2428edb271f0f87b68ba7010e4a99df3b5cb/0.ssz",
|
||||
},
|
||||
{
|
||||
"0x0125e54c64c925018c9296965a5b622d9f5ab626c10917860dcfb6aa09a0a00b/0.ssz",
|
||||
"0x01/0x0125e54c64c925018c9296965a5b622d9f5ab626c10917860dcfb6aa09a0a00b/0.ssz",
|
||||
},
|
||||
{
|
||||
"0xa0/0xa0000137a809ca8425e03ae6c4244eedc7c0aa37f2735883366bcaf1cca1e3f3/0.ssz",
|
||||
"0xa0/0xa0000137a809ca8425e03ae6c4244eedc7c0aa37f2735883366bcaf1cca1e3f3/0.ssz",
|
||||
},
|
||||
{
|
||||
"0xa0/0xa0000137a809ca8425e03ae6c4244eedc7c0aa37f2735883366bcaf1cca1e3f3/1.ssz",
|
||||
"0xa0/0xa0000137a809ca8425e03ae6c4244eedc7c0aa37f2735883366bcaf1cca1e3f3/1.ssz",
|
||||
},
|
||||
{
|
||||
"0x0127dba6fd30fdbb47e73e861d5c6e602b38ac3ddc945bb6a2fc4e10761e9a86/0.ssz",
|
||||
"0x01/0x0127dba6fd30fdbb47e73e861d5c6e602b38ac3ddc945bb6a2fc4e10761e9a86/0.ssz",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "overwrite existing root dir",
|
||||
plan: []migrateBeforeAfter{
|
||||
{
|
||||
"0xff/0xffff875e1d985c5ccb214894983f2428edb271f0f87b68ba7010e4a99df3b5cb/0.ssz",
|
||||
"0xff/0xffff875e1d985c5ccb214894983f2428edb271f0f87b68ba7010e4a99df3b5cb/0.ssz",
|
||||
},
|
||||
{
|
||||
"0xffff875e1d985c5ccb214894983f2428edb271f0f87b68ba7010e4a99df3b5cb/1.ssz",
|
||||
"0xff/0xffff875e1d985c5ccb214894983f2428edb271f0f87b68ba7010e4a99df3b5cb/0.ssz",
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, c := range cases {
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
fs, _ := NewEphemeralBlobStorageWithFs(t)
|
||||
testSetupPaths(t, fs, c.plan)
|
||||
entries, err := listDir(fs, ".")
|
||||
require.NoError(t, err)
|
||||
m := &oneBytePrefixMigrator{}
|
||||
err = m.migrate(fs, entries)
|
||||
if c.err != nil {
|
||||
require.ErrorIs(t, err, c.err)
|
||||
return
|
||||
}
|
||||
require.NoError(t, err)
|
||||
testAssertNewPaths(t, fs, c.plan)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestGroupDirsByPrefix(t *testing.T) {
|
||||
cases := []struct {
|
||||
name string
|
||||
dirs []string
|
||||
groups map[string][]string
|
||||
}{
|
||||
{
|
||||
name: "different buckets",
|
||||
dirs: []string{
|
||||
"0x00ff0b18f16d3f22e6386ec3d6718346358089be531cb3715cb61b34a08aca04",
|
||||
"0x0105400af093eeca95c1bf3874e97ec433244dd45222d850fe5ee50e53385f05",
|
||||
},
|
||||
groups: map[string][]string{
|
||||
"0x00": {"0x00ff0b18f16d3f22e6386ec3d6718346358089be531cb3715cb61b34a08aca04"},
|
||||
"0x01": {"0x0105400af093eeca95c1bf3874e97ec433244dd45222d850fe5ee50e53385f05"},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "same prefix, one bucket",
|
||||
dirs: []string{
|
||||
"0xfff5b975edfa1fbf807afb96e512bfa91eb41f78a9c9999d17f451d0077d3ed8",
|
||||
"0xffff0f4efdd596f39c602c7758d73b7ecf66856fd7649321f78fc8356a2e98b1",
|
||||
"0xffff875e1d985c5ccb214894983f2428edb271f0f87b68ba7010e4a99df3b5cb",
|
||||
},
|
||||
groups: map[string][]string{
|
||||
"0xff": {
|
||||
"0xfff5b975edfa1fbf807afb96e512bfa91eb41f78a9c9999d17f451d0077d3ed8",
|
||||
"0xffff0f4efdd596f39c602c7758d73b7ecf66856fd7649321f78fc8356a2e98b1",
|
||||
"0xffff875e1d985c5ccb214894983f2428edb271f0f87b68ba7010e4a99df3b5cb",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "mix of legacy and new",
|
||||
dirs: []string{
|
||||
"0xfff5b975edfa1fbf807afb96e512bfa91eb41f78a9c9999d17f451d0077d3ed8",
|
||||
"0xff/0xffff0f4efdd596f39c602c7758d73b7ecf66856fd7649321f78fc8356a2e98b1",
|
||||
"0xff/0xffff875e1d985c5ccb214894983f2428edb271f0f87b68ba7010e4a99df3b5cb",
|
||||
},
|
||||
groups: map[string][]string{
|
||||
"0xff": {"0xfff5b975edfa1fbf807afb96e512bfa91eb41f78a9c9999d17f451d0077d3ed8"},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, c := range cases {
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
groups := groupDirsByPrefix(c.dirs)
|
||||
require.Equal(t, len(c.groups), len(groups))
|
||||
for k, v := range c.groups {
|
||||
got := groups[k]
|
||||
require.Equal(t, len(v), len(got))
|
||||
// compare the lists
|
||||
require.DeepEqual(t, v, got)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -28,6 +28,10 @@ var (
|
||||
errNotBlobSSZ = errors.New("not a blob ssz file")
|
||||
)
|
||||
|
||||
// Full root in directory will be 66 chars, eg:
|
||||
// >>> len('0x0002fb4db510b8618b04dc82d023793739c26346a8b02eb73482e24b0fec0555') == 66
|
||||
const legacyRootLen = 66
|
||||
|
||||
type blobPruner struct {
|
||||
sync.Mutex
|
||||
prunedBefore atomic.Uint64
|
||||
@@ -74,7 +78,7 @@ func (p *blobPruner) notify(root [32]byte, latest primitives.Slot, idx uint64) e
|
||||
go func() {
|
||||
p.Lock()
|
||||
defer p.Unlock()
|
||||
if err := p.prune(primitives.Slot(pruned)); err != nil {
|
||||
if err := p.prune(primitives.Slot(pruned), nil); err != nil {
|
||||
log.WithError(err).Errorf("Failed to prune blobs from slot %d", latest)
|
||||
}
|
||||
}()
|
||||
@@ -93,7 +97,7 @@ func windowMin(latest, offset primitives.Slot) primitives.Slot {
|
||||
func (p *blobPruner) warmCache() error {
|
||||
p.Lock()
|
||||
defer p.Unlock()
|
||||
if err := p.prune(0); err != nil {
|
||||
if err := p.prune(0, &oneBytePrefixMigrator{}); err != nil {
|
||||
return err
|
||||
}
|
||||
if !p.warmed {
|
||||
@@ -115,7 +119,7 @@ func (p *blobPruner) waitForCache(ctx context.Context) (*blobStorageCache, error
|
||||
// Prune prunes blobs in the base directory based on the retention epoch.
|
||||
// It deletes blobs older than currentEpoch - (retentionEpochs+bufferEpochs).
|
||||
// This is so that we keep a slight buffer and blobs are deleted after n+2 epochs.
|
||||
func (p *blobPruner) prune(pruneBefore primitives.Slot) error {
|
||||
func (p *blobPruner) prune(pruneBefore primitives.Slot, m directoryMigrator) error {
|
||||
start := time.Now()
|
||||
totalPruned, totalErr := 0, 0
|
||||
// Customize logging/metrics behavior for the initial cache warmup when slot=0.
|
||||
@@ -135,12 +139,11 @@ func (p *blobPruner) prune(pruneBefore primitives.Slot) error {
|
||||
}()
|
||||
}
|
||||
|
||||
entries, err := listDir(p.fs, ".")
|
||||
entries, err := walkAndMigrateBasedir(p.fs, m)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "unable to list root blobs directory")
|
||||
}
|
||||
dirs := filter(entries, filterRoot)
|
||||
for _, dir := range dirs {
|
||||
for _, dir := range entries {
|
||||
pruned, err := p.tryPruneDir(dir, pruneBefore)
|
||||
if err != nil {
|
||||
totalErr += 1
|
||||
@@ -274,6 +277,40 @@ func slotFromBlob(at io.ReaderAt) (primitives.Slot, error) {
|
||||
return primitives.Slot(rawSlot), nil
|
||||
}
|
||||
|
||||
// walkAndMigrateBasedir manages executing any needed directory migrations while also returning a list of every
|
||||
// individual root directory containing blob files.
|
||||
func walkAndMigrateBasedir(fs afero.Fs, m directoryMigrator) ([]string, error) {
|
||||
listing := make([]string, 0)
|
||||
topDirs, err := listDir(fs, ".")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if m != nil {
|
||||
if err := m.migrate(fs, topDirs); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
// list all the subdirs to get the full listing.
|
||||
for i := range topDirs {
|
||||
dir := topDirs[i]
|
||||
// We're not worried about any dangling legacy format paths because migrator should have already done its job.
|
||||
if !filterRootGroupDir(dir) {
|
||||
continue
|
||||
}
|
||||
subdirs, err := listDir(fs, dir)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for _, sd := range subdirs {
|
||||
if !filterRoot(sd) {
|
||||
continue
|
||||
}
|
||||
listing = append(listing, filepath.Join(dir, sd))
|
||||
}
|
||||
}
|
||||
return listing, nil
|
||||
}
|
||||
|
||||
func listDir(fs afero.Fs, dir string) ([]string, error) {
|
||||
top, err := fs.Open(dir)
|
||||
if err != nil {
|
||||
@@ -306,6 +343,14 @@ func filterRoot(s string) bool {
|
||||
return strings.HasPrefix(s, "0x")
|
||||
}
|
||||
|
||||
func filterLegacy(s string) bool {
|
||||
return filterRoot(s) && len(s) == legacyRootLen
|
||||
}
|
||||
|
||||
func filterRootGroupDir(s string) bool {
|
||||
return filterRoot(s) && len(filepath.Base(s)) == rootPrefixLen
|
||||
}
|
||||
|
||||
var dotSszExt = "." + sszExt
|
||||
var dotPartExt = "." + partExt
|
||||
|
||||
|
||||
@@ -43,10 +43,11 @@ func TestTryPruneDir_CachedExpired(t *testing.T) {
|
||||
_, sidecars := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, slot, 1)
|
||||
sc, err := verification.BlobSidecarNoop(sidecars[0])
|
||||
require.NoError(t, err)
|
||||
rootStr := rootString(sc.BlockRoot())
|
||||
require.NoError(t, fs.Mkdir(rootStr, directoryPermissions)) // make empty directory
|
||||
namer := namerForSidecar(sc)
|
||||
dir := namer.dir()
|
||||
require.NoError(t, fs.Mkdir(dir, directoryPermissions)) // make empty directory
|
||||
require.NoError(t, pr.cache.ensure(sc.BlockRoot(), sc.Slot(), 0))
|
||||
pruned, err := pr.tryPruneDir(rootStr, slot+1)
|
||||
pruned, err := pr.tryPruneDir(dir, slot+1)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 0, pruned)
|
||||
})
|
||||
@@ -61,21 +62,21 @@ func TestTryPruneDir_CachedExpired(t *testing.T) {
|
||||
require.NoError(t, bs.Save(scs[1]))
|
||||
|
||||
// check that the root->slot is cached
|
||||
root := scs[0].BlockRoot()
|
||||
rootStr := rootString(root)
|
||||
namer := namerForSidecar(scs[0])
|
||||
dir := namer.dir()
|
||||
cs, cok := bs.pruner.cache.slot(scs[0].BlockRoot())
|
||||
require.Equal(t, true, cok)
|
||||
require.Equal(t, slot, cs)
|
||||
|
||||
// ensure that we see the saved files in the filesystem
|
||||
files, err := listDir(fs, rootStr)
|
||||
files, err := listDir(fs, dir)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 2, len(files))
|
||||
|
||||
pruned, err := bs.pruner.tryPruneDir(rootStr, slot+1)
|
||||
pruned, err := bs.pruner.tryPruneDir(dir, slot+1)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 2, pruned)
|
||||
files, err = listDir(fs, rootStr)
|
||||
files, err = listDir(fs, dir)
|
||||
require.ErrorIs(t, err, os.ErrNotExist)
|
||||
require.Equal(t, 0, len(files))
|
||||
})
|
||||
@@ -94,7 +95,8 @@ func TestTryPruneDir_SlotFromFile(t *testing.T) {
|
||||
|
||||
// check that the root->slot is cached
|
||||
root := scs[0].BlockRoot()
|
||||
rootStr := rootString(root)
|
||||
namer := namerForSidecar(scs[0])
|
||||
dir := namer.dir()
|
||||
cs, ok := bs.pruner.cache.slot(root)
|
||||
require.Equal(t, true, ok)
|
||||
require.Equal(t, slot, cs)
|
||||
@@ -104,14 +106,14 @@ func TestTryPruneDir_SlotFromFile(t *testing.T) {
|
||||
require.Equal(t, false, ok)
|
||||
|
||||
// ensure that we see the saved files in the filesystem
|
||||
files, err := listDir(fs, rootStr)
|
||||
files, err := listDir(fs, dir)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 2, len(files))
|
||||
|
||||
pruned, err := bs.pruner.tryPruneDir(rootStr, slot+1)
|
||||
pruned, err := bs.pruner.tryPruneDir(dir, slot+1)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 2, pruned)
|
||||
files, err = listDir(fs, rootStr)
|
||||
files, err = listDir(fs, dir)
|
||||
require.ErrorIs(t, err, os.ErrNotExist)
|
||||
require.Equal(t, 0, len(files))
|
||||
})
|
||||
@@ -128,24 +130,25 @@ func TestTryPruneDir_SlotFromFile(t *testing.T) {
|
||||
|
||||
// Evict slot mapping from the cache so that we trigger the file read path.
|
||||
root := scs[0].BlockRoot()
|
||||
rootStr := rootString(root)
|
||||
namer := namerForSidecar(scs[0])
|
||||
dir := namer.dir()
|
||||
bs.pruner.cache.evict(root)
|
||||
_, ok := bs.pruner.cache.slot(root)
|
||||
require.Equal(t, false, ok)
|
||||
|
||||
// Ensure that we see the saved files in the filesystem.
|
||||
files, err := listDir(fs, rootStr)
|
||||
files, err := listDir(fs, dir)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 2, len(files))
|
||||
|
||||
// This should use the slotFromFile code (simulating restart).
|
||||
// Setting pruneBefore == slot, so that the slot will be outside the window (at the boundary).
|
||||
pruned, err := bs.pruner.tryPruneDir(rootStr, slot)
|
||||
pruned, err := bs.pruner.tryPruneDir(dir, slot)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 0, pruned)
|
||||
|
||||
// Ensure files are still present.
|
||||
files, err = listDir(fs, rootStr)
|
||||
files, err = listDir(fs, dir)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 2, len(files))
|
||||
})
|
||||
|
||||
96
tools/downgrade-blob-storage/main.go
Normal file
96
tools/downgrade-blob-storage/main.go
Normal file
@@ -0,0 +1,96 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
)
|
||||
|
||||
var errUsage = errors.New("incorrect usage - missing blob path")
|
||||
|
||||
const newLen = 4 // eg '0xff'
|
||||
|
||||
func blobPath() (string, error) {
|
||||
if len(os.Args) < 2 {
|
||||
return "", errUsage
|
||||
}
|
||||
return os.Args[1], nil
|
||||
}
|
||||
|
||||
func usage(err error) {
|
||||
fmt.Printf("%s\n", err.Error())
|
||||
fmt.Println("downgrade-blob-storage: Move blob directories back to old format, without the single byte container directories at the top-level of the directory tree. usage:\n" + os.Args[0] + " <path to blobs dir>")
|
||||
}
|
||||
|
||||
func main() {
|
||||
bp, err := blobPath()
|
||||
if err != nil {
|
||||
if errors.Is(err, errUsage) {
|
||||
usage(err)
|
||||
}
|
||||
os.Exit(1)
|
||||
}
|
||||
if err := downgrade(bp); err != nil {
|
||||
fmt.Printf("fatal error: %s\n", err.Error())
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
|
||||
func downgrade(base string) error {
|
||||
top, err := os.Open(base) // #nosec G304
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// iterate over top-level blob dir, ie 'blobs' inside prysm's datadir
|
||||
topdirs, err := top.Readdirnames(0)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := top.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
nMoved := 0
|
||||
for _, td := range topdirs {
|
||||
// Ignore anything in the old layout.
|
||||
if !filterNew(td) {
|
||||
continue
|
||||
}
|
||||
dir, err := os.Open(filepath.Join(base, td)) // #nosec G304
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// List the subdirectoress of the short dir containers, eg if td == '0xff'
|
||||
// we want to move all the subdirectories in that dir.
|
||||
subs, err := dir.Readdirnames(0)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := dir.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
for _, sd := range subs {
|
||||
// this is the inner layer of directory nesting,
|
||||
// eg if 'td' == '0xff', 'sd' might be something like:
|
||||
// '0xffff875e1d985c5ccb214894983f2428edb271f0f87b68ba7010e4a99df3b5cb'
|
||||
src := filepath.Join(base, td, sd)
|
||||
target := filepath.Join(base, sd)
|
||||
fmt.Printf("moving %s -> %s\n", src, target)
|
||||
if err := os.Rename(src, target); err != nil {
|
||||
return err
|
||||
}
|
||||
nMoved += 1
|
||||
}
|
||||
}
|
||||
fmt.Printf("moved %d directories\n", nMoved)
|
||||
return nil
|
||||
}
|
||||
|
||||
func filterRoot(s string) bool {
|
||||
return strings.HasPrefix(s, "0x")
|
||||
}
|
||||
|
||||
func filterNew(s string) bool {
|
||||
return filterRoot(s) && len(s) == newLen
|
||||
}
|
||||
Reference in New Issue
Block a user