Compare commits

...

2 Commits

Author SHA1 Message Date
Kasey Kirkham
17384cd94c messy first version of db analysis tool 2022-06-21 14:09:00 -05:00
Kasey Kirkham
8e02f70830 start tool to help analyze db design tradeoffs 2022-06-15 14:50:10 -05:00
8 changed files with 1286 additions and 36 deletions

View File

@@ -91,41 +91,9 @@ func KVStoreDatafilePath(dirPath string) string {
return path.Join(dirPath, DatabaseFileName)
}
// NewKVStore initializes a new boltDB key-value store at the directory
// path specified, creates the kv-buckets based on the schema, and stores
// an open connection db object as a property of the Store struct.
func NewKVStore(ctx context.Context, dirPath string, config *Config) (*Store, error) {
hasDir, err := file.HasDir(dirPath)
if err != nil {
return nil, err
}
if !hasDir {
if err := file.MkdirAll(dirPath); err != nil {
return nil, err
}
}
datafile := KVStoreDatafilePath(dirPath)
func NewKVStoreWithDB(ctx context.Context, bdb *bolt.DB) (*Store, error) {
bdb.AllocSize = boltAllocSize
start := time.Now()
log.Infof("Opening Bolt DB at %s", datafile)
boltDB, err := bolt.Open(
datafile,
params.BeaconIoConfig().ReadWritePermissions,
&bolt.Options{
Timeout: 1 * time.Second,
InitialMmapSize: config.InitialMMapSize,
},
)
if err != nil {
log.WithField("elapsed", time.Since(start)).Error("Failed to open Bolt DB")
if errors.Is(err, bolt.ErrTimeout) {
return nil, errors.New("cannot obtain database lock, database may be in use by another process")
}
return nil, err
}
log.WithField("elapsed", time.Since(start)).Info("Opened Bolt DB")
boltDB.AllocSize = boltAllocSize
start = time.Now()
log.Infof("Creating block cache...")
blockCache, err := ristretto.NewCache(&ristretto.Config{
NumCounters: 1000, // number of keys to track frequency of (1000).
@@ -152,8 +120,8 @@ func NewKVStore(ctx context.Context, dirPath string, config *Config) (*Store, er
log.WithField("elapsed", time.Since(start)).Info("Created validator cache")
kv := &Store{
db: boltDB,
databasePath: dirPath,
db: bdb,
databasePath: path.Dir(bdb.Path()),
blockCache: blockCache,
validatorEntryCache: validatorCache,
stateSummaryCache: newStateSummaryCache(),
@@ -205,6 +173,41 @@ func NewKVStore(ctx context.Context, dirPath string, config *Config) (*Store, er
return kv, err
}
// NewKVStore initializes a new boltDB key-value store at the directory
// path specified, creates the kv-buckets based on the schema, and stores
// an open connection db object as a property of the Store struct.
func NewKVStore(ctx context.Context, dirPath string, config *Config) (*Store, error) {
hasDir, err := file.HasDir(dirPath)
if err != nil {
return nil, err
}
if !hasDir {
if err := file.MkdirAll(dirPath); err != nil {
return nil, err
}
}
datafile := KVStoreDatafilePath(dirPath)
start := time.Now()
log.Infof("Opening Bolt DB at %s", datafile)
bdb, err := bolt.Open(
datafile,
params.BeaconIoConfig().ReadWritePermissions,
&bolt.Options{
Timeout: 1 * time.Second,
InitialMmapSize: config.InitialMMapSize,
},
)
if err != nil {
log.WithField("elapsed", time.Since(start)).Error("Failed to open Bolt DB")
if errors.Is(err, bolt.ErrTimeout) {
return nil, errors.New("cannot obtain database lock, database may be in use by another process")
}
return nil, err
}
log.WithField("elapsed", time.Since(start)).Info("Opened Bolt DB")
return NewKVStoreWithDB(ctx, bdb)
}
// ClearDB removes the previously stored database in the data directory.
func (s *Store) ClearDB() error {
if _, err := os.Stat(s.databasePath); os.IsNotExist(err) {

View File

@@ -1085,6 +1085,12 @@ def prysm_deps():
sum = "h1:DBPx88FjZJH3FsICfDAfIfnb7XxKIYVGG6lOPlhENAg=",
version = "v5.0.0",
)
go_repository(
name = "com_github_go_echarts_go_echarts_v2",
importpath = "github.com/go-echarts/go-echarts/v2",
sum = "h1:Rl6PdkbIwUke48lHwPNIcmIHx8gjWEWW2uipcP7Lh/k=",
version = "v2.2.5-0.20220120114451-c13b73ddc21e",
)
go_repository(
name = "com_github_go_errors_errors",

1
go.mod
View File

@@ -124,6 +124,7 @@ require (
github.com/elastic/gosigar v0.14.2 // indirect
github.com/flynn/noise v1.0.0 // indirect
github.com/francoispqt/gojay v1.2.13 // indirect
github.com/go-echarts/go-echarts/v2 v2.2.5-0.20220120114451-c13b73ddc21e // indirect
github.com/go-playground/locales v0.14.0 // indirect
github.com/go-playground/universal-translator v0.18.0 // indirect
github.com/go-sourcemap/sourcemap v2.1.3+incompatible // indirect

5
go.sum
View File

@@ -352,6 +352,10 @@ github.com/gliderlabs/ssh v0.1.1/go.mod h1:U7qILu1NlMHj9FlMhZLlkCdDnU1DBEAqr0aev
github.com/glycerine/go-unsnap-stream v0.0.0-20180323001048-9f0cb55181dd/go.mod h1:/20jfyN9Y5QPEAprSgKAUr+glWDY39ZiUEAYOEv5dsE=
github.com/glycerine/goconvey v0.0.0-20190410193231-58a59202ab31/go.mod h1:Ogl1Tioa0aV7gstGFO7KhffUsb9M4ydbEbbxpcEDc24=
github.com/go-chi/chi/v5 v5.0.0/go.mod h1:BBug9lr0cqtdAhsu6R4AAdvufI0/XBzAQSsUqJpoZOs=
github.com/go-echarts/go-echarts/v2 v2.2.4 h1:SKJpdyNIyD65XjbUZjzg6SwccTNXEgmh+PlaO23g2H0=
github.com/go-echarts/go-echarts/v2 v2.2.4/go.mod h1:6TOomEztzGDVDkOSCFBq3ed7xOYfbOqhaBzD0YV771A=
github.com/go-echarts/go-echarts/v2 v2.2.5-0.20220120114451-c13b73ddc21e h1:Rl6PdkbIwUke48lHwPNIcmIHx8gjWEWW2uipcP7Lh/k=
github.com/go-echarts/go-echarts/v2 v2.2.5-0.20220120114451-c13b73ddc21e/go.mod h1:6TOomEztzGDVDkOSCFBq3ed7xOYfbOqhaBzD0YV771A=
github.com/go-errors/errors v1.0.1/go.mod h1:f4zRHt4oKfwPJE5k8C9vpYG+aDHdBFUsgrm6/TyX73Q=
github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU=
github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8=
@@ -1321,6 +1325,7 @@ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXf
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.6.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=

View File

@@ -0,0 +1,32 @@
load("@io_bazel_rules_go//go:def.bzl", "go_binary")
load("@prysm//tools/go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = [
"main.go",
"summarizer.go",
"tsdb.go",
],
importpath = "github.com/prysmaticlabs/prysm/tools/db-audit",
visibility = ["//visibility:private"],
deps = [
"//beacon-chain/db/kv:go_default_library",
"//beacon-chain/state:go_default_library",
"//config/params:go_default_library",
"//consensus-types/primitives:go_default_library",
"//encoding/bytesutil:go_default_library",
"@com_github_go_echarts_go_echarts_v2//charts:go_default_library",
"@com_github_go_echarts_go_echarts_v2//components:go_default_library",
"@com_github_go_echarts_go_echarts_v2//opts:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_urfave_cli_v2//:go_default_library",
"@io_etcd_go_bbolt//:go_default_library",
],
)
go_binary(
name = "db-audit",
embed = [":go_default_library"],
visibility = ["//visibility:public"],
)

847
tools/db-audit/main.go Normal file
View File

@@ -0,0 +1,847 @@
package main
import (
"bytes"
"context"
"fmt"
"io"
"os"
"strings"
"time"
"github.com/go-echarts/go-echarts/v2/charts"
"github.com/go-echarts/go-echarts/v2/components"
"github.com/go-echarts/go-echarts/v2/opts"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/beacon-chain/db/kv"
"github.com/prysmaticlabs/prysm/config/params"
types "github.com/prysmaticlabs/prysm/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/encoding/bytesutil"
"github.com/urfave/cli/v2"
bolt "go.etcd.io/bbolt"
)
var flags = struct {
dbFile string
tsdbFile string
}{}
const (
initMMapSize = 536870912
)
var buckets = struct{
state []byte
blockSlotRoots []byte
stateSlotRoots []byte
}{
state: []byte("state"),
blockSlotRoots: []byte("block-slot-indices"),
stateSlotRoots: []byte("state-slot-indices"),
}
var commands = []*cli.Command{
{
Name: "bucket-chart",
Usage: "visualize relative size of buckets",
Action: bucketSizes,
Flags: []cli.Flag{
&cli.StringFlag{
Name: "db",
Usage: "path to beaconchain.db",
Destination: &flags.dbFile,
},
&cli.StringFlag{
Name: "tsdb",
Usage: "path to database to store stats",
Destination: &flags.tsdbFile,
Value: "summarizer.db",
},
},
},
{
Name: "summarize",
Usage: "collect data about the given prysm bolt database file",
Action: summarize,
Flags: []cli.Flag{
&cli.StringFlag{
Name: "db",
Usage: "path to beaconchain.db",
Destination: &flags.dbFile,
},
&cli.StringFlag{
Name: "tsdb",
Usage: "path to database to store size summaries",
Destination: &flags.tsdbFile,
Value: "summarizer.db",
},
},
},
{
Name: "dump-summaries",
Usage: "print out everything in the summaries db for debugging",
Action: dumpSummaries,
Flags: []cli.Flag{
&cli.StringFlag{
Name: "tsdb",
Usage: "path to database to store size summaries",
Destination: &flags.tsdbFile,
Value: "summarizer.db",
},
},
},
{
Name: "summary-chart",
Usage: "generate a visualization of the utilization summary",
Action: summaryChart,
Flags: []cli.Flag{
&cli.StringFlag{
Name: "tsdb",
Usage: "path to database to store size summaries",
Destination: &flags.tsdbFile,
Value: "summarizer.db",
},
},
},
{
Name: "averages",
Usage: "report table showing space utilized from highest to lowest",
Action: printAvg,
Flags: []cli.Flag{
&cli.StringFlag{
Name: "tsdb",
Usage: "path to database to store size summaries",
Destination: &flags.tsdbFile,
Value: "summarizer.db",
},
},
},
}
func main() {
app := &cli.App{
Commands: commands,
}
err := app.Run(os.Args)
if err != nil {
fmt.Printf("Fatal error, %v", err)
os.Exit(1)
}
}
func bucketSizes(_ *cli.Context) error {
f := flags
tsdb, err := bolt.Open(f.tsdbFile, 0600, &bolt.Options{
Timeout: 1 * time.Second,
InitialMmapSize: initMMapSize,
})
if err != nil {
return err
}
defer tsdb.Close()
stats, err := getBucketStats(tsdb)
if err != nil {
return err
}
return bucketSizeChart(stats)
}
func lookForSmallerVals(_ *cli.Context) error {
f := flags
db, err := bolt.Open(
f.dbFile,
params.BeaconIoConfig().ReadWritePermissions,
&bolt.Options{
Timeout: 1 * time.Second,
InitialMmapSize: initMMapSize,
},
)
if err != nil {
return errors.Wrapf(err, "error opening db=%s", f.dbFile)
}
defer db.Close()
migrationsBucket := []byte("migrations")
migrationStateValidatorsKey := []byte("migration_state_validator")
migrationCompleted := []byte("done")
returnFlag := false
err = db.View(func(tx *bolt.Tx) error {
mb := tx.Bucket(migrationsBucket)
b := mb.Get(migrationStateValidatorsKey)
returnFlag = bytes.Equal(b, migrationCompleted)
return nil
})
if err != nil {
return err
}
fmt.Printf("migration enabled = %t", returnFlag)
return nil
/*
store, err := kv.NewKVStoreWithDB(ctx, db)
if err != nil {
return err
}
*/
/*
stateBucket := []byte("state")
//stateValidatorsBucket := []byte("state-validators")
blockRootValidatorHashesBucket := []byte("block-root-validator-hashes")
for sr := range slotRootIter(db) {
err := db.View(func(tx *bolt.Tx) error {
bkt := tx.Bucket(stateBucket)
sb := bkt.Get(sr.Root[:])
idxBkt := tx.Bucket(blockRootValidatorHashesBucket)
valKey := idxBkt.Get(sr.Root[:])
fmt.Printf("state bytes = %d, val bytes = %d", len(sb), len(valKey))
return nil
})
if err != nil {
return err
}
}
return nil
*/
}
func dumpSummaries(_ *cli.Context) error {
f := flags
tsdb, err := bolt.Open(f.tsdbFile, 0600, &bolt.Options{
Timeout: 1 * time.Second,
InitialMmapSize: initMMapSize,
})
if err != nil {
return err
}
defer tsdb.Close()
/*
for sum := range summaryIter(tsdb) {
fmt.Printf("%v\n", sum)
}
*/
return summaryDump(tsdb)
}
func summaryChart(_ *cli.Context) error {
f := flags
tsdb, err := bolt.Open(f.tsdbFile, 0600, &bolt.Options{
Timeout: 1 * time.Second,
InitialMmapSize: initMMapSize,
})
if err != nil {
return err
}
defer tsdb.Close()
sums := make([]SizeSummary, 0)
for sum := range summaryIter(tsdb) {
sums = append(sums, sum)
}
/*
fs := forkSummaries(sums)
for i := range fs {
fmt.Println(fs[i].String())
}
*/
return renderLineChart(sums)
}
func printAvg(_ *cli.Context) error {
f := flags
tsdb, err := bolt.Open(f.tsdbFile, 0600, &bolt.Options{
Timeout: 1 * time.Second,
InitialMmapSize: initMMapSize,
})
if err != nil {
return err
}
defer tsdb.Close()
sums := make([]SizeSummary, 0)
for sum := range summaryIter(tsdb) {
sums = append(sums, sum)
}
fs := forkSummaries(sums)
for _, s := range fs {
printRollup(rollupSizes(s))
}
return nil
}
func printRollup(stats SizeStats) {
fmt.Printf("Stats for fork: %s\n", stats.fork)
fmt.Println("Average:")
printSummary(stats.avg)
fmt.Println("Max:")
printSummary(stats.max)
fmt.Println("Min:")
printSummary(stats.min)
}
func printSummary(sum SizeSummary) {
fmt.Printf("\t- total: %d\n", sum.Total)
fmt.Printf("\t- validators: %d\n", sum.Validators)
fmt.Printf("\t- inactivity_scores: %d\n", sum.InactivityScores)
fmt.Printf("\t- balances: %d\n", sum.Balances)
fmt.Printf("\t- randao_mixes: %d\n", sum.RandaoMixes)
fmt.Printf("\t- previous_epoch_participation: %d\n", sum.PreviousEpochParticipation)
fmt.Printf("\t- current_epoch_participation: %d\n", sum.CurrentEpochParticipation)
fmt.Printf("\t- block_roots: %d\n", sum.BlockRoots)
fmt.Printf("\t- state_roots: %d\n", sum.StateRoots)
fmt.Printf("\t- slashings: %d\n", sum.Slashings)
fmt.Printf("\t- current_sync_committee: %d\n", sum.CurrentSyncCommittee)
fmt.Printf("\t- next_sync_committee: %d\n", sum.NextSyncCommittee)
fmt.Printf("\t- eth1_data_votes: %d\n", sum.Eth1DataVotes)
fmt.Printf("\t- historical_roots: %d\n", sum.HistoricalRoots)
fmt.Printf("\t- latest_block_header: %d\n", sum.LatestBlockHeader)
fmt.Printf("\t- eth1_data: %d\n", sum.Eth1Data)
fmt.Printf("\t- previously_justified_checkpoint: %d\n", sum.PreviouslyJustifiedCheckpoint)
fmt.Printf("\t- current_justified_checkpoint: %d\n", sum.CurrentJustifiedCheckpoint)
fmt.Printf("\t- finalized_checkpoint: %d\n", sum.FinalizedCheckpoint)
fmt.Printf("\t- fork: %d\n", sum.Fork)
fmt.Printf("\t- genesis_validators_root: %d\n", sum.GenesisValidatorsRoot)
fmt.Printf("\t- genesis_time: %d\n", sum.GenesisTime)
fmt.Printf("\t- slot: %d\n", sum.Slot)
fmt.Printf("\t- eth1_deposit_index: %d\n", sum.Eth1DepositIndex)
fmt.Printf("\t- justification_bits: %d\n", sum.JustificationBits)
fmt.Printf("\t- previous_epoch_attestations: %d\n", sum.PreviousEpochAttestations)
fmt.Printf("\t- current_epoch_attestations: %d\n", sum.CurrentEpochAttestations)
fmt.Printf("\t- latest_execution_payload_header: %d\n", sum.LatestExecutionPayloadHeader)
}
func min(a, b int) int {
if a < b {
return a
}
return b
}
func max(a, b int) int {
if a > b {
return a
}
return b
}
func rollupSizes(fs ForkSummary) SizeStats {
szs := SizeStats{fork: fs.name}
ss := fs.sb[0].summaries
for _, s := range ss {
szs.min.Total = min(szs.min.Total, s.Total)
szs.max.Total = max(szs.max.Total, s.Total)
szs.avg.Total += s.Total
szs.min.GenesisTime = min(szs.min.GenesisTime, s.GenesisTime)
szs.max.GenesisTime = max(szs.max.GenesisTime, s.GenesisTime)
szs.avg.GenesisTime += s.GenesisTime
szs.min.GenesisValidatorsRoot = min(szs.min.GenesisValidatorsRoot, s.GenesisValidatorsRoot)
szs.max.GenesisValidatorsRoot = max(szs.max.GenesisValidatorsRoot, s.GenesisValidatorsRoot)
szs.avg.GenesisValidatorsRoot += s.GenesisValidatorsRoot
szs.min.Slot = min(szs.min.Slot, s.Slot)
szs.max.Slot = max(szs.max.Slot, s.Slot)
szs.avg.Slot += s.Slot
szs.min.Fork = min(szs.min.Fork, s.Fork)
szs.max.Fork = max(szs.max.Fork, s.Fork)
szs.avg.Fork += s.Fork
szs.min.LatestBlockHeader = min(szs.min.LatestBlockHeader , s.LatestBlockHeader )
szs.max.LatestBlockHeader = max(szs.max.LatestBlockHeader , s.LatestBlockHeader )
szs.avg.LatestBlockHeader += s.LatestBlockHeader
szs.min.BlockRoots = min(szs.min.BlockRoots , s.BlockRoots )
szs.max.BlockRoots = max(szs.max.BlockRoots , s.BlockRoots )
szs.avg.BlockRoots += s.BlockRoots
szs.min.StateRoots = min(szs.min.StateRoots , s.StateRoots )
szs.max.StateRoots = max(szs.max.StateRoots , s.StateRoots )
szs.avg.StateRoots += s.StateRoots
szs.min.HistoricalRoots = min(szs.min.HistoricalRoots , s.HistoricalRoots )
szs.max.HistoricalRoots = max(szs.max.HistoricalRoots , s.HistoricalRoots )
szs.avg.HistoricalRoots += s.HistoricalRoots
szs.min.Eth1Data = min(szs.min.Eth1Data , s.Eth1Data )
szs.max.Eth1Data = max(szs.max.Eth1Data , s.Eth1Data )
szs.avg.Eth1Data += s.Eth1Data
szs.min.Eth1DataVotes = min(szs.min.Eth1DataVotes , s.Eth1DataVotes )
szs.max.Eth1DataVotes = max(szs.max.Eth1DataVotes , s.Eth1DataVotes )
szs.avg.Eth1DataVotes += s.Eth1DataVotes
szs.min.Eth1DepositIndex = min(szs.min.Eth1DepositIndex , s.Eth1DepositIndex )
szs.max.Eth1DepositIndex = max(szs.max.Eth1DepositIndex , s.Eth1DepositIndex )
szs.avg.Eth1DepositIndex += s.Eth1DepositIndex
szs.min.Validators = min(szs.min.Validators , s.Validators )
szs.max.Validators = max(szs.max.Validators , s.Validators )
szs.avg.Validators += s.Validators
szs.min.Balances = min(szs.min.Balances , s.Balances )
szs.max.Balances = max(szs.max.Balances , s.Balances )
szs.avg.Balances += s.Balances
szs.min.RandaoMixes = min(szs.min.RandaoMixes , s.RandaoMixes )
szs.max.RandaoMixes = max(szs.max.RandaoMixes , s.RandaoMixes )
szs.avg.RandaoMixes += s.RandaoMixes
szs.min.Slashings = min(szs.min.Slashings , s.Slashings )
szs.max.Slashings = max(szs.max.Slashings , s.Slashings )
szs.avg.Slashings += s.Slashings
szs.min.PreviousEpochAttestations = min(szs.min.PreviousEpochAttestations , s.PreviousEpochAttestations )
szs.max.PreviousEpochAttestations = max(szs.max.PreviousEpochAttestations , s.PreviousEpochAttestations )
szs.avg.PreviousEpochAttestations += s.PreviousEpochAttestations
szs.min.CurrentEpochAttestations = min(szs.min.CurrentEpochAttestations , s.CurrentEpochAttestations )
szs.max.CurrentEpochAttestations = max(szs.max.CurrentEpochAttestations , s.CurrentEpochAttestations )
szs.avg.CurrentEpochAttestations += s.CurrentEpochAttestations
szs.min.PreviousEpochParticipation = min(szs.min.PreviousEpochParticipation , s.PreviousEpochParticipation )
szs.max.PreviousEpochParticipation = max(szs.max.PreviousEpochParticipation , s.PreviousEpochParticipation )
szs.avg.PreviousEpochParticipation += s.PreviousEpochParticipation
szs.min.CurrentEpochParticipation = min(szs.min.CurrentEpochParticipation , s.CurrentEpochParticipation )
szs.max.CurrentEpochParticipation = max(szs.max.CurrentEpochParticipation , s.CurrentEpochParticipation )
szs.avg.CurrentEpochParticipation += s.CurrentEpochParticipation
szs.min.JustificationBits = min(szs.min.JustificationBits , s.JustificationBits )
szs.max.JustificationBits = max(szs.max.JustificationBits , s.JustificationBits )
szs.avg.JustificationBits += s.JustificationBits
szs.min.PreviouslyJustifiedCheckpoint = min(szs.min.PreviouslyJustifiedCheckpoint , s.PreviouslyJustifiedCheckpoint )
szs.max.PreviouslyJustifiedCheckpoint = max(szs.max.PreviouslyJustifiedCheckpoint , s.PreviouslyJustifiedCheckpoint )
szs.avg.PreviouslyJustifiedCheckpoint += s.PreviouslyJustifiedCheckpoint
szs.min.CurrentJustifiedCheckpoint = min(szs.min.CurrentJustifiedCheckpoint , s.CurrentJustifiedCheckpoint )
szs.max.CurrentJustifiedCheckpoint = max(szs.max.CurrentJustifiedCheckpoint , s.CurrentJustifiedCheckpoint )
szs.avg.CurrentJustifiedCheckpoint += s.CurrentJustifiedCheckpoint
szs.min.FinalizedCheckpoint = min(szs.min.FinalizedCheckpoint , s.FinalizedCheckpoint )
szs.max.FinalizedCheckpoint = max(szs.max.FinalizedCheckpoint , s.FinalizedCheckpoint )
szs.avg.FinalizedCheckpoint += s.FinalizedCheckpoint
szs.min.InactivityScores = min(szs.min.InactivityScores , s.InactivityScores )
szs.max.InactivityScores = max(szs.max.InactivityScores , s.InactivityScores )
szs.avg.InactivityScores += s.InactivityScores
szs.min.CurrentSyncCommittee = min(szs.min.CurrentSyncCommittee , s.CurrentSyncCommittee )
szs.max.CurrentSyncCommittee = max(szs.max.CurrentSyncCommittee , s.CurrentSyncCommittee )
szs.avg.CurrentSyncCommittee += s.CurrentSyncCommittee
szs.min.NextSyncCommittee = min(szs.min.NextSyncCommittee , s.NextSyncCommittee )
szs.max.NextSyncCommittee = max(szs.max.NextSyncCommittee , s.NextSyncCommittee )
szs.avg.NextSyncCommittee += s.NextSyncCommittee
szs.min.LatestExecutionPayloadHeader = min(szs.min.LatestExecutionPayloadHeader , s.LatestExecutionPayloadHeader )
szs.max.LatestExecutionPayloadHeader = max(szs.max.LatestExecutionPayloadHeader , s.LatestExecutionPayloadHeader )
szs.avg.LatestExecutionPayloadHeader += s.LatestExecutionPayloadHeader
}
szs.avg.Total = szs.avg.Total / len(ss)
szs.avg.GenesisTime = szs.avg.GenesisTime / len(ss)
szs.avg.GenesisValidatorsRoot = szs.avg.GenesisValidatorsRoot / len(ss)
szs.avg.Slot = szs.avg.Slot / len(ss)
szs.avg.Fork = szs.avg.Fork / len(ss)
szs.avg.LatestBlockHeader = szs.avg.LatestBlockHeader / len(ss)
szs.avg.BlockRoots = szs.avg.BlockRoots / len(ss)
szs.avg.StateRoots = szs.avg.StateRoots / len(ss)
szs.avg.HistoricalRoots = szs.avg.HistoricalRoots / len(ss)
szs.avg.Eth1Data = szs.avg.Eth1Data / len(ss)
szs.avg.Eth1DataVotes = szs.avg.Eth1DataVotes / len(ss)
szs.avg.Eth1DepositIndex = szs.avg.Eth1DepositIndex / len(ss)
szs.avg.Validators = szs.avg.Validators / len(ss)
szs.avg.Balances = szs.avg.Balances / len(ss)
szs.avg.RandaoMixes = szs.avg.RandaoMixes / len(ss)
szs.avg.Slashings = szs.avg.Slashings / len(ss)
szs.avg.PreviousEpochAttestations = szs.avg.PreviousEpochAttestations / len(ss)
szs.avg.CurrentEpochAttestations = szs.avg.CurrentEpochAttestations / len(ss)
szs.avg.PreviousEpochParticipation = szs.avg.PreviousEpochParticipation / len(ss)
szs.avg.CurrentEpochParticipation = szs.avg.CurrentEpochParticipation / len(ss)
szs.avg.JustificationBits = szs.avg.JustificationBits / len(ss)
szs.avg.PreviouslyJustifiedCheckpoint = szs.avg.PreviouslyJustifiedCheckpoint / len(ss)
szs.avg.CurrentJustifiedCheckpoint = szs.avg.CurrentJustifiedCheckpoint / len(ss)
szs.avg.FinalizedCheckpoint = szs.avg.FinalizedCheckpoint / len(ss)
szs.avg.InactivityScores = szs.avg.InactivityScores / len(ss)
szs.avg.CurrentSyncCommittee = szs.avg.CurrentSyncCommittee / len(ss)
szs.avg.NextSyncCommittee = szs.avg.NextSyncCommittee / len(ss)
szs.avg.LatestExecutionPayloadHeader = szs.avg.LatestExecutionPayloadHeader / len(ss)
return szs
}
type SizeStats struct {
fork string
min SizeSummary
max SizeSummary
avg SizeSummary
}
var fieldLabels = []string{
"validators",
"inactivity_scores",
"balances",
"randao_mixes",
"previous_epoch_attestations",
"current_epoch_attestations",
"previous_epoch_participation",
"current_epoch_participation",
"block_roots",
"state_roots",
"slashings",
"current_sync_committee",
"next_sync_committee",
"eth1_data_votes",
"historical_roots",
"latest_block_header",
"eth1_data",
"previously_justified_checkpoint",
"current_justified_checkpoint",
"finalized_checkpoint",
"genesis_validators_root",
"fork",
"genesis_time",
"slot",
"eth1_deposit_index",
"justification_bits",
"latest_execution_payload_header",
}
func render3dbarChart(fss []ForkSummary) error {
page := components.NewPage()
for _, fs := range fss {
title := fmt.Sprintf("%s: avg field sizes of states", fs.name)
bucketNames := make([]string, 0)
for _, b := range fs.sb {
bucketNames = append(bucketNames, fmt.Sprintf("%d-%d", b.min, b.max))
}
bar3d := charts.NewBar3D()
bar3d.SetGlobalOptions(
charts.WithTitleOpts(opts.Title{Title: title}),
charts.WithGrid3DOpts(opts.Grid3D{
BoxWidth: 200,
BoxDepth: 80,
}),
)
bar3d.SetGlobalOptions(
charts.WithXAxis3DOpts(opts.XAxis3D{Data: bucketNames}),
charts.WithYAxis3DOpts(opts.YAxis3D{Data: fieldLabels}),
)
page.AddCharts(bar3d)
}
f, err := os.Create("bar3d.html")
if err != nil {
return nil
}
defer f.Close()
page.Render(io.MultiWriter(f))
return nil
}
func lineChartData(ss []SizeSummary) map[string][]opts.LineData {
vectors := make(map[string][]opts.LineData)
for _, s := range ss {
vectors["genesis_time"] = append(vectors["genesis_time"], opts.LineData{Value: s.GenesisTime})
vectors["genesis_validators_root"] = append(vectors["genesis_validators_root"], opts.LineData{Value: s.GenesisValidatorsRoot})
vectors["slot"] = append(vectors["slot"], opts.LineData{Value: s.Slot})
vectors["fork"] = append(vectors["fork"], opts.LineData{Value: s.Fork})
vectors["latest_block_header"] = append(vectors["latest_block_header"], opts.LineData{Value: s.LatestBlockHeader})
vectors["block_roots"] = append(vectors["block_roots"], opts.LineData{Value: s.BlockRoots})
vectors["state_roots"] = append(vectors["state_roots"], opts.LineData{Value: s.StateRoots})
vectors["historical_roots"] = append(vectors["historical_roots"], opts.LineData{Value: s.HistoricalRoots})
vectors["eth1_data"] = append(vectors["eth1_data"], opts.LineData{Value: s.Eth1Data})
vectors["eth1_data_votes"] = append(vectors["eth1_data_votes"], opts.LineData{Value: s.Eth1DataVotes})
vectors["eth1_deposit_index"] = append(vectors["eth1_deposit_index"], opts.LineData{Value: s.Eth1DepositIndex})
vectors["validators"] = append(vectors["validators"], opts.LineData{Value: s.Validators})
vectors["balances"] = append(vectors["balances"], opts.LineData{Value: s.Balances})
vectors["randao_mixes"] = append(vectors["randao_mixes"], opts.LineData{Value: s.RandaoMixes})
vectors["slashings"] = append(vectors["slashings"], opts.LineData{Value: s.Slashings})
vectors["previous_epoch_attestations"] = append(vectors["previous_epoch_attestations"], opts.LineData{Value: s.PreviousEpochAttestations})
vectors["current_epoch_attestations"] = append(vectors["current_epoch_attestations"], opts.LineData{Value: s.CurrentEpochAttestations})
vectors["previous_epoch_participation"] = append(vectors["previous_epoch_participation"], opts.LineData{Value: s.PreviousEpochParticipation})
vectors["current_epoch_participation"] = append(vectors["current_epoch_participation"], opts.LineData{Value: s.CurrentEpochParticipation})
vectors["justification_bits"] = append(vectors["justification_bits"], opts.LineData{Value: s.JustificationBits})
vectors["previously_justified_checkpoint"] = append(vectors["previously_justified_checkpoint"], opts.LineData{Value: s.PreviouslyJustifiedCheckpoint})
vectors["current_justified_checkpoint"] = append(vectors["current_justified_checkpoint"], opts.LineData{Value: s.CurrentJustifiedCheckpoint})
vectors["finalized_checkpoint"] = append(vectors["finalized_checkpoint"], opts.LineData{Value: s.FinalizedCheckpoint})
vectors["inactivity_scores"] = append(vectors["inactivity_scores"], opts.LineData{Value: s.InactivityScores})
vectors["current_sync_committee"] = append(vectors["current_sync_committee"], opts.LineData{Value: s.CurrentSyncCommittee})
vectors["next_sync_committee"] = append(vectors["next_sync_committee"], opts.LineData{Value: s.NextSyncCommittee})
vectors["latest_execution_payload_header"] = append(vectors["latest_execution_payload_header"], opts.LineData{Value: s.LatestExecutionPayloadHeader})
}
return vectors
}
func renderLineChart(ss []SizeSummary) error {
page := components.NewPage()
page.SetLayout(components.PageFlexLayout)
xaxis := make([]string, len(ss))
for i, s := range ss {
xaxis[i] = fmt.Sprintf("%d", s.SlotRoot.Slot)
}
line := charts.NewLine()
line.SetXAxis(xaxis)
lcd := lineChartData(ss)
for _, name := range fieldLabels {
points := lcd[name]
line.AddSeries(name, points)
}
line.SetGlobalOptions(
charts.WithTitleOpts(opts.Title{
Title: "growth of BeaconState components",
}),
charts.WithYAxisOpts(opts.YAxis{
Name: "bytes",
SplitLine: &opts.SplitLine{
Show: false,
},
}),
charts.WithXAxisOpts(opts.XAxis{
Name: "slot",
}),
charts.WithLegendOpts(opts.Legend{
Show: true,
Data: fieldLabels,
Left: "0",
Bottom: "0",
Orient: "horizontal",
Type: "scroll",
}),
charts.WithTooltipOpts(opts.Tooltip{
Show: true,
}),
charts.WithInitializationOpts(opts.Initialization{
Width: "1000px",
Height: "600px",
}),
)
line.SetSeriesOptions(
charts.WithLineChartOpts(opts.LineChart{
Smooth: true,
}),
/*
charts.WithMarkLineNameTypeItemOpts(opts.MarkLineNameTypeItem{
Name: "Average",
Type: "average",
}),
charts.WithMarkPointStyleOpts(opts.MarkPointStyle{
Label: &opts.Label{
Show: true,
Formatter: "{a}: {b}",
},
}),
*/
)
page.AddCharts(line)
f, err := os.Create("line.html")
if err != nil {
return nil
}
defer f.Close()
page.Render(io.MultiWriter(f))
return nil
}
func bucketSizeChart(bs map[string]int) error {
page := components.NewPage()
pie := charts.NewPie()
pie.SetGlobalOptions(
charts.WithTitleOpts(opts.Title{Title: "DB Bucket Sizes"}),
)
items := make([]opts.PieData, 0)
for k, v := range bs {
items = append(items, opts.PieData{Name: k, Value: v})
}
pie.AddSeries("pie", items).
SetSeriesOptions(
charts.WithLabelOpts(opts.Label{
Show: true,
Formatter: "{b}: {c}",
}),
charts.WithPieChartOpts(opts.PieChart{
Radius: []string{"40%", "75%"},
}),
)
page.AddCharts(pie)
f, err := os.Create("db-bucket-sizes.html")
if err != nil {
return nil
}
defer f.Close()
return page.Render(io.MultiWriter(f))
}
func summarize(_ *cli.Context) error {
ctx := context.Background()
f := flags
tsdb, err := bolt.Open(f.tsdbFile, 0600, &bolt.Options{
Timeout: 1 * time.Second,
InitialMmapSize: initMMapSize,
})
if err != nil {
return err
}
defer tsdb.Close()
if err := dbinit(tsdb); err != nil {
return errors.Wrapf(err, "error opening tsdb=%s", f.tsdbFile)
}
db, err := bolt.Open(
f.dbFile,
params.BeaconIoConfig().ReadWritePermissions,
&bolt.Options{
Timeout: 1 * time.Second,
InitialMmapSize: initMMapSize,
},
)
if err != nil {
return errors.Wrapf(err, "error opening db=%s", f.dbFile)
}
defer db.Close()
store, err := kv.NewKVStoreWithDB(ctx, db)
if err != nil {
return err
}
// do this first since it reads everything sequentially, which warms up the page cache
stats := make(map[string]int)
err = db.View(func(tx *bolt.Tx) error {
c := tx.Cursor()
for k, v := c.First(); k != nil; k, v = c.Next() {
if k != nil && v == nil {
bkt := tx.Bucket(k)
bc := bkt.Cursor()
l := 0
for bk, bv := bc.First(); bk != nil; bk, bv = bc.Next() {
l += len(bk) + len(bv)
}
fmt.Printf("%s: %d bytes\n", string(k), l)
stats[string(k)] = l
}
}
return nil
})
if err != nil {
return errors.Wrap(err, "error while recording db bucket sizes")
}
for k, v := range stats {
if err := writeBucketStat(tsdb, k, v); err != nil {
return err
}
}
for sr := range slotRootIter(db) {
st, err := store.State(ctx, sr.Root)
if err != nil {
return errors.Wrapf(err, "unable to fetch state for root=%#x", sr.Root)
}
sb, err := st.MarshalSSZ()
if err != nil {
return errors.Wrapf(err, "unable to marshal state w/ root=%#x", sr.Root)
}
sz := &summarizer{s: st, sb: sb, sr: sr}
sum := sz.Summary()
if err := writeSummary(tsdb, sum); err != nil {
return err
}
fmt.Printf("wrote summary for slot=%d, root=%#x\n", sr.Slot, sr.Root)
}
return nil
}
type SlotRoot struct {
Slot types.Slot `json:"slot"`
Root [32]byte `json:"root"`
}
func slotRootIter(db *bolt.DB) chan SlotRoot {
ch := make(chan SlotRoot)
go func() {
err := db.View(func(tx *bolt.Tx) error {
b := tx.Bucket(buckets.stateSlotRoots)
c := b.Cursor()
for k, v := c.First(); k != nil; k, v = c.Next() {
ch <- SlotRoot{Slot: bytesutil.BytesToSlotBigEndian(k), Root: bytesutil.ToBytes32(v)}
}
close(ch)
return nil
})
if err != nil {
panic(err)
}
}()
return ch
}
type ForkSummary struct {
name string
sb []SummaryBucket
}
func (fs ForkSummary) String() string {
bs := make([]string, len(fs.sb))
for i, sb := range fs.sb {
bs[i] = sb.String()
}
return fmt.Sprintf("name=%s, buckets=[%s]", fs.name, strings.Join(bs, ","))
}
type SummaryBucket struct {
summaries []SizeSummary
min types.Slot
max types.Slot
}
func (sb SummaryBucket) String() string {
return fmt.Sprintf("(len=%d, min=%d, max=%d)", len(sb.summaries), sb.min, sb.max)
}
func forkSummaries(sums []SizeSummary) []ForkSummary {
altairIdx := findAltairOffset(sums)
return []ForkSummary{
{
name: "phase0",
sb: splitBuckets(sums[0:altairIdx], 1),
},
{
name: "altair",
sb: splitBuckets(sums[altairIdx:], 1),
},
}
}
func findAltairOffset(sums []SizeSummary) int {
for i := range sums {
// hardcoded prater value
if sums[i].SlotRoot.Slot >= 36660*32 {
return i
}
}
return len(sums)
}
func splitBuckets(sums []SizeSummary, numBuckets int) []SummaryBucket {
bs := len(sums) / numBuckets
bkts := make([]SummaryBucket, numBuckets)
cb := 0
bkts[0].min = sums[0].SlotRoot.Slot
for _, sum := range sums {
if len(bkts[cb].summaries) == bs && cb < numBuckets - 1 {
cb += 1
bkts[cb].min = sum.SlotRoot.Slot
}
bkts[cb].max = sum.SlotRoot.Slot
bkts[cb].summaries = append(bkts[cb].summaries, sum)
}
return bkts
}
type SizeSummary struct {
SlotRoot SlotRoot `json:"slot_root"`
Total int `json:"total"`
GenesisTime int `json:"genesis_time"`
GenesisValidatorsRoot int `json:"genesis_validators_root"`
Slot int `json:"slot"`
Fork int `json:"fork"`
LatestBlockHeader int `json:"latest_block_header"`
BlockRoots int `json:"block_roots"`
StateRoots int `json:"state_roots"`
HistoricalRoots int `json:"historical_roots"`
Eth1Data int `json:"eth1_data"`
Eth1DataVotes int `json:"eth1_data_votes"`
Eth1DepositIndex int `json:"eth1_deposit_index"`
Validators int `json:"validators"`
Balances int `json:"balances"`
RandaoMixes int `json:"randao_mixes"`
Slashings int `json:"slashings"`
PreviousEpochAttestations int `json:"previous_epoch_attestations"`
CurrentEpochAttestations int `json:"current_epoch_attestations"`
PreviousEpochParticipation int `json:"previous_epoch_participation"`
CurrentEpochParticipation int `json:"current_epoch_participation"`
JustificationBits int `json:"justification_bits"`
PreviouslyJustifiedCheckpoint int `json:"previously_justified_checkpoint"`
CurrentJustifiedCheckpoint int `json:"current_justified_checkpoint"`
FinalizedCheckpoint int `json:"finalized_checkpoint"`
InactivityScores int `json:"inactivity_scores"`
CurrentSyncCommittee int `json:"current_sync_committee"`
NextSyncCommittee int `json:"next_sync_committee"`
LatestExecutionPayloadHeader int `json:"latest_execution_payload_header"`
}

View File

@@ -0,0 +1,232 @@
package main
import "github.com/prysmaticlabs/prysm/beacon-chain/state"
type summarizer struct {
s state.BeaconState
sb []byte
sr SlotRoot
}
func (s *summarizer) Summary() SizeSummary {
return SizeSummary{
Total: len(s.sb),
SlotRoot: s.sr,
GenesisTime: s.GenesisTime(),
GenesisValidatorsRoot: s.GenesisValidatorsRoot(),
Slot: s.Slot(),
Fork: s.Fork(),
LatestBlockHeader: s.LatestBlockHeader(),
BlockRoots: s.BlockRoots(),
StateRoots: s.StateRoots(),
HistoricalRoots: s.HistoricalRoots(),
Eth1Data: s.Eth1Data(),
Eth1DataVotes: s.Eth1DataVotes(),
Eth1DepositIndex: s.Eth1DepositIndex(),
Validators: s.Validators(),
Balances: s.Balances(),
RandaoMixes: s.RandaoMixes(),
Slashings: s.Slashings(),
PreviousEpochAttestations: s.PreviousEpochAttestations(),
CurrentEpochAttestations: s.CurrentEpochAttestations(),
PreviousEpochParticipation: s.PreviousEpochParticipation(),
CurrentEpochParticipation: s.CurrentEpochParticipation(),
JustificationBits: s.JustificationBits(),
PreviouslyJustifiedCheckpoint: s.PreviouslyJustifiedCheckpoint(),
CurrentJustifiedCheckpoint: s.CurrentJustifiedCheckpoint(),
FinalizedCheckpoint: s.FinalizedCheckpoint(),
InactivityScores: s.InactivityScores(),
CurrentSyncCommittee: s.CurrentSyncCommittee(),
NextSyncCommittee: s.NextSyncCommittee(),
LatestExecutionPayloadHeader: s.LatestExecutionPayloadHeader(),
}
}
func (z *summarizer) GenesisTime() int {
return 8
}
func (z *summarizer) GenesisValidatorsRoot() int {
return 32
}
func (z *summarizer) Slot() int {
return 8
}
func (z *summarizer) Fork() int {
return z.s.Fork().SizeSSZ()
}
func (z *summarizer) LatestBlockHeader() int {
return z.s.LatestBlockHeader().SizeSSZ()
}
func (z *summarizer) BlockRoots() int {
return 32 * len(z.s.BlockRoots())
}
func (z *summarizer) StateRoots() int {
return 32 * len(z.s.StateRoots())
}
func (z *summarizer) HistoricalRoots() int {
return 32 * len(z.s.HistoricalRoots())
}
func (z *summarizer) Eth1Data() int {
return z.s.Eth1Data().SizeSSZ()
}
func (z *summarizer) Eth1DataVotes() int {
sz := 0
e1dv := z.s.Eth1DataVotes()
for _, v := range e1dv {
sz += v.SizeSSZ()
}
return sz
}
func (z *summarizer) Eth1DepositIndex() int {
return 8
}
func (z *summarizer) Validators() int {
// Validators is already compressed using integer ids for hashed values
//return 8 * len(z.s.Validators())
// JK - not they aren't!!
sz := 0
for _, v := range z.s.Validators() {
sz += v.SizeSSZ()
}
return sz
}
func (z *summarizer) Balances() int {
return 8 * len(z.s.Balances())
}
func (z *summarizer) RandaoMixes() int {
return 65536 * 32
}
func (z *summarizer) Slashings() int {
return 8 * len(z.s.Slashings())
}
func (z *summarizer) PreviousEpochAttestations() int {
atts, err := z.s.PreviousEpochAttestations()
// just means the state doesn't have this value
if err != nil {
return 0
}
sz := 0
for _, v := range atts {
sz += v.SizeSSZ()
}
return sz
}
func (z *summarizer) CurrentEpochAttestations() int {
atts, err := z.s.CurrentEpochAttestations()
// just means the state doesn't have this value
if err != nil {
return 0
}
sz := 0
for _, v := range atts {
sz += v.SizeSSZ()
}
return sz
}
func (z *summarizer) PreviousEpochParticipation() int {
p, _ := z.s.PreviousEpochParticipation()
return len(p)
}
func (z *summarizer) CurrentEpochParticipation() int {
p, _ := z.s.CurrentEpochParticipation()
return len(p)
}
func (z *summarizer) JustificationBits() int {
return len(z.s.JustificationBits())
}
func (z *summarizer) PreviouslyJustifiedCheckpoint() int {
return z.s.PreviousJustifiedCheckpoint().SizeSSZ()
}
func (z *summarizer) CurrentJustifiedCheckpoint() int {
return z.s.CurrentJustifiedCheckpoint().SizeSSZ()
}
func (z *summarizer) FinalizedCheckpoint() int {
return z.s.FinalizedCheckpoint().SizeSSZ()
}
func (z *summarizer) InactivityScores() int {
scores, err := z.s.InactivityScores()
// not supported in the fork for the given state
if err != nil {
return 0
}
return 8 * len(scores)
}
func (z *summarizer) CurrentSyncCommittee() int {
c, err := z.s.CurrentSyncCommittee()
if err != nil {
return 0
}
return c.SizeSSZ()
}
func (z *summarizer) NextSyncCommittee() int {
c, err := z.s.NextSyncCommittee()
if err != nil {
return 0
}
return c.SizeSSZ()
}
func (z *summarizer) LatestExecutionPayloadHeader() int {
h, err := z.s.LatestExecutionPayloadHeader()
if err != nil {
return 0
}
return h.SizeSSZ()
}
/*
func computeSizes(bs state.BeaconState, sb []byte) (*summarizer, error) {
vu, err := detect.FromState(sb)
if err != nil {
return nil, err
}
forkName := version.String(vu.Fork)
switch vu.Fork {
case version.Phase0:
st := &ethpb.BeaconState{}
if err := st.UnmarshalSSZ(sb); err != nil {
return nil, errors.Wrapf(err, "failed to unmarshal state, detected fork=%s", forkName)
}
return &summarizer{s: bs}, nil
case version.Altair:
st := &ethpb.BeaconStateAltair{}
if err := st.UnmarshalSSZ(sb); err != nil {
return nil, errors.Wrapf(err, "failed to unmarshal state, detected fork=%s", forkName)
}
return &summarizer{s: bs}, nil
case version.Bellatrix:
st := &ethpb.BeaconStateBellatrix{}
if err := st.UnmarshalSSZ(sb); err != nil {
return nil, errors.Wrapf(err, "failed to unmarshal state, detected fork=%s", forkName)
}
return &summarizer{s: bs}, nil
default:
return nil, fmt.Errorf("unable to initialize BeaconState for fork version=%s", forkName)
}
}
*/

124
tools/db-audit/tsdb.go Normal file
View File

@@ -0,0 +1,124 @@
package main
import (
"encoding/binary"
"encoding/json"
"fmt"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/encoding/bytesutil"
bolt "go.etcd.io/bbolt"
)
var stateSizeBucket = []byte("state-sizes")
var bucketStatsBucket = []byte("bucket-stats")
func dbinit(db *bolt.DB) error {
return db.Update(func(tx *bolt.Tx) error {
for _, b := range [][]byte{stateSizeBucket, bucketStatsBucket} {
_, err := tx.CreateBucketIfNotExists(b)
if err != nil {
return errors.Wrapf(err, "failed to create bucket %s", string(b))
}
}
return nil
})
}
func writeBucketStat(db *bolt.DB, name string, size int) error {
return db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket(bucketStatsBucket)
if b == nil {
return fmt.Errorf("wtf %s is nil", string(bucketStatsBucket))
}
k := []byte(name)
v := make([]byte, 4)
binary.LittleEndian.PutUint32(v, uint32(size))
err := b.Put(k, v)
if err != nil {
return errors.Wrapf(err, "error Put w/ key=%#x, val=%v", k, v)
}
return nil
})
}
func getBucketStats(db *bolt.DB) (map[string]int, error) {
s := make(map[string]int)
err := db.View(func(tx *bolt.Tx) error {
b := tx.Bucket(bucketStatsBucket)
c := b.Cursor()
for k, v := c.First(); k != nil; k, v = c.Next() {
s[string(k)] = int(binary.LittleEndian.Uint32(v))
}
return nil
})
return s, err
}
func writeSummary(db *bolt.DB, sum SizeSummary) error {
sumb, err := json.Marshal(sum)
if err != nil {
return errors.Wrap(err, "unable to marshal SizeSummary json struct")
}
err = db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket(stateSizeBucket)
k := bytesutil.SlotToBytesBigEndian(sum.SlotRoot.Slot)
fmt.Printf("writing key=%#x for slot=%d\n", k, sum.SlotRoot.Slot)
err := b.Put(k, sumb)
if err != nil {
return errors.Wrapf(err, "error Put w/ key=%#x, val=%v", k, sum)
}
return nil
})
if err != nil {
return errors.Wrapf(err, "error writing summary to db - %v", sum)
}
return nil
}
func summaryIter(db *bolt.DB) chan SizeSummary {
ch := make(chan SizeSummary)
go func() {
err := db.View(func(tx *bolt.Tx) error {
defer close(ch)
b := tx.Bucket(stateSizeBucket)
c := b.Cursor()
for k, v := c.First(); k != nil; k, v = c.Next() {
sum := SizeSummary{}
err := json.Unmarshal(v, &sum)
if err != nil {
return err
}
ch <- sum
}
return nil
})
if err != nil {
panic(err)
}
}()
return ch
}
func summaryDump(db *bolt.DB) error {
err := db.View(func(tx *bolt.Tx) error {
b := tx.Bucket(stateSizeBucket)
c := b.Cursor()
for k, v := c.First(); k != nil; k, v = c.Next() {
fmt.Printf("summary for slot=%d\n", bytesutil.BytesToSlotBigEndian(k))
sum := SizeSummary{}
err := json.Unmarshal(v, &sum)
if err != nil {
return err
}
fmt.Printf("%v\n", sum)
}
return nil
})
if err != nil {
return err
}
return nil
}