diff --git a/validator/db/BUILD.bazel b/validator/db/BUILD.bazel index 0773d285b3..783e996bab 100644 --- a/validator/db/BUILD.bazel +++ b/validator/db/BUILD.bazel @@ -7,6 +7,7 @@ go_library( "alias.go", "cmd.go", "log.go", + "migrate.go", "restore.go", ], importpath = "github.com/prysmaticlabs/prysm/validator/db", diff --git a/validator/db/cmd.go b/validator/db/cmd.go index 5979e4d08b..d300b7ea24 100644 --- a/validator/db/cmd.go +++ b/validator/db/cmd.go @@ -27,5 +27,40 @@ var DatabaseCommands = &cli.Command{ return nil }, }, + { + Name: "migrate", + Category: "db", + Usage: "Defines commands for running validator database migrations", + Subcommands: []*cli.Command{ + { + Name: "up", + Usage: "Runs up migrations for the validator database", + Flags: cmd.WrapFlags([]cli.Flag{ + cmd.DataDirFlag, + }), + Before: tos.VerifyTosAcceptedOrPrompt, + Action: func(cliCtx *cli.Context) error { + if err := migrateUp(cliCtx); err != nil { + log.Fatalf("Could not run database migrations: %v", err) + } + return nil + }, + }, + { + Name: "down", + Usage: "Runs down migrations for the validator database", + Flags: cmd.WrapFlags([]cli.Flag{ + cmd.DataDirFlag, + }), + Before: tos.VerifyTosAcceptedOrPrompt, + Action: func(cliCtx *cli.Context) error { + if err := migrateDown(cliCtx); err != nil { + log.Fatalf("Could not run database migrations: %v", err) + } + return nil + }, + }, + }, + }, }, } diff --git a/validator/db/kv/BUILD.bazel b/validator/db/kv/BUILD.bazel index 1a54a67f9d..4f9d5ecc7a 100644 --- a/validator/db/kv/BUILD.bazel +++ b/validator/db/kv/BUILD.bazel @@ -51,12 +51,9 @@ go_test( embed = [":go_default_library"], deps = [ "//shared/bytesutil:go_default_library", - "//shared/fileutil:go_default_library", "//shared/params:go_default_library", "//shared/testutil/assert:go_default_library", "//shared/testutil/require:go_default_library", - "@com_github_pkg_errors//:go_default_library", - "@com_github_prometheus_client_golang//prometheus:go_default_library", "@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library", "@com_github_sirupsen_logrus//:go_default_library", "@com_github_sirupsen_logrus//hooks/test:go_default_library", diff --git a/validator/db/kv/attester_protection.go b/validator/db/kv/attester_protection.go index 2afb00625e..d1c1a202e3 100644 --- a/validator/db/kv/attester_protection.go +++ b/validator/db/kv/attester_protection.go @@ -291,7 +291,7 @@ func (store *Store) AttestedPublicKeys(ctx context.Context) ([][48]byte, error) var err error attestedPublicKeys := make([][48]byte, 0) err = store.view(func(tx *bolt.Tx) error { - bucket := tx.Bucket(historicAttestationsBucket) + bucket := tx.Bucket(deprecatedAttestationHistoryBucket) return bucket.ForEach(func(key []byte, _ []byte) error { pubKeyBytes := [48]byte{} copy(pubKeyBytes[:], key) diff --git a/validator/db/kv/db.go b/validator/db/kv/db.go index 2f5936b6df..4480087c2d 100644 --- a/validator/db/kv/db.go +++ b/validator/db/kv/db.go @@ -113,7 +113,7 @@ func NewKVStore(ctx context.Context, dirPath string, pubKeys [][48]byte) (*Store return createBuckets( tx, genesisInfoBucket, - historicAttestationsBucket, + deprecatedAttestationHistoryBucket, historicProposalsBucket, lowestSignedSourceBucket, lowestSignedTargetBucket, @@ -133,11 +133,6 @@ func NewKVStore(ctx context.Context, dirPath string, pubKeys [][48]byte) (*Store } } - // Perform a special migration to an optimal attester protection DB schema. - if err := kv.migrateOptimalAttesterProtection(ctx); err != nil { - return nil, errors.Wrap(err, "could not migrate attester protection to more efficient format") - } - // Prune attesting records older than the current weak subjectivity period. if err := kv.PruneAttestationsOlderThanCurrentWeakSubjectivity(ctx); err != nil { return nil, errors.Wrap(err, "could not prune old attestations from DB") diff --git a/validator/db/kv/migration.go b/validator/db/kv/migration.go index a72645e9ec..ba6a01e975 100644 --- a/validator/db/kv/migration.go +++ b/validator/db/kv/migration.go @@ -6,15 +6,41 @@ import ( bolt "go.etcd.io/bbolt" ) -var migrationCompleted = []byte("done") - type migration func(*bolt.Tx) error -var migrations = []migration{} +var ( + migrationCompleted = []byte("done") + upMigrations = []migration{} + downMigrations = []migration{} +) -// RunMigrations defined in the migrations array. -func (s *Store) RunMigrations(ctx context.Context) error { - for _, m := range migrations { +// RunUpMigrations defined in the upMigrations list. +func (s *Store) RunUpMigrations(ctx context.Context) error { + // Run any special migrations that require special conditions. + if err := s.migrateOptimalAttesterProtectionUp(ctx); err != nil { + return err + } + + for _, m := range upMigrations { + if ctx.Err() != nil { + return ctx.Err() + } + + if err := s.db.Update(m); err != nil { + return err + } + } + return nil +} + +// RunDownMigrations defined in the downMigrations list. +func (s *Store) RunDownMigrations(ctx context.Context) error { + // Run any special migrations that require special conditions. + if err := s.migrateOptimalAttesterProtectionDown(ctx); err != nil { + return err + } + + for _, m := range downMigrations { if ctx.Err() != nil { return ctx.Err() } diff --git a/validator/db/kv/migration_optimal_attester_protection.go b/validator/db/kv/migration_optimal_attester_protection.go index 9ffdf343cc..18d9b4f5bc 100644 --- a/validator/db/kv/migration_optimal_attester_protection.go +++ b/validator/db/kv/migration_optimal_attester_protection.go @@ -5,6 +5,7 @@ import ( "context" "github.com/prysmaticlabs/prysm/shared/bytesutil" + "github.com/prysmaticlabs/prysm/shared/params" "github.com/prysmaticlabs/prysm/shared/progressutil" bolt "go.etcd.io/bbolt" ) @@ -15,7 +16,7 @@ var migrationOptimalAttesterProtectionKey = []byte("optimal_attester_protection_ // stored attesting history as large, 2Mb arrays per validator, we need to perform // this migration differently than the rest, ensuring we perform each expensive bolt // update in its own transaction to prevent having everything on the heap. -func (store *Store) migrateOptimalAttesterProtection(ctx context.Context) error { +func (store *Store) migrateOptimalAttesterProtectionUp(ctx context.Context) error { publicKeyBytes := make([][]byte, 0) attestingHistoryBytes := make([][]byte, 0) numKeys := 0 @@ -25,7 +26,7 @@ func (store *Store) migrateOptimalAttesterProtection(ctx context.Context) error return nil // Migration already completed. } - bkt := tx.Bucket(historicAttestationsBucket) + bkt := tx.Bucket(deprecatedAttestationHistoryBucket) numKeys = bkt.Stats().KeyN if err := bkt.ForEach(func(k, v []byte) error { if v == nil { @@ -115,3 +116,139 @@ func (store *Store) migrateOptimalAttesterProtection(ctx context.Context) error return nil }) } + +// Migrate attester protection from the more optimal format to the old format in the DB. +func (store *Store) migrateOptimalAttesterProtectionDown(ctx context.Context) error { + // First we extract the public keys we are migrating down for. + pubKeys := make([][48]byte, 0) + err := store.view(func(tx *bolt.Tx) error { + mb := tx.Bucket(migrationsBucket) + if b := mb.Get(migrationOptimalAttesterProtectionKey); b == nil { + // Migration has not occurred, meaning data is already in old format + // so no need to perform a down migration. + return nil + } + bkt := tx.Bucket(pubKeysBucket) + if bkt == nil { + return nil + } + return bkt.ForEach(func(pubKey, v []byte) error { + if pubKey == nil { + return nil + } + pkBucket := bkt.Bucket(pubKey) + if pkBucket == nil { + return nil + } + pubKeys = append(pubKeys, bytesutil.ToBytes48(pubKey)) + return nil + }) + }) + if err != nil { + return err + } + + // Next up, we extract the data for attested epochs and signing roots + // from the optimized db schema into maps we can use later. + signingRootsByTarget := make(map[uint64][]byte) + targetEpochsBySource := make(map[uint64][]uint64) + err = store.view(func(tx *bolt.Tx) error { + bkt := tx.Bucket(pubKeysBucket) + if bkt == nil { + return nil + } + for _, pubKey := range pubKeys { + pubKeyBkt := bkt.Bucket(pubKey[:]) + if pubKeyBkt == nil { + continue + } + sourceEpochsBucket := pubKeyBkt.Bucket(attestationSourceEpochsBucket) + signingRootsBucket := pubKeyBkt.Bucket(attestationSigningRootsBucket) + // Extract signing roots. + if err := signingRootsBucket.ForEach(func(targetBytes, signingRoot []byte) error { + var sr [32]byte + copy(sr[:], signingRoot) + signingRootsByTarget[bytesutil.BytesToUint64BigEndian(targetBytes)] = sr[:] + return nil + }); err != nil { + return err + } + // Next up, extract the target epochs by source. + if err := sourceEpochsBucket.ForEach(func(sourceBytes, targetEpochsBytes []byte) error { + targetEpochs := make([]uint64, 0) + for i := 0; i < len(targetEpochsBytes); i += 8 { + targetEpochs = append(targetEpochs, bytesutil.BytesToUint64BigEndian(targetEpochsBytes[i:i+8])) + } + targetEpochsBySource[bytesutil.BytesToUint64BigEndian(sourceBytes)] = targetEpochs + return nil + }); err != nil { + return err + } + } + return nil + }) + if err != nil { + return err + } + + // Then, we use the data we extracted to recreate the old + // attesting history format and for each public key, we save it + // to the appropriate bucket. + err = store.update(func(tx *bolt.Tx) error { + bkt := tx.Bucket(pubKeysBucket) + if bkt == nil { + return nil + } + bar := progressutil.InitializeProgressBar(len(pubKeys), "Migrating attesting history to old format") + for _, pubKey := range pubKeys { + // Now we write the attesting history using the data we extracted + // from the buckets accordingly. + history := newDeprecatedAttestingHistory(0) + var maxTargetWritten uint64 + for source, targetEpochs := range targetEpochsBySource { + for _, target := range targetEpochs { + signingRoot := params.BeaconConfig().ZeroHash[:] + if sr, ok := signingRootsByTarget[target]; ok { + signingRoot = sr + } + newHist, err := history.setTargetData(ctx, target, &deprecatedHistoryData{ + Source: source, + SigningRoot: signingRoot, + }) + if err != nil { + return err + } + history = newHist + if target > maxTargetWritten { + maxTargetWritten = target + } + } + } + newHist, err := history.setLatestEpochWritten(ctx, maxTargetWritten) + if err != nil { + return err + } + history = newHist + deprecatedBkt, err := tx.CreateBucketIfNotExists(deprecatedAttestationHistoryBucket) + if err != nil { + return err + } + if err := deprecatedBkt.Put(pubKey[:], history); err != nil { + return err + } + if err := bar.Add(1); err != nil { + return err + } + } + return nil + }) + if err != nil { + return err + } + + // Finally, we clear the migration key. + return store.update(func(tx *bolt.Tx) error { + migrationsBkt := tx.Bucket(migrationsBucket) + return migrationsBkt.Delete(migrationOptimalAttesterProtectionKey) + }) +} diff --git a/validator/db/kv/migration_optimal_attester_protection_test.go b/validator/db/kv/migration_optimal_attester_protection_test.go index 2011e6ec69..26ba6b4bea 100644 --- a/validator/db/kv/migration_optimal_attester_protection_test.go +++ b/validator/db/kv/migration_optimal_attester_protection_test.go @@ -3,19 +3,15 @@ package kv import ( "context" "fmt" - "path/filepath" "testing" - "github.com/pkg/errors" - "github.com/prometheus/client_golang/prometheus" - "github.com/prysmaticlabs/prysm/shared/bytesutil" - "github.com/prysmaticlabs/prysm/shared/fileutil" - "github.com/prysmaticlabs/prysm/shared/params" - "github.com/prysmaticlabs/prysm/shared/testutil/require" bolt "go.etcd.io/bbolt" + + "github.com/prysmaticlabs/prysm/shared/bytesutil" + "github.com/prysmaticlabs/prysm/shared/testutil/require" ) -func Test_migrateOptimalAttesterProtection(t *testing.T) { +func Test_migrateOptimalAttesterProtectionUp(t *testing.T) { tests := []struct { name string setup func(t *testing.T, validatorDB *Store) @@ -60,7 +56,7 @@ func Test_migrateOptimalAttesterProtection(t *testing.T) { require.NoError(t, err) err = validatorDB.update(func(tx *bolt.Tx) error { - bucket := tx.Bucket(historicAttestationsBucket) + bucket := tx.Bucket(deprecatedAttestationHistoryBucket) return bucket.Put(pubKey[:], newHist) }) require.NoError(t, err) @@ -119,13 +115,13 @@ func Test_migrateOptimalAttesterProtection(t *testing.T) { require.NoError(t, err) err = validatorDB.update(func(tx *bolt.Tx) error { - bucket := tx.Bucket(historicAttestationsBucket) + bucket := tx.Bucket(deprecatedAttestationHistoryBucket) return bucket.Put(pubKey[:], newHist) }) require.NoError(t, err) // Run the migration. - require.NoError(t, validatorDB.migrateOptimalAttesterProtection(ctx)) + require.NoError(t, validatorDB.migrateOptimalAttesterProtectionUp(ctx)) // Then delete the migration completed key. err = validatorDB.update(func(tx *bolt.Tx) error { @@ -146,7 +142,7 @@ func Test_migrateOptimalAttesterProtection(t *testing.T) { require.NoError(t, err) err = validatorDB.update(func(tx *bolt.Tx) error { - bucket := tx.Bucket(historicAttestationsBucket) + bucket := tx.Bucket(deprecatedAttestationHistoryBucket) return bucket.Put(pubKey[:], newHist) }) require.NoError(t, err) @@ -186,58 +182,120 @@ func Test_migrateOptimalAttesterProtection(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - validatorDB, err := setupDBWithoutMigration(t.TempDir()) - require.NoError(t, err, "Failed to instantiate DB") - t.Cleanup(func() { - require.NoError(t, validatorDB.Close(), "Failed to close database") - require.NoError(t, validatorDB.ClearDB(), "Failed to clear database") - }) + validatorDB := setupDB(t, nil) tt.setup(t, validatorDB) - require.NoError(t, validatorDB.migrateOptimalAttesterProtection(context.Background())) + require.NoError(t, validatorDB.migrateOptimalAttesterProtectionUp(context.Background())) tt.eval(t, validatorDB) }) } } -func setupDBWithoutMigration(dirPath string) (*Store, error) { - hasDir, err := fileutil.HasDir(dirPath) - if err != nil { - return nil, err +func Test_migrateOptimalAttesterProtectionDown(t *testing.T) { + tests := []struct { + name string + setup func(t *testing.T, validatorDB *Store) + eval func(t *testing.T, validatorDB *Store) + }{ + { + name: "unsets the migration completed key upon completion", + setup: func(t *testing.T, validatorDB *Store) { + err := validatorDB.update(func(tx *bolt.Tx) error { + return tx.Bucket(migrationsBucket).Put(migrationOptimalAttesterProtectionKey, migrationCompleted) + }) + require.NoError(t, err) + }, + eval: func(t *testing.T, validatorDB *Store) { + err := validatorDB.view(func(tx *bolt.Tx) error { + data := tx.Bucket(migrationsBucket).Get(migrationOptimalAttesterProtectionKey) + require.DeepEqual(t, true, data == nil) + return nil + }) + require.NoError(t, err) + }, + }, + { + name: "unsets the migration, even if unset already (no panic)", + setup: func(t *testing.T, validatorDB *Store) {}, + eval: func(t *testing.T, validatorDB *Store) { + // Ensure the migration is not marked as complete. + err := validatorDB.view(func(tx *bolt.Tx) error { + data := tx.Bucket(migrationsBucket).Get(migrationOptimalAttesterProtectionKey) + require.DeepNotEqual(t, data, migrationCompleted) + return nil + }) + require.NoError(t, err) + }, + }, + { + name: "populates old format from data using the new schema", + setup: func(t *testing.T, validatorDB *Store) { + pubKeys := [][48]byte{{1}, {2}} + // Create attesting history for two public keys + err := validatorDB.update(func(tx *bolt.Tx) error { + bkt := tx.Bucket(pubKeysBucket) + for _, pubKey := range pubKeys { + pkBucket, err := bkt.CreateBucketIfNotExists(pubKey[:]) + if err != nil { + return err + } + sourceEpochsBucket, err := pkBucket.CreateBucketIfNotExists(attestationSourceEpochsBucket) + if err != nil { + return err + } + signingRootsBucket, err := pkBucket.CreateBucketIfNotExists(attestationSigningRootsBucket) + if err != nil { + return err + } + // The highest epoch we write is 50. + highestEpoch := uint64(50) + for i := uint64(1); i <= highestEpoch; i++ { + source := bytesutil.Uint64ToBytesBigEndian(i - 1) + target := bytesutil.Uint64ToBytesBigEndian(i) + if err := sourceEpochsBucket.Put(source, target); err != nil { + return err + } + var signingRoot [32]byte + copy(signingRoot[:], fmt.Sprintf("%d", target)) + if err := signingRootsBucket.Put(target, signingRoot[:]); err != nil { + return err + } + } + } + // Finally, we mark the migration as completed to show that we have the + // new, optimized format for attester protection in the database. + migrationBkt := tx.Bucket(migrationsBucket) + return migrationBkt.Put(migrationOptimalAttesterProtectionKey, migrationCompleted) + }) + require.NoError(t, err) + }, + eval: func(t *testing.T, validatorDB *Store) { + ctx := context.Background() + pubKeys := [][48]byte{{1}, {2}} + // Next up, we validate that we have indeed rolled back our data + // into the old format for attesting history. + err := validatorDB.view(func(tx *bolt.Tx) error { + bkt := tx.Bucket(deprecatedAttestationHistoryBucket) + for _, pubKey := range pubKeys { + encodedHistoryBytes := bkt.Get(pubKey[:]) + require.NotNil(t, encodedHistoryBytes) + attestingHistory := deprecatedEncodedAttestingHistory(encodedHistoryBytes) + highestEpoch, err := attestingHistory.getLatestEpochWritten(ctx) + require.NoError(t, err) + // Verify the highest epoch written is 50 from the setup stage. + require.Equal(t, uint64(50), highestEpoch) + } + return nil + }) + require.NoError(t, err) + }, + }, } - if !hasDir { - if err := fileutil.MkdirAll(dirPath); err != nil { - return nil, err - } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + validatorDB := setupDB(t, nil) + tt.setup(t, validatorDB) + require.NoError(t, validatorDB.migrateOptimalAttesterProtectionDown(context.Background())) + tt.eval(t, validatorDB) + }) } - datafile := filepath.Join(dirPath, ProtectionDbFileName) - boltDB, err := bolt.Open(datafile, params.BeaconIoConfig().ReadWritePermissions, &bolt.Options{Timeout: params.BeaconIoConfig().BoltTimeout}) - if err != nil { - if errors.Is(err, bolt.ErrTimeout) { - return nil, errors.New("cannot obtain database lock, database may be in use by another process") - } - return nil, err - } - - kv := &Store{ - db: boltDB, - databasePath: dirPath, - } - - if err := kv.db.Update(func(tx *bolt.Tx) error { - return createBuckets( - tx, - genesisInfoBucket, - historicAttestationsBucket, - historicProposalsBucket, - lowestSignedSourceBucket, - lowestSignedTargetBucket, - lowestSignedProposalsBucket, - highestSignedProposalsBucket, - pubKeysBucket, - migrationsBucket, - ) - }); err != nil { - return nil, err - } - return kv, prometheus.Register(createBoltCollector(kv.db)) } diff --git a/validator/db/kv/schema.go b/validator/db/kv/schema.go index 2d0dcfa50e..fc6d7181b1 100644 --- a/validator/db/kv/schema.go +++ b/validator/db/kv/schema.go @@ -5,9 +5,8 @@ var ( genesisInfoBucket = []byte("genesis-info-bucket") // Validator slashing protection from double proposals. - historicProposalsBucket = []byte("proposal-history-bucket-interchange") - // New Validator slashing protection from slashable attestations. - historicAttestationsBucket = []byte("attestation-history-bucket-interchange") + historicProposalsBucket = []byte("proposal-history-bucket-interchange") + deprecatedAttestationHistoryBucket = []byte("attestation-history-bucket-interchange") // Buckets for lowest signed source and target epoch for individual validator. lowestSignedSourceBucket = []byte("lowest-signed-source-bucket") diff --git a/validator/db/migrate.go b/validator/db/migrate.go new file mode 100644 index 0000000000..550ccf1f7f --- /dev/null +++ b/validator/db/migrate.go @@ -0,0 +1,42 @@ +package db + +import ( + "context" + "path" + + "github.com/pkg/errors" + "github.com/prysmaticlabs/prysm/shared/cmd" + "github.com/prysmaticlabs/prysm/shared/fileutil" + "github.com/prysmaticlabs/prysm/validator/db/kv" + "github.com/urfave/cli/v2" +) + +func migrateUp(cliCtx *cli.Context) error { + dataDir := cliCtx.String(cmd.DataDirFlag.Name) + + if fileutil.FileExists(path.Join(dataDir, kv.ProtectionDbFileName)) { + return errors.New("No validator db found at path, nothing to rollback") + } + + ctx := context.Background() + validatorDB, err := kv.NewKVStore(ctx, dataDir, nil) + if err != nil { + return err + } + return validatorDB.RunUpMigrations(ctx) +} + +func migrateDown(cliCtx *cli.Context) error { + dataDir := cliCtx.String(cmd.DataDirFlag.Name) + + if fileutil.FileExists(path.Join(dataDir, kv.ProtectionDbFileName)) { + return errors.New("No validator db found at path, nothing to rollback") + } + + ctx := context.Background() + validatorDB, err := kv.NewKVStore(ctx, dataDir, nil) + if err != nil { + return err + } + return validatorDB.RunDownMigrations(ctx) +} diff --git a/validator/node/node.go b/validator/node/node.go index 65b93937f4..383fb46614 100644 --- a/validator/node/node.go +++ b/validator/node/node.go @@ -222,7 +222,7 @@ func (s *ValidatorClient) initializeFromCLI(cliCtx *cli.Context) error { return errors.Wrap(err, "could not initialize db") } s.db = valDB - if err := valDB.RunMigrations(cliCtx.Context); err != nil { + if err := valDB.RunUpMigrations(cliCtx.Context); err != nil { return errors.Wrap(err, "could not run database migration") } if !cliCtx.Bool(cmd.DisableMonitoringFlag.Name) { @@ -308,7 +308,7 @@ func (s *ValidatorClient) initializeForWeb(cliCtx *cli.Context) error { return errors.Wrap(err, "could not initialize db") } s.db = valDB - if err := valDB.RunMigrations(cliCtx.Context); err != nil { + if err := valDB.RunUpMigrations(cliCtx.Context); err != nil { return errors.Wrap(err, "could not run database migration") } if !cliCtx.Bool(cmd.DisableMonitoringFlag.Name) {