mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-08 23:18:15 -05:00
Cross check db state bucket after migration (#9311)
* added state db check after migration * satisfy deepsource * added db compare after migration sub-command * fomt issue * point ot bolt issue * gazel fix * maxUint84 * revert accidental deletion of WORKSPACE file * added sorting support * cleanup debug * gazel fix * satisfy deepsource Co-authored-by: Raul Jordan <raul@prysmaticlabs.com> Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
This commit is contained in:
@@ -11,6 +11,7 @@ go_library(
|
||||
"//beacon-chain/state:go_default_library",
|
||||
"//proto/prysm/v1alpha1:go_default_library",
|
||||
"//shared/bytesutil:go_default_library",
|
||||
"//shared/params:go_default_library",
|
||||
"@com_github_dustin_go_humanize//:go_default_library",
|
||||
"@com_github_prysmaticlabs_eth2_types//:go_default_library",
|
||||
"@com_github_sirupsen_logrus//:go_default_library",
|
||||
|
||||
@@ -9,6 +9,7 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"flag"
|
||||
"os"
|
||||
@@ -22,18 +23,25 @@ import (
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/state"
|
||||
ethpb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1"
|
||||
"github.com/prysmaticlabs/prysm/shared/bytesutil"
|
||||
"github.com/prysmaticlabs/prysm/shared/params"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/status-im/keycard-go/hexutils"
|
||||
bolt "go.etcd.io/bbolt"
|
||||
)
|
||||
|
||||
const (
|
||||
MaxUint64 = ^uint64(0)
|
||||
maxSlotsToDisplay = 2000000
|
||||
)
|
||||
|
||||
var (
|
||||
datadir = flag.String("datadir", "", "Path to data directory.")
|
||||
dbName = flag.String("dbname", "", "database name.")
|
||||
bucketStats = flag.Bool("bucket-stats", false, "Show all the bucket stats.")
|
||||
bucketContents = flag.Bool("bucket-contents", false, "Show contents of a given bucket.")
|
||||
bucketName = flag.String("bucket-name", "", "bucket to show contents.")
|
||||
rowLimit = flag.Uint64("limit", 10, "limit to rows.")
|
||||
datadir = flag.String("datadir", "", "Path to data directory.")
|
||||
dbName = flag.String("dbname", "", "database name.")
|
||||
command = flag.String("command", "", "command to execute.")
|
||||
bucketName = flag.String("bucket-name", "", "bucket to show contents.")
|
||||
rowLimit = flag.Uint64("limit", 10, "limit to rows.")
|
||||
migrationName = flag.String("migration", "", "migration to cross check.")
|
||||
destDatadir = flag.String("dest-datadir", "", "Path to destination data directory.")
|
||||
)
|
||||
|
||||
// used to parallelize all the bucket stats
|
||||
@@ -78,23 +86,31 @@ func main() {
|
||||
|
||||
// check if the database file is present.
|
||||
dbNameWithPath := filepath.Join(*datadir, *dbName)
|
||||
if _, err := os.Stat(*datadir); os.IsNotExist(err) {
|
||||
if _, err := os.Stat(dbNameWithPath); os.IsNotExist(err) {
|
||||
log.Fatalf("could not locate database file : %s, %v", dbNameWithPath, err)
|
||||
}
|
||||
|
||||
// show stats of all the buckets.
|
||||
if *bucketStats {
|
||||
switch *command {
|
||||
case "bucket-stats":
|
||||
printBucketStats(dbNameWithPath)
|
||||
return
|
||||
}
|
||||
|
||||
// show teh contents of the specified bucket.
|
||||
if *bucketContents {
|
||||
case "bucket-content":
|
||||
switch *bucketName {
|
||||
case "state", "state-summary":
|
||||
case "state",
|
||||
"state-summary":
|
||||
printBucketContents(dbNameWithPath, *rowLimit, *bucketName)
|
||||
default:
|
||||
log.Fatal("Oops, Only 'state' and 'state-summary' buckets are supported for now.")
|
||||
log.Fatal("Oops, given bucket is supported for now.")
|
||||
}
|
||||
case "migration-check":
|
||||
destDbNameWithPath := filepath.Join(*destDatadir, *dbName)
|
||||
if _, err := os.Stat(destDbNameWithPath); os.IsNotExist(err) {
|
||||
log.Fatalf("could not locate destination database file : %s, %v", destDbNameWithPath, err)
|
||||
}
|
||||
switch *migrationName {
|
||||
case "validator-entries":
|
||||
checkValidatorMigration(dbNameWithPath, destDbNameWithPath)
|
||||
default:
|
||||
log.Fatal("Oops, given migration is not supported for now.")
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -104,8 +120,8 @@ func printBucketStats(dbNameWithPath string) {
|
||||
groupSize := uint64(128)
|
||||
doneC := make(chan bool)
|
||||
statsC := make(chan *bucketStat, groupSize)
|
||||
go readBucketStats(ctx, dbNameWithPath, statsC)
|
||||
go printBucketStates(statsC, doneC)
|
||||
go readBucketStat(ctx, dbNameWithPath, statsC)
|
||||
go printBucketStat(statsC, doneC)
|
||||
<-doneC
|
||||
}
|
||||
|
||||
@@ -147,7 +163,7 @@ func printBucketContents(dbNameWithPath string, rowLimit uint64, bucketName stri
|
||||
<-doneC
|
||||
}
|
||||
|
||||
func readBucketStats(ctx context.Context, dbNameWithPath string, statsC chan<- *bucketStat) {
|
||||
func readBucketStat(ctx context.Context, dbNameWithPath string, statsC chan<- *bucketStat) {
|
||||
// open the raw database file. If the file is busy, then exit.
|
||||
db, openErr := bolt.Open(dbNameWithPath, 0600, &bolt.Options{Timeout: 1 * time.Second})
|
||||
if openErr != nil {
|
||||
@@ -236,6 +252,7 @@ func readBucketStats(ctx context.Context, dbNameWithPath string, statsC chan<- *
|
||||
}
|
||||
|
||||
func readStates(ctx context.Context, db *kv.Store, stateC chan<- *modifiedState, keys [][]byte, sizes []uint64) {
|
||||
stateMap := make(map[uint64]*modifiedState)
|
||||
for rowCount, key := range keys {
|
||||
st, stateErr := db.State(ctx, bytesutil.ToBytes32(key))
|
||||
if stateErr != nil {
|
||||
@@ -248,7 +265,13 @@ func readStates(ctx context.Context, db *kv.Store, stateC chan<- *modifiedState,
|
||||
valueSize: sizes[rowCount],
|
||||
rowCount: uint64(rowCount),
|
||||
}
|
||||
stateC <- mst
|
||||
stateMap[uint64(st.Slot())] = mst
|
||||
}
|
||||
|
||||
for i := uint64(0); i < maxSlotsToDisplay; i++ {
|
||||
if _, ok := stateMap[i]; ok {
|
||||
stateC <- stateMap[i]
|
||||
}
|
||||
}
|
||||
close(stateC)
|
||||
}
|
||||
@@ -272,7 +295,7 @@ func readStateSummary(ctx context.Context, db *kv.Store, stateSummaryC chan<- *m
|
||||
close(stateSummaryC)
|
||||
}
|
||||
|
||||
func printBucketStates(statsC <-chan *bucketStat, doneC chan<- bool) {
|
||||
func printBucketStat(statsC <-chan *bucketStat, doneC chan<- bool) {
|
||||
for stat := range statsC {
|
||||
if stat.noOfRows != 0 {
|
||||
averageValueSize := stat.totalValueSize / stat.noOfRows
|
||||
@@ -298,7 +321,7 @@ func printBucketStates(statsC <-chan *bucketStat, doneC chan<- bool) {
|
||||
func printStates(stateC <-chan *modifiedState, doneC chan<- bool) {
|
||||
for mst := range stateC {
|
||||
st := mst.state
|
||||
log.Infof("---- row = %04d ----", mst.rowCount)
|
||||
log.Infof("---- row = %04d, slot = %8d, epoch = %8d, key = %s ----", mst.rowCount, st.Slot(), st.Slot()/params.BeaconConfig().SlotsPerEpoch, hexutils.BytesToHex(mst.key))
|
||||
log.Infof("key : %s", hexutils.BytesToHex(mst.key))
|
||||
log.Infof("value : compressed size = %s", humanize.Bytes(mst.valueSize))
|
||||
t := time.Unix(int64(st.GenesisTime()), 0)
|
||||
@@ -345,6 +368,83 @@ func printStateSummary(stateSummaryC <-chan *modifiedStateSummary, doneC chan<-
|
||||
doneC <- true
|
||||
}
|
||||
|
||||
func checkValidatorMigration(dbNameWithPath, destDbNameWithPath string) {
|
||||
// get the keys within the supplied limit for the given bucket.
|
||||
sourceStateKeys, _ := keysOfBucket(dbNameWithPath, []byte("state"), MaxUint64)
|
||||
destStateKeys, _ := keysOfBucket(destDbNameWithPath, []byte("state"), MaxUint64)
|
||||
|
||||
if len(destStateKeys) < len(sourceStateKeys) {
|
||||
log.Fatalf("destination keys are lesser then source keys (%d/%d)", len(sourceStateKeys), len(destStateKeys))
|
||||
}
|
||||
|
||||
// create the source and destination KV stores.
|
||||
sourceDbDirectory := filepath.Dir(dbNameWithPath)
|
||||
sourceDB, openErr := kv.NewKVStore(context.Background(), sourceDbDirectory, &kv.Config{})
|
||||
if openErr != nil {
|
||||
log.Fatalf("could not open sourceDB: %v", openErr)
|
||||
}
|
||||
|
||||
destinationDbDirectory := filepath.Dir(destDbNameWithPath)
|
||||
destDB, openErr := kv.NewKVStore(context.Background(), destinationDbDirectory, &kv.Config{})
|
||||
if openErr != nil {
|
||||
// dirty hack alert: Ignore this prometheus error as we are opening two DB with same metric name
|
||||
// if you want to avoid this then we should pass the metric name when opening the DB which touches
|
||||
// too many places.
|
||||
if openErr.Error() != "duplicate metrics collector registration attempted" {
|
||||
log.Fatalf("could not open sourceDB, %v", openErr)
|
||||
}
|
||||
}
|
||||
|
||||
// don't forget to close it when ejecting out of this function.
|
||||
defer func() {
|
||||
closeErr := sourceDB.Close()
|
||||
if closeErr != nil {
|
||||
log.Fatalf("could not close sourceDB: %v", closeErr)
|
||||
}
|
||||
}()
|
||||
defer func() {
|
||||
closeErr := destDB.Close()
|
||||
if closeErr != nil {
|
||||
log.Fatalf("could not close sourceDB: %v", closeErr)
|
||||
}
|
||||
}()
|
||||
|
||||
ctx := context.Background()
|
||||
failCount := 0
|
||||
for rowCount, key := range sourceStateKeys[910:] {
|
||||
sourceState, stateErr := sourceDB.State(ctx, bytesutil.ToBytes32(key))
|
||||
if stateErr != nil {
|
||||
log.Fatalf("could not get from source db, the state for key : %s, %v", hexutils.BytesToHex(key), stateErr)
|
||||
}
|
||||
destinationState, stateErr := destDB.State(ctx, bytesutil.ToBytes32(key))
|
||||
if stateErr != nil {
|
||||
log.Fatalf("could not get destination db, the state for key : %s, %v", hexutils.BytesToHex(key), stateErr)
|
||||
}
|
||||
if destinationState == nil {
|
||||
log.Infof("could not find state in migrated DB: index = %d, slot = %d, epoch = %d, numOfValidators = %d, key = %s",
|
||||
rowCount, sourceState.Slot(), sourceState.Slot()/params.BeaconConfig().SlotsPerEpoch, sourceState.NumValidators(), hexutils.BytesToHex(key))
|
||||
failCount++
|
||||
continue
|
||||
}
|
||||
|
||||
if len(sourceState.Validators()) != len(destinationState.Validators()) {
|
||||
log.Fatalf("validator mismatch : source = %d, dest = %d", len(sourceState.Validators()), len(destinationState.Validators()))
|
||||
}
|
||||
sourceStateHash, err := sourceState.HashTreeRoot(ctx)
|
||||
if err != nil {
|
||||
log.Fatalf("could not find hash of source state: %v", err)
|
||||
}
|
||||
destinationSatteHash, err := destinationState.HashTreeRoot(ctx)
|
||||
if err != nil {
|
||||
log.Fatalf("could not find hash of destination state: %v", err)
|
||||
}
|
||||
if !bytes.Equal(sourceStateHash[:], destinationSatteHash[:]) {
|
||||
log.Fatalf("state mismatch : key = %s", hexutils.BytesToHex(key))
|
||||
}
|
||||
}
|
||||
log.Infof("number of state that did not match: %d", failCount)
|
||||
}
|
||||
|
||||
func keysOfBucket(dbNameWithPath string, bucketName []byte, rowLimit uint64) ([][]byte, []uint64) {
|
||||
// open the raw database file. If the file is busy, then exit.
|
||||
db, openErr := bolt.Open(dbNameWithPath, 0600, &bolt.Options{Timeout: 1 * time.Second})
|
||||
@@ -371,7 +471,11 @@ func keysOfBucket(dbNameWithPath string, bucketName []byte, rowLimit uint64) ([]
|
||||
if count >= rowLimit {
|
||||
return nil
|
||||
}
|
||||
keys = append(keys, k)
|
||||
actualKey := make([]byte, len(k))
|
||||
actualSizes := make([]byte, len(v))
|
||||
copy(actualKey, k)
|
||||
copy(actualSizes, v)
|
||||
keys = append(keys, actualKey)
|
||||
sizes = append(sizes, uint64(len(v)))
|
||||
count++
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user