diff --git a/beacon-chain/state/interface/BUILD.bazel b/beacon-chain/state/interface/BUILD.bazel index 3ce31a362f..bb7a21e52c 100644 --- a/beacon-chain/state/interface/BUILD.bazel +++ b/beacon-chain/state/interface/BUILD.bazel @@ -18,6 +18,7 @@ go_library( "//slasher/rpc:__subpackages__", "//spectest:__subpackages__", "//tools/benchmark-files-gen:__pkg__", + "//tools/exploredb:__pkg__", "//tools/pcli:__pkg__", ], deps = [ diff --git a/tools/exploredb/BUILD.bazel b/tools/exploredb/BUILD.bazel index cd057c0b94..90efdfc7ee 100644 --- a/tools/exploredb/BUILD.bazel +++ b/tools/exploredb/BUILD.bazel @@ -8,10 +8,12 @@ go_library( visibility = ["//visibility:private"], deps = [ "//beacon-chain/db/kv:go_default_library", + "//beacon-chain/state/interface:go_default_library", "//proto/beacon/p2p/v1:go_default_library", "//proto/eth/v1alpha1:go_default_library", "//shared/bytesutil:go_default_library", "@com_github_dustin_go_humanize//:go_default_library", + "@com_github_prysmaticlabs_eth2_types//:go_default_library", "@com_github_sirupsen_logrus//:go_default_library", "@com_github_status_im_keycard_go//hexutils:go_default_library", "@io_etcd_go_bbolt//:go_default_library", diff --git a/tools/exploredb/main.go b/tools/exploredb/main.go index 84ab91eb52..6528dbf2d0 100644 --- a/tools/exploredb/main.go +++ b/tools/exploredb/main.go @@ -11,13 +11,15 @@ package main import ( "context" "flag" - "fmt" "os" "path/filepath" + "sync" "time" "github.com/dustin/go-humanize" + types "github.com/prysmaticlabs/eth2-types" "github.com/prysmaticlabs/prysm/beacon-chain/db/kv" + iface "github.com/prysmaticlabs/prysm/beacon-chain/state/interface" pbp2p "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" ethpb "github.com/prysmaticlabs/prysm/proto/eth/v1alpha1" "github.com/prysmaticlabs/prysm/shared/bytesutil" @@ -35,6 +37,35 @@ var ( rowLimit = flag.Uint64("limit", 10, "limit to rows.") ) +// used to parallelize all the bucket stats +type bucketStat struct { + bucketName string + noOfRows uint64 + totalKeySize uint64 + totalValueSize uint64 + minKeySize uint64 + maxKeySize uint64 + minValueSize uint64 + maxValueSize uint64 +} + +// used to parallelize state bucket processing +type modifiedState struct { + state iface.BeaconState + key []byte + valueSize uint64 + rowCount uint64 +} + +// used to parallelize state summary bucket processing +type modifiedStateSummary struct { + slot types.Slot + root []byte + key []byte + valueSize uint64 + rowCount uint64 +} + func main() { flag.Parse() @@ -54,7 +85,7 @@ func main() { // show stats of all the buckets. if *bucketStats { - showBucketStats(dbNameWithPath) + printBucketStats(dbNameWithPath) return } @@ -69,7 +100,55 @@ func main() { } } -func showBucketStats(dbNameWithPath string) { +func printBucketStats(dbNameWithPath string) { + ctx := context.Background() + groupSize := uint64(128) + doneC := make(chan bool) + statsC := make(chan *bucketStat, groupSize) + go readBucketStats(ctx, dbNameWithPath, statsC) + go printBucketStates(statsC, doneC) + <-doneC +} + +func printBucketContents(dbNameWithPath string, rowLimit uint64, bucketName string) { + // get the keys within the supplied limit for the given bucket. + bucketNameInBytes := []byte(bucketName) + keys, sizes := keysOfBucket(dbNameWithPath, bucketNameInBytes, rowLimit) + + // create a new KV Store. + dbDirectory := filepath.Dir(dbNameWithPath) + db, openErr := kv.NewKVStore(context.Background(), dbDirectory, &kv.Config{}) + if openErr != nil { + log.Fatalf("could not open db, %v", openErr) + } + + // don't forget to close it when ejecting out of this function. + defer func() { + closeErr := db.Close() + if closeErr != nil { + log.Fatalf("could not close db, %v", closeErr) + } + }() + + // retrieve every element for keys in the list and call the respective display function. + ctx := context.Background() + groupSize := uint64(128) + doneC := make(chan bool) + switch bucketName { + case "state": + stateC := make(chan *modifiedState, groupSize) + go readStates(ctx, db, stateC, keys, sizes) + go printStates(stateC, doneC) + + case "state-summary": + stateSummaryC := make(chan *modifiedStateSummary, groupSize) + go readStateSummary(ctx, db, stateSummaryC, keys, sizes) + go printStateSummary(stateSummaryC, doneC) + } + <-doneC +} + +func readBucketStats(ctx context.Context, dbNameWithPath string, statsC chan<- *bucketStat) { // open the raw database file. If the file is busy, then exit. db, openErr := bolt.Open(dbNameWithPath, 0600, &bolt.Options{Timeout: 1 * time.Second}) if openErr != nil { @@ -95,143 +174,176 @@ func showBucketStats(dbNameWithPath string) { log.Fatalf("could not read buckets from db while getting list of buckets: %v", viewErr1) } - // for every bucket, calculate the stats and display them. - // TODO: parallelize the execution + // for every bucket, calculate the stats and send it for printing. + // calculate the state of all the buckets in parallel. + var wg sync.WaitGroup for _, bName := range buckets { - count := uint64(0) - minValueSize := ^uint64(0) - maxValueSize := uint64(0) - totalValueSize := uint64(0) - minKeySize := ^uint64(0) - maxKeySize := uint64(0) - totalKeySize := uint64(0) - if viewErr2 := db.View(func(tx *bolt.Tx) error { - b := tx.Bucket([]byte(bName)) - if forEachErr := b.ForEach(func(k, v []byte) error { - count++ - valueSize := uint64(len(v)) - if valueSize < minValueSize { - minValueSize = valueSize - } - if valueSize > maxValueSize { - maxValueSize = valueSize - } - totalValueSize += valueSize + wg.Add(1) + go func(bukName string) { + defer wg.Done() + count := uint64(0) + minValueSize := ^uint64(0) + maxValueSize := uint64(0) + totalValueSize := uint64(0) + minKeySize := ^uint64(0) + maxKeySize := uint64(0) + totalKeySize := uint64(0) + if viewErr2 := db.View(func(tx *bolt.Tx) error { + b := tx.Bucket([]byte(bukName)) + if forEachErr := b.ForEach(func(k, v []byte) error { + count++ + valueSize := uint64(len(v)) + if valueSize < minValueSize { + minValueSize = valueSize + } + if valueSize > maxValueSize { + maxValueSize = valueSize + } + totalValueSize += valueSize - keyize := uint64(len(k)) - if keyize < minKeySize { - minKeySize = keyize + keyize := uint64(len(k)) + if keyize < minKeySize { + minKeySize = keyize + } + if keyize > maxKeySize { + maxKeySize = keyize + } + totalKeySize += uint64(len(k)) + return nil + }); forEachErr != nil { + log.WithError(forEachErr).Errorf("could not process row %d for bucket: %s", count, bukName) + return forEachErr } - if keyize > maxKeySize { - maxKeySize = keyize - } - totalKeySize += uint64(len(k)) return nil - }); forEachErr != nil { - log.Errorf("could not process row %d for bucket: %s, %v", count, bName, forEachErr) - return forEachErr + }); viewErr2 != nil { + log.WithError(viewErr2).Errorf("could not get stats for bucket: %s", bukName) + return } - return nil - }); viewErr2 != nil { - log.Errorf("could not get stats for bucket: %s, %v", bName, viewErr2) + stat := &bucketStat{ + bucketName: bukName, + noOfRows: count, + totalKeySize: totalKeySize, + totalValueSize: totalValueSize, + minKeySize: minKeySize, + maxKeySize: maxKeySize, + minValueSize: minValueSize, + maxValueSize: maxValueSize, + } + statsC <- stat + }(bName) + } + wg.Wait() + close(statsC) +} + +func readStates(ctx context.Context, db *kv.Store, stateC chan<- *modifiedState, keys [][]byte, sizes []uint64) { + for rowCount, key := range keys { + st, stateErr := db.State(ctx, bytesutil.ToBytes32(key)) + if stateErr != nil { + log.WithError(stateErr).Errorf("could not get state for key : %s", hexutils.BytesToHex(key)) continue } - - if count != 0 { - averageValueSize := totalValueSize / count - averageKeySize := totalKeySize / count - fmt.Println("------ ", bName, " --------") - fmt.Println("NumberOfRows = ", count) - fmt.Println("TotalBucketSize = ", humanize.Bytes(totalValueSize+totalKeySize)) - fmt.Println("KeySize = ", humanize.Bytes(totalKeySize), "(min = "+humanize.Bytes(minKeySize)+", avg = "+humanize.Bytes(averageKeySize)+", max = "+humanize.Bytes(maxKeySize)+")") - fmt.Println("ValueSize = ", humanize.Bytes(totalValueSize), "(min = "+humanize.Bytes(minValueSize)+", avg = "+humanize.Bytes(averageValueSize)+", max = "+humanize.Bytes(maxValueSize)+")") + mst := &modifiedState{ + state: st, + key: key, + valueSize: sizes[rowCount], + rowCount: uint64(rowCount), } + stateC <- mst } + close(stateC) } -func printBucketContents(dbNameWithPath string, rowLimit uint64, bucketName string) { - // get the keys within the supplied limit for the given bucket. - bucketNameInBytes := []byte(bucketName) - keys, sizes := keysOfBucket(dbNameWithPath, bucketNameInBytes, rowLimit) - - // create a new KV Store. - dbDirectory := filepath.Dir(dbNameWithPath) - db, openErr := kv.NewKVStore(context.Background(), dbDirectory, &kv.Config{}) - if openErr != nil { - log.Fatalf("could not open db, %v", openErr) - } - - // dont forget to close it when ejecting out of this function. - defer func() { - closeErr := db.Close() - if closeErr != nil { - log.Fatalf("could not close db, %v", closeErr) +func readStateSummary(ctx context.Context, db *kv.Store, stateSummaryC chan<- *modifiedStateSummary, keys [][]byte, sizes []uint64) { + for rowCount, key := range keys { + ss, ssErr := db.StateSummary(ctx, bytesutil.ToBytes32(key)) + if ssErr != nil { + log.WithError(ssErr).Errorf("could not get state summary for key : %s", hexutils.BytesToHex(key)) + continue } - }() - - // retrieve every element for keys in the list and call the respective display function. - ctx := context.Background() - rowCount := uint64(0) - for index, key := range keys { - switch bucketName { - case "state": - printState(ctx, db, key, rowCount, sizes[index]) - case "state-summary": - printStateSummary(ctx, db, key, rowCount) + mst := &modifiedStateSummary{ + slot: ss.Slot, + root: ss.Root, + key: key, + valueSize: sizes[rowCount], + rowCount: uint64(rowCount), } - rowCount++ + stateSummaryC <- mst } + close(stateSummaryC) } -func printState(ctx context.Context, db *kv.Store, key []byte, rowCount, valueSize uint64) { - st, stateErr := db.State(ctx, bytesutil.ToBytes32(key)) - if stateErr != nil { - log.Errorf("could not get state for key : , %v", stateErr) +func printBucketStates(statsC <-chan *bucketStat, doneC chan<- bool) { + for stat := range statsC { + if stat.noOfRows != 0 { + averageValueSize := stat.totalValueSize / stat.noOfRows + averageKeySize := stat.totalKeySize / stat.noOfRows + log.Infof("------ %s ---------", stat.bucketName) + log.Infof("NumberOfRows = %d", stat.noOfRows) + log.Infof("TotalBucketSize = %s", humanize.Bytes(stat.totalValueSize+stat.totalKeySize)) + log.Infof("KeySize = %s, (min = %s, avg = %s, max = %s)", + humanize.Bytes(stat.totalKeySize), + humanize.Bytes(stat.minKeySize), + humanize.Bytes(averageKeySize), + humanize.Bytes(stat.maxKeySize)) + log.Infof("ValueSize = %s, (min = %s, avg = %s, max = %s)", + humanize.Bytes(stat.totalValueSize), + humanize.Bytes(stat.minValueSize), + humanize.Bytes(averageValueSize), + humanize.Bytes(stat.maxValueSize)) + } } - rowStr := fmt.Sprintf("---- row = %04d ----", rowCount) - fmt.Println(rowStr) - fmt.Println("key :", key) - fmt.Println("value : compressed size = ", humanize.Bytes(valueSize)) - fmt.Println("genesis_time :", st.GenesisTime()) - fmt.Println("genesis_validators_root :", hexutils.BytesToHex(st.GenesisValidatorRoot())) - fmt.Println("slot :", st.Slot()) - fmt.Println("fork : previous_version: ", st.Fork().PreviousVersion, ", current_version: ", st.Fork().CurrentVersion) - fmt.Println("latest_block_header : sizeSSZ = ", humanize.Bytes(uint64(st.LatestBlockHeader().SizeSSZ()))) - size, count := sizeAndCountOfByteList(st.BlockRoots()) - fmt.Println("block_roots : size = ", humanize.Bytes(size), ", count = ", count) - size, count = sizeAndCountOfByteList(st.StateRoots()) - fmt.Println("state_roots : size = ", humanize.Bytes(size), ", count = ", count) - size, count = sizeAndCountOfByteList(st.HistoricalRoots()) - fmt.Println("historical_roots : size = ", humanize.Bytes(size), ", count = ", count) - fmt.Println("eth1_data : sizeSSZ = ", humanize.Bytes(uint64(st.Eth1Data().SizeSSZ()))) - size, count = sizeAndCountGeneric(st.Eth1DataVotes(), nil) - fmt.Println("eth1_data_votes : sizeSSZ = ", humanize.Bytes(size), ", count = ", count) - fmt.Println("eth1_deposit_index :", st.Eth1DepositIndex()) - size, count = sizeAndCountGeneric(st.Validators(), nil) - fmt.Println("validators : sizeSSZ = ", humanize.Bytes(size), ", count = ", count) - size, count = sizeAndCountOfUin64List(st.Balances()) - fmt.Println("balances : size = ", humanize.Bytes(size), ", count = ", count) - size, count = sizeAndCountOfByteList(st.RandaoMixes()) - fmt.Println("randao_mixes : size = ", humanize.Bytes(size), ", count = ", count) - size, count = sizeAndCountOfUin64List(st.Slashings()) - fmt.Println("slashings : size = ", humanize.Bytes(size), ", count = ", count) - size, count = sizeAndCountGeneric(st.PreviousEpochAttestations()) - fmt.Println("previous_epoch_attestations : sizeSSZ ", humanize.Bytes(size), ", count = ", count) - size, count = sizeAndCountGeneric(st.CurrentEpochAttestations()) - fmt.Println("current_epoch_attestations : sizeSSZ = ", humanize.Bytes(size), ", count = ", count) - fmt.Println("justification_bits : size = ", humanize.Bytes(st.JustificationBits().Len()), ", count = ", st.JustificationBits().Count()) - fmt.Println("previous_justified_checkpoint : sizeSSZ = ", humanize.Bytes(uint64(st.PreviousJustifiedCheckpoint().SizeSSZ()))) - fmt.Println("current_justified_checkpoint : sizeSSZ = ", humanize.Bytes(uint64(st.CurrentJustifiedCheckpoint().SizeSSZ()))) - fmt.Println("finalized_checkpoint : sizeSSZ = ", humanize.Bytes(uint64(st.FinalizedCheckpoint().SizeSSZ()))) + doneC <- true } -func printStateSummary(ctx context.Context, db *kv.Store, key []byte, rowCount uint64) { - ss, ssErr := db.StateSummary(ctx, bytesutil.ToBytes32(key)) - if ssErr != nil { - log.Errorf("could not get state summary for key : , %v", ssErr) +func printStates(stateC <-chan *modifiedState, doneC chan<- bool) { + for mst := range stateC { + st := mst.state + log.Infof("---- row = %04d ----", mst.rowCount) + log.Infof("key : %s", hexutils.BytesToHex(mst.key)) + log.Infof("value : compressed size = %s", humanize.Bytes(mst.valueSize)) + t := time.Unix(int64(st.GenesisTime()), 0) + log.Infof("genesis_time : %s", t.Format(time.UnixDate)) + log.Infof("genesis_validators_root : %s", hexutils.BytesToHex(st.GenesisValidatorRoot())) + log.Infof("slot : %d", st.Slot()) + log.Infof("fork : previous_version = %b, current_version = %b", st.Fork().PreviousVersion, st.Fork().CurrentVersion) + log.Infof("latest_block_header : sizeSSZ = %s", humanize.Bytes(uint64(st.LatestBlockHeader().SizeSSZ()))) + size, count := sizeAndCountOfByteList(st.BlockRoots()) + log.Infof("block_roots : size = %s, count = %d", humanize.Bytes(size), count) + size, count = sizeAndCountOfByteList(st.StateRoots()) + log.Infof("state_roots : size = %s, count = %d", humanize.Bytes(size), count) + size, count = sizeAndCountOfByteList(st.HistoricalRoots()) + log.Infof("historical_roots : size = %s, count = %d", humanize.Bytes(size), count) + log.Infof("eth1_data : sizeSSZ = %s", humanize.Bytes(uint64(st.Eth1Data().SizeSSZ()))) + size, count = sizeAndCountGeneric(st.Eth1DataVotes(), nil) + log.Infof("eth1_data_votes : sizeSSZ = %s, count = %d", humanize.Bytes(size), count) + log.Infof("eth1_deposit_index : %d", st.Eth1DepositIndex()) + size, count = sizeAndCountGeneric(st.Validators(), nil) + log.Infof("validators : sizeSSZ = %s, count = %d", humanize.Bytes(size), count) + size, count = sizeAndCountOfUin64List(st.Balances()) + log.Infof("balances : size = %s, count = %d", humanize.Bytes(size), count) + size, count = sizeAndCountOfByteList(st.RandaoMixes()) + log.Infof("randao_mixes : size = %s, count = %d", humanize.Bytes(size), count) + size, count = sizeAndCountOfUin64List(st.Slashings()) + log.Infof("slashings : size = %s, count = %d", humanize.Bytes(size), count) + size, count = sizeAndCountGeneric(st.PreviousEpochAttestations()) + log.Infof("previous_epoch_attestations : sizeSSZ = %s, count = %d", humanize.Bytes(size), count) + size, count = sizeAndCountGeneric(st.CurrentEpochAttestations()) + log.Infof("current_epoch_attestations : sizeSSZ = %s, count = %d", humanize.Bytes(size), count) + log.Infof("justification_bits : size = %s, count = %d", humanize.Bytes(st.JustificationBits().Len()), st.JustificationBits().Count()) + log.Infof("previous_justified_checkpoint : sizeSSZ = %s", humanize.Bytes(uint64(st.PreviousJustifiedCheckpoint().SizeSSZ()))) + log.Infof("current_justified_checkpoint : sizeSSZ = %s", humanize.Bytes(uint64(st.CurrentJustifiedCheckpoint().SizeSSZ()))) + log.Infof("finalized_checkpoint : sizeSSZ = %s", humanize.Bytes(uint64(st.FinalizedCheckpoint().SizeSSZ()))) + } - rowCountStr := fmt.Sprintf("row : %04d, ", rowCount) - fmt.Println(rowCountStr, "slot : ", ss.Slot, ", root : ", hexutils.BytesToHex(ss.Root)) + doneC <- true +} + +func printStateSummary(stateSummaryC <-chan *modifiedStateSummary, doneC chan<- bool) { + for msts := range stateSummaryC { + log.Infof("row : %04d, slot : %d, root = %s", msts.rowCount, msts.slot, hexutils.BytesToHex(msts.root)) + } + doneC <- true } func keysOfBucket(dbNameWithPath string, bucketName []byte, rowLimit uint64) ([][]byte, []uint64) {