From 395feeb35d0c8bd1c145d2286d16c64dbf457edb Mon Sep 17 00:00:00 2001 From: Mohamed Zahoor Date: Fri, 6 Aug 2021 00:00:05 +0530 Subject: [PATCH] Cross check db state bucket after migration (#9311) * added state db check after migration * satisfy deepsource * added db compare after migration sub-command * fomt issue * point ot bolt issue * gazel fix * maxUint84 * revert accidental deletion of WORKSPACE file * added sorting support * cleanup debug * gazel fix * satisfy deepsource Co-authored-by: Raul Jordan Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com> --- beacon-chain/db/kv/BUILD.bazel | 1 + .../db/kv/migration_state_validators.go | 4 +- tools/exploredb/BUILD.bazel | 1 + tools/exploredb/main.go | 150 +++++++++++++++--- 4 files changed, 132 insertions(+), 24 deletions(-) diff --git a/beacon-chain/db/kv/BUILD.bazel b/beacon-chain/db/kv/BUILD.bazel index ec10ef3407..c8aec124b0 100644 --- a/beacon-chain/db/kv/BUILD.bazel +++ b/beacon-chain/db/kv/BUILD.bazel @@ -52,6 +52,7 @@ go_library( "//shared/traceutil:go_default_library", "@com_github_dgraph_io_ristretto//:go_default_library", "@com_github_ethereum_go_ethereum//common:go_default_library", + "@com_github_ethereum_go_ethereum//common/hexutil:go_default_library", "@com_github_ferranbt_fastssz//:go_default_library", "@com_github_golang_snappy//:go_default_library", "@com_github_pkg_errors//:go_default_library", diff --git a/beacon-chain/db/kv/migration_state_validators.go b/beacon-chain/db/kv/migration_state_validators.go index 8d2218c5d2..8301cbcb96 100644 --- a/beacon-chain/db/kv/migration_state_validators.go +++ b/beacon-chain/db/kv/migration_state_validators.go @@ -3,7 +3,9 @@ package kv import ( "bytes" "context" + "fmt" + "github.com/ethereum/go-ethereum/common/hexutil" "github.com/golang/snappy" v1alpha1 "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1" "github.com/prysmaticlabs/prysm/shared/featureconfig" @@ -103,7 +105,7 @@ func migrateStateValidators(ctx context.Context, db *bolt.DB) error { // no validators in state to migrate if len(state.Validators) == 0 { - continue + return fmt.Errorf("no validator entries in state key 0x%s", hexutil.Encode(keys[index])) } // move all the validators in this state registry out to a new bucket. diff --git a/tools/exploredb/BUILD.bazel b/tools/exploredb/BUILD.bazel index 3111d876c4..75f051d5e3 100644 --- a/tools/exploredb/BUILD.bazel +++ b/tools/exploredb/BUILD.bazel @@ -11,6 +11,7 @@ go_library( "//beacon-chain/state:go_default_library", "//proto/prysm/v1alpha1:go_default_library", "//shared/bytesutil:go_default_library", + "//shared/params:go_default_library", "@com_github_dustin_go_humanize//:go_default_library", "@com_github_prysmaticlabs_eth2_types//:go_default_library", "@com_github_sirupsen_logrus//:go_default_library", diff --git a/tools/exploredb/main.go b/tools/exploredb/main.go index 5271e1a2d5..e078e3da95 100644 --- a/tools/exploredb/main.go +++ b/tools/exploredb/main.go @@ -9,6 +9,7 @@ package main import ( + "bytes" "context" "flag" "os" @@ -22,18 +23,25 @@ import ( "github.com/prysmaticlabs/prysm/beacon-chain/state" ethpb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1" "github.com/prysmaticlabs/prysm/shared/bytesutil" + "github.com/prysmaticlabs/prysm/shared/params" log "github.com/sirupsen/logrus" "github.com/status-im/keycard-go/hexutils" bolt "go.etcd.io/bbolt" ) +const ( + MaxUint64 = ^uint64(0) + maxSlotsToDisplay = 2000000 +) + var ( - datadir = flag.String("datadir", "", "Path to data directory.") - dbName = flag.String("dbname", "", "database name.") - bucketStats = flag.Bool("bucket-stats", false, "Show all the bucket stats.") - bucketContents = flag.Bool("bucket-contents", false, "Show contents of a given bucket.") - bucketName = flag.String("bucket-name", "", "bucket to show contents.") - rowLimit = flag.Uint64("limit", 10, "limit to rows.") + datadir = flag.String("datadir", "", "Path to data directory.") + dbName = flag.String("dbname", "", "database name.") + command = flag.String("command", "", "command to execute.") + bucketName = flag.String("bucket-name", "", "bucket to show contents.") + rowLimit = flag.Uint64("limit", 10, "limit to rows.") + migrationName = flag.String("migration", "", "migration to cross check.") + destDatadir = flag.String("dest-datadir", "", "Path to destination data directory.") ) // used to parallelize all the bucket stats @@ -78,23 +86,31 @@ func main() { // check if the database file is present. dbNameWithPath := filepath.Join(*datadir, *dbName) - if _, err := os.Stat(*datadir); os.IsNotExist(err) { + if _, err := os.Stat(dbNameWithPath); os.IsNotExist(err) { log.Fatalf("could not locate database file : %s, %v", dbNameWithPath, err) } - // show stats of all the buckets. - if *bucketStats { + switch *command { + case "bucket-stats": printBucketStats(dbNameWithPath) - return - } - - // show teh contents of the specified bucket. - if *bucketContents { + case "bucket-content": switch *bucketName { - case "state", "state-summary": + case "state", + "state-summary": printBucketContents(dbNameWithPath, *rowLimit, *bucketName) default: - log.Fatal("Oops, Only 'state' and 'state-summary' buckets are supported for now.") + log.Fatal("Oops, given bucket is supported for now.") + } + case "migration-check": + destDbNameWithPath := filepath.Join(*destDatadir, *dbName) + if _, err := os.Stat(destDbNameWithPath); os.IsNotExist(err) { + log.Fatalf("could not locate destination database file : %s, %v", destDbNameWithPath, err) + } + switch *migrationName { + case "validator-entries": + checkValidatorMigration(dbNameWithPath, destDbNameWithPath) + default: + log.Fatal("Oops, given migration is not supported for now.") } } } @@ -104,8 +120,8 @@ func printBucketStats(dbNameWithPath string) { groupSize := uint64(128) doneC := make(chan bool) statsC := make(chan *bucketStat, groupSize) - go readBucketStats(ctx, dbNameWithPath, statsC) - go printBucketStates(statsC, doneC) + go readBucketStat(ctx, dbNameWithPath, statsC) + go printBucketStat(statsC, doneC) <-doneC } @@ -147,7 +163,7 @@ func printBucketContents(dbNameWithPath string, rowLimit uint64, bucketName stri <-doneC } -func readBucketStats(ctx context.Context, dbNameWithPath string, statsC chan<- *bucketStat) { +func readBucketStat(ctx context.Context, dbNameWithPath string, statsC chan<- *bucketStat) { // open the raw database file. If the file is busy, then exit. db, openErr := bolt.Open(dbNameWithPath, 0600, &bolt.Options{Timeout: 1 * time.Second}) if openErr != nil { @@ -236,6 +252,7 @@ func readBucketStats(ctx context.Context, dbNameWithPath string, statsC chan<- * } func readStates(ctx context.Context, db *kv.Store, stateC chan<- *modifiedState, keys [][]byte, sizes []uint64) { + stateMap := make(map[uint64]*modifiedState) for rowCount, key := range keys { st, stateErr := db.State(ctx, bytesutil.ToBytes32(key)) if stateErr != nil { @@ -248,7 +265,13 @@ func readStates(ctx context.Context, db *kv.Store, stateC chan<- *modifiedState, valueSize: sizes[rowCount], rowCount: uint64(rowCount), } - stateC <- mst + stateMap[uint64(st.Slot())] = mst + } + + for i := uint64(0); i < maxSlotsToDisplay; i++ { + if _, ok := stateMap[i]; ok { + stateC <- stateMap[i] + } } close(stateC) } @@ -272,7 +295,7 @@ func readStateSummary(ctx context.Context, db *kv.Store, stateSummaryC chan<- *m close(stateSummaryC) } -func printBucketStates(statsC <-chan *bucketStat, doneC chan<- bool) { +func printBucketStat(statsC <-chan *bucketStat, doneC chan<- bool) { for stat := range statsC { if stat.noOfRows != 0 { averageValueSize := stat.totalValueSize / stat.noOfRows @@ -298,7 +321,7 @@ func printBucketStates(statsC <-chan *bucketStat, doneC chan<- bool) { func printStates(stateC <-chan *modifiedState, doneC chan<- bool) { for mst := range stateC { st := mst.state - log.Infof("---- row = %04d ----", mst.rowCount) + log.Infof("---- row = %04d, slot = %8d, epoch = %8d, key = %s ----", mst.rowCount, st.Slot(), st.Slot()/params.BeaconConfig().SlotsPerEpoch, hexutils.BytesToHex(mst.key)) log.Infof("key : %s", hexutils.BytesToHex(mst.key)) log.Infof("value : compressed size = %s", humanize.Bytes(mst.valueSize)) t := time.Unix(int64(st.GenesisTime()), 0) @@ -345,6 +368,83 @@ func printStateSummary(stateSummaryC <-chan *modifiedStateSummary, doneC chan<- doneC <- true } +func checkValidatorMigration(dbNameWithPath, destDbNameWithPath string) { + // get the keys within the supplied limit for the given bucket. + sourceStateKeys, _ := keysOfBucket(dbNameWithPath, []byte("state"), MaxUint64) + destStateKeys, _ := keysOfBucket(destDbNameWithPath, []byte("state"), MaxUint64) + + if len(destStateKeys) < len(sourceStateKeys) { + log.Fatalf("destination keys are lesser then source keys (%d/%d)", len(sourceStateKeys), len(destStateKeys)) + } + + // create the source and destination KV stores. + sourceDbDirectory := filepath.Dir(dbNameWithPath) + sourceDB, openErr := kv.NewKVStore(context.Background(), sourceDbDirectory, &kv.Config{}) + if openErr != nil { + log.Fatalf("could not open sourceDB: %v", openErr) + } + + destinationDbDirectory := filepath.Dir(destDbNameWithPath) + destDB, openErr := kv.NewKVStore(context.Background(), destinationDbDirectory, &kv.Config{}) + if openErr != nil { + // dirty hack alert: Ignore this prometheus error as we are opening two DB with same metric name + // if you want to avoid this then we should pass the metric name when opening the DB which touches + // too many places. + if openErr.Error() != "duplicate metrics collector registration attempted" { + log.Fatalf("could not open sourceDB, %v", openErr) + } + } + + // don't forget to close it when ejecting out of this function. + defer func() { + closeErr := sourceDB.Close() + if closeErr != nil { + log.Fatalf("could not close sourceDB: %v", closeErr) + } + }() + defer func() { + closeErr := destDB.Close() + if closeErr != nil { + log.Fatalf("could not close sourceDB: %v", closeErr) + } + }() + + ctx := context.Background() + failCount := 0 + for rowCount, key := range sourceStateKeys[910:] { + sourceState, stateErr := sourceDB.State(ctx, bytesutil.ToBytes32(key)) + if stateErr != nil { + log.Fatalf("could not get from source db, the state for key : %s, %v", hexutils.BytesToHex(key), stateErr) + } + destinationState, stateErr := destDB.State(ctx, bytesutil.ToBytes32(key)) + if stateErr != nil { + log.Fatalf("could not get destination db, the state for key : %s, %v", hexutils.BytesToHex(key), stateErr) + } + if destinationState == nil { + log.Infof("could not find state in migrated DB: index = %d, slot = %d, epoch = %d, numOfValidators = %d, key = %s", + rowCount, sourceState.Slot(), sourceState.Slot()/params.BeaconConfig().SlotsPerEpoch, sourceState.NumValidators(), hexutils.BytesToHex(key)) + failCount++ + continue + } + + if len(sourceState.Validators()) != len(destinationState.Validators()) { + log.Fatalf("validator mismatch : source = %d, dest = %d", len(sourceState.Validators()), len(destinationState.Validators())) + } + sourceStateHash, err := sourceState.HashTreeRoot(ctx) + if err != nil { + log.Fatalf("could not find hash of source state: %v", err) + } + destinationSatteHash, err := destinationState.HashTreeRoot(ctx) + if err != nil { + log.Fatalf("could not find hash of destination state: %v", err) + } + if !bytes.Equal(sourceStateHash[:], destinationSatteHash[:]) { + log.Fatalf("state mismatch : key = %s", hexutils.BytesToHex(key)) + } + } + log.Infof("number of state that did not match: %d", failCount) +} + func keysOfBucket(dbNameWithPath string, bucketName []byte, rowLimit uint64) ([][]byte, []uint64) { // open the raw database file. If the file is busy, then exit. db, openErr := bolt.Open(dbNameWithPath, 0600, &bolt.Options{Timeout: 1 * time.Second}) @@ -371,7 +471,11 @@ func keysOfBucket(dbNameWithPath string, bucketName []byte, rowLimit uint64) ([] if count >= rowLimit { return nil } - keys = append(keys, k) + actualKey := make([]byte, len(k)) + actualSizes := make([]byte, len(v)) + copy(actualKey, k) + copy(actualSizes, v) + keys = append(keys, actualKey) sizes = append(sizes, uint64(len(v))) count++ }