Split validator databases (#6048)

* initial working implementation with a basic test
* merge validator enhancements
* added test dependency to db's build file
* Merge branch 'master' into merge-validators-enhancements
* changed formatting of public key
* Merge branch 'master' into merge-validators-enhancements
* Merge branch 'master' into merge-validators-enhancements
* removed unused import
* Merge branch 'merge-validators-enhancements' into split-validator-databases

# Conflicts:
#	validator/db/BUILD.bazel
#	validator/db/manage.go
#	validator/db/manage_test.go
* tests and small fixes
* extracted common functionality
* Merge branch 'master' into split-validator-databases

# Conflicts:
#	validator/accounts/account_test.go
#	validator/db/manage.go
#	validator/db/manage_test.go
* added missing test dependency to build file
* added missing flags to main.go
* applied code review suggestions
* renamed flags to avoid duplication
* Merge branch 'master' into split-validator-databases
* Merge branch 'master' into split-validator-databases
* removed redundant parenthesis
* Merge branch 'master' into split-validator-databases
* Merge branch 'master' into split-validator-databases
* Merge branch 'master' into split-validator-databases
* Merge branch 'master' into split-validator-databases
* Merge branch 'master' into split-validator-databases
* extracted defer errors to package-level variables
* Merge branch 'master' into split-validator-databases
* comply with error naming convention
* removed incorrect import
* Merge branch 'master' into split-validator-databases
This commit is contained in:
rkapka
2020-06-03 00:04:16 +02:00
committed by GitHub
parent 469499873b
commit ac138eae7b
8 changed files with 602 additions and 218 deletions

View File

@@ -26,6 +26,9 @@ import (
var log = logrus.WithField("prefix", "accounts")
var errFailedToCloseDb = errors.New("failed to close the database")
var errFailedToCloseManyDb = errors.New("failed to close one or more databases")
// DecryptKeysFromKeystore extracts a set of validator private keys from
// an encrypted keystore directory and a password string.
func DecryptKeysFromKeystore(directory string, filePrefix string, password string) (map[string]*keystore.Key, error) {
@@ -225,9 +228,17 @@ func HandleEmptyKeystoreFlags(cliCtx *cli.Context, confirmPassword bool) (string
func Merge(ctx context.Context, sourceDirectories []string, targetDirectory string) (err error) {
var sourceStores []*db.Store
defer func() {
failedToClose := false
for _, store := range sourceStores {
if deferErr := store.Close(); deferErr != nil {
err = errors.Wrapf(deferErr, "Failed to close the database in %s", store.DatabasePath())
failedToClose = true
}
}
if failedToClose {
if err != nil {
err = errors.Wrapf(err, errFailedToCloseManyDb.Error())
} else {
err = errFailedToCloseManyDb
}
}
}()
@@ -235,7 +246,7 @@ func Merge(ctx context.Context, sourceDirectories []string, targetDirectory stri
for _, dir := range sourceDirectories {
store, err := db.GetKVStore(dir)
if err != nil {
return errors.Wrapf(err, "Failed to prepare the database in %s for merging", dir)
return errors.Wrapf(err, "failed to prepare the database in %s for merging", dir)
}
if store == nil {
continue
@@ -247,11 +258,33 @@ func Merge(ctx context.Context, sourceDirectories []string, targetDirectory stri
return errors.New("no validator databases found in source directories")
}
if err := db.Merge(ctx, sourceStores, targetDirectory); err != nil {
return errors.Wrapf(err, "Failed to merge validator databases into %s", targetDirectory)
}
return db.Merge(ctx, sourceStores, targetDirectory)
}
return nil
// Split splits data from one validator database in sourceDirectory into several validator databases.
// Each validator database is created in its own subdirectory inside targetDirectory.
func Split(ctx context.Context, sourceDirectory string, targetDirectory string) (err error) {
var sourceStore *db.Store
sourceStore, err = db.GetKVStore(sourceDirectory)
if err != nil {
return errors.Wrap(err, "failed to prepare the source database for splitting")
}
if sourceStore == nil {
return errors.New("no database found in source directory")
}
defer func() {
if sourceStore != nil {
if deferErr := sourceStore.Close(); deferErr != nil {
if err != nil {
err = errors.Wrap(err, errFailedToCloseDb.Error())
} else {
err = errors.Wrap(deferErr, errFailedToCloseDb.Error())
}
}
}
}()
return db.Split(ctx, sourceStore, targetDirectory)
}
// ChangePassword changes the password for all keys located in a keystore.
@@ -276,14 +309,14 @@ func ChangePassword(keystorePath string, oldPassword string, newPassword string)
func changePasswordForKeyType(keystorePath string, filePrefix string, oldPassword string, newPassword string) error {
keys, err := DecryptKeysFromKeystore(keystorePath, filePrefix, oldPassword)
if err != nil {
return errors.Wrap(err, "Failed to decrypt keys")
return errors.Wrap(err, "failed to decrypt keys")
}
keyStore := keystore.NewKeystore(keystorePath)
for _, key := range keys {
keyFileName := keystorePath + filePrefix + hex.EncodeToString(key.PublicKey.Marshal())[:12]
if err := keyStore.StoreKey(keyFileName, key, newPassword); err != nil {
return errors.Wrapf(err, "Failed to encrypt key %s with the new password", keyFileName)
return errors.Wrapf(err, "failed to encrypt key %s with the new password", keyFileName)
}
}
@@ -305,7 +338,7 @@ func homeDir() string {
func ExtractPublicKeysFromKeyStore(keystorePath string, passphrase string) ([][]byte, error) {
decryptedKeys, err := DecryptKeysFromKeystore(keystorePath, params.BeaconConfig().ValidatorPrivkeyFileName, passphrase)
if err != nil {
return nil, errors.Wrapf(err, "Could not decrypt keys from keystore in path %s", keystorePath)
return nil, errors.Wrapf(err, "could not decrypt keys from keystore in path %s", keystorePath)
}
i := 0

View File

@@ -243,6 +243,55 @@ func TestMerge_FailsWhenNoDatabaseExistsInAllSourceDirectories(t *testing.T) {
}
}
func TestSplit(t *testing.T) {
pubKeys := [][48]byte{{1}, {2}}
sourceStore := db.SetupDB(t, pubKeys)
proposalEpoch := uint64(0)
proposalHistory1 := bitfield.Bitlist{0x01, 0x00, 0x00, 0x00, 0x01}
if err := sourceStore.SaveProposalHistoryForEpoch(context.Background(), pubKeys[0][:], proposalEpoch, proposalHistory1); err != nil {
t.Fatal("Saving proposal history failed")
}
proposalHistory2 := bitfield.Bitlist{0x02, 0x00, 0x00, 0x00, 0x01}
if err := sourceStore.SaveProposalHistoryForEpoch(context.Background(), pubKeys[1][:], proposalEpoch, proposalHistory2); err != nil {
t.Fatal("Saving proposal history failed")
}
attestationHistoryMap1 := make(map[uint64]uint64)
attestationHistoryMap1[0] = 0
pubKeyAttestationHistory1 := &slashpb.AttestationHistory{
TargetToSource: attestationHistoryMap1,
LatestEpochWritten: 0,
}
attestationHistoryMap2 := make(map[uint64]uint64)
attestationHistoryMap2[0] = 1
pubKeyAttestationHistory2 := &slashpb.AttestationHistory{
TargetToSource: attestationHistoryMap2,
LatestEpochWritten: 0,
}
dbAttestationHistory := make(map[[48]byte]*slashpb.AttestationHistory)
dbAttestationHistory[pubKeys[0]] = pubKeyAttestationHistory1
dbAttestationHistory[pubKeys[1]] = pubKeyAttestationHistory2
if err := sourceStore.SaveAttestationHistoryForPubKeys(context.Background(), dbAttestationHistory); err != nil {
t.Fatalf("Saving attestation history failed %v", err)
}
if err := sourceStore.Close(); err != nil {
t.Fatalf("Closing source store failed: %v", err)
}
targetDirectory := testutil.TempDir() + "/target"
t.Cleanup(func() {
if err := os.RemoveAll(targetDirectory); err != nil {
t.Errorf("Could not remove target directory : %v", err)
}
})
if err := Split(context.Background(), sourceStore.DatabasePath(), targetDirectory); err != nil {
t.Fatalf("Splitting failed: %v", err)
}
}
func prepareSourcesForMerging(firstStorePubKey [48]byte, firstStore *db.Store, secondStorePubKey [48]byte, secondStore *db.Store) (*sourceStoresHistory, error) {
proposalEpoch := uint64(0)
proposalHistory1 := bitfield.Bitlist{0x01, 0x00, 0x00, 0x00, 0x01}

View File

@@ -42,5 +42,6 @@ go_test(
"//shared/testutil:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_prysmaticlabs_go_bitfield//:go_default_library",
"@io_etcd_go_bbolt//:go_default_library",
],
)

View File

@@ -1,13 +1,19 @@
package db
import (
"bytes"
"context"
"encoding/hex"
"path/filepath"
"github.com/pkg/errors"
bolt "go.etcd.io/bbolt"
"go.opencensus.io/trace"
)
var errFailedToCloseSource = errors.New("failed to close the source")
var errFailedToCloseManySources = errors.New("failed to close one or more stores")
type epochProposals struct {
Epoch []byte
Proposals []byte
@@ -28,48 +34,24 @@ func Merge(ctx context.Context, sourceStores []*Store, targetDirectory string) e
ctx, span := trace.StartSpan(ctx, "Validator.Db.Manage")
defer span.End()
var allProposals []pubKeyProposals
var allAttestations []pubKeyAttestations
for _, store := range sourceStores {
if err := store.db.View(func(tx *bolt.Tx) error {
proposalsBucket := tx.Bucket(historicProposalsBucket)
if err := proposalsBucket.ForEach(func(pubKey, _ []byte) error {
pubKeyProposals, err := getPubKeyProposals(pubKey, proposalsBucket)
if err != nil {
return errors.Wrapf(err, "Could not retrieve proposals for database in %s", store.databasePath)
}
allProposals = append(allProposals, *pubKeyProposals)
return nil
}); err != nil {
return errors.Wrapf(err, "Could not retrieve proposals for database in %s", store.databasePath)
}
attestationsBucket := tx.Bucket(historicAttestationsBucket)
if err := attestationsBucket.ForEach(func(pubKey, v []byte) error {
attestations := pubKeyAttestations{
PubKey: make([]byte, len(pubKey)),
Attestations: make([]byte, len(v)),
}
copy(attestations.PubKey, pubKey)
copy(attestations.Attestations, v)
allAttestations = append(allAttestations, attestations)
return nil
}); err != nil {
return errors.Wrapf(err, "Could not retrieve attestations for database in %s", store.databasePath)
}
return nil
}); err != nil {
return err
}
allProposals, allAttestations, err := getAllProposalsAndAllAttestations(sourceStores)
if err != nil {
return err
}
return createMergeTargetStore(targetDirectory, allProposals, allAttestations)
}
if err := createTargetStore(targetDirectory, allProposals, allAttestations); err != nil {
return errors.Wrapf(err, "Could not create target store")
// Split splits data from sourceStore into several stores, one for each public key in sourceStore.
// Each new store is created in its own subdirectory inside targetDirectory.
func Split(ctx context.Context, sourceStore *Store, targetDirectory string) error {
ctx, span := trace.StartSpan(ctx, "Validator.Db.Manage")
defer span.End()
allProposals, allAttestations, err := getAllProposalsAndAllAttestations([]*Store{sourceStore})
if err != nil {
return err
}
return nil
return createSplitTargetStores(targetDirectory, allProposals, allAttestations)
}
func getPubKeyProposals(pubKey []byte, proposalsBucket *bolt.Bucket) (*pubKeyProposals, error) {
@@ -79,6 +61,10 @@ func getPubKeyProposals(pubKey []byte, proposalsBucket *bolt.Bucket) (*pubKeyPro
}
pubKeyBucket := proposalsBucket.Bucket(pubKey)
if pubKeyBucket == nil {
return &pubKeyProposals, nil
}
if err := pubKeyBucket.ForEach(func(epoch, v []byte) error {
epochProposals := epochProposals{
Epoch: make([]byte, len(epoch)),
@@ -89,13 +75,13 @@ func getPubKeyProposals(pubKey []byte, proposalsBucket *bolt.Bucket) (*pubKeyPro
pubKeyProposals.Proposals = append(pubKeyProposals.Proposals, epochProposals)
return nil
}); err != nil {
return nil, err
return nil, errors.Wrapf(err, "could not retrieve proposals for public key %x", pubKey[:12])
}
return &pubKeyProposals, nil
}
func createTargetStore(
func createMergeTargetStore(
targetDirectory string,
allProposals []pubKeyProposals,
allAttestations []pubKeyAttestations) (err error) {
@@ -103,35 +89,33 @@ func createTargetStore(
newStore, err := NewKVStore(targetDirectory, [][48]byte{})
defer func() {
if deferErr := newStore.Close(); deferErr != nil {
err = errors.Wrap(err, "Could not close the merged database")
if err != nil {
err = errors.Wrap(err, errFailedToCloseSource.Error())
} else {
err = errors.Wrap(deferErr, errFailedToCloseSource.Error())
}
}
}()
if err != nil {
return errors.Wrapf(err, "Could not initialize a new database in %s", targetDirectory)
return errors.Wrapf(err, "could not initialize a new database in %s", targetDirectory)
}
err = newStore.update(func(tx *bolt.Tx) error {
proposalsBucket := tx.Bucket(historicProposalsBucket)
allProposalsBucket := tx.Bucket(historicProposalsBucket)
for _, pubKeyProposals := range allProposals {
pubKeyBucket, err := proposalsBucket.CreateBucket(pubKeyProposals.PubKey)
proposalsBucket, err := createProposalsBucket(allProposalsBucket, pubKeyProposals.PubKey)
if err != nil {
return errors.Wrapf(err,
"Could not create proposals bucket for public key %x",
pubKeyProposals.PubKey[:12])
return err
}
for _, epochProposals := range pubKeyProposals.Proposals {
if err := pubKeyBucket.Put(epochProposals.Epoch, epochProposals.Proposals); err != nil {
return errors.Wrapf(err, "Could not add epoch proposals for epoch %v", epochProposals.Epoch)
}
if err := addEpochProposals(proposalsBucket, pubKeyProposals.Proposals); err != nil {
return err
}
}
attestationsBucket := tx.Bucket(historicAttestationsBucket)
for _, attestations := range allAttestations {
if err := attestationsBucket.Put(attestations.PubKey, attestations.Attestations); err != nil {
return errors.Wrapf(
err,
"Could not add public key attestations for public key %x",
attestations.PubKey[:12])
if err := addAttestations(attestationsBucket, attestations); err != nil {
return err
}
}
return nil
@@ -139,3 +123,205 @@ func createTargetStore(
return err
}
func createSplitTargetStores(
targetDirectory string,
allProposals []pubKeyProposals,
allAttestations []pubKeyAttestations) (err error) {
var storesToClose []*Store
defer func() {
failedToClose := false
for _, store := range storesToClose {
if deferErr := store.Close(); deferErr != nil {
failedToClose = true
}
}
if failedToClose {
if err != nil {
err = errors.Wrapf(err, errFailedToCloseManySources.Error())
} else {
err = errFailedToCloseManySources
}
}
}()
for _, pubKeyProposals := range allProposals {
dirName := hex.EncodeToString(pubKeyProposals.PubKey)[:12]
path := filepath.Join(targetDirectory, dirName)
newStore, err := NewKVStore(path, [][48]byte{})
if err != nil {
return errors.Wrapf(err, "could not create a validator database in %s", path)
}
storesToClose = append(storesToClose, newStore)
if err := newStore.update(func(tx *bolt.Tx) error {
allProposalsBucket := tx.Bucket(historicProposalsBucket)
proposalsBucket, err := createProposalsBucket(allProposalsBucket, pubKeyProposals.PubKey)
if err != nil {
return err
}
if err := addEpochProposals(proposalsBucket, pubKeyProposals.Proposals); err != nil {
return err
}
attestationsBucket := tx.Bucket(historicAttestationsBucket)
for _, pubKeyAttestations := range allAttestations {
if string(pubKeyAttestations.PubKey) == string(pubKeyProposals.PubKey) {
if err := addAttestations(attestationsBucket, pubKeyAttestations); err != nil {
return err
}
break
}
}
return nil
}); err != nil {
return err
}
}
// Create stores for attestations belonging to public keys that do not have proposals.
for _, pubKeyAttestations := range allAttestations {
var hasMatchingProposals = false
for _, pubKeyProposals := range allProposals {
if string(pubKeyAttestations.PubKey) == string(pubKeyProposals.PubKey) {
hasMatchingProposals = true
break
}
}
if !hasMatchingProposals {
dirName := hex.EncodeToString(pubKeyAttestations.PubKey)[:12]
path := filepath.Join(targetDirectory, dirName)
newStore, err := NewKVStore(path, [][48]byte{})
if err != nil {
return errors.Wrapf(err, "could not create a validator database in %s", path)
}
storesToClose = append(storesToClose, newStore)
if err := newStore.update(func(tx *bolt.Tx) error {
attestationsBucket := tx.Bucket(historicAttestationsBucket)
if err := addAttestations(attestationsBucket, pubKeyAttestations); err != nil {
return err
}
return nil
}); err != nil {
return err
}
}
}
return nil
}
func getAllProposalsAndAllAttestations(stores []*Store) ([]pubKeyProposals, []pubKeyAttestations, error) {
var allProposals []pubKeyProposals
var allAttestations []pubKeyAttestations
for _, store := range stores {
// Storing keys upfront will allow using several short transactions (one for every key)
// instead of one long-running transaction for all keys.
var allKeys [][]byte
if err := store.db.View(func(tx *bolt.Tx) error {
proposalsBucket := tx.Bucket(historicProposalsBucket)
if err := proposalsBucket.ForEach(func(pubKey, _ []byte) error {
pubKeyCopy := make([]byte, len(pubKey))
copy(pubKeyCopy, pubKey)
allKeys = append(allKeys, pubKeyCopy)
return nil
}); err != nil {
return errors.Wrapf(err, "could not retrieve proposals for source in %s", store.databasePath)
}
attestationsBucket := tx.Bucket(historicAttestationsBucket)
if err := attestationsBucket.ForEach(func(pubKey, _ []byte) error {
pubKeyCopy := make([]byte, len(pubKey))
copy(pubKeyCopy, pubKey)
allKeys = append(allKeys, pubKeyCopy)
return nil
}); err != nil {
return errors.Wrapf(err, "could not retrieve attestations for source in %s", store.databasePath)
}
return nil
}); err != nil {
return nil, nil, err
}
allKeys = removeDuplicateKeys(allKeys)
for _, pubKey := range allKeys {
if err := store.db.View(func(tx *bolt.Tx) error {
proposalsBucket := tx.Bucket(historicProposalsBucket)
pubKeyProposals, err := getPubKeyProposals(pubKey, proposalsBucket)
if err != nil {
return err
}
allProposals = append(allProposals, *pubKeyProposals)
attestationsBucket := tx.Bucket(historicAttestationsBucket)
v := attestationsBucket.Get(pubKey)
if v != nil {
attestations := pubKeyAttestations{
PubKey: pubKey,
Attestations: make([]byte, len(v)),
}
copy(attestations.Attestations, v)
allAttestations = append(allAttestations, attestations)
}
return nil
}); err != nil {
return nil, nil, errors.Wrapf(err, "could not retrieve data for public key %x", pubKey[:12])
}
}
}
return allProposals, allAttestations, nil
}
func createProposalsBucket(topLevelBucket *bolt.Bucket, pubKey []byte) (*bolt.Bucket, error) {
var bucket, err = topLevelBucket.CreateBucket(pubKey)
if err != nil {
return nil, errors.Wrapf(err, "could not create proposals bucket for public key %x", pubKey[:12])
}
return bucket, nil
}
func addEpochProposals(bucket *bolt.Bucket, proposals []epochProposals) error {
for _, singleProposal := range proposals {
if err := bucket.Put(singleProposal.Epoch, singleProposal.Proposals); err != nil {
return errors.Wrapf(err, "could not add epoch proposals for epoch %v", singleProposal.Epoch)
}
}
return nil
}
func addAttestations(bucket *bolt.Bucket, attestations pubKeyAttestations) error {
if err := bucket.Put(attestations.PubKey, attestations.Attestations); err != nil {
return errors.Wrapf(
err,
"could not add public key attestations for public key %x",
attestations.PubKey[:12])
}
return nil
}
func removeDuplicateKeys(keys [][]byte) [][]byte {
last := 0
next:
for _, k1 := range keys {
for _, k2 := range keys[:last] {
if bytes.Equal(k1, k2) {
continue next
}
}
keys[last] = k1
last++
}
return keys[:last]
}

View File

@@ -3,25 +3,21 @@ package db
import (
"bytes"
"context"
"encoding/hex"
"os"
"path/filepath"
"testing"
"github.com/pkg/errors"
"github.com/prysmaticlabs/go-bitfield"
slashpb "github.com/prysmaticlabs/prysm/proto/slashing"
"github.com/prysmaticlabs/prysm/shared/testutil"
bolt "go.etcd.io/bbolt"
)
type sourceStoresHistory struct {
ProposalEpoch uint64
FirstStoreFirstPubKeyProposals bitfield.Bitlist
FirstStoreSecondPubKeyProposals bitfield.Bitlist
SecondStoreFirstPubKeyProposals bitfield.Bitlist
SecondStoreSecondPubKeyProposals bitfield.Bitlist
FirstStoreFirstPubKeyAttestations map[uint64]uint64
FirstStoreSecondPubKeyAttestations map[uint64]uint64
SecondStoreFirstPubKeyAttestations map[uint64]uint64
SecondStoreSecondPubKeyAttestations map[uint64]uint64
type storeHistory struct {
Proposals map[[48]byte]bitfield.Bitlist
Attestations map[[48]byte]map[uint64]uint64
}
func TestMerge(t *testing.T) {
@@ -30,10 +26,32 @@ func TestMerge(t *testing.T) {
secondStorePubKeys := [][48]byte{{3}, {4}}
secondStore := SetupDB(t, secondStorePubKeys)
history, err := prepareSourcesForMerging(firstStorePubKeys, firstStore, secondStorePubKeys, secondStore)
storeHistory1, err := prepareStore(firstStore, firstStorePubKeys)
if err != nil {
t.Fatal(err)
}
storeHistory2, err := prepareStore(secondStore, secondStorePubKeys)
if err != nil {
t.Fatal(err)
}
mergedProposals := make(map[[48]byte]bitfield.Bitlist)
for k, v := range storeHistory1.Proposals {
mergedProposals[k] = v
}
for k, v := range storeHistory2.Proposals {
mergedProposals[k] = v
}
mergedAttestations := make(map[[48]byte]map[uint64]uint64)
for k, v := range storeHistory1.Attestations {
mergedAttestations[k] = v
}
for k, v := range storeHistory2.Attestations {
mergedAttestations[k] = v
}
mergedStoreHistory := storeHistory{
Proposals: mergedProposals,
Attestations: mergedAttestations,
}
targetDirectory := testutil.TempDir() + "/target"
t.Cleanup(func() {
@@ -51,164 +69,227 @@ func TestMerge(t *testing.T) {
t.Fatalf("Retrieving the merged store failed: %v", err)
}
assertMergedStore(t, mergedStore, firstStorePubKeys, secondStorePubKeys, history)
assertStore(
t,
mergedStore,
append(firstStorePubKeys, secondStorePubKeys[0], secondStorePubKeys[1]),
&mergedStoreHistory)
}
func prepareSourcesForMerging(
firstStorePubKeys [][48]byte,
firstStore *Store,
secondStorePubKeys [][48]byte,
secondStore *Store) (*sourceStoresHistory, error) {
func TestSplit(t *testing.T) {
pubKey1 := [48]byte{1}
pubKey2 := [48]byte{2}
sourceStore := SetupDB(t, [][48]byte{pubKey1, pubKey2})
proposalEpoch := uint64(0)
proposalHistory1 := bitfield.Bitlist{0x01, 0x00, 0x00, 0x00, 0x01}
if err := firstStore.SaveProposalHistoryForEpoch(context.Background(), firstStorePubKeys[0][:], proposalEpoch, proposalHistory1); err != nil {
return nil, errors.Wrapf(err, "Saving proposal history failed")
storeHistory1, err := prepareStore(sourceStore, [][48]byte{pubKey1})
if err != nil {
t.Fatal(err)
}
proposalHistory2 := bitfield.Bitlist{0x02, 0x00, 0x00, 0x00, 0x01}
if err := firstStore.SaveProposalHistoryForEpoch(context.Background(), firstStorePubKeys[1][:], proposalEpoch, proposalHistory2); err != nil {
return nil, errors.Wrapf(err, "Saving proposal history failed")
}
proposalHistory3 := bitfield.Bitlist{0x03, 0x00, 0x00, 0x00, 0x01}
if err := secondStore.SaveProposalHistoryForEpoch(context.Background(), secondStorePubKeys[0][:], proposalEpoch, proposalHistory3); err != nil {
return nil, errors.Wrapf(err, "Saving proposal history failed")
}
proposalHistory4 := bitfield.Bitlist{0x04, 0x00, 0x00, 0x00, 0x01}
if err := secondStore.SaveProposalHistoryForEpoch(context.Background(), secondStorePubKeys[1][:], proposalEpoch, proposalHistory4); err != nil {
return nil, errors.Wrapf(err, "Saving proposal history failed")
storeHistory2, err := prepareStore(sourceStore, [][48]byte{pubKey2})
if err != nil {
t.Fatal(err)
}
attestationHistoryMap1 := make(map[uint64]uint64)
attestationHistoryMap1[0] = 0
pubKeyAttestationHistory1 := &slashpb.AttestationHistory{
TargetToSource: attestationHistoryMap1,
LatestEpochWritten: 0,
targetDirectory := testutil.TempDir() + "/target"
t.Cleanup(func() {
if err := os.RemoveAll(targetDirectory); err != nil {
t.Errorf("Could not remove target directory: %v", err)
}
})
if err := Split(context.Background(), sourceStore, targetDirectory); err != nil {
t.Fatalf("Splitting failed: %v", err)
}
attestationHistoryMap2 := make(map[uint64]uint64)
attestationHistoryMap2[0] = 1
pubKeyAttestationHistory2 := &slashpb.AttestationHistory{
TargetToSource: attestationHistoryMap2,
LatestEpochWritten: 0,
encodedKey1 := hex.EncodeToString(pubKey1[:])[:12]
keyStore1, err := GetKVStore(filepath.Join(targetDirectory, encodedKey1))
if err != nil {
t.Fatalf("Retrieving the store for public key %v failed: %v", encodedKey1, err)
}
dbAttestationHistory1 := make(map[[48]byte]*slashpb.AttestationHistory)
dbAttestationHistory1[firstStorePubKeys[0]] = pubKeyAttestationHistory1
dbAttestationHistory1[firstStorePubKeys[1]] = pubKeyAttestationHistory2
if err := firstStore.SaveAttestationHistoryForPubKeys(context.Background(), dbAttestationHistory1); err != nil {
return nil, errors.Wrapf(err, "Saving attestation history failed")
if keyStore1 == nil {
t.Fatalf("No store created for public key %v", encodedKey1)
}
attestationHistoryMap3 := make(map[uint64]uint64)
attestationHistoryMap3[0] = 2
pubKeyAttestationHistory3 := &slashpb.AttestationHistory{
TargetToSource: attestationHistoryMap3,
LatestEpochWritten: 0,
encodedKey2 := hex.EncodeToString(pubKey2[:])[:12]
keyStore2, err := GetKVStore(filepath.Join(targetDirectory, encodedKey2))
if err != nil {
t.Fatalf("Retrieving the store for public key %v failed: %v", encodedKey2, err)
}
attestationHistoryMap4 := make(map[uint64]uint64)
attestationHistoryMap4[0] = 3
pubKeyAttestationHistory4 := &slashpb.AttestationHistory{
TargetToSource: attestationHistoryMap4,
LatestEpochWritten: 0,
if keyStore2 == nil {
t.Fatalf("No store created for public key %v", encodedKey2)
}
dbAttestationHistory2 := make(map[[48]byte]*slashpb.AttestationHistory)
dbAttestationHistory2[secondStorePubKeys[0]] = pubKeyAttestationHistory3
dbAttestationHistory2[secondStorePubKeys[1]] = pubKeyAttestationHistory4
if err := secondStore.SaveAttestationHistoryForPubKeys(context.Background(), dbAttestationHistory2); err != nil {
if err := keyStore1.view(func(tx *bolt.Tx) error {
otherKeyProposalsBucket := tx.Bucket(historicProposalsBucket).Bucket(pubKey2[:])
if otherKeyProposalsBucket != nil {
t.Fatalf("Store for public key %v contains proposals for another key", encodedKey2)
}
otherKeyAttestationsBucket := tx.Bucket(historicAttestationsBucket).Bucket(pubKey2[:])
if otherKeyAttestationsBucket != nil {
t.Fatalf("Store for public key %v contains attestations for another key", encodedKey2)
}
return nil
}); err != nil {
t.Fatalf("Failed to close store: %v", err)
}
if err := keyStore2.view(func(tx *bolt.Tx) error {
otherKeyProposalsBucket := tx.Bucket(historicProposalsBucket).Bucket(pubKey1[:])
if otherKeyProposalsBucket != nil {
t.Fatalf("Store for public key %v contains proposals for another key", encodedKey1)
}
otherKeyAttestationsBucket := tx.Bucket(historicAttestationsBucket).Bucket(pubKey1[:])
if otherKeyAttestationsBucket != nil {
t.Fatalf("Store for public key %v contains attestations for another key", encodedKey1)
}
return nil
}); err != nil {
t.Fatalf("Failed to close store: %v", err)
}
assertStore(t, keyStore1, [][48]byte{pubKey1}, storeHistory1)
assertStore(t, keyStore2, [][48]byte{pubKey2}, storeHistory2)
}
func TestSplit_AttestationsWithoutMatchingProposalsAreSplit(t *testing.T) {
pubKey1 := [48]byte{1}
pubKey2 := [48]byte{2}
sourceStore := SetupDB(t, [][48]byte{pubKey1, pubKey2})
_, err := prepareStoreProposals(sourceStore, [][48]byte{pubKey1})
if err != nil {
t.Fatal(err)
}
attestationHistory, err := prepareStoreAttestations(sourceStore, [][48]byte{pubKey1, pubKey2})
if err != nil {
t.Fatal(err)
}
targetDirectory := testutil.TempDir() + "/target"
t.Cleanup(func() {
if err := os.RemoveAll(targetDirectory); err != nil {
t.Errorf("Could not remove target directory : %v", err)
}
})
if err := Split(context.Background(), sourceStore, targetDirectory); err != nil {
t.Fatalf("Splitting failed: %v", err)
}
encodedKey1 := hex.EncodeToString(pubKey1[:])[:12]
encodedKey2 := hex.EncodeToString(pubKey2[:])[:12]
attestationsOnlyKeyStore, err := GetKVStore(filepath.Join(targetDirectory, encodedKey2))
if err != nil {
t.Fatalf("Retrieving the store failed: %v", err)
}
if attestationsOnlyKeyStore == nil {
t.Fatalf("No store created for public key %v", encodedKey2)
}
if err := attestationsOnlyKeyStore.view(func(tx *bolt.Tx) error {
otherKeyProposalsBucket := tx.Bucket(historicProposalsBucket).Bucket(pubKey1[:])
if otherKeyProposalsBucket != nil {
t.Fatalf("Store for public key %v contains proposals for another key", encodedKey1)
}
otherKeyAttestationsBucket := tx.Bucket(historicAttestationsBucket).Bucket(pubKey1[:])
if otherKeyAttestationsBucket != nil {
t.Fatalf("Store for public key %v contains attestations for another key", encodedKey1)
}
return nil
}); err != nil {
t.Fatalf("Failed to retrieve attestations: %v", err)
}
splitAttestationsHistory, err :=
attestationsOnlyKeyStore.AttestationHistoryForPubKeys(context.Background(), [][48]byte{pubKey2})
if err != nil {
t.Fatalf("Retrieving attestation history failed for public key %v", encodedKey2)
}
if splitAttestationsHistory[pubKey2].TargetToSource[0] != attestationHistory[pubKey2][0] {
t.Fatalf(
"Attestations not merged correctly: expected %v vs received %v",
attestationHistory[pubKey2][0],
splitAttestationsHistory[pubKey2].TargetToSource[0])
}
}
func prepareStore(store *Store, pubKeys [][48]byte) (*storeHistory, error) {
proposals, err := prepareStoreProposals(store, pubKeys)
if err != nil {
return nil, err
}
attestations, err := prepareStoreAttestations(store, pubKeys)
if err != nil {
return nil, err
}
history := storeHistory{
Proposals: proposals,
Attestations: attestations,
}
return &history, nil
}
func prepareStoreProposals(store *Store, pubKeys [][48]byte) (map[[48]byte]bitfield.Bitlist, error) {
proposals := make(map[[48]byte]bitfield.Bitlist)
for i, key := range pubKeys {
proposalHistory := bitfield.Bitlist{byte(i), 0x00, 0x00, 0x00, 0x01}
if err := store.SaveProposalHistoryForEpoch(context.Background(), key[:], 0, proposalHistory); err != nil {
return nil, errors.Wrapf(err, "Saving proposal history failed")
}
proposals[key] = proposalHistory
}
return proposals, nil
}
func prepareStoreAttestations(store *Store, pubKeys [][48]byte) (map[[48]byte]map[uint64]uint64, error) {
storeAttestationHistory := make(map[[48]byte]*slashpb.AttestationHistory)
attestations := make(map[[48]byte]map[uint64]uint64)
for i, key := range pubKeys {
attestationHistoryMap := make(map[uint64]uint64)
attestationHistoryMap[0] = uint64(i)
attestationHistory := &slashpb.AttestationHistory{
TargetToSource: attestationHistoryMap,
LatestEpochWritten: 0,
}
storeAttestationHistory[key] = attestationHistory
attestations[key] = attestationHistoryMap
}
if err := store.SaveAttestationHistoryForPubKeys(context.Background(), storeAttestationHistory); err != nil {
return nil, errors.Wrapf(err, "Saving attestation history failed")
}
mergeHistory := &sourceStoresHistory{
ProposalEpoch: proposalEpoch,
FirstStoreFirstPubKeyProposals: proposalHistory1,
FirstStoreSecondPubKeyProposals: proposalHistory2,
SecondStoreFirstPubKeyProposals: proposalHistory3,
SecondStoreSecondPubKeyProposals: proposalHistory4,
FirstStoreFirstPubKeyAttestations: attestationHistoryMap1,
FirstStoreSecondPubKeyAttestations: attestationHistoryMap2,
SecondStoreFirstPubKeyAttestations: attestationHistoryMap3,
SecondStoreSecondPubKeyAttestations: attestationHistoryMap4,
}
return mergeHistory, nil
return attestations, nil
}
func assertMergedStore(
t *testing.T,
mergedStore *Store,
firstStorePubKeys [][48]byte,
secondStorePubKeys [][48]byte,
history *sourceStoresHistory) {
mergedProposalHistory1, err := mergedStore.ProposalHistoryForEpoch(
context.Background(), firstStorePubKeys[0][:], history.ProposalEpoch)
if err != nil {
t.Fatalf("Retrieving merged proposal history failed for public key %v", firstStorePubKeys[0])
}
if !bytes.Equal(mergedProposalHistory1, history.FirstStoreFirstPubKeyProposals) {
t.Fatalf(
"Proposals not merged correctly: expected %v vs received %v",
history.FirstStoreFirstPubKeyProposals,
mergedProposalHistory1)
}
mergedProposalHistory2, err := mergedStore.ProposalHistoryForEpoch(
context.Background(), firstStorePubKeys[1][:], history.ProposalEpoch)
if err != nil {
t.Fatalf("Retrieving merged proposal history failed for public key %v", firstStorePubKeys[1])
}
if !bytes.Equal(mergedProposalHistory2, history.FirstStoreSecondPubKeyProposals) {
t.Fatalf(
"Proposals not merged correctly: expected %v vs received %v",
history.FirstStoreSecondPubKeyProposals,
mergedProposalHistory2)
}
mergedProposalHistory3, err := mergedStore.ProposalHistoryForEpoch(
context.Background(), secondStorePubKeys[0][:], history.ProposalEpoch)
if err != nil {
t.Fatalf("Retrieving merged proposal history failed for public key %v", secondStorePubKeys[0])
}
if !bytes.Equal(mergedProposalHistory3, history.SecondStoreFirstPubKeyProposals) {
t.Fatalf(
"Proposals not merged correctly: expected %v vs received %v",
history.SecondStoreFirstPubKeyProposals,
mergedProposalHistory3)
}
mergedProposalHistory4, err := mergedStore.ProposalHistoryForEpoch(
context.Background(), secondStorePubKeys[1][:], history.ProposalEpoch)
if err != nil {
t.Fatalf("Retrieving merged proposal history failed for public key %v", secondStorePubKeys[1])
}
if !bytes.Equal(mergedProposalHistory4, history.SecondStoreSecondPubKeyProposals) {
t.Fatalf("Proposals not merged correctly: expected %v vs received %v",
history.SecondStoreSecondPubKeyProposals,
mergedProposalHistory4)
func assertStore(t *testing.T, store *Store, pubKeys [][48]byte, expectedHistory *storeHistory) {
for _, key := range pubKeys {
proposalHistory, err := store.ProposalHistoryForEpoch(
context.Background(), key[:], 0)
if err != nil {
t.Fatalf("Retrieving proposal history failed for public key %v", key)
}
expectedProposals := expectedHistory.Proposals[key]
if !bytes.Equal(proposalHistory, expectedProposals) {
t.Fatalf("Proposals are incorrect: expected %v vs received %v", expectedProposals, proposalHistory)
}
}
mergedAttestationHistory, err := mergedStore.AttestationHistoryForPubKeys(
context.Background(),
append(firstStorePubKeys, secondStorePubKeys[0], secondStorePubKeys[1]))
attestationHistory, err := store.AttestationHistoryForPubKeys(context.Background(), pubKeys)
if err != nil {
t.Fatalf("Retrieving merged attestation history failed")
t.Fatalf("Retrieving attestation history failed")
}
if mergedAttestationHistory[firstStorePubKeys[0]].TargetToSource[0] != history.FirstStoreFirstPubKeyAttestations[0] {
t.Fatalf(
"Attestations not merged correctly: expected %v vs received %v",
history.FirstStoreFirstPubKeyAttestations[0],
mergedAttestationHistory[firstStorePubKeys[0]].TargetToSource[0])
}
if mergedAttestationHistory[firstStorePubKeys[1]].TargetToSource[0] != history.FirstStoreSecondPubKeyAttestations[0] {
t.Fatalf(
"Attestations not merged correctly: expected %v vs received %v",
history.FirstStoreSecondPubKeyAttestations,
mergedAttestationHistory[firstStorePubKeys[1]].TargetToSource[0])
}
if mergedAttestationHistory[secondStorePubKeys[0]].TargetToSource[0] != history.SecondStoreFirstPubKeyAttestations[0] {
t.Fatalf(
"Attestations not merged correctly: expected %v vs received %v",
history.SecondStoreFirstPubKeyAttestations,
mergedAttestationHistory[secondStorePubKeys[0]].TargetToSource[0])
}
if mergedAttestationHistory[secondStorePubKeys[1]].TargetToSource[0] != history.SecondStoreSecondPubKeyAttestations[0] {
t.Fatalf(
"Attestations not merged correctly: expected %v vs received %v",
history.SecondStoreSecondPubKeyAttestations,
mergedAttestationHistory[secondStorePubKeys[1]].TargetToSource[0])
for _, key := range pubKeys {
expectedAttestations := expectedHistory.Attestations[key]
if attestationHistory[key].TargetToSource[0] != expectedAttestations[0] {
t.Fatalf(
"Attestations are incorrect: expected %v vs received %v",
expectedAttestations[0],
attestationHistory[key].TargetToSource[0])
}
}
}

View File

@@ -77,12 +77,12 @@ var (
}
// MergeSourceDirectories defines the locations of source validator databases that will be merged.
MergeSourceDirectories = &cli.StringFlag{
Name: "source-dirs",
Name: "merge-source-dirs",
Usage: "A comma-separated list of directories containing validator databases that will be merged.",
}
// MergeTargetDirectory defines the locations where the merged database will be stored.
MergeTargetDirectory = &cli.StringFlag{
Name: "target-dir",
Name: "merge-target-dir",
Usage: "The directory where the merged database will be stored.",
}
// MonitoringPortFlag defines the http port used to serve prometheus metrics.
@@ -97,6 +97,16 @@ var (
Name: "password",
Usage: "String value of the password for your validator private keys",
}
// SplitSourceDirectory defines the location of the validator database that will be split.
SplitSourceDirectory = &cli.StringFlag{
Name: "split-source-dir",
Usage: "The directory containing the validator database that will be split.",
}
// SplitTargetDirectory defines the location where the split outcome will be stored.
SplitTargetDirectory = &cli.StringFlag{
Name: "split-target-dir",
Usage: "The directory where the split outcome will be stored.",
}
// UnencryptedKeysFlag specifies a file path of a JSON file of unencrypted validator keys as an
// alternative from launching the validator client from decrypting a keystore directory.
UnencryptedKeysFlag = &cli.StringFlag{

View File

@@ -51,6 +51,8 @@ var appFlags = []cli.Flag{
flags.KeystorePathFlag,
flags.MergeSourceDirectories,
flags.MergeTargetDirectory,
flags.SplitSourceDirectory,
flags.SplitTargetDirectory,
flags.PasswordFlag,
flags.DisablePenaltyRewardLogFlag,
flags.UnencryptedKeysFlag,
@@ -245,6 +247,26 @@ contract in order to activate the validator client`,
log.Info("Merge completed successfully")
}
return nil
},
},
{
Name: "split",
Description: "splits one validator database into several databases - one for each public key",
Flags: []cli.Flag{
flags.SplitSourceDirectory,
flags.SplitTargetDirectory,
},
Action: func(cliCtx *cli.Context) error {
source := cliCtx.String(flags.SplitSourceDirectory.Name)
target := cliCtx.String(flags.SplitTargetDirectory.Name)
if err := accounts.Split(context.Background(), source, target); err != nil {
log.WithError(err).Error("Splitting validator data failed")
} else {
log.Info("Split completed successfully")
}
return nil
},
},

View File

@@ -89,6 +89,8 @@ var appHelpFlagGroups = []flagGroup{
flags.GrpcHeadersFlag,
flags.SlasherRPCProviderFlag,
flags.SlasherCertFlag,
flags.SplitSourceDirectory,
flags.SplitTargetDirectory,
flags.DisableAccountMetricsFlag,
},
},