Configurable DB Mmap Size for Beacon Node and Validator Client (#8448)

* add flag to beacon and validator

* gaz

* fuzz

* add dep viz

* add to tools
This commit is contained in:
Raul Jordan
2021-02-15 14:29:47 -06:00
committed by GitHub
parent 7c3827a9a4
commit 36b6a71af4
35 changed files with 101 additions and 40 deletions

View File

@@ -9,6 +9,6 @@ import (
)
// NewDB initializes a new DB.
func NewDB(ctx context.Context, dirPath string) (Database, error) {
return kv.NewKVStore(ctx, dirPath)
func NewDB(ctx context.Context, dirPath string, config *kv.Config) (Database, error) {
return kv.NewKVStore(ctx, dirPath, config)
}

View File

@@ -9,7 +9,7 @@ import (
// NewDB initializes a new DB with kafka wrapper.
func NewDB(dirPath string, stateSummaryCache *kv.stateSummaryCache) (Database, error) {
db, err := kv.NewKVStore(dirPath, stateSummaryCache)
db, err := kv.NewKVStore(dirPath, stateSummaryCache, &kv.Config{})
if err != nil {
return nil, err
}

View File

@@ -26,7 +26,11 @@ go_library(
"utils.go",
],
importpath = "github.com/prysmaticlabs/prysm/beacon-chain/db/kv",
visibility = ["//beacon-chain:__subpackages__"],
visibility = [
"//beacon-chain:__subpackages__",
"//fuzz:__pkg__",
"//tools:__subpackages__",
],
deps = [
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/db/filters:go_default_library",

View File

@@ -12,7 +12,7 @@ import (
)
func TestStore_Backup(t *testing.T) {
db, err := NewKVStore(context.Background(), t.TempDir())
db, err := NewKVStore(context.Background(), t.TempDir(), &Config{})
require.NoError(t, err, "Failed to instantiate DB")
ctx := context.Background()
@@ -41,7 +41,7 @@ func TestStore_Backup(t *testing.T) {
// our NewKVStore function expects when opening a database.
require.NoError(t, os.Rename(oldFilePath, newFilePath))
backedDB, err := NewKVStore(ctx, backupsPath)
backedDB, err := NewKVStore(ctx, backupsPath, &Config{})
require.NoError(t, err, "Failed to instantiate DB")
t.Cleanup(func() {
require.NoError(t, backedDB.Close(), "Failed to close database")

View File

@@ -48,6 +48,11 @@ var blockedBuckets = [][]byte{
finalizedBlockRootsIndexBucket,
}
// Config for the bolt db kv store.
type Config struct {
InitialMMapSize int
}
// Store defines an implementation of the Prysm Database interface
// using BoltDB as the underlying persistent kv-store for eth2.
type Store struct {
@@ -62,7 +67,7 @@ type Store struct {
// NewKVStore initializes a new boltDB key-value store at the directory
// path specified, creates the kv-buckets based on the schema, and stores
// an open connection db object as a property of the Store struct.
func NewKVStore(ctx context.Context, dirPath string) (*Store, error) {
func NewKVStore(ctx context.Context, dirPath string, config *Config) (*Store, error) {
hasDir, err := fileutil.HasDir(dirPath)
if err != nil {
return nil, err
@@ -73,7 +78,14 @@ func NewKVStore(ctx context.Context, dirPath string) (*Store, error) {
}
}
datafile := path.Join(dirPath, DatabaseFileName)
boltDB, err := bolt.Open(datafile, params.BeaconIoConfig().ReadWritePermissions, &bolt.Options{Timeout: 1 * time.Second, InitialMmapSize: 10e6})
boltDB, err := bolt.Open(
datafile,
params.BeaconIoConfig().ReadWritePermissions,
&bolt.Options{
Timeout: 1 * time.Second,
InitialMmapSize: config.InitialMMapSize,
},
)
if err != nil {
if errors.Is(err, bolt.ErrTimeout) {
return nil, errors.New("cannot obtain database lock, database may be in use by another process")

View File

@@ -9,7 +9,7 @@ import (
// setupDB instantiates and returns a Store instance.
func setupDB(t testing.TB) *Store {
db, err := NewKVStore(context.Background(), t.TempDir())
db, err := NewKVStore(context.Background(), t.TempDir(), &Config{})
require.NoError(t, err, "Failed to instantiate DB")
t.Cleanup(func() {
require.NoError(t, db.Close(), "Failed to close database")

View File

@@ -21,7 +21,7 @@ func TestRestore(t *testing.T) {
logHook := logTest.NewGlobal()
ctx := context.Background()
backupDb, err := kv.NewKVStore(context.Background(), t.TempDir())
backupDb, err := kv.NewKVStore(context.Background(), t.TempDir(), &kv.Config{})
defer func() {
require.NoError(t, backupDb.Close())
}()
@@ -58,7 +58,7 @@ func TestRestore(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, 1, len(files))
assert.Equal(t, kv.DatabaseFileName, files[0].Name())
restoredDb, err := kv.NewKVStore(context.Background(), path.Join(restoreDir, kv.BeaconNodeDbDirName))
restoredDb, err := kv.NewKVStore(context.Background(), path.Join(restoreDir, kv.BeaconNodeDbDirName), &kv.Config{})
defer func() {
require.NoError(t, restoredDb.Close())
}()

View File

@@ -12,7 +12,7 @@ import (
// SetupDB instantiates and returns database backed by key value store.
func SetupDB(t testing.TB) db.Database {
s, err := kv.NewKVStore(context.Background(), t.TempDir())
s, err := kv.NewKVStore(context.Background(), t.TempDir(), &kv.Config{})
if err != nil {
t.Fatal(err)
}

View File

@@ -108,6 +108,7 @@ var appFlags = []cli.Flag{
cmd.AcceptTosFlag,
cmd.RestoreSourceFileFlag,
cmd.RestoreTargetDirFlag,
cmd.BoltMMapInitialSizeFlag,
}
func init() {

View File

@@ -298,7 +298,9 @@ func (b *BeaconNode) startDB(cliCtx *cli.Context) error {
log.WithField("database-path", dbPath).Info("Checking DB")
d, err := db.NewDB(b.ctx, dbPath)
d, err := db.NewDB(b.ctx, dbPath, &kv.Config{
InitialMMapSize: cliCtx.Int(cmd.BoltMMapInitialSizeFlag.Name),
})
if err != nil {
return err
}
@@ -320,7 +322,9 @@ func (b *BeaconNode) startDB(cliCtx *cli.Context) error {
if err := d.ClearDB(); err != nil {
return errors.Wrap(err, "could not clear database")
}
d, err = db.NewDB(b.ctx, dbPath)
d, err = db.NewDB(b.ctx, dbPath, &kv.Config{
InitialMMapSize: cliCtx.Int(cmd.BoltMMapInitialSizeFlag.Name),
})
if err != nil {
return errors.Wrap(err, "could not create new database")
}

View File

@@ -528,7 +528,7 @@ func TestStatusRPCRequest_FinalizedBlockExists(t *testing.T) {
}
func TestStatusRPCRequest_FinalizedBlockSkippedSlots(t *testing.T) {
db, err := kv.NewKVStore(context.Background(), t.TempDir())
db, err := kv.NewKVStore(context.Background(), t.TempDir(), &kv.Config{})
require.NoError(t, err)
bState, err := state.GenesisBeaconState(nil, 0, &ethpb.Eth1Data{DepositRoot: make([]byte, 32), BlockHash: make([]byte, 32)})
require.NoError(t, err)

View File

@@ -72,6 +72,7 @@ var appHelpFlagGroups = []flagGroup{
cmd.AcceptTosFlag,
cmd.RestoreSourceFileFlag,
cmd.RestoreTargetDirFlag,
cmd.BoltMMapInitialSizeFlag,
},
},
{

View File

@@ -70,6 +70,7 @@ go_fuzz_test(
"//beacon-chain/p2p/testing:go_default_library",
"//beacon-chain/sync:go_default_library",
"//beacon-chain/db:go_default_library",
"//beacon-chain/db/kv:go_default_library",
"//beacon-chain/operations/voluntaryexits:go_default_library",
"//beacon-chain/blockchain:go_default_library",
"//beacon-chain/operations/slashings:go_default_library",
@@ -169,6 +170,7 @@ go_library(
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/core/state:go_default_library",
"//beacon-chain/db:go_default_library",
"//beacon-chain/db/kv:go_default_library",
"//beacon-chain/forkchoice/protoarray:go_default_library",
"//beacon-chain/operations/attestations:go_default_library",
"//beacon-chain/operations/slashings:go_default_library",

View File

@@ -17,6 +17,7 @@ import (
"github.com/prysmaticlabs/prysm/beacon-chain/blockchain/testing"
"github.com/prysmaticlabs/prysm/beacon-chain/core/state"
"github.com/prysmaticlabs/prysm/beacon-chain/db"
beaconkv "github.com/prysmaticlabs/prysm/beacon-chain/db/kv"
"github.com/prysmaticlabs/prysm/beacon-chain/forkchoice/protoarray"
"github.com/prysmaticlabs/prysm/beacon-chain/operations/attestations"
"github.com/prysmaticlabs/prysm/beacon-chain/operations/slashings"
@@ -51,7 +52,7 @@ func init() {
var err error
db1, err = db.NewDB(context.Background(), dbPath)
db1, err = db.NewDB(context.Background(), dbPath, &beaconkv.Config{})
if err != nil {
panic(err)
}

View File

@@ -231,6 +231,12 @@ var (
Usage: "Target directory of the restored database",
Value: DefaultDataDir(),
}
// BoltMMapInitialSizeFlag specifies the initial size in bytes of boltdb's mmap syscall.
BoltMMapInitialSizeFlag = &cli.IntFlag{
Name: "bolt-mmap-initial-size",
Usage: "Specifies the size in bytes of bolt db's mmap syscall allocation",
Value: 536870912, // 512 Mb as a default value.
}
)
// LoadFlagsFromConfig sets flags values from config file if ConfigFileFlag is set.

View File

@@ -9,6 +9,7 @@ go_library(
deps = [
"//beacon-chain/db:go_default_library",
"//beacon-chain/db/filters:go_default_library",
"//beacon-chain/db/kv:go_default_library",
"//shared/bytesutil:go_default_library",
"@com_github_emicklei_dot//:go_default_library",
],

View File

@@ -18,6 +18,7 @@ import (
"github.com/emicklei/dot"
"github.com/prysmaticlabs/prysm/beacon-chain/db"
"github.com/prysmaticlabs/prysm/beacon-chain/db/filters"
"github.com/prysmaticlabs/prysm/beacon-chain/db/kv"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
)
@@ -37,7 +38,7 @@ type node struct {
func main() {
flag.Parse()
db, err := db.NewDB(context.Background(), *datadir)
db, err := db.NewDB(context.Background(), *datadir, &kv.Config{})
if err != nil {
panic(err)
}

View File

@@ -9,6 +9,7 @@ go_library(
deps = [
"//beacon-chain/core/state/interop:go_default_library",
"//beacon-chain/db:go_default_library",
"//beacon-chain/db/kv:go_default_library",
"//shared/featureconfig:go_default_library",
],
)

View File

@@ -7,6 +7,7 @@ import (
"github.com/prysmaticlabs/prysm/beacon-chain/core/state/interop"
"github.com/prysmaticlabs/prysm/beacon-chain/db"
"github.com/prysmaticlabs/prysm/beacon-chain/db/kv"
"github.com/prysmaticlabs/prysm/shared/featureconfig"
)
@@ -22,7 +23,7 @@ func main() {
defer resetCfg()
flag.Parse()
fmt.Println("Starting process...")
d, err := db.NewDB(context.Background(), *datadir)
d, err := db.NewDB(context.Background(), *datadir, &kv.Config{})
if err != nil {
panic(err)
}

View File

@@ -8,6 +8,7 @@ go_library(
visibility = ["//visibility:private"],
deps = [
"//beacon-chain/db:go_default_library",
"//beacon-chain/db/kv:go_default_library",
"//shared/fileutil:go_default_library",
],
)

View File

@@ -6,6 +6,7 @@ import (
"os"
"github.com/prysmaticlabs/prysm/beacon-chain/db"
"github.com/prysmaticlabs/prysm/beacon-chain/db/kv"
"github.com/prysmaticlabs/prysm/shared/fileutil"
)
@@ -20,7 +21,7 @@ func main() {
fmt.Printf("Reading db at %s and writing ssz output to %s.\n", os.Args[1], os.Args[2])
d, err := db.NewDB(context.Background(), os.Args[1])
d, err := db.NewDB(context.Background(), os.Args[1], &kv.Config{})
if err != nil {
panic(err)
}

View File

@@ -213,7 +213,7 @@ func TestStore_CheckSlashableAttestation_SurroundVote_54kEpochs(t *testing.T) {
func TestLowestSignedSourceEpoch_SaveRetrieve(t *testing.T) {
ctx := context.Background()
validatorDB, err := NewKVStore(ctx, t.TempDir(), nil)
validatorDB, err := NewKVStore(ctx, t.TempDir(), &Config{})
require.NoError(t, err, "Failed to instantiate DB")
t.Cleanup(func() {
require.NoError(t, validatorDB.Close(), "Failed to close database")
@@ -272,7 +272,7 @@ func TestLowestSignedSourceEpoch_SaveRetrieve(t *testing.T) {
func TestLowestSignedTargetEpoch_SaveRetrieveReplace(t *testing.T) {
ctx := context.Background()
validatorDB, err := NewKVStore(ctx, t.TempDir(), nil)
validatorDB, err := NewKVStore(ctx, t.TempDir(), &Config{})
require.NoError(t, err, "Failed to instantiate DB")
t.Cleanup(func() {
require.NoError(t, validatorDB.Close(), "Failed to close database")
@@ -500,7 +500,9 @@ func benchCheckSurroundVote(
shouldSurround bool,
) {
ctx := context.Background()
validatorDB, err := NewKVStore(ctx, filepath.Join(os.TempDir(), "benchsurroundvote"), pubKeys)
validatorDB, err := NewKVStore(ctx, filepath.Join(os.TempDir(), "benchsurroundvote"), &Config{
PubKeys: pubKeys,
})
require.NoError(b, err, "Failed to instantiate DB")
defer func() {
require.NoError(b, validatorDB.Close(), "Failed to close database")

View File

@@ -29,7 +29,7 @@ func TestStore_Backup(t *testing.T) {
// our NewKVStore function expects when opening a database.
require.NoError(t, os.Rename(oldFilePath, newFilePath))
backedDB, err := NewKVStore(ctx, backupsPath, nil)
backedDB, err := NewKVStore(ctx, backupsPath, &Config{})
require.NoError(t, err, "Failed to instantiate DB")
t.Cleanup(func() {
require.NoError(t, backedDB.Close(), "Failed to close database")

View File

@@ -48,6 +48,11 @@ var blockedBuckets = [][]byte{
attestationSourceEpochsBucket,
}
type Config struct {
PubKeys [][48]byte
InitialMMapSize int
}
// Store defines an implementation of the Prysm Database interface
// using BoltDB as the underlying persistent kv-store for eth2.
type Store struct {
@@ -98,7 +103,7 @@ func createBuckets(tx *bolt.Tx, buckets ...[]byte) error {
// NewKVStore initializes a new boltDB key-value store at the directory
// path specified, creates the kv-buckets based on the schema, and stores
// an open connection db object as a property of the Store struct.
func NewKVStore(ctx context.Context, dirPath string, pubKeys [][48]byte) (*Store, error) {
func NewKVStore(ctx context.Context, dirPath string, config *Config) (*Store, error) {
hasDir, err := fileutil.HasDir(dirPath)
if err != nil {
return nil, err
@@ -109,7 +114,10 @@ func NewKVStore(ctx context.Context, dirPath string, pubKeys [][48]byte) (*Store
}
}
datafile := filepath.Join(dirPath, ProtectionDbFileName)
boltDB, err := bolt.Open(datafile, params.BeaconIoConfig().ReadWritePermissions, &bolt.Options{Timeout: params.BeaconIoConfig().BoltTimeout})
boltDB, err := bolt.Open(datafile, params.BeaconIoConfig().ReadWritePermissions, &bolt.Options{
Timeout: params.BeaconIoConfig().BoltTimeout,
InitialMmapSize: config.InitialMMapSize,
})
if err != nil {
if errors.Is(err, bolt.ErrTimeout) {
return nil, errors.New("cannot obtain database lock, database may be in use by another process")
@@ -144,8 +152,8 @@ func NewKVStore(ctx context.Context, dirPath string, pubKeys [][48]byte) (*Store
}
// Initialize the required public keys into the DB to ensure they're not empty.
if pubKeys != nil {
if err := kv.UpdatePublicKeysBuckets(pubKeys); err != nil {
if config != nil {
if err := kv.UpdatePublicKeysBuckets(config.PubKeys); err != nil {
return nil, err
}
}

View File

@@ -18,7 +18,9 @@ func TestMain(m *testing.M) {
// setupDB instantiates and returns a DB instance for the validator client.
func setupDB(t testing.TB, pubkeys [][48]byte) *Store {
db, err := NewKVStore(context.Background(), t.TempDir(), pubkeys)
db, err := NewKVStore(context.Background(), t.TempDir(), &Config{
PubKeys: pubkeys,
})
require.NoError(t, err, "Failed to instantiate DB")
err = db.UpdatePublicKeysBuckets(pubkeys)
require.NoError(t, err, "Failed to create old buckets for public keys")

View File

@@ -178,7 +178,7 @@ func TestPruneProposalHistoryBySlot_OK(t *testing.T) {
func TestStore_ProposedPublicKeys(t *testing.T) {
ctx := context.Background()
validatorDB, err := NewKVStore(ctx, t.TempDir(), nil)
validatorDB, err := NewKVStore(ctx, t.TempDir(), &Config{})
require.NoError(t, err, "Failed to instantiate DB")
t.Cleanup(func() {
require.NoError(t, validatorDB.Close(), "Failed to close database")

View File

@@ -19,7 +19,7 @@ func migrateUp(cliCtx *cli.Context) error {
}
ctx := context.Background()
validatorDB, err := kv.NewKVStore(ctx, dataDir, nil)
validatorDB, err := kv.NewKVStore(ctx, dataDir, &kv.Config{})
if err != nil {
return err
}
@@ -34,7 +34,7 @@ func migrateDown(cliCtx *cli.Context) error {
}
ctx := context.Background()
validatorDB, err := kv.NewKVStore(ctx, dataDir, nil)
validatorDB, err := kv.NewKVStore(ctx, dataDir, &kv.Config{})
if err != nil {
return err
}

View File

@@ -21,7 +21,7 @@ func TestRestore(t *testing.T) {
logHook := logTest.NewGlobal()
ctx := context.Background()
backupDb, err := kv.NewKVStore(ctx, t.TempDir(), nil)
backupDb, err := kv.NewKVStore(ctx, t.TempDir(), &kv.Config{})
defer func() {
require.NoError(t, backupDb.Close())
}()
@@ -51,7 +51,7 @@ func TestRestore(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, 1, len(files))
assert.Equal(t, kv.ProtectionDbFileName, files[0].Name())
restoredDb, err := kv.NewKVStore(ctx, restoreDir, nil)
restoredDb, err := kv.NewKVStore(ctx, restoreDir, &kv.Config{})
defer func() {
require.NoError(t, restoredDb.Close())
}()

View File

@@ -10,7 +10,9 @@ import (
// SetupDB instantiates and returns a DB instance for the validator client.
func SetupDB(t testing.TB, pubkeys [][48]byte) iface.ValidatorDB {
db, err := kv.NewKVStore(context.Background(), t.TempDir(), pubkeys)
db, err := kv.NewKVStore(context.Background(), t.TempDir(), &kv.Config{
PubKeys: pubkeys,
})
if err != nil {
t.Fatalf("Failed to instantiate DB: %v", err)
}

View File

@@ -12,7 +12,9 @@ import (
func TestClearDB(t *testing.T) {
// Setting up manually is required, since SetupDB() will also register a teardown procedure.
testDB, err := kv.NewKVStore(context.Background(), t.TempDir(), [][48]byte{})
testDB, err := kv.NewKVStore(context.Background(), t.TempDir(), &kv.Config{
PubKeys: nil,
})
require.NoError(t, err, "Failed to instantiate DB")
require.NoError(t, testDB.ClearDB())

View File

@@ -89,6 +89,7 @@ var appFlags = []cli.Flag{
cmd.ConfigFileFlag,
cmd.ChainConfigFileFlag,
cmd.GrpcMaxCallRecvMsgSizeFlag,
cmd.BoltMMapInitialSizeFlag,
debug.PProfFlag,
debug.PProfAddrFlag,
debug.PProfPortFlag,

View File

@@ -224,7 +224,10 @@ func (c *ValidatorClient) initializeFromCLI(cliCtx *cli.Context) error {
}
log.WithField("databasePath", dataDir).Info("Checking DB")
valDB, err := kv.NewKVStore(cliCtx.Context, dataDir, nil)
valDB, err := kv.NewKVStore(cliCtx.Context, dataDir, &kv.Config{
PubKeys: nil,
InitialMMapSize: cliCtx.Int(cmd.BoltMMapInitialSizeFlag.Name),
})
if err != nil {
return errors.Wrap(err, "could not initialize db")
}
@@ -310,7 +313,10 @@ func (c *ValidatorClient) initializeForWeb(cliCtx *cli.Context) error {
}
}
log.WithField("databasePath", dataDir).Info("Checking DB")
valDB, err := kv.NewKVStore(cliCtx.Context, dataDir, make([][48]byte, 0))
valDB, err := kv.NewKVStore(cliCtx.Context, dataDir, &kv.Config{
PubKeys: nil,
InitialMMapSize: cliCtx.Int(cmd.BoltMMapInitialSizeFlag.Name),
})
if err != nil {
return errors.Wrap(err, "could not initialize db")
}
@@ -551,7 +557,7 @@ func clearDB(ctx context.Context, dataDir string, force bool) error {
}
if clearDBConfirmed {
valDB, err := kv.NewKVStore(ctx, dataDir, nil)
valDB, err := kv.NewKVStore(ctx, dataDir, &kv.Config{})
if err != nil {
return errors.Wrapf(err, "Could not create DB in dir %s", dataDir)
}

View File

@@ -37,7 +37,7 @@ func ExportSlashingProtectionJSONCli(cliCtx *cli.Context) error {
return err
}
}
validatorDB, err := kv.NewKVStore(cliCtx.Context, dataDir, nil)
validatorDB, err := kv.NewKVStore(cliCtx.Context, dataDir, &kv.Config{})
if err != nil {
return errors.Wrapf(err, "could not access validator database at path %s", dataDir)
}

View File

@@ -32,7 +32,7 @@ func ImportSlashingProtectionCLI(cliCtx *cli.Context) error {
return err
}
}
valDB, err := kv.NewKVStore(cliCtx.Context, dataDir, make([][48]byte, 0))
valDB, err := kv.NewKVStore(cliCtx.Context, dataDir, &kv.Config{})
if err != nil {
return errors.Wrapf(err, "could not access validator database at path: %s", dataDir)
}

View File

@@ -65,6 +65,7 @@ var appHelpFlagGroups = []flagGroup{
cmd.ChainConfigFileFlag,
cmd.GrpcMaxCallRecvMsgSizeFlag,
cmd.AcceptTosFlag,
cmd.BoltMMapInitialSizeFlag,
},
},
{