Compare commits

...

1 Commits

Author SHA1 Message Date
Kasey Kirkham
6b49be1eef new blob storage scheme avoiding large base dir 2024-05-17 16:02:35 -05:00
8 changed files with 496 additions and 40 deletions

View File

@@ -7,6 +7,7 @@ go_library(
"cache.go",
"log.go",
"metrics.go",
"migration.go",
"mock.go",
"pruner.go",
],
@@ -37,6 +38,7 @@ go_test(
srcs = [
"blob_test.go",
"cache_test.go",
"migration_test.go",
"pruner_test.go",
],
embed = [":go_default_library"],

View File

@@ -6,6 +6,7 @@ import (
"math"
"os"
"path"
"path/filepath"
"strconv"
"strings"
"time"
@@ -37,6 +38,7 @@ const (
partExt = "part"
directoryPermissions = 0700
rootPrefixLen = 4
)
// BlobStorageOption is a functional option for configuring a BlobStorage.
@@ -321,8 +323,14 @@ func namerForSidecar(sc blocks.VerifiedROBlob) blobNamer {
return blobNamer{root: sc.BlockRoot(), index: sc.Index}
}
func (p blobNamer) groupDir() string {
return oneBytePrefix(rootString(p.root))
}
func (p blobNamer) dir() string {
return rootString(p.root)
rs := rootString(p.root)
parentDir := oneBytePrefix(rs)
return filepath.Join(parentDir, rs)
}
func (p blobNamer) partPath(entropy string) string {
@@ -337,6 +345,11 @@ func rootString(root [32]byte) string {
return fmt.Sprintf("%#x", root)
}
func oneBytePrefix(p string) string {
// returns eg 0x00 from 0x0002fb4db510b8618b04dc82d023793739c26346a8b02eb73482e24b0fec0555
return p[0:rootPrefixLen]
}
func stringToRoot(str string) ([32]byte, error) {
slice, err := hexutil.Decode(str)
if err != nil {

View File

@@ -170,10 +170,15 @@ func TestBlobStoragePrune(t *testing.T) {
for _, sidecar := range testSidecars {
require.NoError(t, bs.Save(sidecar))
}
namer := namerForSidecar(testSidecars[0])
require.NoError(t, bs.pruner.prune(currentSlot-bs.pruner.windowSize))
beforeFolders, err := afero.ReadDir(fs, namer.groupDir())
require.NoError(t, err)
require.Equal(t, 1, len(beforeFolders))
remainingFolders, err := afero.ReadDir(fs, ".")
require.NoError(t, bs.pruner.prune(currentSlot-bs.pruner.windowSize, nil))
remainingFolders, err := afero.ReadDir(fs, namer.groupDir())
require.NoError(t, err)
require.Equal(t, 0, len(remainingFolders))
})
@@ -181,36 +186,51 @@ func TestBlobStoragePrune(t *testing.T) {
_, sidecars := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, 299, fieldparams.MaxBlobsPerBlock)
testSidecars, err := verification.BlobSidecarSliceNoop(sidecars)
require.NoError(t, err)
namer := namerForSidecar(testSidecars[0])
for _, sidecar := range testSidecars[4:] {
require.NoError(t, bs.Save(sidecar))
}
require.NoError(t, bs.pruner.prune(currentSlot-bs.pruner.windowSize))
require.NoError(t, bs.pruner.prune(currentSlot-bs.pruner.windowSize, nil))
remainingFolders, err := afero.ReadDir(fs, ".")
remainingFolders, err := afero.ReadDir(fs, namer.groupDir())
require.NoError(t, err)
require.Equal(t, 0, len(remainingFolders))
})
t.Run("PruneMany", func(t *testing.T) {
blockQty := 10
slot := primitives.Slot(1)
for j := 0; j <= blockQty; j++ {
root := bytesutil.ToBytes32(bytesutil.ToBytes(uint64(slot), 32))
_, sidecars := util.GenerateTestDenebBlockWithSidecar(t, root, slot, fieldparams.MaxBlobsPerBlock)
pruneBefore := currentSlot - bs.pruner.windowSize
increment := primitives.Slot(10000)
slots := []primitives.Slot{
pruneBefore - increment,
pruneBefore - (2 * increment),
pruneBefore,
pruneBefore + increment,
pruneBefore + (2 * increment),
}
namers := make([]blobNamer, len(slots))
for i, s := range slots {
_, sidecars := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, s, 1)
testSidecars, err := verification.BlobSidecarSliceNoop(sidecars)
require.NoError(t, err)
require.NoError(t, bs.Save(testSidecars[0]))
slot += 10000
namers[i] = namerForSidecar(testSidecars[0])
}
require.NoError(t, bs.pruner.prune(currentSlot-bs.pruner.windowSize))
require.NoError(t, bs.pruner.prune(currentSlot-bs.pruner.windowSize, nil))
remainingFolders, err := afero.ReadDir(fs, ".")
require.NoError(t, err)
require.Equal(t, 4, len(remainingFolders))
// first 2 subdirs should be removed
for _, nmr := range namers[0:2] {
entries, err := listDir(fs, nmr.dir())
require.Equal(t, 0, len(entries))
require.ErrorIs(t, err, os.ErrNotExist)
}
// the rest should still be there
for _, nmr := range namers[2:] {
entries, err := listDir(fs, nmr.dir())
require.NoError(t, err)
require.Equal(t, 1, len(entries))
}
})
}
@@ -234,7 +254,7 @@ func BenchmarkPruning(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
err := bs.pruner.prune(currentSlot)
err := bs.pruner.prune(currentSlot, nil)
require.NoError(b, err)
}
}

View File

@@ -0,0 +1,61 @@
package filesystem
import (
"path/filepath"
"time"
"github.com/spf13/afero"
)
type directoryMigrator interface {
migrate(fs afero.Fs, dirs []string) error
}
type oneBytePrefixMigrator struct {
migrated []string
}
func (m *oneBytePrefixMigrator) migrate(fs afero.Fs, dirs []string) error {
start := time.Now()
defer func() {
nMigrated := len(m.migrated)
if nMigrated > 0 {
log.WithField("elapsed", time.Since(start).String()).
WithField("dirsMoved", nMigrated).
Debug("Migrated blob subdirectories to byte-prefixed format")
}
}()
groups := groupDirsByPrefix(dirs)
return m.renameByGroup(fs, groups)
}
func (m *oneBytePrefixMigrator) renameByGroup(fs afero.Fs, groups map[string][]string) error {
for g, sd := range groups {
// make the enclosing dir if needed
if err := fs.MkdirAll(g, directoryPermissions); err != nil {
return err
}
for _, dir := range sd {
dest := filepath.Join(g, dir)
// todo: check if directory exists and move files one at a time if so?
// that shouldn't be a problem if we migrate in cache warmup and never write to old path.
if err := fs.Rename(dir, dest); err != nil {
return err
}
log.WithField("source", dir).WithField("dest", dest).Trace("Migrated legacy blob storage path.")
m.migrated = append(m.migrated, dir)
}
}
return nil
}
func groupDirsByPrefix(dirs []string) map[string][]string {
groups := make(map[string][]string)
for _, dir := range dirs {
if filterLegacy(dir) {
key := oneBytePrefix(dir)
groups[key] = append(groups[key], dir)
}
}
return groups
}

View File

@@ -0,0 +1,216 @@
package filesystem
import (
"os"
"path/filepath"
"testing"
"github.com/prysmaticlabs/prysm/v5/testing/require"
"github.com/spf13/afero"
)
func testSetupPaths(t *testing.T, fs afero.Fs, paths []migrateBeforeAfter) {
for _, ba := range paths {
p := ba.before()
dir := filepath.Dir(p)
require.NoError(t, fs.MkdirAll(dir, directoryPermissions))
fh, err := fs.Create(p)
require.NoError(t, err)
require.NoError(t, fh.Close())
// double check that we got the full path correct
_, err = fs.Stat(ba.before())
require.NoError(t, err)
}
}
func testAssertNewPaths(t *testing.T, fs afero.Fs, paths []migrateBeforeAfter) {
for _, ba := range paths {
if ba.before() != ba.after() {
_, err := fs.Stat(ba.before())
require.ErrorIs(t, err, os.ErrNotExist)
dir := filepath.Dir(ba.before())
_, err = listDir(fs, dir)
require.ErrorIs(t, err, os.ErrNotExist)
}
_, err := fs.Stat(ba.after())
require.NoError(t, err)
}
}
type migrateBeforeAfter [2]string
func (ba migrateBeforeAfter) before() string {
return ba[0]
}
func (ba migrateBeforeAfter) after() string {
return ba[1]
}
func TestOneBytePrefixMigrator(t *testing.T) {
cases := []struct {
name string
plan []migrateBeforeAfter
err error
}{
{
name: "happy path",
plan: []migrateBeforeAfter{
{
"0x0125e54c64c925018c9296965a5b622d9f5ab626c10917860dcfb6aa09a0a00b/0.ssz",
"0x01/0x0125e54c64c925018c9296965a5b622d9f5ab626c10917860dcfb6aa09a0a00b/0.ssz",
},
{
"0x0127dba6fd30fdbb47e73e861d5c6e602b38ac3ddc945bb6a2fc4e10761e9a86/0.ssz",
"0x01/0x0127dba6fd30fdbb47e73e861d5c6e602b38ac3ddc945bb6a2fc4e10761e9a86/0.ssz",
},
{
"0x0127dba6fd30fdbb47e73e861d5c6e602b38ac3ddc945bb6a2fc4e10761e9a86/1.ssz",
"0x01/0x0127dba6fd30fdbb47e73e861d5c6e602b38ac3ddc945bb6a2fc4e10761e9a86/1.ssz",
},
{
"0x0232521756a0b965eab2c2245d7ad85feaeaf5f427cd14d1a7531f9d555b415c/0.ssz",
"0x02/0x0232521756a0b965eab2c2245d7ad85feaeaf5f427cd14d1a7531f9d555b415c/0.ssz",
},
},
},
{
name: "different roots same prefix",
plan: []migrateBeforeAfter{
{
"0xff/0xffff875e1d985c5ccb214894983f2428edb271f0f87b68ba7010e4a99df3b5cb/0.ssz",
"0xff/0xffff875e1d985c5ccb214894983f2428edb271f0f87b68ba7010e4a99df3b5cb/0.ssz",
},
{
"0xff0774a80664e1667dcd5a18bced866a596b6cef5f351c0f88cd310dd00cb16d/0.ssz",
"0xff/0xff0774a80664e1667dcd5a18bced866a596b6cef5f351c0f88cd310dd00cb16d/0.ssz",
},
{
"0x0125e54c64c925018c9296965a5b622d9f5ab626c10917860dcfb6aa09a0a00b/0.ssz",
"0x01/0x0125e54c64c925018c9296965a5b622d9f5ab626c10917860dcfb6aa09a0a00b/0.ssz",
},
{
"0x01/0x0127dba6fd30fdbb47e73e861d5c6e602b38ac3ddc945bb6a2fc4e10761e9a86/0.ssz",
"0x01/0x0127dba6fd30fdbb47e73e861d5c6e602b38ac3ddc945bb6a2fc4e10761e9a86/0.ssz",
},
{
"0x0127dba6fd30fdbb47e73e861d5c6e602b38ac3ddc945bb6a2fc4e10761e9a86/1.ssz",
"0x01/0x0127dba6fd30fdbb47e73e861d5c6e602b38ac3ddc945bb6a2fc4e10761e9a86/1.ssz",
},
},
},
{
name: "mix old and new",
plan: []migrateBeforeAfter{
{
"0xff/0xffff875e1d985c5ccb214894983f2428edb271f0f87b68ba7010e4a99df3b5cb/0.ssz",
"0xff/0xffff875e1d985c5ccb214894983f2428edb271f0f87b68ba7010e4a99df3b5cb/0.ssz",
},
{
"0x0125e54c64c925018c9296965a5b622d9f5ab626c10917860dcfb6aa09a0a00b/0.ssz",
"0x01/0x0125e54c64c925018c9296965a5b622d9f5ab626c10917860dcfb6aa09a0a00b/0.ssz",
},
{
"0xa0/0xa0000137a809ca8425e03ae6c4244eedc7c0aa37f2735883366bcaf1cca1e3f3/0.ssz",
"0xa0/0xa0000137a809ca8425e03ae6c4244eedc7c0aa37f2735883366bcaf1cca1e3f3/0.ssz",
},
{
"0xa0/0xa0000137a809ca8425e03ae6c4244eedc7c0aa37f2735883366bcaf1cca1e3f3/1.ssz",
"0xa0/0xa0000137a809ca8425e03ae6c4244eedc7c0aa37f2735883366bcaf1cca1e3f3/1.ssz",
},
{
"0x0127dba6fd30fdbb47e73e861d5c6e602b38ac3ddc945bb6a2fc4e10761e9a86/0.ssz",
"0x01/0x0127dba6fd30fdbb47e73e861d5c6e602b38ac3ddc945bb6a2fc4e10761e9a86/0.ssz",
},
},
},
{
name: "overwrite existing root dir",
plan: []migrateBeforeAfter{
{
"0xff/0xffff875e1d985c5ccb214894983f2428edb271f0f87b68ba7010e4a99df3b5cb/0.ssz",
"0xff/0xffff875e1d985c5ccb214894983f2428edb271f0f87b68ba7010e4a99df3b5cb/0.ssz",
},
{
"0xffff875e1d985c5ccb214894983f2428edb271f0f87b68ba7010e4a99df3b5cb/1.ssz",
"0xff/0xffff875e1d985c5ccb214894983f2428edb271f0f87b68ba7010e4a99df3b5cb/0.ssz",
},
},
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
fs, _ := NewEphemeralBlobStorageWithFs(t)
testSetupPaths(t, fs, c.plan)
entries, err := listDir(fs, ".")
require.NoError(t, err)
m := &oneBytePrefixMigrator{}
err = m.migrate(fs, entries)
if c.err != nil {
require.ErrorIs(t, err, c.err)
return
}
require.NoError(t, err)
testAssertNewPaths(t, fs, c.plan)
})
}
}
func TestGroupDirsByPrefix(t *testing.T) {
cases := []struct {
name string
dirs []string
groups map[string][]string
}{
{
name: "different buckets",
dirs: []string{
"0x00ff0b18f16d3f22e6386ec3d6718346358089be531cb3715cb61b34a08aca04",
"0x0105400af093eeca95c1bf3874e97ec433244dd45222d850fe5ee50e53385f05",
},
groups: map[string][]string{
"0x00": {"0x00ff0b18f16d3f22e6386ec3d6718346358089be531cb3715cb61b34a08aca04"},
"0x01": {"0x0105400af093eeca95c1bf3874e97ec433244dd45222d850fe5ee50e53385f05"},
},
},
{
name: "same prefix, one bucket",
dirs: []string{
"0xfff5b975edfa1fbf807afb96e512bfa91eb41f78a9c9999d17f451d0077d3ed8",
"0xffff0f4efdd596f39c602c7758d73b7ecf66856fd7649321f78fc8356a2e98b1",
"0xffff875e1d985c5ccb214894983f2428edb271f0f87b68ba7010e4a99df3b5cb",
},
groups: map[string][]string{
"0xff": {
"0xfff5b975edfa1fbf807afb96e512bfa91eb41f78a9c9999d17f451d0077d3ed8",
"0xffff0f4efdd596f39c602c7758d73b7ecf66856fd7649321f78fc8356a2e98b1",
"0xffff875e1d985c5ccb214894983f2428edb271f0f87b68ba7010e4a99df3b5cb",
},
},
},
{
name: "mix of legacy and new",
dirs: []string{
"0xfff5b975edfa1fbf807afb96e512bfa91eb41f78a9c9999d17f451d0077d3ed8",
"0xff/0xffff0f4efdd596f39c602c7758d73b7ecf66856fd7649321f78fc8356a2e98b1",
"0xff/0xffff875e1d985c5ccb214894983f2428edb271f0f87b68ba7010e4a99df3b5cb",
},
groups: map[string][]string{
"0xff": {"0xfff5b975edfa1fbf807afb96e512bfa91eb41f78a9c9999d17f451d0077d3ed8"},
},
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
groups := groupDirsByPrefix(c.dirs)
require.Equal(t, len(c.groups), len(groups))
for k, v := range c.groups {
got := groups[k]
require.Equal(t, len(v), len(got))
// compare the lists
require.DeepEqual(t, v, got)
}
})
}
}

View File

@@ -28,6 +28,10 @@ var (
errNotBlobSSZ = errors.New("not a blob ssz file")
)
// Full root in directory will be 66 chars, eg:
// >>> len('0x0002fb4db510b8618b04dc82d023793739c26346a8b02eb73482e24b0fec0555') == 66
const legacyRootLen = 66
type blobPruner struct {
sync.Mutex
prunedBefore atomic.Uint64
@@ -74,7 +78,7 @@ func (p *blobPruner) notify(root [32]byte, latest primitives.Slot, idx uint64) e
go func() {
p.Lock()
defer p.Unlock()
if err := p.prune(primitives.Slot(pruned)); err != nil {
if err := p.prune(primitives.Slot(pruned), nil); err != nil {
log.WithError(err).Errorf("Failed to prune blobs from slot %d", latest)
}
}()
@@ -93,7 +97,7 @@ func windowMin(latest, offset primitives.Slot) primitives.Slot {
func (p *blobPruner) warmCache() error {
p.Lock()
defer p.Unlock()
if err := p.prune(0); err != nil {
if err := p.prune(0, &oneBytePrefixMigrator{}); err != nil {
return err
}
if !p.warmed {
@@ -115,7 +119,7 @@ func (p *blobPruner) waitForCache(ctx context.Context) (*blobStorageCache, error
// Prune prunes blobs in the base directory based on the retention epoch.
// It deletes blobs older than currentEpoch - (retentionEpochs+bufferEpochs).
// This is so that we keep a slight buffer and blobs are deleted after n+2 epochs.
func (p *blobPruner) prune(pruneBefore primitives.Slot) error {
func (p *blobPruner) prune(pruneBefore primitives.Slot, m directoryMigrator) error {
start := time.Now()
totalPruned, totalErr := 0, 0
// Customize logging/metrics behavior for the initial cache warmup when slot=0.
@@ -135,12 +139,11 @@ func (p *blobPruner) prune(pruneBefore primitives.Slot) error {
}()
}
entries, err := listDir(p.fs, ".")
entries, err := walkAndMigrateBasedir(p.fs, m)
if err != nil {
return errors.Wrap(err, "unable to list root blobs directory")
}
dirs := filter(entries, filterRoot)
for _, dir := range dirs {
for _, dir := range entries {
pruned, err := p.tryPruneDir(dir, pruneBefore)
if err != nil {
totalErr += 1
@@ -274,6 +277,40 @@ func slotFromBlob(at io.ReaderAt) (primitives.Slot, error) {
return primitives.Slot(rawSlot), nil
}
// walkAndMigrateBasedir manages executing any needed directory migrations while also returning a list of every
// individual root directory containing blob files.
func walkAndMigrateBasedir(fs afero.Fs, m directoryMigrator) ([]string, error) {
listing := make([]string, 0)
topDirs, err := listDir(fs, ".")
if err != nil {
return nil, err
}
if m != nil {
if err := m.migrate(fs, topDirs); err != nil {
return nil, err
}
}
// list all the subdirs to get the full listing.
for i := range topDirs {
dir := topDirs[i]
// We're not worried about any dangling legacy format paths because migrator should have already done its job.
if !filterRootGroupDir(dir) {
continue
}
subdirs, err := listDir(fs, dir)
if err != nil {
return nil, err
}
for _, sd := range subdirs {
if !filterRoot(sd) {
continue
}
listing = append(listing, filepath.Join(dir, sd))
}
}
return listing, nil
}
func listDir(fs afero.Fs, dir string) ([]string, error) {
top, err := fs.Open(dir)
if err != nil {
@@ -306,6 +343,14 @@ func filterRoot(s string) bool {
return strings.HasPrefix(s, "0x")
}
func filterLegacy(s string) bool {
return filterRoot(s) && len(s) == legacyRootLen
}
func filterRootGroupDir(s string) bool {
return filterRoot(s) && len(filepath.Base(s)) == rootPrefixLen
}
var dotSszExt = "." + sszExt
var dotPartExt = "." + partExt

View File

@@ -43,10 +43,11 @@ func TestTryPruneDir_CachedExpired(t *testing.T) {
_, sidecars := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, slot, 1)
sc, err := verification.BlobSidecarNoop(sidecars[0])
require.NoError(t, err)
rootStr := rootString(sc.BlockRoot())
require.NoError(t, fs.Mkdir(rootStr, directoryPermissions)) // make empty directory
namer := namerForSidecar(sc)
dir := namer.dir()
require.NoError(t, fs.Mkdir(dir, directoryPermissions)) // make empty directory
require.NoError(t, pr.cache.ensure(sc.BlockRoot(), sc.Slot(), 0))
pruned, err := pr.tryPruneDir(rootStr, slot+1)
pruned, err := pr.tryPruneDir(dir, slot+1)
require.NoError(t, err)
require.Equal(t, 0, pruned)
})
@@ -61,21 +62,21 @@ func TestTryPruneDir_CachedExpired(t *testing.T) {
require.NoError(t, bs.Save(scs[1]))
// check that the root->slot is cached
root := scs[0].BlockRoot()
rootStr := rootString(root)
namer := namerForSidecar(scs[0])
dir := namer.dir()
cs, cok := bs.pruner.cache.slot(scs[0].BlockRoot())
require.Equal(t, true, cok)
require.Equal(t, slot, cs)
// ensure that we see the saved files in the filesystem
files, err := listDir(fs, rootStr)
files, err := listDir(fs, dir)
require.NoError(t, err)
require.Equal(t, 2, len(files))
pruned, err := bs.pruner.tryPruneDir(rootStr, slot+1)
pruned, err := bs.pruner.tryPruneDir(dir, slot+1)
require.NoError(t, err)
require.Equal(t, 2, pruned)
files, err = listDir(fs, rootStr)
files, err = listDir(fs, dir)
require.ErrorIs(t, err, os.ErrNotExist)
require.Equal(t, 0, len(files))
})
@@ -94,7 +95,8 @@ func TestTryPruneDir_SlotFromFile(t *testing.T) {
// check that the root->slot is cached
root := scs[0].BlockRoot()
rootStr := rootString(root)
namer := namerForSidecar(scs[0])
dir := namer.dir()
cs, ok := bs.pruner.cache.slot(root)
require.Equal(t, true, ok)
require.Equal(t, slot, cs)
@@ -104,14 +106,14 @@ func TestTryPruneDir_SlotFromFile(t *testing.T) {
require.Equal(t, false, ok)
// ensure that we see the saved files in the filesystem
files, err := listDir(fs, rootStr)
files, err := listDir(fs, dir)
require.NoError(t, err)
require.Equal(t, 2, len(files))
pruned, err := bs.pruner.tryPruneDir(rootStr, slot+1)
pruned, err := bs.pruner.tryPruneDir(dir, slot+1)
require.NoError(t, err)
require.Equal(t, 2, pruned)
files, err = listDir(fs, rootStr)
files, err = listDir(fs, dir)
require.ErrorIs(t, err, os.ErrNotExist)
require.Equal(t, 0, len(files))
})
@@ -128,24 +130,25 @@ func TestTryPruneDir_SlotFromFile(t *testing.T) {
// Evict slot mapping from the cache so that we trigger the file read path.
root := scs[0].BlockRoot()
rootStr := rootString(root)
namer := namerForSidecar(scs[0])
dir := namer.dir()
bs.pruner.cache.evict(root)
_, ok := bs.pruner.cache.slot(root)
require.Equal(t, false, ok)
// Ensure that we see the saved files in the filesystem.
files, err := listDir(fs, rootStr)
files, err := listDir(fs, dir)
require.NoError(t, err)
require.Equal(t, 2, len(files))
// This should use the slotFromFile code (simulating restart).
// Setting pruneBefore == slot, so that the slot will be outside the window (at the boundary).
pruned, err := bs.pruner.tryPruneDir(rootStr, slot)
pruned, err := bs.pruner.tryPruneDir(dir, slot)
require.NoError(t, err)
require.Equal(t, 0, pruned)
// Ensure files are still present.
files, err = listDir(fs, rootStr)
files, err = listDir(fs, dir)
require.NoError(t, err)
require.Equal(t, 2, len(files))
})

View File

@@ -0,0 +1,96 @@
package main
import (
"errors"
"fmt"
"os"
"path/filepath"
"strings"
)
var errUsage = errors.New("incorrect usage - missing blob path")
const newLen = 4 // eg '0xff'
func blobPath() (string, error) {
if len(os.Args) < 2 {
return "", errUsage
}
return os.Args[1], nil
}
func usage(err error) {
fmt.Printf("%s\n", err.Error())
fmt.Println("downgrade-blob-storage: Move blob directories back to old format, without the single byte container directories at the top-level of the directory tree. usage:\n" + os.Args[0] + " <path to blobs dir>")
}
func main() {
bp, err := blobPath()
if err != nil {
if errors.Is(err, errUsage) {
usage(err)
}
os.Exit(1)
}
if err := downgrade(bp); err != nil {
fmt.Printf("fatal error: %s\n", err.Error())
os.Exit(1)
}
}
func downgrade(base string) error {
top, err := os.Open(base) // #nosec G304
if err != nil {
return err
}
// iterate over top-level blob dir, ie 'blobs' inside prysm's datadir
topdirs, err := top.Readdirnames(0)
if err != nil {
return err
}
if err := top.Close(); err != nil {
return err
}
nMoved := 0
for _, td := range topdirs {
// Ignore anything in the old layout.
if !filterNew(td) {
continue
}
dir, err := os.Open(filepath.Join(base, td)) // #nosec G304
if err != nil {
return err
}
// List the subdirectoress of the short dir containers, eg if td == '0xff'
// we want to move all the subdirectories in that dir.
subs, err := dir.Readdirnames(0)
if err != nil {
return err
}
if err := dir.Close(); err != nil {
return err
}
for _, sd := range subs {
// this is the inner layer of directory nesting,
// eg if 'td' == '0xff', 'sd' might be something like:
// '0xffff875e1d985c5ccb214894983f2428edb271f0f87b68ba7010e4a99df3b5cb'
src := filepath.Join(base, td, sd)
target := filepath.Join(base, sd)
fmt.Printf("moving %s -> %s\n", src, target)
if err := os.Rename(src, target); err != nil {
return err
}
nMoved += 1
}
}
fmt.Printf("moved %d directories\n", nMoved)
return nil
}
func filterRoot(s string) bool {
return strings.HasPrefix(s, "0x")
}
func filterNew(s string) bool {
return filterRoot(s) && len(s) == newLen
}