mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-09 15:37:56 -05:00
Merge validators enhancements (#6027)
* merge validator enhancements * added test dependency to db's build file * Merge branch 'master' into merge-validators-enhancements * changed formatting of public key * Merge branch 'master' into merge-validators-enhancements * Merge branch 'master' into merge-validators-enhancements * removed unused import
This commit is contained in:
@@ -222,12 +222,12 @@ func HandleEmptyKeystoreFlags(cliCtx *cli.Context, confirmPassword bool) (string
|
||||
}
|
||||
|
||||
// Merge merges data from validator databases in sourceDirectories into a new store, which is created in targetDirectory.
|
||||
func Merge(ctx context.Context, sourceDirectories []string, targetDirectory string) error {
|
||||
func Merge(ctx context.Context, sourceDirectories []string, targetDirectory string) (err error) {
|
||||
var sourceStores []*db.Store
|
||||
defer func() {
|
||||
for _, store := range sourceStores {
|
||||
if err := store.Close(); err != nil {
|
||||
err = errors.Wrapf(err, "Failed to close the database in %s", store.DatabasePath())
|
||||
if deferErr := store.Close(); deferErr != nil {
|
||||
err = errors.Wrapf(deferErr, "Failed to close the database in %s", store.DatabasePath())
|
||||
}
|
||||
}
|
||||
}()
|
||||
@@ -247,8 +247,7 @@ func Merge(ctx context.Context, sourceDirectories []string, targetDirectory stri
|
||||
return errors.New("no validator databases found in source directories")
|
||||
}
|
||||
|
||||
err := db.Merge(ctx, sourceStores, targetDirectory)
|
||||
if err != nil {
|
||||
if err := db.Merge(ctx, sourceStores, targetDirectory); err != nil {
|
||||
return errors.Wrapf(err, "Failed to merge validator databases into %s", targetDirectory)
|
||||
}
|
||||
|
||||
|
||||
@@ -24,22 +24,18 @@ import (
|
||||
)
|
||||
|
||||
type sourceStoresHistory struct {
|
||||
ProposalEpoch uint64
|
||||
FirstStoreFirstPubKeyProposals bitfield.Bitlist
|
||||
FirstStoreSecondPubKeyProposals bitfield.Bitlist
|
||||
SecondStoreFirstPubKeyProposals bitfield.Bitlist
|
||||
SecondStoreSecondPubKeyProposals bitfield.Bitlist
|
||||
FirstStoreFirstPubKeyAttestations map[uint64]uint64
|
||||
FirstStoreSecondPubKeyAttestations map[uint64]uint64
|
||||
SecondStoreFirstPubKeyAttestations map[uint64]uint64
|
||||
SecondStoreSecondPubKeyAttestations map[uint64]uint64
|
||||
ProposalEpoch uint64
|
||||
FirstStorePubKeyProposals bitfield.Bitlist
|
||||
SecondStorePubKeyProposals bitfield.Bitlist
|
||||
FirstStorePubKeyAttestations map[uint64]uint64
|
||||
SecondStorePubKeyAttestations map[uint64]uint64
|
||||
}
|
||||
|
||||
func TestNewValidatorAccount_AccountExists(t *testing.T) {
|
||||
directory := testutil.TempDir() + "/testkeystore"
|
||||
defer func() {
|
||||
if err := os.RemoveAll(directory); err != nil {
|
||||
t.Logf("Could not remove directory: %v", err)
|
||||
t.Errorf("Could not remove directory: %v", err)
|
||||
}
|
||||
}()
|
||||
validatorKey, err := keystore.NewKey()
|
||||
@@ -176,46 +172,13 @@ func TestChangePassword_KeyNotMatchingOldPasswordNotEncryptedWithNewPassword(t *
|
||||
}
|
||||
}
|
||||
|
||||
func TestMerge(t *testing.T) {
|
||||
firstStorePubKeys := [][48]byte{{1}, {2}}
|
||||
firstStore := db.SetupDB(t, firstStorePubKeys)
|
||||
secondStorePubKeys := [][48]byte{{3}, {4}}
|
||||
secondStore := db.SetupDB(t, secondStorePubKeys)
|
||||
|
||||
history, err := prepareSourcesForMerging(firstStorePubKeys, firstStore, secondStorePubKeys, secondStore)
|
||||
if err != nil {
|
||||
t.Fatalf(err.Error())
|
||||
}
|
||||
|
||||
if err := firstStore.Close(); err != nil {
|
||||
t.Fatalf("Closing source store failed: %v", err)
|
||||
}
|
||||
if err := secondStore.Close(); err != nil {
|
||||
t.Fatalf("Closing source store failed: %v", err)
|
||||
}
|
||||
|
||||
targetDirectory := testutil.TempDir() + "/target"
|
||||
err = Merge(context.Background(), []string{firstStore.DatabasePath(), secondStore.DatabasePath()}, targetDirectory)
|
||||
if err != nil {
|
||||
t.Fatalf("Merging failed: %v", err)
|
||||
}
|
||||
mergedStore, err := db.GetKVStore(targetDirectory)
|
||||
if err != nil {
|
||||
t.Fatalf("Retrieving the merged store failed: %v", err)
|
||||
}
|
||||
|
||||
assertMergedStore(t, mergedStore, firstStorePubKeys, secondStorePubKeys, history)
|
||||
|
||||
cleanupAfterMerge(t, []string{firstStore.DatabasePath(), secondStore.DatabasePath(), targetDirectory})
|
||||
}
|
||||
|
||||
func TestMerge_SucceedsWhenNoDatabaseExistsInSomeSourceDirectory(t *testing.T) {
|
||||
firstStorePubKeys := [][48]byte{{1}, {2}}
|
||||
firstStore := db.SetupDB(t, firstStorePubKeys)
|
||||
secondStorePubKeys := [][48]byte{{3}, {4}}
|
||||
secondStore := db.SetupDB(t, secondStorePubKeys)
|
||||
firstStorePubKey := [48]byte{1}
|
||||
firstStore := db.SetupDB(t, [][48]byte{firstStorePubKey})
|
||||
secondStorePubKey := [48]byte{2}
|
||||
secondStore := db.SetupDB(t, [][48]byte{secondStorePubKey})
|
||||
|
||||
history, err := prepareSourcesForMerging(firstStorePubKeys, firstStore, secondStorePubKeys, secondStore)
|
||||
history, err := prepareSourcesForMerging(firstStorePubKey, firstStore, secondStorePubKey, secondStore)
|
||||
if err != nil {
|
||||
t.Fatalf(err.Error())
|
||||
}
|
||||
@@ -232,6 +195,12 @@ func TestMerge_SucceedsWhenNoDatabaseExistsInSomeSourceDirectory(t *testing.T) {
|
||||
t.Fatalf("Could not create directory %s", sourceDirectoryWithoutStore)
|
||||
}
|
||||
targetDirectory := testutil.TempDir() + "/target"
|
||||
t.Cleanup(func() {
|
||||
if err := os.RemoveAll(targetDirectory); err != nil {
|
||||
t.Errorf("Could not remove target directory : %v", err)
|
||||
}
|
||||
})
|
||||
|
||||
err = Merge(
|
||||
context.Background(),
|
||||
[]string{firstStore.DatabasePath(), secondStore.DatabasePath(), sourceDirectoryWithoutStore}, targetDirectory)
|
||||
@@ -243,11 +212,7 @@ func TestMerge_SucceedsWhenNoDatabaseExistsInSomeSourceDirectory(t *testing.T) {
|
||||
t.Fatalf("Retrieving the merged store failed: %v", err)
|
||||
}
|
||||
|
||||
assertMergedStore(t, mergedStore, firstStorePubKeys, secondStorePubKeys, history)
|
||||
|
||||
cleanupAfterMerge(
|
||||
t,
|
||||
[]string{firstStore.DatabasePath(), secondStore.DatabasePath(), sourceDirectoryWithoutStore, targetDirectory})
|
||||
assertMergedStore(t, mergedStore, firstStorePubKey, secondStorePubKey, history)
|
||||
}
|
||||
|
||||
func TestMerge_FailsWhenNoDatabaseExistsInAllSourceDirectories(t *testing.T) {
|
||||
@@ -263,32 +228,29 @@ func TestMerge_FailsWhenNoDatabaseExistsInAllSourceDirectories(t *testing.T) {
|
||||
if err := os.MkdirAll(targetDirectory, 0700); err != nil {
|
||||
t.Fatalf("Could not create directory %s", targetDirectory)
|
||||
}
|
||||
t.Cleanup(func() {
|
||||
for _, dir := range []string{sourceDirectory1, sourceDirectory2, targetDirectory} {
|
||||
if err := os.RemoveAll(dir); err != nil {
|
||||
t.Errorf("Could not remove directory : %v", err)
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
err := Merge(context.Background(), []string{sourceDirectory1, sourceDirectory2}, targetDirectory)
|
||||
expected := "no validator databases found in source directories"
|
||||
if err == nil || !strings.Contains(err.Error(), expected) {
|
||||
t.Errorf("Expected: %s vs received %v", expected, err)
|
||||
}
|
||||
|
||||
cleanupAfterMerge(t, []string{sourceDirectory1, sourceDirectory2, targetDirectory})
|
||||
}
|
||||
|
||||
func prepareSourcesForMerging(firstStorePubKeys [][48]byte, firstStore *db.Store, secondStorePubKeys [][48]byte, secondStore *db.Store) (*sourceStoresHistory, error) {
|
||||
func prepareSourcesForMerging(firstStorePubKey [48]byte, firstStore *db.Store, secondStorePubKey [48]byte, secondStore *db.Store) (*sourceStoresHistory, error) {
|
||||
proposalEpoch := uint64(0)
|
||||
proposalHistory1 := bitfield.Bitlist{0x01, 0x00, 0x00, 0x00, 0x01}
|
||||
if err := firstStore.SaveProposalHistoryForEpoch(context.Background(), firstStorePubKeys[0][:], proposalEpoch, proposalHistory1); err != nil {
|
||||
if err := firstStore.SaveProposalHistoryForEpoch(context.Background(), firstStorePubKey[:], proposalEpoch, proposalHistory1); err != nil {
|
||||
return nil, errors.Wrapf(err, "Saving proposal history failed")
|
||||
}
|
||||
proposalHistory2 := bitfield.Bitlist{0x02, 0x00, 0x00, 0x00, 0x01}
|
||||
if err := firstStore.SaveProposalHistoryForEpoch(context.Background(), firstStorePubKeys[1][:], proposalEpoch, proposalHistory2); err != nil {
|
||||
return nil, errors.Wrapf(err, "Saving proposal history failed")
|
||||
}
|
||||
proposalHistory3 := bitfield.Bitlist{0x03, 0x00, 0x00, 0x00, 0x01}
|
||||
if err := secondStore.SaveProposalHistoryForEpoch(context.Background(), secondStorePubKeys[0][:], proposalEpoch, proposalHistory3); err != nil {
|
||||
return nil, errors.Wrapf(err, "Saving proposal history failed")
|
||||
}
|
||||
proposalHistory4 := bitfield.Bitlist{0x04, 0x00, 0x00, 0x00, 0x01}
|
||||
if err := secondStore.SaveProposalHistoryForEpoch(context.Background(), secondStorePubKeys[1][:], proposalEpoch, proposalHistory4); err != nil {
|
||||
if err := secondStore.SaveProposalHistoryForEpoch(context.Background(), secondStorePubKey[:], proposalEpoch, proposalHistory2); err != nil {
|
||||
return nil, errors.Wrapf(err, "Saving proposal history failed")
|
||||
}
|
||||
|
||||
@@ -298,47 +260,29 @@ func prepareSourcesForMerging(firstStorePubKeys [][48]byte, firstStore *db.Store
|
||||
TargetToSource: attestationHistoryMap1,
|
||||
LatestEpochWritten: 0,
|
||||
}
|
||||
dbAttestationHistory1 := make(map[[48]byte]*slashpb.AttestationHistory)
|
||||
dbAttestationHistory1[firstStorePubKey] = pubKeyAttestationHistory1
|
||||
if err := firstStore.SaveAttestationHistoryForPubKeys(context.Background(), dbAttestationHistory1); err != nil {
|
||||
return nil, errors.Wrapf(err, "Saving attestation history failed")
|
||||
}
|
||||
attestationHistoryMap2 := make(map[uint64]uint64)
|
||||
attestationHistoryMap2[0] = 1
|
||||
pubKeyAttestationHistory2 := &slashpb.AttestationHistory{
|
||||
TargetToSource: attestationHistoryMap2,
|
||||
LatestEpochWritten: 0,
|
||||
}
|
||||
dbAttestationHistory1 := make(map[[48]byte]*slashpb.AttestationHistory)
|
||||
dbAttestationHistory1[firstStorePubKeys[0]] = pubKeyAttestationHistory1
|
||||
dbAttestationHistory1[firstStorePubKeys[1]] = pubKeyAttestationHistory2
|
||||
if err := firstStore.SaveAttestationHistoryForPubKeys(context.Background(), dbAttestationHistory1); err != nil {
|
||||
return nil, errors.Wrapf(err, "Saving attestation history failed")
|
||||
}
|
||||
attestationHistoryMap3 := make(map[uint64]uint64)
|
||||
attestationHistoryMap3[0] = 2
|
||||
pubKeyAttestationHistory3 := &slashpb.AttestationHistory{
|
||||
TargetToSource: attestationHistoryMap3,
|
||||
LatestEpochWritten: 0,
|
||||
}
|
||||
attestationHistoryMap4 := make(map[uint64]uint64)
|
||||
attestationHistoryMap4[0] = 3
|
||||
pubKeyAttestationHistory4 := &slashpb.AttestationHistory{
|
||||
TargetToSource: attestationHistoryMap4,
|
||||
LatestEpochWritten: 0,
|
||||
}
|
||||
dbAttestationHistory2 := make(map[[48]byte]*slashpb.AttestationHistory)
|
||||
dbAttestationHistory2[secondStorePubKeys[0]] = pubKeyAttestationHistory3
|
||||
dbAttestationHistory2[secondStorePubKeys[1]] = pubKeyAttestationHistory4
|
||||
dbAttestationHistory2[secondStorePubKey] = pubKeyAttestationHistory2
|
||||
if err := secondStore.SaveAttestationHistoryForPubKeys(context.Background(), dbAttestationHistory2); err != nil {
|
||||
return nil, errors.Wrapf(err, "Saving attestation history failed")
|
||||
}
|
||||
|
||||
mergeHistory := &sourceStoresHistory{
|
||||
ProposalEpoch: proposalEpoch,
|
||||
FirstStoreFirstPubKeyProposals: proposalHistory1,
|
||||
FirstStoreSecondPubKeyProposals: proposalHistory2,
|
||||
SecondStoreFirstPubKeyProposals: proposalHistory3,
|
||||
SecondStoreSecondPubKeyProposals: proposalHistory4,
|
||||
FirstStoreFirstPubKeyAttestations: attestationHistoryMap1,
|
||||
FirstStoreSecondPubKeyAttestations: attestationHistoryMap2,
|
||||
SecondStoreFirstPubKeyAttestations: attestationHistoryMap3,
|
||||
SecondStoreSecondPubKeyAttestations: attestationHistoryMap4,
|
||||
ProposalEpoch: proposalEpoch,
|
||||
FirstStorePubKeyProposals: proposalHistory1,
|
||||
SecondStorePubKeyProposals: proposalHistory2,
|
||||
FirstStorePubKeyAttestations: attestationHistoryMap1,
|
||||
SecondStorePubKeyAttestations: attestationHistoryMap2,
|
||||
}
|
||||
|
||||
return mergeHistory, nil
|
||||
@@ -347,95 +291,49 @@ func prepareSourcesForMerging(firstStorePubKeys [][48]byte, firstStore *db.Store
|
||||
func assertMergedStore(
|
||||
t *testing.T,
|
||||
mergedStore *db.Store,
|
||||
firstStorePubKeys [][48]byte,
|
||||
secondStorePubKeys [][48]byte,
|
||||
firstStorePubKey [48]byte,
|
||||
secondStorePubKey [48]byte,
|
||||
history *sourceStoresHistory) {
|
||||
|
||||
mergedProposalHistory1, err := mergedStore.ProposalHistoryForEpoch(
|
||||
context.Background(), firstStorePubKeys[0][:], history.ProposalEpoch)
|
||||
context.Background(), firstStorePubKey[:], history.ProposalEpoch)
|
||||
if err != nil {
|
||||
t.Errorf("Retrieving merged proposal history failed for public key %v", firstStorePubKeys[0])
|
||||
} else {
|
||||
if !bytes.Equal(mergedProposalHistory1, history.FirstStoreFirstPubKeyProposals) {
|
||||
t.Errorf(
|
||||
"Proposals not merged correctly: expected %v vs received %v",
|
||||
history.FirstStoreFirstPubKeyProposals,
|
||||
mergedProposalHistory1)
|
||||
}
|
||||
t.Fatalf("Retrieving merged proposal history failed for public key %v", firstStorePubKey)
|
||||
}
|
||||
if !bytes.Equal(mergedProposalHistory1, history.FirstStorePubKeyProposals) {
|
||||
t.Fatalf(
|
||||
"Proposals not merged correctly: expected %v vs received %v",
|
||||
history.FirstStorePubKeyProposals,
|
||||
mergedProposalHistory1)
|
||||
}
|
||||
mergedProposalHistory2, err := mergedStore.ProposalHistoryForEpoch(
|
||||
context.Background(), firstStorePubKeys[1][:], history.ProposalEpoch)
|
||||
context.Background(), secondStorePubKey[:], history.ProposalEpoch)
|
||||
if err != nil {
|
||||
t.Errorf("Retrieving merged proposal history failed for public key %v", firstStorePubKeys[1])
|
||||
} else {
|
||||
if !bytes.Equal(mergedProposalHistory2, history.FirstStoreSecondPubKeyProposals) {
|
||||
t.Errorf(
|
||||
"Proposals not merged correctly: expected %v vs received %v",
|
||||
history.FirstStoreSecondPubKeyProposals,
|
||||
mergedProposalHistory2)
|
||||
}
|
||||
t.Fatalf("Retrieving merged proposal history failed for public key %v", secondStorePubKey)
|
||||
}
|
||||
mergedProposalHistory3, err := mergedStore.ProposalHistoryForEpoch(
|
||||
context.Background(), secondStorePubKeys[0][:], history.ProposalEpoch)
|
||||
if err != nil {
|
||||
t.Errorf("Retrieving merged proposal history failed for public key %v", secondStorePubKeys[0])
|
||||
} else {
|
||||
if !bytes.Equal(mergedProposalHistory3, history.SecondStoreFirstPubKeyProposals) {
|
||||
t.Errorf(
|
||||
"Proposals not merged correctly: expected %v vs received %v",
|
||||
history.SecondStoreFirstPubKeyProposals,
|
||||
mergedProposalHistory3)
|
||||
}
|
||||
}
|
||||
mergedProposalHistory4, err := mergedStore.ProposalHistoryForEpoch(
|
||||
context.Background(), secondStorePubKeys[1][:], history.ProposalEpoch)
|
||||
if err != nil {
|
||||
t.Errorf("Retrieving merged proposal history failed for public key %v", secondStorePubKeys[1])
|
||||
} else {
|
||||
if !bytes.Equal(mergedProposalHistory4, history.SecondStoreSecondPubKeyProposals) {
|
||||
t.Errorf("Proposals not merged correctly: expected %v vs received %v",
|
||||
history.SecondStoreSecondPubKeyProposals,
|
||||
mergedProposalHistory4)
|
||||
}
|
||||
if !bytes.Equal(mergedProposalHistory2, history.SecondStorePubKeyProposals) {
|
||||
t.Fatalf(
|
||||
"Proposals not merged correctly: expected %v vs received %v",
|
||||
history.SecondStorePubKeyProposals,
|
||||
mergedProposalHistory2)
|
||||
}
|
||||
|
||||
mergedAttestationHistory, err := mergedStore.AttestationHistoryForPubKeys(
|
||||
context.Background(),
|
||||
append(firstStorePubKeys, secondStorePubKeys[0], secondStorePubKeys[1]))
|
||||
[][48]byte{firstStorePubKey, secondStorePubKey})
|
||||
if err != nil {
|
||||
t.Error("Retrieving merged attestation history failed")
|
||||
} else {
|
||||
if mergedAttestationHistory[firstStorePubKeys[0]].TargetToSource[0] != history.FirstStoreFirstPubKeyAttestations[0] {
|
||||
t.Errorf(
|
||||
"Attestations not merged correctly: expected %v vs received %v",
|
||||
history.FirstStoreFirstPubKeyAttestations[0],
|
||||
mergedAttestationHistory[firstStorePubKeys[0]].TargetToSource[0])
|
||||
}
|
||||
if mergedAttestationHistory[firstStorePubKeys[1]].TargetToSource[0] != history.FirstStoreSecondPubKeyAttestations[0] {
|
||||
t.Errorf(
|
||||
"Attestations not merged correctly: expected %v vs received %v",
|
||||
history.FirstStoreSecondPubKeyAttestations,
|
||||
mergedAttestationHistory[firstStorePubKeys[1]].TargetToSource[0])
|
||||
}
|
||||
if mergedAttestationHistory[secondStorePubKeys[0]].TargetToSource[0] != history.SecondStoreFirstPubKeyAttestations[0] {
|
||||
t.Errorf(
|
||||
"Attestations not merged correctly: expected %v vs received %v",
|
||||
history.SecondStoreFirstPubKeyAttestations,
|
||||
mergedAttestationHistory[secondStorePubKeys[0]].TargetToSource[0])
|
||||
}
|
||||
if mergedAttestationHistory[secondStorePubKeys[1]].TargetToSource[0] != history.SecondStoreSecondPubKeyAttestations[0] {
|
||||
t.Errorf(
|
||||
"Attestations not merged correctly: expected %v vs received %v",
|
||||
history.SecondStoreSecondPubKeyAttestations,
|
||||
mergedAttestationHistory[secondStorePubKeys[1]].TargetToSource[0])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func cleanupAfterMerge(t *testing.T, directories []string) {
|
||||
for _, dir := range directories {
|
||||
if err := os.RemoveAll(dir); err != nil {
|
||||
t.Logf("Could not remove directory %s: %v", dir, err)
|
||||
}
|
||||
t.Fatalf("Retrieving merged attestation history failed")
|
||||
}
|
||||
if mergedAttestationHistory[firstStorePubKey].TargetToSource[0] != history.FirstStorePubKeyAttestations[0] {
|
||||
t.Fatalf(
|
||||
"Attestations not merged correctly: expected %v vs received %v",
|
||||
history.FirstStorePubKeyAttestations[0],
|
||||
mergedAttestationHistory[firstStorePubKey].TargetToSource[0])
|
||||
}
|
||||
if mergedAttestationHistory[secondStorePubKey].TargetToSource[0] != history.SecondStorePubKeyAttestations[0] {
|
||||
t.Fatalf(
|
||||
"Attestations not merged correctly: expected %v vs received %v",
|
||||
history.SecondStorePubKeyAttestations,
|
||||
mergedAttestationHistory[secondStorePubKey].TargetToSource[0])
|
||||
}
|
||||
}
|
||||
|
||||
@@ -114,7 +114,7 @@ func (v *ValidatorService) Start() {
|
||||
return
|
||||
}
|
||||
|
||||
valDB, err := db.NewKVStoreWithPublicKeyBuckets(v.dataDir, pubkeys)
|
||||
valDB, err := db.NewKVStore(v.dataDir, pubkeys)
|
||||
if err != nil {
|
||||
log.Errorf("Could not initialize db: %v", err)
|
||||
return
|
||||
|
||||
@@ -31,6 +31,7 @@ go_test(
|
||||
name = "go_default_test",
|
||||
srcs = [
|
||||
"attestation_history_test.go",
|
||||
"manage_test.go",
|
||||
"proposal_history_test.go",
|
||||
"setup_db_test.go",
|
||||
],
|
||||
@@ -39,6 +40,8 @@ go_test(
|
||||
"//beacon-chain/core/helpers:go_default_library",
|
||||
"//proto/slashing:go_default_library",
|
||||
"//shared/params:go_default_library",
|
||||
"//shared/testutil:go_default_library",
|
||||
"@com_github_pkg_errors//:go_default_library",
|
||||
"@com_github_prysmaticlabs_go_bitfield//:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
@@ -62,24 +62,10 @@ func createBuckets(tx *bolt.Tx, buckets ...[]byte) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// NewKVStoreWithPublicKeyBuckets initializes a new boltDB key-value store at the directory
|
||||
// path specified, creates the kv-buckets based on the schema and provided public keys,
|
||||
// and stores an open connection db object as a property of the Store struct.
|
||||
func NewKVStoreWithPublicKeyBuckets(dirPath string, pubKeys [][48]byte) (*Store, error) {
|
||||
kv, err := NewKVStore(dirPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Initialize the required public keys into the DB to ensure they're not empty.
|
||||
if err := kv.initializeSubBuckets(pubKeys); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return kv, err
|
||||
}
|
||||
|
||||
// NewKVStore initializes a new boltDB key-value store at the directory path specified
|
||||
// and stores an open connection db object as a property of the Store struct.
|
||||
func NewKVStore(dirPath string) (*Store, error) {
|
||||
// NewKVStore initializes a new boltDB key-value store at the directory
|
||||
// path specified, creates the kv-buckets based on the schema, and stores
|
||||
// an open connection db object as a property of the Store struct.
|
||||
func NewKVStore(dirPath string, pubKeys [][48]byte) (*Store, error) {
|
||||
if err := os.MkdirAll(dirPath, 0700); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -104,6 +90,11 @@ func NewKVStore(dirPath string) (*Store, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Initialize the required public keys into the DB to ensure they're not empty.
|
||||
if err := kv.initializeSubBuckets(pubKeys); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return kv, err
|
||||
}
|
||||
|
||||
|
||||
@@ -98,11 +98,11 @@ func getPubKeyProposals(pubKey []byte, proposalsBucket *bolt.Bucket) (*pubKeyPro
|
||||
func createTargetStore(
|
||||
targetDirectory string,
|
||||
allProposals []pubKeyProposals,
|
||||
allAttestations []pubKeyAttestations) error {
|
||||
allAttestations []pubKeyAttestations) (err error) {
|
||||
|
||||
newStore, err := NewKVStore(targetDirectory)
|
||||
newStore, err := NewKVStore(targetDirectory, [][48]byte{})
|
||||
defer func() {
|
||||
if e := newStore.Close(); e != nil {
|
||||
if deferErr := newStore.Close(); deferErr != nil {
|
||||
err = errors.Wrap(err, "Could not close the merged database")
|
||||
}
|
||||
}()
|
||||
@@ -110,12 +110,14 @@ func createTargetStore(
|
||||
return errors.Wrapf(err, "Could not initialize a new database in %s", targetDirectory)
|
||||
}
|
||||
|
||||
if err := newStore.update(func(tx *bolt.Tx) error {
|
||||
err = newStore.update(func(tx *bolt.Tx) error {
|
||||
proposalsBucket := tx.Bucket(historicProposalsBucket)
|
||||
for _, pubKeyProposals := range allProposals {
|
||||
pubKeyBucket, err := proposalsBucket.CreateBucket(pubKeyProposals.PubKey)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "Could not create proposals bucket for public key %v", pubKeyProposals.PubKey)
|
||||
return errors.Wrapf(err,
|
||||
"Could not create proposals bucket for public key %x",
|
||||
pubKeyProposals.PubKey[:12])
|
||||
}
|
||||
for _, epochProposals := range pubKeyProposals.Proposals {
|
||||
if err := pubKeyBucket.Put(epochProposals.Epoch, epochProposals.Proposals); err != nil {
|
||||
@@ -126,12 +128,14 @@ func createTargetStore(
|
||||
attestationsBucket := tx.Bucket(historicAttestationsBucket)
|
||||
for _, attestations := range allAttestations {
|
||||
if err := attestationsBucket.Put(attestations.PubKey, attestations.Attestations); err != nil {
|
||||
return errors.Wrapf(err, "Could not add public key attestations for public key %v", attestations.PubKey)
|
||||
return errors.Wrapf(
|
||||
err,
|
||||
"Could not add public key attestations for public key %x",
|
||||
attestations.PubKey[:12])
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
214
validator/db/manage_test.go
Normal file
214
validator/db/manage_test.go
Normal file
@@ -0,0 +1,214 @@
|
||||
package db
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prysmaticlabs/go-bitfield"
|
||||
slashpb "github.com/prysmaticlabs/prysm/proto/slashing"
|
||||
"github.com/prysmaticlabs/prysm/shared/testutil"
|
||||
)
|
||||
|
||||
type sourceStoresHistory struct {
|
||||
ProposalEpoch uint64
|
||||
FirstStoreFirstPubKeyProposals bitfield.Bitlist
|
||||
FirstStoreSecondPubKeyProposals bitfield.Bitlist
|
||||
SecondStoreFirstPubKeyProposals bitfield.Bitlist
|
||||
SecondStoreSecondPubKeyProposals bitfield.Bitlist
|
||||
FirstStoreFirstPubKeyAttestations map[uint64]uint64
|
||||
FirstStoreSecondPubKeyAttestations map[uint64]uint64
|
||||
SecondStoreFirstPubKeyAttestations map[uint64]uint64
|
||||
SecondStoreSecondPubKeyAttestations map[uint64]uint64
|
||||
}
|
||||
|
||||
func TestMerge(t *testing.T) {
|
||||
firstStorePubKeys := [][48]byte{{1}, {2}}
|
||||
firstStore := SetupDB(t, firstStorePubKeys)
|
||||
secondStorePubKeys := [][48]byte{{3}, {4}}
|
||||
secondStore := SetupDB(t, secondStorePubKeys)
|
||||
|
||||
history, err := prepareSourcesForMerging(firstStorePubKeys, firstStore, secondStorePubKeys, secondStore)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
targetDirectory := testutil.TempDir() + "/target"
|
||||
t.Cleanup(func() {
|
||||
if err := os.RemoveAll(targetDirectory); err != nil {
|
||||
t.Errorf("Could not remove target directory : %v", err)
|
||||
}
|
||||
})
|
||||
|
||||
err = Merge(context.Background(), []*Store{firstStore, secondStore}, targetDirectory)
|
||||
if err != nil {
|
||||
t.Fatalf("Merging failed: %v", err)
|
||||
}
|
||||
mergedStore, err := GetKVStore(targetDirectory)
|
||||
if err != nil {
|
||||
t.Fatalf("Retrieving the merged store failed: %v", err)
|
||||
}
|
||||
|
||||
assertMergedStore(t, mergedStore, firstStorePubKeys, secondStorePubKeys, history)
|
||||
}
|
||||
|
||||
func prepareSourcesForMerging(
|
||||
firstStorePubKeys [][48]byte,
|
||||
firstStore *Store,
|
||||
secondStorePubKeys [][48]byte,
|
||||
secondStore *Store) (*sourceStoresHistory, error) {
|
||||
|
||||
proposalEpoch := uint64(0)
|
||||
proposalHistory1 := bitfield.Bitlist{0x01, 0x00, 0x00, 0x00, 0x01}
|
||||
if err := firstStore.SaveProposalHistoryForEpoch(context.Background(), firstStorePubKeys[0][:], proposalEpoch, proposalHistory1); err != nil {
|
||||
return nil, errors.Wrapf(err, "Saving proposal history failed")
|
||||
}
|
||||
proposalHistory2 := bitfield.Bitlist{0x02, 0x00, 0x00, 0x00, 0x01}
|
||||
if err := firstStore.SaveProposalHistoryForEpoch(context.Background(), firstStorePubKeys[1][:], proposalEpoch, proposalHistory2); err != nil {
|
||||
return nil, errors.Wrapf(err, "Saving proposal history failed")
|
||||
}
|
||||
proposalHistory3 := bitfield.Bitlist{0x03, 0x00, 0x00, 0x00, 0x01}
|
||||
if err := secondStore.SaveProposalHistoryForEpoch(context.Background(), secondStorePubKeys[0][:], proposalEpoch, proposalHistory3); err != nil {
|
||||
return nil, errors.Wrapf(err, "Saving proposal history failed")
|
||||
}
|
||||
proposalHistory4 := bitfield.Bitlist{0x04, 0x00, 0x00, 0x00, 0x01}
|
||||
if err := secondStore.SaveProposalHistoryForEpoch(context.Background(), secondStorePubKeys[1][:], proposalEpoch, proposalHistory4); err != nil {
|
||||
return nil, errors.Wrapf(err, "Saving proposal history failed")
|
||||
}
|
||||
|
||||
attestationHistoryMap1 := make(map[uint64]uint64)
|
||||
attestationHistoryMap1[0] = 0
|
||||
pubKeyAttestationHistory1 := &slashpb.AttestationHistory{
|
||||
TargetToSource: attestationHistoryMap1,
|
||||
LatestEpochWritten: 0,
|
||||
}
|
||||
attestationHistoryMap2 := make(map[uint64]uint64)
|
||||
attestationHistoryMap2[0] = 1
|
||||
pubKeyAttestationHistory2 := &slashpb.AttestationHistory{
|
||||
TargetToSource: attestationHistoryMap2,
|
||||
LatestEpochWritten: 0,
|
||||
}
|
||||
dbAttestationHistory1 := make(map[[48]byte]*slashpb.AttestationHistory)
|
||||
dbAttestationHistory1[firstStorePubKeys[0]] = pubKeyAttestationHistory1
|
||||
dbAttestationHistory1[firstStorePubKeys[1]] = pubKeyAttestationHistory2
|
||||
if err := firstStore.SaveAttestationHistoryForPubKeys(context.Background(), dbAttestationHistory1); err != nil {
|
||||
return nil, errors.Wrapf(err, "Saving attestation history failed")
|
||||
}
|
||||
attestationHistoryMap3 := make(map[uint64]uint64)
|
||||
attestationHistoryMap3[0] = 2
|
||||
pubKeyAttestationHistory3 := &slashpb.AttestationHistory{
|
||||
TargetToSource: attestationHistoryMap3,
|
||||
LatestEpochWritten: 0,
|
||||
}
|
||||
attestationHistoryMap4 := make(map[uint64]uint64)
|
||||
attestationHistoryMap4[0] = 3
|
||||
pubKeyAttestationHistory4 := &slashpb.AttestationHistory{
|
||||
TargetToSource: attestationHistoryMap4,
|
||||
LatestEpochWritten: 0,
|
||||
}
|
||||
dbAttestationHistory2 := make(map[[48]byte]*slashpb.AttestationHistory)
|
||||
dbAttestationHistory2[secondStorePubKeys[0]] = pubKeyAttestationHistory3
|
||||
dbAttestationHistory2[secondStorePubKeys[1]] = pubKeyAttestationHistory4
|
||||
if err := secondStore.SaveAttestationHistoryForPubKeys(context.Background(), dbAttestationHistory2); err != nil {
|
||||
return nil, errors.Wrapf(err, "Saving attestation history failed")
|
||||
}
|
||||
|
||||
mergeHistory := &sourceStoresHistory{
|
||||
ProposalEpoch: proposalEpoch,
|
||||
FirstStoreFirstPubKeyProposals: proposalHistory1,
|
||||
FirstStoreSecondPubKeyProposals: proposalHistory2,
|
||||
SecondStoreFirstPubKeyProposals: proposalHistory3,
|
||||
SecondStoreSecondPubKeyProposals: proposalHistory4,
|
||||
FirstStoreFirstPubKeyAttestations: attestationHistoryMap1,
|
||||
FirstStoreSecondPubKeyAttestations: attestationHistoryMap2,
|
||||
SecondStoreFirstPubKeyAttestations: attestationHistoryMap3,
|
||||
SecondStoreSecondPubKeyAttestations: attestationHistoryMap4,
|
||||
}
|
||||
|
||||
return mergeHistory, nil
|
||||
}
|
||||
|
||||
func assertMergedStore(
|
||||
t *testing.T,
|
||||
mergedStore *Store,
|
||||
firstStorePubKeys [][48]byte,
|
||||
secondStorePubKeys [][48]byte,
|
||||
history *sourceStoresHistory) {
|
||||
|
||||
mergedProposalHistory1, err := mergedStore.ProposalHistoryForEpoch(
|
||||
context.Background(), firstStorePubKeys[0][:], history.ProposalEpoch)
|
||||
if err != nil {
|
||||
t.Fatalf("Retrieving merged proposal history failed for public key %v", firstStorePubKeys[0])
|
||||
}
|
||||
if !bytes.Equal(mergedProposalHistory1, history.FirstStoreFirstPubKeyProposals) {
|
||||
t.Fatalf(
|
||||
"Proposals not merged correctly: expected %v vs received %v",
|
||||
history.FirstStoreFirstPubKeyProposals,
|
||||
mergedProposalHistory1)
|
||||
}
|
||||
mergedProposalHistory2, err := mergedStore.ProposalHistoryForEpoch(
|
||||
context.Background(), firstStorePubKeys[1][:], history.ProposalEpoch)
|
||||
if err != nil {
|
||||
t.Fatalf("Retrieving merged proposal history failed for public key %v", firstStorePubKeys[1])
|
||||
}
|
||||
if !bytes.Equal(mergedProposalHistory2, history.FirstStoreSecondPubKeyProposals) {
|
||||
t.Fatalf(
|
||||
"Proposals not merged correctly: expected %v vs received %v",
|
||||
history.FirstStoreSecondPubKeyProposals,
|
||||
mergedProposalHistory2)
|
||||
}
|
||||
mergedProposalHistory3, err := mergedStore.ProposalHistoryForEpoch(
|
||||
context.Background(), secondStorePubKeys[0][:], history.ProposalEpoch)
|
||||
if err != nil {
|
||||
t.Fatalf("Retrieving merged proposal history failed for public key %v", secondStorePubKeys[0])
|
||||
}
|
||||
if !bytes.Equal(mergedProposalHistory3, history.SecondStoreFirstPubKeyProposals) {
|
||||
t.Fatalf(
|
||||
"Proposals not merged correctly: expected %v vs received %v",
|
||||
history.SecondStoreFirstPubKeyProposals,
|
||||
mergedProposalHistory3)
|
||||
}
|
||||
mergedProposalHistory4, err := mergedStore.ProposalHistoryForEpoch(
|
||||
context.Background(), secondStorePubKeys[1][:], history.ProposalEpoch)
|
||||
if err != nil {
|
||||
t.Fatalf("Retrieving merged proposal history failed for public key %v", secondStorePubKeys[1])
|
||||
}
|
||||
if !bytes.Equal(mergedProposalHistory4, history.SecondStoreSecondPubKeyProposals) {
|
||||
t.Fatalf("Proposals not merged correctly: expected %v vs received %v",
|
||||
history.SecondStoreSecondPubKeyProposals,
|
||||
mergedProposalHistory4)
|
||||
}
|
||||
|
||||
mergedAttestationHistory, err := mergedStore.AttestationHistoryForPubKeys(
|
||||
context.Background(),
|
||||
append(firstStorePubKeys, secondStorePubKeys[0], secondStorePubKeys[1]))
|
||||
if err != nil {
|
||||
t.Fatalf("Retrieving merged attestation history failed")
|
||||
}
|
||||
if mergedAttestationHistory[firstStorePubKeys[0]].TargetToSource[0] != history.FirstStoreFirstPubKeyAttestations[0] {
|
||||
t.Fatalf(
|
||||
"Attestations not merged correctly: expected %v vs received %v",
|
||||
history.FirstStoreFirstPubKeyAttestations[0],
|
||||
mergedAttestationHistory[firstStorePubKeys[0]].TargetToSource[0])
|
||||
}
|
||||
if mergedAttestationHistory[firstStorePubKeys[1]].TargetToSource[0] != history.FirstStoreSecondPubKeyAttestations[0] {
|
||||
t.Fatalf(
|
||||
"Attestations not merged correctly: expected %v vs received %v",
|
||||
history.FirstStoreSecondPubKeyAttestations,
|
||||
mergedAttestationHistory[firstStorePubKeys[1]].TargetToSource[0])
|
||||
}
|
||||
if mergedAttestationHistory[secondStorePubKeys[0]].TargetToSource[0] != history.SecondStoreFirstPubKeyAttestations[0] {
|
||||
t.Fatalf(
|
||||
"Attestations not merged correctly: expected %v vs received %v",
|
||||
history.SecondStoreFirstPubKeyAttestations,
|
||||
mergedAttestationHistory[secondStorePubKeys[0]].TargetToSource[0])
|
||||
}
|
||||
if mergedAttestationHistory[secondStorePubKeys[1]].TargetToSource[0] != history.SecondStoreSecondPubKeyAttestations[0] {
|
||||
t.Fatalf(
|
||||
"Attestations not merged correctly: expected %v vs received %v",
|
||||
history.SecondStoreSecondPubKeyAttestations,
|
||||
mergedAttestationHistory[secondStorePubKeys[1]].TargetToSource[0])
|
||||
}
|
||||
}
|
||||
@@ -19,7 +19,7 @@ func SetupDB(t testing.TB, pubkeys [][48]byte) *Store {
|
||||
if err := os.RemoveAll(p); err != nil {
|
||||
t.Fatalf("Failed to remove directory: %v", err)
|
||||
}
|
||||
db, err := NewKVStoreWithPublicKeyBuckets(p, pubkeys)
|
||||
db, err := NewKVStore(p, pubkeys)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to instantiate DB: %v", err)
|
||||
}
|
||||
|
||||
@@ -19,7 +19,7 @@ func TestClearDB(t *testing.T) {
|
||||
if err := os.RemoveAll(p); err != nil {
|
||||
t.Fatalf("Failed to remove directory: %v", err)
|
||||
}
|
||||
db, err := NewKVStoreWithPublicKeyBuckets(p, [][48]byte{})
|
||||
db, err := NewKVStore(p, [][48]byte{})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to instantiate DB: %v", err)
|
||||
}
|
||||
|
||||
@@ -317,7 +317,7 @@ func clearDB(dataDir string, pubkeys [][48]byte, force bool) error {
|
||||
}
|
||||
|
||||
if clearDBConfirmed {
|
||||
valDB, err := db.NewKVStoreWithPublicKeyBuckets(dataDir, pubkeys)
|
||||
valDB, err := db.NewKVStore(dataDir, pubkeys)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "Could not create DB in dir %s", dataDir)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user