Make exportdb tool faster (#9184)

* added prodecer consumer pattern for reading and printing

* make state interface visible in exploredb

* bazel magic for visibility

* fix manual visibilithy marking

* linter fix

* parallelized bucket stats

* add log.WithErr and log.Infof

Co-authored-by: Raul Jordan <raul@prysmaticlabs.com>
This commit is contained in:
Mohamed Zahoor
2021-07-17 03:54:16 +05:30
committed by GitHub
parent 223d5309a0
commit 6f126c92c0
3 changed files with 235 additions and 120 deletions

View File

@@ -8,10 +8,12 @@ go_library(
visibility = ["//visibility:private"],
deps = [
"//beacon-chain/db/kv:go_default_library",
"//beacon-chain/state/interface:go_default_library",
"//proto/beacon/p2p/v1:go_default_library",
"//proto/eth/v1alpha1:go_default_library",
"//shared/bytesutil:go_default_library",
"@com_github_dustin_go_humanize//:go_default_library",
"@com_github_prysmaticlabs_eth2_types//:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@com_github_status_im_keycard_go//hexutils:go_default_library",
"@io_etcd_go_bbolt//:go_default_library",

View File

@@ -11,13 +11,15 @@ package main
import (
"context"
"flag"
"fmt"
"os"
"path/filepath"
"sync"
"time"
"github.com/dustin/go-humanize"
types "github.com/prysmaticlabs/eth2-types"
"github.com/prysmaticlabs/prysm/beacon-chain/db/kv"
iface "github.com/prysmaticlabs/prysm/beacon-chain/state/interface"
pbp2p "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
ethpb "github.com/prysmaticlabs/prysm/proto/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
@@ -35,6 +37,35 @@ var (
rowLimit = flag.Uint64("limit", 10, "limit to rows.")
)
// used to parallelize all the bucket stats
type bucketStat struct {
bucketName string
noOfRows uint64
totalKeySize uint64
totalValueSize uint64
minKeySize uint64
maxKeySize uint64
minValueSize uint64
maxValueSize uint64
}
// used to parallelize state bucket processing
type modifiedState struct {
state iface.BeaconState
key []byte
valueSize uint64
rowCount uint64
}
// used to parallelize state summary bucket processing
type modifiedStateSummary struct {
slot types.Slot
root []byte
key []byte
valueSize uint64
rowCount uint64
}
func main() {
flag.Parse()
@@ -54,7 +85,7 @@ func main() {
// show stats of all the buckets.
if *bucketStats {
showBucketStats(dbNameWithPath)
printBucketStats(dbNameWithPath)
return
}
@@ -69,7 +100,55 @@ func main() {
}
}
func showBucketStats(dbNameWithPath string) {
func printBucketStats(dbNameWithPath string) {
ctx := context.Background()
groupSize := uint64(128)
doneC := make(chan bool)
statsC := make(chan *bucketStat, groupSize)
go readBucketStats(ctx, dbNameWithPath, statsC)
go printBucketStates(statsC, doneC)
<-doneC
}
func printBucketContents(dbNameWithPath string, rowLimit uint64, bucketName string) {
// get the keys within the supplied limit for the given bucket.
bucketNameInBytes := []byte(bucketName)
keys, sizes := keysOfBucket(dbNameWithPath, bucketNameInBytes, rowLimit)
// create a new KV Store.
dbDirectory := filepath.Dir(dbNameWithPath)
db, openErr := kv.NewKVStore(context.Background(), dbDirectory, &kv.Config{})
if openErr != nil {
log.Fatalf("could not open db, %v", openErr)
}
// don't forget to close it when ejecting out of this function.
defer func() {
closeErr := db.Close()
if closeErr != nil {
log.Fatalf("could not close db, %v", closeErr)
}
}()
// retrieve every element for keys in the list and call the respective display function.
ctx := context.Background()
groupSize := uint64(128)
doneC := make(chan bool)
switch bucketName {
case "state":
stateC := make(chan *modifiedState, groupSize)
go readStates(ctx, db, stateC, keys, sizes)
go printStates(stateC, doneC)
case "state-summary":
stateSummaryC := make(chan *modifiedStateSummary, groupSize)
go readStateSummary(ctx, db, stateSummaryC, keys, sizes)
go printStateSummary(stateSummaryC, doneC)
}
<-doneC
}
func readBucketStats(ctx context.Context, dbNameWithPath string, statsC chan<- *bucketStat) {
// open the raw database file. If the file is busy, then exit.
db, openErr := bolt.Open(dbNameWithPath, 0600, &bolt.Options{Timeout: 1 * time.Second})
if openErr != nil {
@@ -95,143 +174,176 @@ func showBucketStats(dbNameWithPath string) {
log.Fatalf("could not read buckets from db while getting list of buckets: %v", viewErr1)
}
// for every bucket, calculate the stats and display them.
// TODO: parallelize the execution
// for every bucket, calculate the stats and send it for printing.
// calculate the state of all the buckets in parallel.
var wg sync.WaitGroup
for _, bName := range buckets {
count := uint64(0)
minValueSize := ^uint64(0)
maxValueSize := uint64(0)
totalValueSize := uint64(0)
minKeySize := ^uint64(0)
maxKeySize := uint64(0)
totalKeySize := uint64(0)
if viewErr2 := db.View(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte(bName))
if forEachErr := b.ForEach(func(k, v []byte) error {
count++
valueSize := uint64(len(v))
if valueSize < minValueSize {
minValueSize = valueSize
}
if valueSize > maxValueSize {
maxValueSize = valueSize
}
totalValueSize += valueSize
wg.Add(1)
go func(bukName string) {
defer wg.Done()
count := uint64(0)
minValueSize := ^uint64(0)
maxValueSize := uint64(0)
totalValueSize := uint64(0)
minKeySize := ^uint64(0)
maxKeySize := uint64(0)
totalKeySize := uint64(0)
if viewErr2 := db.View(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte(bukName))
if forEachErr := b.ForEach(func(k, v []byte) error {
count++
valueSize := uint64(len(v))
if valueSize < minValueSize {
minValueSize = valueSize
}
if valueSize > maxValueSize {
maxValueSize = valueSize
}
totalValueSize += valueSize
keyize := uint64(len(k))
if keyize < minKeySize {
minKeySize = keyize
keyize := uint64(len(k))
if keyize < minKeySize {
minKeySize = keyize
}
if keyize > maxKeySize {
maxKeySize = keyize
}
totalKeySize += uint64(len(k))
return nil
}); forEachErr != nil {
log.WithError(forEachErr).Errorf("could not process row %d for bucket: %s", count, bukName)
return forEachErr
}
if keyize > maxKeySize {
maxKeySize = keyize
}
totalKeySize += uint64(len(k))
return nil
}); forEachErr != nil {
log.Errorf("could not process row %d for bucket: %s, %v", count, bName, forEachErr)
return forEachErr
}); viewErr2 != nil {
log.WithError(viewErr2).Errorf("could not get stats for bucket: %s", bukName)
return
}
return nil
}); viewErr2 != nil {
log.Errorf("could not get stats for bucket: %s, %v", bName, viewErr2)
stat := &bucketStat{
bucketName: bukName,
noOfRows: count,
totalKeySize: totalKeySize,
totalValueSize: totalValueSize,
minKeySize: minKeySize,
maxKeySize: maxKeySize,
minValueSize: minValueSize,
maxValueSize: maxValueSize,
}
statsC <- stat
}(bName)
}
wg.Wait()
close(statsC)
}
func readStates(ctx context.Context, db *kv.Store, stateC chan<- *modifiedState, keys [][]byte, sizes []uint64) {
for rowCount, key := range keys {
st, stateErr := db.State(ctx, bytesutil.ToBytes32(key))
if stateErr != nil {
log.WithError(stateErr).Errorf("could not get state for key : %s", hexutils.BytesToHex(key))
continue
}
if count != 0 {
averageValueSize := totalValueSize / count
averageKeySize := totalKeySize / count
fmt.Println("------ ", bName, " --------")
fmt.Println("NumberOfRows = ", count)
fmt.Println("TotalBucketSize = ", humanize.Bytes(totalValueSize+totalKeySize))
fmt.Println("KeySize = ", humanize.Bytes(totalKeySize), "(min = "+humanize.Bytes(minKeySize)+", avg = "+humanize.Bytes(averageKeySize)+", max = "+humanize.Bytes(maxKeySize)+")")
fmt.Println("ValueSize = ", humanize.Bytes(totalValueSize), "(min = "+humanize.Bytes(minValueSize)+", avg = "+humanize.Bytes(averageValueSize)+", max = "+humanize.Bytes(maxValueSize)+")")
mst := &modifiedState{
state: st,
key: key,
valueSize: sizes[rowCount],
rowCount: uint64(rowCount),
}
stateC <- mst
}
close(stateC)
}
func printBucketContents(dbNameWithPath string, rowLimit uint64, bucketName string) {
// get the keys within the supplied limit for the given bucket.
bucketNameInBytes := []byte(bucketName)
keys, sizes := keysOfBucket(dbNameWithPath, bucketNameInBytes, rowLimit)
// create a new KV Store.
dbDirectory := filepath.Dir(dbNameWithPath)
db, openErr := kv.NewKVStore(context.Background(), dbDirectory, &kv.Config{})
if openErr != nil {
log.Fatalf("could not open db, %v", openErr)
}
// dont forget to close it when ejecting out of this function.
defer func() {
closeErr := db.Close()
if closeErr != nil {
log.Fatalf("could not close db, %v", closeErr)
func readStateSummary(ctx context.Context, db *kv.Store, stateSummaryC chan<- *modifiedStateSummary, keys [][]byte, sizes []uint64) {
for rowCount, key := range keys {
ss, ssErr := db.StateSummary(ctx, bytesutil.ToBytes32(key))
if ssErr != nil {
log.WithError(ssErr).Errorf("could not get state summary for key : %s", hexutils.BytesToHex(key))
continue
}
}()
// retrieve every element for keys in the list and call the respective display function.
ctx := context.Background()
rowCount := uint64(0)
for index, key := range keys {
switch bucketName {
case "state":
printState(ctx, db, key, rowCount, sizes[index])
case "state-summary":
printStateSummary(ctx, db, key, rowCount)
mst := &modifiedStateSummary{
slot: ss.Slot,
root: ss.Root,
key: key,
valueSize: sizes[rowCount],
rowCount: uint64(rowCount),
}
rowCount++
stateSummaryC <- mst
}
close(stateSummaryC)
}
func printState(ctx context.Context, db *kv.Store, key []byte, rowCount, valueSize uint64) {
st, stateErr := db.State(ctx, bytesutil.ToBytes32(key))
if stateErr != nil {
log.Errorf("could not get state for key : , %v", stateErr)
func printBucketStates(statsC <-chan *bucketStat, doneC chan<- bool) {
for stat := range statsC {
if stat.noOfRows != 0 {
averageValueSize := stat.totalValueSize / stat.noOfRows
averageKeySize := stat.totalKeySize / stat.noOfRows
log.Infof("------ %s ---------", stat.bucketName)
log.Infof("NumberOfRows = %d", stat.noOfRows)
log.Infof("TotalBucketSize = %s", humanize.Bytes(stat.totalValueSize+stat.totalKeySize))
log.Infof("KeySize = %s, (min = %s, avg = %s, max = %s)",
humanize.Bytes(stat.totalKeySize),
humanize.Bytes(stat.minKeySize),
humanize.Bytes(averageKeySize),
humanize.Bytes(stat.maxKeySize))
log.Infof("ValueSize = %s, (min = %s, avg = %s, max = %s)",
humanize.Bytes(stat.totalValueSize),
humanize.Bytes(stat.minValueSize),
humanize.Bytes(averageValueSize),
humanize.Bytes(stat.maxValueSize))
}
}
rowStr := fmt.Sprintf("---- row = %04d ----", rowCount)
fmt.Println(rowStr)
fmt.Println("key :", key)
fmt.Println("value : compressed size = ", humanize.Bytes(valueSize))
fmt.Println("genesis_time :", st.GenesisTime())
fmt.Println("genesis_validators_root :", hexutils.BytesToHex(st.GenesisValidatorRoot()))
fmt.Println("slot :", st.Slot())
fmt.Println("fork : previous_version: ", st.Fork().PreviousVersion, ", current_version: ", st.Fork().CurrentVersion)
fmt.Println("latest_block_header : sizeSSZ = ", humanize.Bytes(uint64(st.LatestBlockHeader().SizeSSZ())))
size, count := sizeAndCountOfByteList(st.BlockRoots())
fmt.Println("block_roots : size = ", humanize.Bytes(size), ", count = ", count)
size, count = sizeAndCountOfByteList(st.StateRoots())
fmt.Println("state_roots : size = ", humanize.Bytes(size), ", count = ", count)
size, count = sizeAndCountOfByteList(st.HistoricalRoots())
fmt.Println("historical_roots : size = ", humanize.Bytes(size), ", count = ", count)
fmt.Println("eth1_data : sizeSSZ = ", humanize.Bytes(uint64(st.Eth1Data().SizeSSZ())))
size, count = sizeAndCountGeneric(st.Eth1DataVotes(), nil)
fmt.Println("eth1_data_votes : sizeSSZ = ", humanize.Bytes(size), ", count = ", count)
fmt.Println("eth1_deposit_index :", st.Eth1DepositIndex())
size, count = sizeAndCountGeneric(st.Validators(), nil)
fmt.Println("validators : sizeSSZ = ", humanize.Bytes(size), ", count = ", count)
size, count = sizeAndCountOfUin64List(st.Balances())
fmt.Println("balances : size = ", humanize.Bytes(size), ", count = ", count)
size, count = sizeAndCountOfByteList(st.RandaoMixes())
fmt.Println("randao_mixes : size = ", humanize.Bytes(size), ", count = ", count)
size, count = sizeAndCountOfUin64List(st.Slashings())
fmt.Println("slashings : size = ", humanize.Bytes(size), ", count = ", count)
size, count = sizeAndCountGeneric(st.PreviousEpochAttestations())
fmt.Println("previous_epoch_attestations : sizeSSZ ", humanize.Bytes(size), ", count = ", count)
size, count = sizeAndCountGeneric(st.CurrentEpochAttestations())
fmt.Println("current_epoch_attestations : sizeSSZ = ", humanize.Bytes(size), ", count = ", count)
fmt.Println("justification_bits : size = ", humanize.Bytes(st.JustificationBits().Len()), ", count = ", st.JustificationBits().Count())
fmt.Println("previous_justified_checkpoint : sizeSSZ = ", humanize.Bytes(uint64(st.PreviousJustifiedCheckpoint().SizeSSZ())))
fmt.Println("current_justified_checkpoint : sizeSSZ = ", humanize.Bytes(uint64(st.CurrentJustifiedCheckpoint().SizeSSZ())))
fmt.Println("finalized_checkpoint : sizeSSZ = ", humanize.Bytes(uint64(st.FinalizedCheckpoint().SizeSSZ())))
doneC <- true
}
func printStateSummary(ctx context.Context, db *kv.Store, key []byte, rowCount uint64) {
ss, ssErr := db.StateSummary(ctx, bytesutil.ToBytes32(key))
if ssErr != nil {
log.Errorf("could not get state summary for key : , %v", ssErr)
func printStates(stateC <-chan *modifiedState, doneC chan<- bool) {
for mst := range stateC {
st := mst.state
log.Infof("---- row = %04d ----", mst.rowCount)
log.Infof("key : %s", hexutils.BytesToHex(mst.key))
log.Infof("value : compressed size = %s", humanize.Bytes(mst.valueSize))
t := time.Unix(int64(st.GenesisTime()), 0)
log.Infof("genesis_time : %s", t.Format(time.UnixDate))
log.Infof("genesis_validators_root : %s", hexutils.BytesToHex(st.GenesisValidatorRoot()))
log.Infof("slot : %d", st.Slot())
log.Infof("fork : previous_version = %b, current_version = %b", st.Fork().PreviousVersion, st.Fork().CurrentVersion)
log.Infof("latest_block_header : sizeSSZ = %s", humanize.Bytes(uint64(st.LatestBlockHeader().SizeSSZ())))
size, count := sizeAndCountOfByteList(st.BlockRoots())
log.Infof("block_roots : size = %s, count = %d", humanize.Bytes(size), count)
size, count = sizeAndCountOfByteList(st.StateRoots())
log.Infof("state_roots : size = %s, count = %d", humanize.Bytes(size), count)
size, count = sizeAndCountOfByteList(st.HistoricalRoots())
log.Infof("historical_roots : size = %s, count = %d", humanize.Bytes(size), count)
log.Infof("eth1_data : sizeSSZ = %s", humanize.Bytes(uint64(st.Eth1Data().SizeSSZ())))
size, count = sizeAndCountGeneric(st.Eth1DataVotes(), nil)
log.Infof("eth1_data_votes : sizeSSZ = %s, count = %d", humanize.Bytes(size), count)
log.Infof("eth1_deposit_index : %d", st.Eth1DepositIndex())
size, count = sizeAndCountGeneric(st.Validators(), nil)
log.Infof("validators : sizeSSZ = %s, count = %d", humanize.Bytes(size), count)
size, count = sizeAndCountOfUin64List(st.Balances())
log.Infof("balances : size = %s, count = %d", humanize.Bytes(size), count)
size, count = sizeAndCountOfByteList(st.RandaoMixes())
log.Infof("randao_mixes : size = %s, count = %d", humanize.Bytes(size), count)
size, count = sizeAndCountOfUin64List(st.Slashings())
log.Infof("slashings : size = %s, count = %d", humanize.Bytes(size), count)
size, count = sizeAndCountGeneric(st.PreviousEpochAttestations())
log.Infof("previous_epoch_attestations : sizeSSZ = %s, count = %d", humanize.Bytes(size), count)
size, count = sizeAndCountGeneric(st.CurrentEpochAttestations())
log.Infof("current_epoch_attestations : sizeSSZ = %s, count = %d", humanize.Bytes(size), count)
log.Infof("justification_bits : size = %s, count = %d", humanize.Bytes(st.JustificationBits().Len()), st.JustificationBits().Count())
log.Infof("previous_justified_checkpoint : sizeSSZ = %s", humanize.Bytes(uint64(st.PreviousJustifiedCheckpoint().SizeSSZ())))
log.Infof("current_justified_checkpoint : sizeSSZ = %s", humanize.Bytes(uint64(st.CurrentJustifiedCheckpoint().SizeSSZ())))
log.Infof("finalized_checkpoint : sizeSSZ = %s", humanize.Bytes(uint64(st.FinalizedCheckpoint().SizeSSZ())))
}
rowCountStr := fmt.Sprintf("row : %04d, ", rowCount)
fmt.Println(rowCountStr, "slot : ", ss.Slot, ", root : ", hexutils.BytesToHex(ss.Root))
doneC <- true
}
func printStateSummary(stateSummaryC <-chan *modifiedStateSummary, doneC chan<- bool) {
for msts := range stateSummaryC {
log.Infof("row : %04d, slot : %d, root = %s", msts.rowCount, msts.slot, hexutils.BytesToHex(msts.root))
}
doneC <- true
}
func keysOfBucket(dbNameWithPath string, bucketName []byte, rowLimit uint64) ([][]byte, []uint64) {