mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-31 16:18:00 -05:00
Compare commits
1 Commits
e2e-debugg
...
nested-blo
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6b49be1eef |
@@ -7,6 +7,7 @@ go_library(
|
|||||||
"cache.go",
|
"cache.go",
|
||||||
"log.go",
|
"log.go",
|
||||||
"metrics.go",
|
"metrics.go",
|
||||||
|
"migration.go",
|
||||||
"mock.go",
|
"mock.go",
|
||||||
"pruner.go",
|
"pruner.go",
|
||||||
],
|
],
|
||||||
@@ -37,6 +38,7 @@ go_test(
|
|||||||
srcs = [
|
srcs = [
|
||||||
"blob_test.go",
|
"blob_test.go",
|
||||||
"cache_test.go",
|
"cache_test.go",
|
||||||
|
"migration_test.go",
|
||||||
"pruner_test.go",
|
"pruner_test.go",
|
||||||
],
|
],
|
||||||
embed = [":go_default_library"],
|
embed = [":go_default_library"],
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ import (
|
|||||||
"math"
|
"math"
|
||||||
"os"
|
"os"
|
||||||
"path"
|
"path"
|
||||||
|
"path/filepath"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
@@ -37,6 +38,7 @@ const (
|
|||||||
partExt = "part"
|
partExt = "part"
|
||||||
|
|
||||||
directoryPermissions = 0700
|
directoryPermissions = 0700
|
||||||
|
rootPrefixLen = 4
|
||||||
)
|
)
|
||||||
|
|
||||||
// BlobStorageOption is a functional option for configuring a BlobStorage.
|
// BlobStorageOption is a functional option for configuring a BlobStorage.
|
||||||
@@ -321,8 +323,14 @@ func namerForSidecar(sc blocks.VerifiedROBlob) blobNamer {
|
|||||||
return blobNamer{root: sc.BlockRoot(), index: sc.Index}
|
return blobNamer{root: sc.BlockRoot(), index: sc.Index}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p blobNamer) groupDir() string {
|
||||||
|
return oneBytePrefix(rootString(p.root))
|
||||||
|
}
|
||||||
|
|
||||||
func (p blobNamer) dir() string {
|
func (p blobNamer) dir() string {
|
||||||
return rootString(p.root)
|
rs := rootString(p.root)
|
||||||
|
parentDir := oneBytePrefix(rs)
|
||||||
|
return filepath.Join(parentDir, rs)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p blobNamer) partPath(entropy string) string {
|
func (p blobNamer) partPath(entropy string) string {
|
||||||
@@ -337,6 +345,11 @@ func rootString(root [32]byte) string {
|
|||||||
return fmt.Sprintf("%#x", root)
|
return fmt.Sprintf("%#x", root)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func oneBytePrefix(p string) string {
|
||||||
|
// returns eg 0x00 from 0x0002fb4db510b8618b04dc82d023793739c26346a8b02eb73482e24b0fec0555
|
||||||
|
return p[0:rootPrefixLen]
|
||||||
|
}
|
||||||
|
|
||||||
func stringToRoot(str string) ([32]byte, error) {
|
func stringToRoot(str string) ([32]byte, error) {
|
||||||
slice, err := hexutil.Decode(str)
|
slice, err := hexutil.Decode(str)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
@@ -170,10 +170,15 @@ func TestBlobStoragePrune(t *testing.T) {
|
|||||||
for _, sidecar := range testSidecars {
|
for _, sidecar := range testSidecars {
|
||||||
require.NoError(t, bs.Save(sidecar))
|
require.NoError(t, bs.Save(sidecar))
|
||||||
}
|
}
|
||||||
|
namer := namerForSidecar(testSidecars[0])
|
||||||
|
|
||||||
require.NoError(t, bs.pruner.prune(currentSlot-bs.pruner.windowSize))
|
beforeFolders, err := afero.ReadDir(fs, namer.groupDir())
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, 1, len(beforeFolders))
|
||||||
|
|
||||||
remainingFolders, err := afero.ReadDir(fs, ".")
|
require.NoError(t, bs.pruner.prune(currentSlot-bs.pruner.windowSize, nil))
|
||||||
|
|
||||||
|
remainingFolders, err := afero.ReadDir(fs, namer.groupDir())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, 0, len(remainingFolders))
|
require.Equal(t, 0, len(remainingFolders))
|
||||||
})
|
})
|
||||||
@@ -181,36 +186,51 @@ func TestBlobStoragePrune(t *testing.T) {
|
|||||||
_, sidecars := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, 299, fieldparams.MaxBlobsPerBlock)
|
_, sidecars := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, 299, fieldparams.MaxBlobsPerBlock)
|
||||||
testSidecars, err := verification.BlobSidecarSliceNoop(sidecars)
|
testSidecars, err := verification.BlobSidecarSliceNoop(sidecars)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
namer := namerForSidecar(testSidecars[0])
|
||||||
|
|
||||||
for _, sidecar := range testSidecars[4:] {
|
for _, sidecar := range testSidecars[4:] {
|
||||||
require.NoError(t, bs.Save(sidecar))
|
require.NoError(t, bs.Save(sidecar))
|
||||||
}
|
}
|
||||||
|
|
||||||
require.NoError(t, bs.pruner.prune(currentSlot-bs.pruner.windowSize))
|
require.NoError(t, bs.pruner.prune(currentSlot-bs.pruner.windowSize, nil))
|
||||||
|
|
||||||
remainingFolders, err := afero.ReadDir(fs, ".")
|
remainingFolders, err := afero.ReadDir(fs, namer.groupDir())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, 0, len(remainingFolders))
|
require.Equal(t, 0, len(remainingFolders))
|
||||||
})
|
})
|
||||||
t.Run("PruneMany", func(t *testing.T) {
|
t.Run("PruneMany", func(t *testing.T) {
|
||||||
blockQty := 10
|
pruneBefore := currentSlot - bs.pruner.windowSize
|
||||||
slot := primitives.Slot(1)
|
increment := primitives.Slot(10000)
|
||||||
|
slots := []primitives.Slot{
|
||||||
for j := 0; j <= blockQty; j++ {
|
pruneBefore - increment,
|
||||||
root := bytesutil.ToBytes32(bytesutil.ToBytes(uint64(slot), 32))
|
pruneBefore - (2 * increment),
|
||||||
_, sidecars := util.GenerateTestDenebBlockWithSidecar(t, root, slot, fieldparams.MaxBlobsPerBlock)
|
pruneBefore,
|
||||||
|
pruneBefore + increment,
|
||||||
|
pruneBefore + (2 * increment),
|
||||||
|
}
|
||||||
|
namers := make([]blobNamer, len(slots))
|
||||||
|
for i, s := range slots {
|
||||||
|
_, sidecars := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, s, 1)
|
||||||
testSidecars, err := verification.BlobSidecarSliceNoop(sidecars)
|
testSidecars, err := verification.BlobSidecarSliceNoop(sidecars)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.NoError(t, bs.Save(testSidecars[0]))
|
require.NoError(t, bs.Save(testSidecars[0]))
|
||||||
|
namers[i] = namerForSidecar(testSidecars[0])
|
||||||
slot += 10000
|
|
||||||
}
|
}
|
||||||
|
|
||||||
require.NoError(t, bs.pruner.prune(currentSlot-bs.pruner.windowSize))
|
require.NoError(t, bs.pruner.prune(currentSlot-bs.pruner.windowSize, nil))
|
||||||
|
|
||||||
remainingFolders, err := afero.ReadDir(fs, ".")
|
// first 2 subdirs should be removed
|
||||||
require.NoError(t, err)
|
for _, nmr := range namers[0:2] {
|
||||||
require.Equal(t, 4, len(remainingFolders))
|
entries, err := listDir(fs, nmr.dir())
|
||||||
|
require.Equal(t, 0, len(entries))
|
||||||
|
require.ErrorIs(t, err, os.ErrNotExist)
|
||||||
|
}
|
||||||
|
// the rest should still be there
|
||||||
|
for _, nmr := range namers[2:] {
|
||||||
|
entries, err := listDir(fs, nmr.dir())
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, 1, len(entries))
|
||||||
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -234,7 +254,7 @@ func BenchmarkPruning(b *testing.B) {
|
|||||||
b.ReportAllocs()
|
b.ReportAllocs()
|
||||||
b.ResetTimer()
|
b.ResetTimer()
|
||||||
for i := 0; i < b.N; i++ {
|
for i := 0; i < b.N; i++ {
|
||||||
err := bs.pruner.prune(currentSlot)
|
err := bs.pruner.prune(currentSlot, nil)
|
||||||
require.NoError(b, err)
|
require.NoError(b, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
61
beacon-chain/db/filesystem/migration.go
Normal file
61
beacon-chain/db/filesystem/migration.go
Normal file
@@ -0,0 +1,61 @@
|
|||||||
|
package filesystem
|
||||||
|
|
||||||
|
import (
|
||||||
|
"path/filepath"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/spf13/afero"
|
||||||
|
)
|
||||||
|
|
||||||
|
type directoryMigrator interface {
|
||||||
|
migrate(fs afero.Fs, dirs []string) error
|
||||||
|
}
|
||||||
|
|
||||||
|
type oneBytePrefixMigrator struct {
|
||||||
|
migrated []string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *oneBytePrefixMigrator) migrate(fs afero.Fs, dirs []string) error {
|
||||||
|
start := time.Now()
|
||||||
|
defer func() {
|
||||||
|
nMigrated := len(m.migrated)
|
||||||
|
if nMigrated > 0 {
|
||||||
|
log.WithField("elapsed", time.Since(start).String()).
|
||||||
|
WithField("dirsMoved", nMigrated).
|
||||||
|
Debug("Migrated blob subdirectories to byte-prefixed format")
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
groups := groupDirsByPrefix(dirs)
|
||||||
|
return m.renameByGroup(fs, groups)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *oneBytePrefixMigrator) renameByGroup(fs afero.Fs, groups map[string][]string) error {
|
||||||
|
for g, sd := range groups {
|
||||||
|
// make the enclosing dir if needed
|
||||||
|
if err := fs.MkdirAll(g, directoryPermissions); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
for _, dir := range sd {
|
||||||
|
dest := filepath.Join(g, dir)
|
||||||
|
// todo: check if directory exists and move files one at a time if so?
|
||||||
|
// that shouldn't be a problem if we migrate in cache warmup and never write to old path.
|
||||||
|
if err := fs.Rename(dir, dest); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
log.WithField("source", dir).WithField("dest", dest).Trace("Migrated legacy blob storage path.")
|
||||||
|
m.migrated = append(m.migrated, dir)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func groupDirsByPrefix(dirs []string) map[string][]string {
|
||||||
|
groups := make(map[string][]string)
|
||||||
|
for _, dir := range dirs {
|
||||||
|
if filterLegacy(dir) {
|
||||||
|
key := oneBytePrefix(dir)
|
||||||
|
groups[key] = append(groups[key], dir)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return groups
|
||||||
|
}
|
||||||
216
beacon-chain/db/filesystem/migration_test.go
Normal file
216
beacon-chain/db/filesystem/migration_test.go
Normal file
@@ -0,0 +1,216 @@
|
|||||||
|
package filesystem
|
||||||
|
|
||||||
|
import (
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/prysmaticlabs/prysm/v5/testing/require"
|
||||||
|
"github.com/spf13/afero"
|
||||||
|
)
|
||||||
|
|
||||||
|
func testSetupPaths(t *testing.T, fs afero.Fs, paths []migrateBeforeAfter) {
|
||||||
|
for _, ba := range paths {
|
||||||
|
p := ba.before()
|
||||||
|
dir := filepath.Dir(p)
|
||||||
|
require.NoError(t, fs.MkdirAll(dir, directoryPermissions))
|
||||||
|
fh, err := fs.Create(p)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NoError(t, fh.Close())
|
||||||
|
// double check that we got the full path correct
|
||||||
|
_, err = fs.Stat(ba.before())
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func testAssertNewPaths(t *testing.T, fs afero.Fs, paths []migrateBeforeAfter) {
|
||||||
|
for _, ba := range paths {
|
||||||
|
if ba.before() != ba.after() {
|
||||||
|
_, err := fs.Stat(ba.before())
|
||||||
|
require.ErrorIs(t, err, os.ErrNotExist)
|
||||||
|
dir := filepath.Dir(ba.before())
|
||||||
|
_, err = listDir(fs, dir)
|
||||||
|
require.ErrorIs(t, err, os.ErrNotExist)
|
||||||
|
}
|
||||||
|
_, err := fs.Stat(ba.after())
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type migrateBeforeAfter [2]string
|
||||||
|
|
||||||
|
func (ba migrateBeforeAfter) before() string {
|
||||||
|
return ba[0]
|
||||||
|
}
|
||||||
|
func (ba migrateBeforeAfter) after() string {
|
||||||
|
return ba[1]
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestOneBytePrefixMigrator(t *testing.T) {
|
||||||
|
cases := []struct {
|
||||||
|
name string
|
||||||
|
plan []migrateBeforeAfter
|
||||||
|
err error
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "happy path",
|
||||||
|
plan: []migrateBeforeAfter{
|
||||||
|
{
|
||||||
|
"0x0125e54c64c925018c9296965a5b622d9f5ab626c10917860dcfb6aa09a0a00b/0.ssz",
|
||||||
|
"0x01/0x0125e54c64c925018c9296965a5b622d9f5ab626c10917860dcfb6aa09a0a00b/0.ssz",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"0x0127dba6fd30fdbb47e73e861d5c6e602b38ac3ddc945bb6a2fc4e10761e9a86/0.ssz",
|
||||||
|
"0x01/0x0127dba6fd30fdbb47e73e861d5c6e602b38ac3ddc945bb6a2fc4e10761e9a86/0.ssz",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"0x0127dba6fd30fdbb47e73e861d5c6e602b38ac3ddc945bb6a2fc4e10761e9a86/1.ssz",
|
||||||
|
"0x01/0x0127dba6fd30fdbb47e73e861d5c6e602b38ac3ddc945bb6a2fc4e10761e9a86/1.ssz",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"0x0232521756a0b965eab2c2245d7ad85feaeaf5f427cd14d1a7531f9d555b415c/0.ssz",
|
||||||
|
"0x02/0x0232521756a0b965eab2c2245d7ad85feaeaf5f427cd14d1a7531f9d555b415c/0.ssz",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "different roots same prefix",
|
||||||
|
plan: []migrateBeforeAfter{
|
||||||
|
{
|
||||||
|
"0xff/0xffff875e1d985c5ccb214894983f2428edb271f0f87b68ba7010e4a99df3b5cb/0.ssz",
|
||||||
|
"0xff/0xffff875e1d985c5ccb214894983f2428edb271f0f87b68ba7010e4a99df3b5cb/0.ssz",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"0xff0774a80664e1667dcd5a18bced866a596b6cef5f351c0f88cd310dd00cb16d/0.ssz",
|
||||||
|
"0xff/0xff0774a80664e1667dcd5a18bced866a596b6cef5f351c0f88cd310dd00cb16d/0.ssz",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"0x0125e54c64c925018c9296965a5b622d9f5ab626c10917860dcfb6aa09a0a00b/0.ssz",
|
||||||
|
"0x01/0x0125e54c64c925018c9296965a5b622d9f5ab626c10917860dcfb6aa09a0a00b/0.ssz",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"0x01/0x0127dba6fd30fdbb47e73e861d5c6e602b38ac3ddc945bb6a2fc4e10761e9a86/0.ssz",
|
||||||
|
"0x01/0x0127dba6fd30fdbb47e73e861d5c6e602b38ac3ddc945bb6a2fc4e10761e9a86/0.ssz",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"0x0127dba6fd30fdbb47e73e861d5c6e602b38ac3ddc945bb6a2fc4e10761e9a86/1.ssz",
|
||||||
|
"0x01/0x0127dba6fd30fdbb47e73e861d5c6e602b38ac3ddc945bb6a2fc4e10761e9a86/1.ssz",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "mix old and new",
|
||||||
|
plan: []migrateBeforeAfter{
|
||||||
|
{
|
||||||
|
"0xff/0xffff875e1d985c5ccb214894983f2428edb271f0f87b68ba7010e4a99df3b5cb/0.ssz",
|
||||||
|
"0xff/0xffff875e1d985c5ccb214894983f2428edb271f0f87b68ba7010e4a99df3b5cb/0.ssz",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"0x0125e54c64c925018c9296965a5b622d9f5ab626c10917860dcfb6aa09a0a00b/0.ssz",
|
||||||
|
"0x01/0x0125e54c64c925018c9296965a5b622d9f5ab626c10917860dcfb6aa09a0a00b/0.ssz",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"0xa0/0xa0000137a809ca8425e03ae6c4244eedc7c0aa37f2735883366bcaf1cca1e3f3/0.ssz",
|
||||||
|
"0xa0/0xa0000137a809ca8425e03ae6c4244eedc7c0aa37f2735883366bcaf1cca1e3f3/0.ssz",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"0xa0/0xa0000137a809ca8425e03ae6c4244eedc7c0aa37f2735883366bcaf1cca1e3f3/1.ssz",
|
||||||
|
"0xa0/0xa0000137a809ca8425e03ae6c4244eedc7c0aa37f2735883366bcaf1cca1e3f3/1.ssz",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"0x0127dba6fd30fdbb47e73e861d5c6e602b38ac3ddc945bb6a2fc4e10761e9a86/0.ssz",
|
||||||
|
"0x01/0x0127dba6fd30fdbb47e73e861d5c6e602b38ac3ddc945bb6a2fc4e10761e9a86/0.ssz",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "overwrite existing root dir",
|
||||||
|
plan: []migrateBeforeAfter{
|
||||||
|
{
|
||||||
|
"0xff/0xffff875e1d985c5ccb214894983f2428edb271f0f87b68ba7010e4a99df3b5cb/0.ssz",
|
||||||
|
"0xff/0xffff875e1d985c5ccb214894983f2428edb271f0f87b68ba7010e4a99df3b5cb/0.ssz",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"0xffff875e1d985c5ccb214894983f2428edb271f0f87b68ba7010e4a99df3b5cb/1.ssz",
|
||||||
|
"0xff/0xffff875e1d985c5ccb214894983f2428edb271f0f87b68ba7010e4a99df3b5cb/0.ssz",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, c := range cases {
|
||||||
|
t.Run(c.name, func(t *testing.T) {
|
||||||
|
fs, _ := NewEphemeralBlobStorageWithFs(t)
|
||||||
|
testSetupPaths(t, fs, c.plan)
|
||||||
|
entries, err := listDir(fs, ".")
|
||||||
|
require.NoError(t, err)
|
||||||
|
m := &oneBytePrefixMigrator{}
|
||||||
|
err = m.migrate(fs, entries)
|
||||||
|
if c.err != nil {
|
||||||
|
require.ErrorIs(t, err, c.err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
require.NoError(t, err)
|
||||||
|
testAssertNewPaths(t, fs, c.plan)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGroupDirsByPrefix(t *testing.T) {
|
||||||
|
cases := []struct {
|
||||||
|
name string
|
||||||
|
dirs []string
|
||||||
|
groups map[string][]string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "different buckets",
|
||||||
|
dirs: []string{
|
||||||
|
"0x00ff0b18f16d3f22e6386ec3d6718346358089be531cb3715cb61b34a08aca04",
|
||||||
|
"0x0105400af093eeca95c1bf3874e97ec433244dd45222d850fe5ee50e53385f05",
|
||||||
|
},
|
||||||
|
groups: map[string][]string{
|
||||||
|
"0x00": {"0x00ff0b18f16d3f22e6386ec3d6718346358089be531cb3715cb61b34a08aca04"},
|
||||||
|
"0x01": {"0x0105400af093eeca95c1bf3874e97ec433244dd45222d850fe5ee50e53385f05"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "same prefix, one bucket",
|
||||||
|
dirs: []string{
|
||||||
|
"0xfff5b975edfa1fbf807afb96e512bfa91eb41f78a9c9999d17f451d0077d3ed8",
|
||||||
|
"0xffff0f4efdd596f39c602c7758d73b7ecf66856fd7649321f78fc8356a2e98b1",
|
||||||
|
"0xffff875e1d985c5ccb214894983f2428edb271f0f87b68ba7010e4a99df3b5cb",
|
||||||
|
},
|
||||||
|
groups: map[string][]string{
|
||||||
|
"0xff": {
|
||||||
|
"0xfff5b975edfa1fbf807afb96e512bfa91eb41f78a9c9999d17f451d0077d3ed8",
|
||||||
|
"0xffff0f4efdd596f39c602c7758d73b7ecf66856fd7649321f78fc8356a2e98b1",
|
||||||
|
"0xffff875e1d985c5ccb214894983f2428edb271f0f87b68ba7010e4a99df3b5cb",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "mix of legacy and new",
|
||||||
|
dirs: []string{
|
||||||
|
"0xfff5b975edfa1fbf807afb96e512bfa91eb41f78a9c9999d17f451d0077d3ed8",
|
||||||
|
"0xff/0xffff0f4efdd596f39c602c7758d73b7ecf66856fd7649321f78fc8356a2e98b1",
|
||||||
|
"0xff/0xffff875e1d985c5ccb214894983f2428edb271f0f87b68ba7010e4a99df3b5cb",
|
||||||
|
},
|
||||||
|
groups: map[string][]string{
|
||||||
|
"0xff": {"0xfff5b975edfa1fbf807afb96e512bfa91eb41f78a9c9999d17f451d0077d3ed8"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, c := range cases {
|
||||||
|
t.Run(c.name, func(t *testing.T) {
|
||||||
|
groups := groupDirsByPrefix(c.dirs)
|
||||||
|
require.Equal(t, len(c.groups), len(groups))
|
||||||
|
for k, v := range c.groups {
|
||||||
|
got := groups[k]
|
||||||
|
require.Equal(t, len(v), len(got))
|
||||||
|
// compare the lists
|
||||||
|
require.DeepEqual(t, v, got)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -28,6 +28,10 @@ var (
|
|||||||
errNotBlobSSZ = errors.New("not a blob ssz file")
|
errNotBlobSSZ = errors.New("not a blob ssz file")
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Full root in directory will be 66 chars, eg:
|
||||||
|
// >>> len('0x0002fb4db510b8618b04dc82d023793739c26346a8b02eb73482e24b0fec0555') == 66
|
||||||
|
const legacyRootLen = 66
|
||||||
|
|
||||||
type blobPruner struct {
|
type blobPruner struct {
|
||||||
sync.Mutex
|
sync.Mutex
|
||||||
prunedBefore atomic.Uint64
|
prunedBefore atomic.Uint64
|
||||||
@@ -74,7 +78,7 @@ func (p *blobPruner) notify(root [32]byte, latest primitives.Slot, idx uint64) e
|
|||||||
go func() {
|
go func() {
|
||||||
p.Lock()
|
p.Lock()
|
||||||
defer p.Unlock()
|
defer p.Unlock()
|
||||||
if err := p.prune(primitives.Slot(pruned)); err != nil {
|
if err := p.prune(primitives.Slot(pruned), nil); err != nil {
|
||||||
log.WithError(err).Errorf("Failed to prune blobs from slot %d", latest)
|
log.WithError(err).Errorf("Failed to prune blobs from slot %d", latest)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
@@ -93,7 +97,7 @@ func windowMin(latest, offset primitives.Slot) primitives.Slot {
|
|||||||
func (p *blobPruner) warmCache() error {
|
func (p *blobPruner) warmCache() error {
|
||||||
p.Lock()
|
p.Lock()
|
||||||
defer p.Unlock()
|
defer p.Unlock()
|
||||||
if err := p.prune(0); err != nil {
|
if err := p.prune(0, &oneBytePrefixMigrator{}); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if !p.warmed {
|
if !p.warmed {
|
||||||
@@ -115,7 +119,7 @@ func (p *blobPruner) waitForCache(ctx context.Context) (*blobStorageCache, error
|
|||||||
// Prune prunes blobs in the base directory based on the retention epoch.
|
// Prune prunes blobs in the base directory based on the retention epoch.
|
||||||
// It deletes blobs older than currentEpoch - (retentionEpochs+bufferEpochs).
|
// It deletes blobs older than currentEpoch - (retentionEpochs+bufferEpochs).
|
||||||
// This is so that we keep a slight buffer and blobs are deleted after n+2 epochs.
|
// This is so that we keep a slight buffer and blobs are deleted after n+2 epochs.
|
||||||
func (p *blobPruner) prune(pruneBefore primitives.Slot) error {
|
func (p *blobPruner) prune(pruneBefore primitives.Slot, m directoryMigrator) error {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
totalPruned, totalErr := 0, 0
|
totalPruned, totalErr := 0, 0
|
||||||
// Customize logging/metrics behavior for the initial cache warmup when slot=0.
|
// Customize logging/metrics behavior for the initial cache warmup when slot=0.
|
||||||
@@ -135,12 +139,11 @@ func (p *blobPruner) prune(pruneBefore primitives.Slot) error {
|
|||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
entries, err := listDir(p.fs, ".")
|
entries, err := walkAndMigrateBasedir(p.fs, m)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "unable to list root blobs directory")
|
return errors.Wrap(err, "unable to list root blobs directory")
|
||||||
}
|
}
|
||||||
dirs := filter(entries, filterRoot)
|
for _, dir := range entries {
|
||||||
for _, dir := range dirs {
|
|
||||||
pruned, err := p.tryPruneDir(dir, pruneBefore)
|
pruned, err := p.tryPruneDir(dir, pruneBefore)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
totalErr += 1
|
totalErr += 1
|
||||||
@@ -274,6 +277,40 @@ func slotFromBlob(at io.ReaderAt) (primitives.Slot, error) {
|
|||||||
return primitives.Slot(rawSlot), nil
|
return primitives.Slot(rawSlot), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// walkAndMigrateBasedir manages executing any needed directory migrations while also returning a list of every
|
||||||
|
// individual root directory containing blob files.
|
||||||
|
func walkAndMigrateBasedir(fs afero.Fs, m directoryMigrator) ([]string, error) {
|
||||||
|
listing := make([]string, 0)
|
||||||
|
topDirs, err := listDir(fs, ".")
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if m != nil {
|
||||||
|
if err := m.migrate(fs, topDirs); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// list all the subdirs to get the full listing.
|
||||||
|
for i := range topDirs {
|
||||||
|
dir := topDirs[i]
|
||||||
|
// We're not worried about any dangling legacy format paths because migrator should have already done its job.
|
||||||
|
if !filterRootGroupDir(dir) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
subdirs, err := listDir(fs, dir)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
for _, sd := range subdirs {
|
||||||
|
if !filterRoot(sd) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
listing = append(listing, filepath.Join(dir, sd))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return listing, nil
|
||||||
|
}
|
||||||
|
|
||||||
func listDir(fs afero.Fs, dir string) ([]string, error) {
|
func listDir(fs afero.Fs, dir string) ([]string, error) {
|
||||||
top, err := fs.Open(dir)
|
top, err := fs.Open(dir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -306,6 +343,14 @@ func filterRoot(s string) bool {
|
|||||||
return strings.HasPrefix(s, "0x")
|
return strings.HasPrefix(s, "0x")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func filterLegacy(s string) bool {
|
||||||
|
return filterRoot(s) && len(s) == legacyRootLen
|
||||||
|
}
|
||||||
|
|
||||||
|
func filterRootGroupDir(s string) bool {
|
||||||
|
return filterRoot(s) && len(filepath.Base(s)) == rootPrefixLen
|
||||||
|
}
|
||||||
|
|
||||||
var dotSszExt = "." + sszExt
|
var dotSszExt = "." + sszExt
|
||||||
var dotPartExt = "." + partExt
|
var dotPartExt = "." + partExt
|
||||||
|
|
||||||
|
|||||||
@@ -43,10 +43,11 @@ func TestTryPruneDir_CachedExpired(t *testing.T) {
|
|||||||
_, sidecars := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, slot, 1)
|
_, sidecars := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, slot, 1)
|
||||||
sc, err := verification.BlobSidecarNoop(sidecars[0])
|
sc, err := verification.BlobSidecarNoop(sidecars[0])
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
rootStr := rootString(sc.BlockRoot())
|
namer := namerForSidecar(sc)
|
||||||
require.NoError(t, fs.Mkdir(rootStr, directoryPermissions)) // make empty directory
|
dir := namer.dir()
|
||||||
|
require.NoError(t, fs.Mkdir(dir, directoryPermissions)) // make empty directory
|
||||||
require.NoError(t, pr.cache.ensure(sc.BlockRoot(), sc.Slot(), 0))
|
require.NoError(t, pr.cache.ensure(sc.BlockRoot(), sc.Slot(), 0))
|
||||||
pruned, err := pr.tryPruneDir(rootStr, slot+1)
|
pruned, err := pr.tryPruneDir(dir, slot+1)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, 0, pruned)
|
require.Equal(t, 0, pruned)
|
||||||
})
|
})
|
||||||
@@ -61,21 +62,21 @@ func TestTryPruneDir_CachedExpired(t *testing.T) {
|
|||||||
require.NoError(t, bs.Save(scs[1]))
|
require.NoError(t, bs.Save(scs[1]))
|
||||||
|
|
||||||
// check that the root->slot is cached
|
// check that the root->slot is cached
|
||||||
root := scs[0].BlockRoot()
|
namer := namerForSidecar(scs[0])
|
||||||
rootStr := rootString(root)
|
dir := namer.dir()
|
||||||
cs, cok := bs.pruner.cache.slot(scs[0].BlockRoot())
|
cs, cok := bs.pruner.cache.slot(scs[0].BlockRoot())
|
||||||
require.Equal(t, true, cok)
|
require.Equal(t, true, cok)
|
||||||
require.Equal(t, slot, cs)
|
require.Equal(t, slot, cs)
|
||||||
|
|
||||||
// ensure that we see the saved files in the filesystem
|
// ensure that we see the saved files in the filesystem
|
||||||
files, err := listDir(fs, rootStr)
|
files, err := listDir(fs, dir)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, 2, len(files))
|
require.Equal(t, 2, len(files))
|
||||||
|
|
||||||
pruned, err := bs.pruner.tryPruneDir(rootStr, slot+1)
|
pruned, err := bs.pruner.tryPruneDir(dir, slot+1)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, 2, pruned)
|
require.Equal(t, 2, pruned)
|
||||||
files, err = listDir(fs, rootStr)
|
files, err = listDir(fs, dir)
|
||||||
require.ErrorIs(t, err, os.ErrNotExist)
|
require.ErrorIs(t, err, os.ErrNotExist)
|
||||||
require.Equal(t, 0, len(files))
|
require.Equal(t, 0, len(files))
|
||||||
})
|
})
|
||||||
@@ -94,7 +95,8 @@ func TestTryPruneDir_SlotFromFile(t *testing.T) {
|
|||||||
|
|
||||||
// check that the root->slot is cached
|
// check that the root->slot is cached
|
||||||
root := scs[0].BlockRoot()
|
root := scs[0].BlockRoot()
|
||||||
rootStr := rootString(root)
|
namer := namerForSidecar(scs[0])
|
||||||
|
dir := namer.dir()
|
||||||
cs, ok := bs.pruner.cache.slot(root)
|
cs, ok := bs.pruner.cache.slot(root)
|
||||||
require.Equal(t, true, ok)
|
require.Equal(t, true, ok)
|
||||||
require.Equal(t, slot, cs)
|
require.Equal(t, slot, cs)
|
||||||
@@ -104,14 +106,14 @@ func TestTryPruneDir_SlotFromFile(t *testing.T) {
|
|||||||
require.Equal(t, false, ok)
|
require.Equal(t, false, ok)
|
||||||
|
|
||||||
// ensure that we see the saved files in the filesystem
|
// ensure that we see the saved files in the filesystem
|
||||||
files, err := listDir(fs, rootStr)
|
files, err := listDir(fs, dir)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, 2, len(files))
|
require.Equal(t, 2, len(files))
|
||||||
|
|
||||||
pruned, err := bs.pruner.tryPruneDir(rootStr, slot+1)
|
pruned, err := bs.pruner.tryPruneDir(dir, slot+1)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, 2, pruned)
|
require.Equal(t, 2, pruned)
|
||||||
files, err = listDir(fs, rootStr)
|
files, err = listDir(fs, dir)
|
||||||
require.ErrorIs(t, err, os.ErrNotExist)
|
require.ErrorIs(t, err, os.ErrNotExist)
|
||||||
require.Equal(t, 0, len(files))
|
require.Equal(t, 0, len(files))
|
||||||
})
|
})
|
||||||
@@ -128,24 +130,25 @@ func TestTryPruneDir_SlotFromFile(t *testing.T) {
|
|||||||
|
|
||||||
// Evict slot mapping from the cache so that we trigger the file read path.
|
// Evict slot mapping from the cache so that we trigger the file read path.
|
||||||
root := scs[0].BlockRoot()
|
root := scs[0].BlockRoot()
|
||||||
rootStr := rootString(root)
|
namer := namerForSidecar(scs[0])
|
||||||
|
dir := namer.dir()
|
||||||
bs.pruner.cache.evict(root)
|
bs.pruner.cache.evict(root)
|
||||||
_, ok := bs.pruner.cache.slot(root)
|
_, ok := bs.pruner.cache.slot(root)
|
||||||
require.Equal(t, false, ok)
|
require.Equal(t, false, ok)
|
||||||
|
|
||||||
// Ensure that we see the saved files in the filesystem.
|
// Ensure that we see the saved files in the filesystem.
|
||||||
files, err := listDir(fs, rootStr)
|
files, err := listDir(fs, dir)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, 2, len(files))
|
require.Equal(t, 2, len(files))
|
||||||
|
|
||||||
// This should use the slotFromFile code (simulating restart).
|
// This should use the slotFromFile code (simulating restart).
|
||||||
// Setting pruneBefore == slot, so that the slot will be outside the window (at the boundary).
|
// Setting pruneBefore == slot, so that the slot will be outside the window (at the boundary).
|
||||||
pruned, err := bs.pruner.tryPruneDir(rootStr, slot)
|
pruned, err := bs.pruner.tryPruneDir(dir, slot)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, 0, pruned)
|
require.Equal(t, 0, pruned)
|
||||||
|
|
||||||
// Ensure files are still present.
|
// Ensure files are still present.
|
||||||
files, err = listDir(fs, rootStr)
|
files, err = listDir(fs, dir)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, 2, len(files))
|
require.Equal(t, 2, len(files))
|
||||||
})
|
})
|
||||||
|
|||||||
96
tools/downgrade-blob-storage/main.go
Normal file
96
tools/downgrade-blob-storage/main.go
Normal file
@@ -0,0 +1,96 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"strings"
|
||||||
|
)
|
||||||
|
|
||||||
|
var errUsage = errors.New("incorrect usage - missing blob path")
|
||||||
|
|
||||||
|
const newLen = 4 // eg '0xff'
|
||||||
|
|
||||||
|
func blobPath() (string, error) {
|
||||||
|
if len(os.Args) < 2 {
|
||||||
|
return "", errUsage
|
||||||
|
}
|
||||||
|
return os.Args[1], nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func usage(err error) {
|
||||||
|
fmt.Printf("%s\n", err.Error())
|
||||||
|
fmt.Println("downgrade-blob-storage: Move blob directories back to old format, without the single byte container directories at the top-level of the directory tree. usage:\n" + os.Args[0] + " <path to blobs dir>")
|
||||||
|
}
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
bp, err := blobPath()
|
||||||
|
if err != nil {
|
||||||
|
if errors.Is(err, errUsage) {
|
||||||
|
usage(err)
|
||||||
|
}
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
if err := downgrade(bp); err != nil {
|
||||||
|
fmt.Printf("fatal error: %s\n", err.Error())
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func downgrade(base string) error {
|
||||||
|
top, err := os.Open(base) // #nosec G304
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
// iterate over top-level blob dir, ie 'blobs' inside prysm's datadir
|
||||||
|
topdirs, err := top.Readdirnames(0)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := top.Close(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
nMoved := 0
|
||||||
|
for _, td := range topdirs {
|
||||||
|
// Ignore anything in the old layout.
|
||||||
|
if !filterNew(td) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
dir, err := os.Open(filepath.Join(base, td)) // #nosec G304
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
// List the subdirectoress of the short dir containers, eg if td == '0xff'
|
||||||
|
// we want to move all the subdirectories in that dir.
|
||||||
|
subs, err := dir.Readdirnames(0)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := dir.Close(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
for _, sd := range subs {
|
||||||
|
// this is the inner layer of directory nesting,
|
||||||
|
// eg if 'td' == '0xff', 'sd' might be something like:
|
||||||
|
// '0xffff875e1d985c5ccb214894983f2428edb271f0f87b68ba7010e4a99df3b5cb'
|
||||||
|
src := filepath.Join(base, td, sd)
|
||||||
|
target := filepath.Join(base, sd)
|
||||||
|
fmt.Printf("moving %s -> %s\n", src, target)
|
||||||
|
if err := os.Rename(src, target); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
nMoved += 1
|
||||||
|
}
|
||||||
|
}
|
||||||
|
fmt.Printf("moved %d directories\n", nMoved)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func filterRoot(s string) bool {
|
||||||
|
return strings.HasPrefix(s, "0x")
|
||||||
|
}
|
||||||
|
|
||||||
|
func filterNew(s string) bool {
|
||||||
|
return filterRoot(s) && len(s) == newLen
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user