Compare commits

...

1 Commits

Author SHA1 Message Date
Kasey Kirkham
03aba86b34 changes to support external db tools
- allow kv.Store to be created from a bolt database (so the same
  database handle can be shared by code not going through the Store
  interface)
- support for building an IsCanonical type directly from the db, which
  enables a script to get a stategen.Replayer instance up and running.
2022-07-19 19:56:15 -05:00
2 changed files with 116 additions and 36 deletions

View File

@@ -0,0 +1,57 @@
package db
import (
"context"
types "github.com/prysmaticlabs/prysm/consensus-types/primitives"
ethpb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/time/slots"
)
type CanonicalChecker interface {
IsCanonical(ctx context.Context, blockRoot [32]byte) (bool, error)
}
type FinalizedChecker interface {
IsFinalizedBlock(ctx context.Context, blockRoot [32]byte) bool
}
type canonicalChecker struct {
fc FinalizedChecker
}
func (cc *canonicalChecker) IsCanonical(ctx context.Context, root [32]byte) (bool, error) {
return cc.fc.IsFinalizedBlock(ctx, root), nil
}
func NewCanonicalChecker(fc FinalizedChecker) CanonicalChecker {
return &canonicalChecker{fc: fc}
}
type CurrentSlotter interface {
CurrentSlot() types.Slot
}
type FinalizedCheckpointer interface {
FinalizedCheckpoint(ctx context.Context) (*ethpb.Checkpoint, error)
}
type finalizedCurrentSlotter struct {
fc FinalizedCheckpointer
ctx context.Context
}
func (fc *finalizedCurrentSlotter) CurrentSlot() types.Slot {
cp, err := fc.fc.FinalizedCheckpoint(fc.ctx)
if err != nil {
return 0
}
s, err := slots.EpochStart(cp.Epoch)
if err != nil {
return 0
}
return s
}
func FinalizedCurrentSlotter(fc FinalizedCheckpointer, ctx context.Context) CurrentSlotter {
return &finalizedCurrentSlotter{fc: fc, ctx: ctx}
}

View File

@@ -93,36 +93,12 @@ func KVStoreDatafilePath(dirPath string) string {
return path.Join(dirPath, DatabaseFileName)
}
// NewKVStore initializes a new boltDB key-value store at the directory
// path specified, creates the kv-buckets based on the schema, and stores
// an open connection db object as a property of the Store struct.
func NewKVStore(ctx context.Context, dirPath string, config *Config) (*Store, error) {
hasDir, err := file.HasDir(dirPath)
if err != nil {
return nil, err
}
if !hasDir {
if err := file.MkdirAll(dirPath); err != nil {
return nil, err
}
}
datafile := KVStoreDatafilePath(dirPath)
log.Infof("Opening Bolt DB at %s", datafile)
boltDB, err := bolt.Open(
datafile,
params.BeaconIoConfig().ReadWritePermissions,
&bolt.Options{
Timeout: 1 * time.Second,
InitialMmapSize: config.InitialMMapSize,
},
)
if err != nil {
if errors.Is(err, bolt.ErrTimeout) {
return nil, errors.New("cannot obtain database lock, database may be in use by another process")
}
return nil, err
}
boltDB.AllocSize = boltAllocSize
type StoreOption func(*Store) error
func NewKVStoreWithDB(ctx context.Context, bdb *bolt.DB, opts ...StoreOption) (*Store, error) {
bdb.AllocSize = boltAllocSize
start := time.Now()
log.Infof("Creating block cache...")
blockCache, err := ristretto.NewCache(&ristretto.Config{
NumCounters: 1000, // number of keys to track frequency of (1000).
MaxCost: BlockCacheSize, // maximum cost of cache (1000 Blocks).
@@ -142,8 +118,8 @@ func NewKVStore(ctx context.Context, dirPath string, config *Config) (*Store, er
}
kv := &Store{
db: boltDB,
databasePath: dirPath,
db: bdb,
databasePath: path.Dir(bdb.Path()),
blockCache: blockCache,
validatorEntryCache: validatorCache,
stateSummaryCache: newStateSummaryCache(),
@@ -174,6 +150,7 @@ func NewKVStore(ctx context.Context, dirPath string, config *Config) (*Store, er
blockParentRootIndicesBucket,
finalizedBlockRootsIndexBucket,
blockRootValidatorHashesBucket,
finalizedBlockRootBySlot,
// State management service bucket.
newStateServiceCompatibleBucket,
// Migrations
@@ -185,13 +162,59 @@ func NewKVStore(ctx context.Context, dirPath string, config *Config) (*Store, er
}); err != nil {
return nil, err
}
if err = prometheus.Register(createBoltCollector(kv.db)); err != nil {
return nil, err
}
if err = kv.checkNeedsResync(); err != nil {
return nil, err
}
return kv, nil
log.WithField("elapsed", time.Since(start)).Info("Updated db and created buckets")
for _, o := range opts {
err := o(kv)
if err != nil {
return nil, errors.Wrap(err, "error while applying StoreOptions")
}
}
return kv, err
}
func WithPrometheusCollection() StoreOption {
return func(kv *Store) error {
return prometheus.Register(createBoltCollector(kv.db))
}
}
// NewKVStore initializes a new boltDB key-value store at the directory
// path specified, creates the kv-buckets based on the schema, and stores
// an open connection db object as a property of the Store struct.
func NewKVStore(ctx context.Context, dirPath string, config *Config) (*Store, error) {
hasDir, err := file.HasDir(dirPath)
if err != nil {
return nil, err
}
if !hasDir {
if err := file.MkdirAll(dirPath); err != nil {
return nil, err
}
}
datafile := KVStoreDatafilePath(dirPath)
start := time.Now()
log.Infof("Opening Bolt DB at %s", datafile)
bdb, err := bolt.Open(
datafile,
params.BeaconIoConfig().ReadWritePermissions,
&bolt.Options{
Timeout: 1 * time.Second,
InitialMmapSize: config.InitialMMapSize,
},
)
if err != nil {
log.WithField("elapsed", time.Since(start)).Error("Failed to open Bolt DB")
if errors.Is(err, bolt.ErrTimeout) {
return nil, errors.New("cannot obtain database lock, database may be in use by another process")
}
return nil, err
}
log.WithField("elapsed", time.Since(start)).Info("Opened Bolt DB")
return NewKVStoreWithDB(ctx, bdb, WithPrometheusCollection())
}
// ClearDB removes the previously stored database in the data directory.