Delete Dead Code in the Validator Client's Database Package (#7970)

* delete deprecated

* mod

* deep source

Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
This commit is contained in:
Raul Jordan
2020-11-26 10:50:55 -06:00
committed by GitHub
parent 4ef8a0f99e
commit 10857223d0
19 changed files with 324 additions and 1800 deletions

1
go.mod
View File

@@ -99,7 +99,6 @@ require (
github.com/trailofbits/go-mutexasserts v0.0.0-20200708152505-19999e7d3cef
github.com/tyler-smith/go-bip39 v1.0.2
github.com/urfave/cli/v2 v2.2.0
github.com/wealdtech/go-bytesutil v1.1.1
github.com/wealdtech/go-eth2-util v1.6.2
github.com/wealdtech/go-eth2-wallet-encryptor-keystorev4 v1.1.1
github.com/wercker/journalhook v0.0.0-20180428041537-5d0a5ae867b3

View File

@@ -6,9 +6,5 @@ go_library(
importpath = "github.com/prysmaticlabs/prysm/validator/db/iface",
# Other packages must use github.com/prysmaticlabs/prysm/validator/db.Database alias.
visibility = ["//validator/db:__subpackages__"],
deps = [
"//proto/slashing:go_default_library",
"//validator/db/kv:go_default_library",
"@com_github_prysmaticlabs_go_bitfield//:go_default_library",
],
deps = ["//validator/db/kv:go_default_library"],
)

View File

@@ -5,8 +5,6 @@ import (
"context"
"io"
"github.com/prysmaticlabs/go-bitfield"
slashpb "github.com/prysmaticlabs/prysm/proto/slashing"
"github.com/prysmaticlabs/prysm/validator/db/kv"
)
@@ -22,10 +20,6 @@ type ValidatorDB interface {
SaveGenesisValidatorsRoot(ctx context.Context, genValRoot []byte) error
// Proposer protection related methods.
ProposalHistoryForEpoch(ctx context.Context, publicKey []byte, epoch uint64) (bitfield.Bitlist, error)
SaveProposalHistoryForEpoch(ctx context.Context, publicKey []byte, epoch uint64, history bitfield.Bitlist) error
// New data structure methods
HighestSignedProposal(ctx context.Context, publicKey [48]byte) (uint64, error)
LowestSignedProposal(ctx context.Context, publicKey [48]byte) (uint64, error)
ProposalHistoryForSlot(ctx context.Context, publicKey [48]byte, slot uint64) ([32]byte, bool, error)
@@ -33,14 +27,10 @@ type ValidatorDB interface {
ProposedPublicKeys(ctx context.Context) ([][48]byte, error)
// Attester protection related methods.
AttestationHistoryForPubKeys(ctx context.Context, publicKeys [][48]byte) (map[[48]byte]*slashpb.AttestationHistory, error)
SaveAttestationHistoryForPubKeys(ctx context.Context, historyByPubKey map[[48]byte]*slashpb.AttestationHistory) error
HighestSignedTargetEpoch(ctx context.Context, publicKey [48]byte) (uint64, error)
HighestSignedSourceEpoch(ctx context.Context, publicKey [48]byte) (uint64, error)
SaveHighestSignedTargetEpoch(ctx context.Context, publicKey [48]byte, epoch uint64) error
SaveHighestSignedSourceEpoch(ctx context.Context, publicKey [48]byte, epoch uint64) error
// New attestation store methods.
AttestationHistoryForPubKeysV2(ctx context.Context, publicKeys [][48]byte) (map[[48]byte]kv.EncHistoryData, error)
SaveAttestationHistoryForPubKeysV2(ctx context.Context, historyByPubKeys map[[48]byte]kv.EncHistoryData) error
SaveAttestationHistoryForPubKeyV2(ctx context.Context, pubKey [48]byte, history kv.EncHistoryData) error

View File

@@ -4,12 +4,10 @@ load("@io_bazel_rules_go//go:def.bzl", "go_test")
go_library(
name = "go_default_library",
srcs = [
"attestation_history.go",
"attestation_history_v2.go",
"db.go",
"genesis.go",
"manage.go",
"proposal_history.go",
"historical_attestations.go",
"proposal_history_v2.go",
"schema.go",
],
@@ -17,15 +15,11 @@ go_library(
visibility = ["//validator:__subpackages__"],
deps = [
"//beacon-chain/core/helpers:go_default_library",
"//proto/slashing:go_default_library",
"//shared/bytesutil:go_default_library",
"//shared/fileutil:go_default_library",
"//shared/params:go_default_library",
"@com_github_gogo_protobuf//proto:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_prysmaticlabs_go_bitfield//:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@com_github_wealdtech_go_bytesutil//:go_default_library",
"@io_etcd_go_bbolt//:go_default_library",
"@io_opencensus_go//trace:go_default_library",
],
@@ -34,24 +28,17 @@ go_library(
go_test(
name = "go_default_test",
srcs = [
"attestation_history_test.go",
"attestation_history_v2_test.go",
"db_test.go",
"genesis_test.go",
"manage_test.go",
"proposal_history_test.go",
"historical_attestations_test.go",
"proposal_history_v2_test.go",
],
embed = [":go_default_library"],
deps = [
"//beacon-chain/core/helpers:go_default_library",
"//proto/slashing:go_default_library",
"//shared/bytesutil:go_default_library",
"//shared/params:go_default_library",
"//shared/testutil/assert:go_default_library",
"//shared/testutil/require:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_prysmaticlabs_go_bitfield//:go_default_library",
"@io_etcd_go_bbolt//:go_default_library",
],
)

View File

@@ -1,84 +0,0 @@
package kv
import (
"context"
"github.com/gogo/protobuf/proto"
"github.com/pkg/errors"
slashpb "github.com/prysmaticlabs/prysm/proto/slashing"
"github.com/prysmaticlabs/prysm/shared/params"
bolt "go.etcd.io/bbolt"
"go.opencensus.io/trace"
)
func unmarshalAttestationHistory(ctx context.Context, enc []byte) (*slashpb.AttestationHistory, error) {
ctx, span := trace.StartSpan(ctx, "Validator.unmarshalAttestationHistory")
defer span.End()
history := &slashpb.AttestationHistory{}
if err := proto.Unmarshal(enc, history); err != nil {
return nil, errors.Wrap(err, "failed to unmarshal encoding")
}
return history, nil
}
// AttestationHistoryForPubKeys accepts an array of validator public keys and returns a mapping of corresponding attestation history.
func (store *Store) AttestationHistoryForPubKeys(ctx context.Context, publicKeys [][48]byte) (map[[48]byte]*slashpb.AttestationHistory, error) {
ctx, span := trace.StartSpan(ctx, "Validator.AttestationHistoryForPubKeys")
defer span.End()
if len(publicKeys) == 0 {
return make(map[[48]byte]*slashpb.AttestationHistory), nil
}
var err error
attestationHistoryForVals := make(map[[48]byte]*slashpb.AttestationHistory)
err = store.view(func(tx *bolt.Tx) error {
bucket := tx.Bucket(historicAttestationsBucket)
for _, key := range publicKeys {
enc := bucket.Get(key[:])
var attestationHistory *slashpb.AttestationHistory
if len(enc) == 0 {
newMap := make(map[uint64]uint64)
newMap[0] = params.BeaconConfig().FarFutureEpoch
attestationHistory = &slashpb.AttestationHistory{
TargetToSource: newMap,
}
} else {
attestationHistory, err = unmarshalAttestationHistory(ctx, enc)
if err != nil {
return err
}
}
attestationHistoryForVals[key] = attestationHistory
}
return nil
})
return attestationHistoryForVals, err
}
// SaveAttestationHistoryForPubKeys saves the attestation histories for the requested validator public keys.
func (store *Store) SaveAttestationHistoryForPubKeys(ctx context.Context, historyByPubKeys map[[48]byte]*slashpb.AttestationHistory) error {
ctx, span := trace.StartSpan(ctx, "Validator.SaveAttestationHistoryForPubKeys")
defer span.End()
encoded := make(map[[48]byte][]byte)
for pubKey, history := range historyByPubKeys {
enc, err := proto.Marshal(history)
if err != nil {
return errors.Wrap(err, "failed to encode attestation history")
}
encoded[pubKey] = enc
}
err := store.update(func(tx *bolt.Tx) error {
bucket := tx.Bucket(historicAttestationsBucket)
for pubKey, encodedHistory := range encoded {
if err := bucket.Put(pubKey[:], encodedHistory); err != nil {
return err
}
}
return nil
})
return err
}

View File

@@ -1,141 +0,0 @@
package kv
import (
"context"
"testing"
slashpb "github.com/prysmaticlabs/prysm/proto/slashing"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/testutil/require"
)
func TestAttestationHistoryForPubKeys_EmptyVals(t *testing.T) {
pubkeys := [][48]byte{{30}, {25}, {20}}
db := setupDB(t, pubkeys)
historyForPubKeys, err := db.AttestationHistoryForPubKeys(context.Background(), pubkeys)
require.NoError(t, err)
newMap := make(map[uint64]uint64)
newMap[0] = params.BeaconConfig().FarFutureEpoch
clean := &slashpb.AttestationHistory{
TargetToSource: newMap,
}
cleanAttHistoryForPubKeys := make(map[[48]byte]*slashpb.AttestationHistory)
for _, pubKey := range pubkeys {
cleanAttHistoryForPubKeys[pubKey] = clean
}
require.DeepEqual(t, cleanAttHistoryForPubKeys, historyForPubKeys, "Expected attestation history epoch bits to be empty")
}
func TestSaveAttestationHistory_OK(t *testing.T) {
pubKeys := [][48]byte{{3}, {4}}
db := setupDB(t, pubKeys)
farFuture := params.BeaconConfig().FarFutureEpoch
newMap := make(map[uint64]uint64)
// The validator attested at target epoch 2 but had no attestations for target epochs 0 and 1.
newMap[0] = farFuture
newMap[1] = farFuture
newMap[2] = 1
history := &slashpb.AttestationHistory{
TargetToSource: newMap,
LatestEpochWritten: 2,
}
newMap2 := make(map[uint64]uint64)
// The validator attested at target epoch 1 and 3 but had no attestations for target epochs 0 and 2.
newMap2[0] = farFuture
newMap2[1] = 0
newMap2[2] = farFuture
newMap2[3] = 2
history2 := &slashpb.AttestationHistory{
TargetToSource: newMap2,
LatestEpochWritten: 3,
}
attestationHistory := make(map[[48]byte]*slashpb.AttestationHistory)
attestationHistory[pubKeys[0]] = history
attestationHistory[pubKeys[1]] = history2
require.NoError(t, db.SaveAttestationHistoryForPubKeys(context.Background(), attestationHistory), "Saving attestation history failed")
savedHistories, err := db.AttestationHistoryForPubKeys(context.Background(), pubKeys)
require.NoError(t, err, "Failed to get attestation history")
require.NotNil(t, savedHistories)
require.DeepEqual(t, attestationHistory, savedHistories, "Expected DB to keep object the same, received: %v", history)
savedHistory := savedHistories[pubKeys[0]]
require.Equal(t, newMap[2], savedHistory.TargetToSource[2], "Expected target epoch %d to have the same marked source epoch", 2)
require.Equal(t, newMap[1], savedHistory.TargetToSource[1], "Expected target epoch %d to have the same marked source epoch", 1)
require.Equal(t, newMap[0], savedHistory.TargetToSource[0], "Expected target epoch %d to have the same marked source epoch", 0)
savedHistory = savedHistories[pubKeys[1]]
require.Equal(t, newMap2[3], savedHistory.TargetToSource[3], "Expected target epoch %d to have the same marked source epoch", 3)
require.Equal(t, newMap2[2], savedHistory.TargetToSource[2], "Expected target epoch %d to have the same marked source epoch", 2)
require.Equal(t, newMap2[1], savedHistory.TargetToSource[1], "Expected target epoch %d to have the same marked source epoch", 1)
}
func TestSaveAttestationHistory_Overwrites(t *testing.T) {
db := setupDB(t, [][48]byte{})
farFuture := params.BeaconConfig().FarFutureEpoch
newMap1 := make(map[uint64]uint64)
newMap1[0] = farFuture
newMap1[1] = 0
newMap2 := make(map[uint64]uint64)
newMap2[0] = farFuture
newMap2[1] = farFuture
newMap2[2] = 1
newMap3 := make(map[uint64]uint64)
newMap3[0] = farFuture
newMap3[1] = farFuture
newMap3[2] = farFuture
newMap3[3] = 2
tests := []struct {
pubkey [48]byte
epoch uint64
history *slashpb.AttestationHistory
}{
{
pubkey: [48]byte{0},
epoch: uint64(1),
history: &slashpb.AttestationHistory{
TargetToSource: newMap1,
LatestEpochWritten: 1,
},
},
{
pubkey: [48]byte{0},
epoch: uint64(2),
history: &slashpb.AttestationHistory{
TargetToSource: newMap2,
LatestEpochWritten: 2,
},
},
{
pubkey: [48]byte{0},
epoch: uint64(3),
history: &slashpb.AttestationHistory{
TargetToSource: newMap3,
LatestEpochWritten: 3,
},
},
}
for _, tt := range tests {
attHistory := make(map[[48]byte]*slashpb.AttestationHistory)
attHistory[tt.pubkey] = tt.history
require.NoError(t, db.SaveAttestationHistoryForPubKeys(context.Background(), attHistory), "Saving attestation history failed")
histories, err := db.AttestationHistoryForPubKeys(context.Background(), [][48]byte{tt.pubkey})
require.NoError(t, err, "Failed to get attestation history")
history := histories[tt.pubkey]
require.NotNil(t, history)
require.DeepEqual(t, tt.history, history, "Expected DB to keep object the same")
require.Equal(t, tt.epoch-1, history.TargetToSource[tt.epoch],
"Expected target epoch %d to be marked with correct source epoch %d", tt.epoch, history.TargetToSource[tt.epoch])
require.Equal(t, farFuture, history.TargetToSource[tt.epoch-1],
"Expected target epoch %d to not be marked as attested for, received %d", tt.epoch-1, history.TargetToSource[tt.epoch-1])
}
}

View File

@@ -2,175 +2,12 @@ package kv
import (
"context"
"fmt"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/params"
log "github.com/sirupsen/logrus"
bolt "go.etcd.io/bbolt"
"go.opencensus.io/trace"
)
const (
// The size of each data entry in bytes for the source epoch (8 bytes) and signing root (32 bytes).
uint64Size = 8
latestEpochWrittenSize = uint64Size
targetSize = uint64Size
sourceSize = uint64Size
signingRootSize = 32
historySize = targetSize + sourceSize + signingRootSize
minimalSize = latestEpochWrittenSize
)
// HistoryData stores the needed data to confirm if an attestation is slashable
// or repeated.
type HistoryData struct {
Source uint64
SigningRoot []byte
}
// EncHistoryData encapsulated history data.
type EncHistoryData []byte
func (hd EncHistoryData) assertSize() error {
if hd == nil || len(hd) < minimalSize {
return fmt.Errorf("encapsulated data size: %d is smaller then minimal size: %d", len(hd), minimalSize)
}
if (len(hd)-minimalSize)%historySize != 0 {
return fmt.Errorf("encapsulated data size: %d is not a multiple of entry size: %d", len(hd), historySize)
}
return nil
}
func (h *HistoryData) IsEmpty() bool {
if h == (*HistoryData)(nil) {
return true
}
if h.Source == params.BeaconConfig().FarFutureEpoch {
return true
}
return false
}
func emptyHistoryData() *HistoryData {
h := &HistoryData{Source: params.BeaconConfig().FarFutureEpoch, SigningRoot: bytesutil.PadTo([]byte{}, 32)}
return h
}
// NewAttestationHistoryArray creates a new encapsulated attestation history byte array
// sized by the latest epoch written.
func NewAttestationHistoryArray(target uint64) EncHistoryData {
relativeTarget := target % params.BeaconConfig().WeakSubjectivityPeriod
historyDataSize := (relativeTarget + 1) * historySize
arraySize := latestEpochWrittenSize + historyDataSize
en := make(EncHistoryData, arraySize)
enc := en
ctx := context.Background()
var err error
for i := uint64(0); i <= target%params.BeaconConfig().WeakSubjectivityPeriod; i++ {
enc, err = enc.SetTargetData(ctx, i, emptyHistoryData())
if err != nil {
log.WithError(err).Error("Failed to set empty target data")
}
}
return enc
}
func (hd EncHistoryData) GetLatestEpochWritten(ctx context.Context) (uint64, error) {
if err := hd.assertSize(); err != nil {
return 0, err
}
return bytesutil.FromBytes8(hd[:latestEpochWrittenSize]), nil
}
func (hd EncHistoryData) SetLatestEpochWritten(ctx context.Context, latestEpochWritten uint64) (EncHistoryData, error) {
if err := hd.assertSize(); err != nil {
return nil, err
}
copy(hd[:latestEpochWrittenSize], bytesutil.Uint64ToBytesLittleEndian(latestEpochWritten))
return hd, nil
}
func (hd EncHistoryData) GetTargetData(ctx context.Context, target uint64) (*HistoryData, error) {
if err := hd.assertSize(); err != nil {
return nil, err
}
// Cursor for the location to read target epoch from.
// Modulus of target epoch X weak subjectivity period in order to have maximum size to the encapsulated data array.
cursor := (target%params.BeaconConfig().WeakSubjectivityPeriod)*historySize + latestEpochWrittenSize
if uint64(len(hd)) < cursor+historySize {
return nil, nil
}
history := &HistoryData{}
history.Source = bytesutil.FromBytes8(hd[cursor : cursor+sourceSize])
sr := make([]byte, 32)
copy(sr, hd[cursor+sourceSize:cursor+historySize])
history.SigningRoot = sr
return history, nil
}
func (hd EncHistoryData) SetTargetData(ctx context.Context, target uint64, historyData *HistoryData) (EncHistoryData, error) {
if err := hd.assertSize(); err != nil {
return nil, err
}
// Cursor for the location to write target epoch to.
// Modulus of target epoch X weak subjectivity period in order to have maximum size to the encapsulated data array.
cursor := latestEpochWrittenSize + (target%params.BeaconConfig().WeakSubjectivityPeriod)*historySize
if uint64(len(hd)) < cursor+historySize {
ext := make([]byte, cursor+historySize-uint64(len(hd)))
hd = append(hd, ext...)
}
copy(hd[cursor:cursor+sourceSize], bytesutil.Uint64ToBytesLittleEndian(historyData.Source))
copy(hd[cursor+sourceSize:cursor+sourceSize+signingRootSize], historyData.SigningRoot)
return hd, nil
}
// MarkAllAsAttestedSinceLatestWrittenEpoch returns an attesting history with specified target+epoch pairs
// since the latest written epoch up to the incoming attestation's target epoch as attested for.
func MarkAllAsAttestedSinceLatestWrittenEpoch(
ctx context.Context,
hist EncHistoryData,
incomingTarget uint64,
incomingAtt *HistoryData,
) (EncHistoryData, error) {
wsPeriod := params.BeaconConfig().WeakSubjectivityPeriod
latestEpochWritten, err := hist.GetLatestEpochWritten(ctx)
if err != nil {
return EncHistoryData{}, errors.Wrap(err, "could not get latest epoch written from history")
}
currentHD := hist
if incomingTarget > latestEpochWritten {
// If the target epoch to mark is ahead of latest written epoch, override the old targets and mark the requested epoch.
// Limit the overwriting to one weak subjectivity period as further is not needed.
maxToWrite := latestEpochWritten + wsPeriod
for i := latestEpochWritten + 1; i < incomingTarget && i <= maxToWrite; i++ {
newHD, err := hist.SetTargetData(ctx, i%wsPeriod, &HistoryData{
Source: params.BeaconConfig().FarFutureEpoch,
})
if err != nil {
return EncHistoryData{}, errors.Wrap(err, "could not set target data")
}
currentHD = newHD
}
newHD, err := currentHD.SetLatestEpochWritten(ctx, incomingTarget)
if err != nil {
return EncHistoryData{}, errors.Wrap(err, "could not set latest epoch written")
}
currentHD = newHD
}
newHD, err := currentHD.SetTargetData(ctx, incomingTarget%wsPeriod, &HistoryData{
Source: incomingAtt.Source,
SigningRoot: incomingAtt.SigningRoot,
})
if err != nil {
return EncHistoryData{}, errors.Wrap(err, "could not set target data")
}
return newHD, nil
}
// AttestedPublicKeys retrieves all public keys in our attestation history bucket.
func (store *Store) AttestedPublicKeys(ctx context.Context) ([][48]byte, error) {
ctx, span := trace.StartSpan(ctx, "Validator.AttestedPublicKeys")
@@ -252,98 +89,6 @@ func (store *Store) SaveAttestationHistoryForPubKeyV2(ctx context.Context, pubKe
return err
}
// MigrateV2AttestationProtection import old attestation format data into the new attestation format
func (store *Store) MigrateV2AttestationProtection(ctx context.Context) error {
ctx, span := trace.StartSpan(ctx, "Validator.MigrateV2AttestationProtection")
defer span.End()
var allKeys [][48]byte
if err := store.db.View(func(tx *bolt.Tx) error {
attestationsBucket := tx.Bucket(historicAttestationsBucket)
if err := attestationsBucket.ForEach(func(pubKey, _ []byte) error {
var pubKeyCopy [48]byte
copy(pubKeyCopy[:], pubKey)
allKeys = append(allKeys, pubKeyCopy)
return nil
}); err != nil {
return errors.Wrapf(err, "could not retrieve attestations for source in %s", store.databasePath)
}
return nil
}); err != nil {
return err
}
allKeys = removeDuplicateKeys(allKeys)
attMap, err := store.AttestationHistoryForPubKeys(ctx, allKeys)
if err != nil {
return errors.Wrapf(err, "could not retrieve data for public keys %v", allKeys)
}
dataMap := make(map[[48]byte]EncHistoryData)
for key, atts := range attMap {
dataMap[key] = NewAttestationHistoryArray(atts.LatestEpochWritten)
dataMap[key], err = dataMap[key].SetLatestEpochWritten(ctx, atts.LatestEpochWritten)
if err != nil {
return errors.Wrapf(err, "failed to set latest epoch while migrating attestations to v2")
}
for target, source := range atts.TargetToSource {
dataMap[key], err = dataMap[key].SetTargetData(ctx, target, &HistoryData{
Source: source,
SigningRoot: []byte{1},
})
if err != nil {
return errors.Wrapf(err, "failed to set target data while migrating attestations to v2")
}
}
}
err = store.SaveAttestationHistoryForPubKeysV2(ctx, dataMap)
return err
}
// MigrateV2AttestationProtectionDb exports old attestation protection data
// format to the new format and save the exported flag to database.
func (store *Store) MigrateV2AttestationProtectionDb(ctx context.Context) error {
ctx, span := trace.StartSpan(ctx, "Validator.MigrateV2AttestationProtectionDb")
defer span.End()
importAttestations, err := store.shouldMigrateAttestations()
if err != nil {
return errors.Wrap(err, "failed to analyze whether attestations should be imported")
}
if !importAttestations {
return nil
}
log.Info("Starting proposals protection db migration to v2...")
err = store.MigrateV2AttestationProtection(ctx)
if err != nil {
return errors.Wrap(err, "filed to import attestations")
}
err = store.db.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket(historicAttestationsBucket)
if bucket != nil {
if err := bucket.Put([]byte(attestationExported), []byte{1}); err != nil {
return errors.Wrap(err, "failed to set migrated attestations flag in db")
}
}
return nil
})
log.Info("Finished proposals protection db migration to v2")
return err
}
func (store *Store) shouldMigrateAttestations() (bool, error) {
var importAttestations bool
err := store.db.View(func(tx *bolt.Tx) error {
attestationBucket := tx.Bucket(historicAttestationsBucket)
if attestationBucket != nil && attestationBucket.Stats().KeyN != 0 {
if exported := attestationBucket.Get([]byte(attestationExported)); exported == nil {
importAttestations = true
}
}
return nil
})
return importAttestations, err
}
// HighestSignedSourceEpoch returns the highest signed source epoch for a validator public key.
// If no data exists, returning 0 is a sensible default.
func (store *Store) HighestSignedSourceEpoch(ctx context.Context, publicKey [48]byte) (uint64, error) {

View File

@@ -4,131 +4,11 @@ import (
"context"
"testing"
slashpb "github.com/prysmaticlabs/prysm/proto/slashing"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/testutil/assert"
"github.com/prysmaticlabs/prysm/shared/testutil/require"
bolt "go.etcd.io/bbolt"
)
func TestNewAttestationHistoryArray(t *testing.T) {
ba := NewAttestationHistoryArray(0)
assert.Equal(t, latestEpochWrittenSize+historySize, len(ba))
ba = NewAttestationHistoryArray(params.BeaconConfig().WeakSubjectivityPeriod - 1)
assert.Equal(t, latestEpochWrittenSize+historySize*params.BeaconConfig().WeakSubjectivityPeriod, uint64(len(ba)))
ba = NewAttestationHistoryArray(params.BeaconConfig().WeakSubjectivityPeriod)
assert.Equal(t, latestEpochWrittenSize+historySize, len(ba))
ba = NewAttestationHistoryArray(params.BeaconConfig().WeakSubjectivityPeriod + 1)
assert.Equal(t, latestEpochWrittenSize+historySize+historySize, len(ba))
}
func TestSizeChecks(t *testing.T) {
require.ErrorContains(t, "is smaller then minimal size", EncHistoryData{}.assertSize())
require.NoError(t, EncHistoryData{0, 1, 2, 3, 4, 5, 6, 7}.assertSize())
require.ErrorContains(t, "is not a multiple of entry size", EncHistoryData{0, 1, 2, 3, 4, 5, 6, 7, 8}.assertSize())
require.NoError(t, NewAttestationHistoryArray(0).assertSize())
require.NoError(t, NewAttestationHistoryArray(1).assertSize())
require.NoError(t, NewAttestationHistoryArray(params.BeaconConfig().WeakSubjectivityPeriod).assertSize())
require.NoError(t, NewAttestationHistoryArray(params.BeaconConfig().WeakSubjectivityPeriod-1).assertSize())
}
func TestGetLatestEpochWritten(t *testing.T) {
ctx := context.Background()
ha := NewAttestationHistoryArray(0)
ha[0] = 28
lew, err := ha.GetLatestEpochWritten(ctx)
require.NoError(t, err)
assert.Equal(t, uint64(28), lew)
}
func TestSetLatestEpochWritten(t *testing.T) {
ctx := context.Background()
ha := NewAttestationHistoryArray(0)
lew, err := ha.SetLatestEpochWritten(ctx, 2828282828)
require.NoError(t, err)
bytes := lew[:latestEpochWrittenSize]
assert.Equal(t, uint64(2828282828), bytesutil.FromBytes8(bytes))
}
func TestGetTargetData(t *testing.T) {
ctx := context.Background()
ha := NewAttestationHistoryArray(0)
td, err := ha.GetTargetData(ctx, 0)
require.NoError(t, err)
assert.DeepEqual(t, emptyHistoryData(), td)
td, err = ha.GetTargetData(ctx, 1)
require.NoError(t, err)
var nilHist *HistoryData
require.Equal(t, nilHist, td)
}
func TestSetTargetData(t *testing.T) {
ctx := context.Background()
type testStruct struct {
name string
enc EncHistoryData
target uint64
source uint64
signingRoot []byte
expected EncHistoryData
error string
}
tests := []testStruct{
{
name: "empty enc",
enc: EncHistoryData{},
target: 0,
source: 100,
signingRoot: []byte{1, 2, 3},
expected: (EncHistoryData)(nil),
error: "encapsulated data size",
},
{
name: "new enc",
enc: NewAttestationHistoryArray(0),
target: 0,
source: 100,
signingRoot: []byte{1, 2, 3},
expected: EncHistoryData{0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x64, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x2, 0x3, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0},
error: "",
},
{
name: "higher target",
enc: NewAttestationHistoryArray(0),
target: 2,
source: 100,
signingRoot: []byte{1, 2, 3},
expected: EncHistoryData{0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x64, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x2, 0x3, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0},
error: "",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
enc, err := tt.enc.SetTargetData(ctx,
tt.target,
&HistoryData{
Source: tt.source,
SigningRoot: tt.signingRoot,
})
if tt.error == "" {
require.NoError(t, err)
td, err := enc.GetTargetData(ctx, tt.target)
require.NoError(t, err)
require.DeepEqual(t, bytesutil.PadTo(tt.signingRoot, 32), td.SigningRoot)
require.Equal(t, tt.source, td.Source)
return
}
assert.ErrorContains(t, tt.error, err)
require.DeepEqual(t, tt.expected, enc)
})
}
}
func TestAttestationHistoryForPubKeysNew_EmptyVals(t *testing.T) {
func TestAttestationHistoryForPubKeys_EmptyVals(t *testing.T) {
pubkeys := [][48]byte{{30}, {25}, {20}}
db := setupDB(t, pubkeys)
historyForPubKeys, err := db.AttestationHistoryForPubKeysV2(context.Background(), pubkeys)
@@ -141,7 +21,7 @@ func TestAttestationHistoryForPubKeysNew_EmptyVals(t *testing.T) {
require.DeepEqual(t, cleanAttHistoryForPubKeys, historyForPubKeys, "Expected attestation history epoch bits to be empty")
}
func TestAttestationHistoryForPubKeysNew_OK(t *testing.T) {
func TestAttestationHistoryForPubKeys_OK(t *testing.T) {
ctx := context.Background()
pubkeys := [][48]byte{{30}, {25}, {20}}
db := setupDB(t, pubkeys)
@@ -192,122 +72,6 @@ func TestAttestationHistoryForPubKey_OK(t *testing.T) {
require.NoError(t, err)
require.DeepEqual(t, history, historyForPubKeys[pubkeys[0]], "Expected attestation history epoch bits to be empty")
}
func TestStore_ImportOldAttestationFormatBadSourceFormat(t *testing.T) {
ctx := context.Background()
pubKeys := [][48]byte{{3}, {4}}
db := setupDB(t, pubKeys)
err := db.update(func(tx *bolt.Tx) error {
bucket := tx.Bucket(historicAttestationsBucket)
for _, pubKey := range pubKeys {
if err := bucket.Put(pubKey[:], []byte{1}); err != nil {
return err
}
}
return nil
})
require.NoError(t, err)
require.ErrorContains(t, "could not retrieve data for public keys", db.MigrateV2AttestationProtection(ctx))
}
func TestStore_ImportOldAttestationFormat(t *testing.T) {
ctx := context.Background()
pubKeys := [][48]byte{{3}, {4}}
db := setupDB(t, pubKeys)
farFuture := params.BeaconConfig().FarFutureEpoch
newMap := make(map[uint64]uint64)
// The validator attested at target epoch 2 but had no attestations for target epochs 0 and 1.
newMap[0] = farFuture
newMap[1] = farFuture
newMap[2] = 1
history := &slashpb.AttestationHistory{
TargetToSource: newMap,
LatestEpochWritten: 2,
}
newMap2 := make(map[uint64]uint64)
// The validator attested at target epoch 1 and 3 but had no attestations for target epochs 0 and 2.
newMap2[0] = farFuture
newMap2[1] = 0
newMap2[2] = farFuture
newMap2[3] = 2
history2 := &slashpb.AttestationHistory{
TargetToSource: newMap2,
LatestEpochWritten: 3,
}
attestationHistory := make(map[[48]byte]*slashpb.AttestationHistory)
attestationHistory[pubKeys[0]] = history
attestationHistory[pubKeys[1]] = history2
require.NoError(t, db.SaveAttestationHistoryForPubKeys(context.Background(), attestationHistory), "Saving attestation history failed")
require.NoError(t, db.MigrateV2AttestationProtection(ctx), "Import attestation history failed")
attHis, err := db.AttestationHistoryForPubKeysV2(ctx, pubKeys)
require.NoError(t, err)
for pk, encHis := range attHis {
his, ok := attestationHistory[pk]
require.Equal(t, true, ok, "Missing public key in the original data")
lew, err := encHis.GetLatestEpochWritten(ctx)
require.NoError(t, err, "Failed to get latest epoch written")
require.Equal(t, his.LatestEpochWritten, lew, "LatestEpochWritten is not equal to the source data value")
for target, source := range his.TargetToSource {
hd, err := encHis.GetTargetData(ctx, target)
require.NoError(t, err, "Failed to get target data for epoch: %d", target)
require.Equal(t, source, hd.Source, "Source epoch is different")
require.DeepEqual(t, bytesutil.PadTo([]byte{1}, 32), hd.SigningRoot, "Signing root differs in imported data")
}
}
}
func TestShouldImportAttestations(t *testing.T) {
pubkey := [48]byte{3}
db := setupDB(t, [][48]byte{pubkey})
ctx := context.Background()
shouldImport, err := db.shouldMigrateAttestations()
require.NoError(t, err)
require.Equal(t, false, shouldImport, "Empty bucket should not be imported")
newMap := make(map[uint64]uint64)
newMap[2] = 1
history := &slashpb.AttestationHistory{
TargetToSource: newMap,
LatestEpochWritten: 2,
}
attestationHistory := make(map[[48]byte]*slashpb.AttestationHistory)
attestationHistory[pubkey] = history
err = db.SaveAttestationHistoryForPubKeys(ctx, attestationHistory)
require.NoError(t, err)
shouldImport, err = db.shouldMigrateAttestations()
require.NoError(t, err)
require.Equal(t, true, shouldImport, "Bucket with content should be imported")
}
func TestStore_UpdateAttestationProtectionDb(t *testing.T) {
pubkey := [48]byte{3}
db := setupDB(t, [][48]byte{pubkey})
ctx := context.Background()
newMap := make(map[uint64]uint64)
newMap[2] = 1
history := &slashpb.AttestationHistory{
TargetToSource: newMap,
LatestEpochWritten: 2,
}
attestationHistory := make(map[[48]byte]*slashpb.AttestationHistory)
attestationHistory[pubkey] = history
err := db.SaveAttestationHistoryForPubKeys(ctx, attestationHistory)
require.NoError(t, err)
shouldImport, err := db.shouldMigrateAttestations()
require.NoError(t, err)
require.Equal(t, true, shouldImport, "Bucket with content should be imported")
err = db.MigrateV2AttestationProtectionDb(ctx)
require.NoError(t, err)
shouldImport, err = db.shouldMigrateAttestations()
require.NoError(t, err)
require.Equal(t, false, shouldImport, "Proposals should not be re-imported")
}
func TestStore_AttestedPublicKeys(t *testing.T) {
ctx := context.Background()
validatorDB, err := NewKVStore(t.TempDir(), nil)

View File

@@ -14,9 +14,6 @@ import (
// ProtectionDbFileName Validator slashing protection db file name.
var ProtectionDbFileName = "validator.db"
const proposalExported = "PROPOSALS_IMPORTED"
const attestationExported = "ATTESTATIONS_IMPORTED"
// Store defines an implementation of the Prysm Database interface
// using BoltDB as the underlying persistent kv-store for eth2.
type Store struct {
@@ -109,21 +106,17 @@ func NewKVStore(dirPath string, pubKeys [][48]byte) (*Store, error) {
return kv, err
}
// GetKVStore returns the validator boltDB key-value store from directory. Returns nil if no such store exists.
func GetKVStore(directory string) (*Store, error) {
fileName := filepath.Join(directory, ProtectionDbFileName)
if _, err := os.Stat(fileName); os.IsNotExist(err) {
return nil, nil
}
boltDb, err := bolt.Open(fileName, params.BeaconIoConfig().ReadWritePermissions, &bolt.Options{Timeout: params.BeaconIoConfig().BoltTimeout})
if err != nil {
if errors.Is(err, bolt.ErrTimeout) {
return nil, errors.New("cannot obtain database lock, database may be in use by another process")
// UpdatePublicKeysBuckets for a specified list of keys.
func (store *Store) UpdatePublicKeysBuckets(pubKeys [][48]byte) error {
return store.update(func(tx *bolt.Tx) error {
bucket := tx.Bucket(newHistoricProposalsBucket)
for _, pubKey := range pubKeys {
if _, err := bucket.CreateBucketIfNotExists(pubKey[:]); err != nil {
return errors.Wrap(err, "failed to create proposal history bucket")
}
}
return nil, err
}
return &Store{db: boltDb, databasePath: directory}, nil
return nil
})
}
// Size returns the db size in bytes.

View File

@@ -10,7 +10,7 @@ import (
func setupDB(t testing.TB, pubkeys [][48]byte) *Store {
db, err := NewKVStore(t.TempDir(), pubkeys)
require.NoError(t, err, "Failed to instantiate DB")
err = db.OldUpdatePublicKeysBuckets(pubkeys)
err = db.UpdatePublicKeysBuckets(pubkeys)
require.NoError(t, err, "Failed to create old buckets for public keys")
t.Cleanup(func() {
require.NoError(t, db.Close(), "Failed to close database")

View File

@@ -0,0 +1,170 @@
package kv
import (
"context"
"fmt"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/params"
log "github.com/sirupsen/logrus"
)
const (
// The size of each data entry in bytes for the source epoch (8 bytes) and signing root (32 bytes).
uint64Size = 8
latestEpochWrittenSize = uint64Size
targetSize = uint64Size
sourceSize = uint64Size
signingRootSize = 32
historySize = targetSize + sourceSize + signingRootSize
minimalSize = latestEpochWrittenSize
)
// HistoryData stores the needed data to confirm if an attestation is slashable
// or repeated.
type HistoryData struct {
Source uint64
SigningRoot []byte
}
// EncHistoryData encapsulated history data.
type EncHistoryData []byte
func (hd EncHistoryData) assertSize() error {
if hd == nil || len(hd) < minimalSize {
return fmt.Errorf("encapsulated data size: %d is smaller then minimal size: %d", len(hd), minimalSize)
}
if (len(hd)-minimalSize)%historySize != 0 {
return fmt.Errorf("encapsulated data size: %d is not a multiple of entry size: %d", len(hd), historySize)
}
return nil
}
func (h *HistoryData) IsEmpty() bool {
if h == (*HistoryData)(nil) {
return true
}
if h.Source == params.BeaconConfig().FarFutureEpoch {
return true
}
return false
}
func emptyHistoryData() *HistoryData {
h := &HistoryData{Source: params.BeaconConfig().FarFutureEpoch, SigningRoot: bytesutil.PadTo([]byte{}, 32)}
return h
}
// NewAttestationHistoryArray creates a new encapsulated attestation history byte array
// sized by the latest epoch written.
func NewAttestationHistoryArray(target uint64) EncHistoryData {
relativeTarget := target % params.BeaconConfig().WeakSubjectivityPeriod
historyDataSize := (relativeTarget + 1) * historySize
arraySize := latestEpochWrittenSize + historyDataSize
en := make(EncHistoryData, arraySize)
enc := en
ctx := context.Background()
var err error
for i := uint64(0); i <= target%params.BeaconConfig().WeakSubjectivityPeriod; i++ {
enc, err = enc.SetTargetData(ctx, i, emptyHistoryData())
if err != nil {
log.WithError(err).Error("Failed to set empty target data")
}
}
return enc
}
func (hd EncHistoryData) GetLatestEpochWritten(ctx context.Context) (uint64, error) {
if err := hd.assertSize(); err != nil {
return 0, err
}
return bytesutil.FromBytes8(hd[:latestEpochWrittenSize]), nil
}
func (hd EncHistoryData) SetLatestEpochWritten(ctx context.Context, latestEpochWritten uint64) (EncHistoryData, error) {
if err := hd.assertSize(); err != nil {
return nil, err
}
copy(hd[:latestEpochWrittenSize], bytesutil.Uint64ToBytesLittleEndian(latestEpochWritten))
return hd, nil
}
func (hd EncHistoryData) GetTargetData(ctx context.Context, target uint64) (*HistoryData, error) {
if err := hd.assertSize(); err != nil {
return nil, err
}
// Cursor for the location to read target epoch from.
// Modulus of target epoch X weak subjectivity period in order to have maximum size to the encapsulated data array.
cursor := (target%params.BeaconConfig().WeakSubjectivityPeriod)*historySize + latestEpochWrittenSize
if uint64(len(hd)) < cursor+historySize {
return nil, nil
}
history := &HistoryData{}
history.Source = bytesutil.FromBytes8(hd[cursor : cursor+sourceSize])
sr := make([]byte, 32)
copy(sr, hd[cursor+sourceSize:cursor+historySize])
history.SigningRoot = sr
return history, nil
}
func (hd EncHistoryData) SetTargetData(ctx context.Context, target uint64, historyData *HistoryData) (EncHistoryData, error) {
if err := hd.assertSize(); err != nil {
return nil, err
}
// Cursor for the location to write target epoch to.
// Modulus of target epoch X weak subjectivity period in order to have maximum size to the encapsulated data array.
cursor := latestEpochWrittenSize + (target%params.BeaconConfig().WeakSubjectivityPeriod)*historySize
if uint64(len(hd)) < cursor+historySize {
ext := make([]byte, cursor+historySize-uint64(len(hd)))
hd = append(hd, ext...)
}
copy(hd[cursor:cursor+sourceSize], bytesutil.Uint64ToBytesLittleEndian(historyData.Source))
copy(hd[cursor+sourceSize:cursor+sourceSize+signingRootSize], historyData.SigningRoot)
return hd, nil
}
// MarkAllAsAttestedSinceLatestWrittenEpoch returns an attesting history with specified target+epoch pairs
// since the latest written epoch up to the incoming attestation's target epoch as attested for.
func MarkAllAsAttestedSinceLatestWrittenEpoch(
ctx context.Context,
hist EncHistoryData,
incomingTarget uint64,
incomingAtt *HistoryData,
) (EncHistoryData, error) {
wsPeriod := params.BeaconConfig().WeakSubjectivityPeriod
latestEpochWritten, err := hist.GetLatestEpochWritten(ctx)
if err != nil {
return EncHistoryData{}, errors.Wrap(err, "could not get latest epoch written from history")
}
currentHD := hist
if incomingTarget > latestEpochWritten {
// If the target epoch to mark is ahead of latest written epoch, override the old targets and mark the requested epoch.
// Limit the overwriting to one weak subjectivity period as further is not needed.
maxToWrite := latestEpochWritten + wsPeriod
for i := latestEpochWritten + 1; i < incomingTarget && i <= maxToWrite; i++ {
newHD, err := hist.SetTargetData(ctx, i%wsPeriod, &HistoryData{
Source: params.BeaconConfig().FarFutureEpoch,
})
if err != nil {
return EncHistoryData{}, errors.Wrap(err, "could not set target data")
}
currentHD = newHD
}
newHD, err := currentHD.SetLatestEpochWritten(ctx, incomingTarget)
if err != nil {
return EncHistoryData{}, errors.Wrap(err, "could not set latest epoch written")
}
currentHD = newHD
}
newHD, err := currentHD.SetTargetData(ctx, incomingTarget%wsPeriod, &HistoryData{
Source: incomingAtt.Source,
SigningRoot: incomingAtt.SigningRoot,
})
if err != nil {
return EncHistoryData{}, errors.Wrap(err, "could not set target data")
}
return newHD, nil
}

View File

@@ -0,0 +1,127 @@
package kv
import (
"context"
"testing"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/testutil/assert"
"github.com/prysmaticlabs/prysm/shared/testutil/require"
)
func TestNewAttestationHistoryArray(t *testing.T) {
ba := NewAttestationHistoryArray(0)
assert.Equal(t, latestEpochWrittenSize+historySize, len(ba))
ba = NewAttestationHistoryArray(params.BeaconConfig().WeakSubjectivityPeriod - 1)
assert.Equal(t, latestEpochWrittenSize+historySize*params.BeaconConfig().WeakSubjectivityPeriod, uint64(len(ba)))
ba = NewAttestationHistoryArray(params.BeaconConfig().WeakSubjectivityPeriod)
assert.Equal(t, latestEpochWrittenSize+historySize, len(ba))
ba = NewAttestationHistoryArray(params.BeaconConfig().WeakSubjectivityPeriod + 1)
assert.Equal(t, latestEpochWrittenSize+historySize+historySize, len(ba))
}
func TestSizeChecks(t *testing.T) {
require.ErrorContains(t, "is smaller then minimal size", EncHistoryData{}.assertSize())
require.NoError(t, EncHistoryData{0, 1, 2, 3, 4, 5, 6, 7}.assertSize())
require.ErrorContains(t, "is not a multiple of entry size", EncHistoryData{0, 1, 2, 3, 4, 5, 6, 7, 8}.assertSize())
require.NoError(t, NewAttestationHistoryArray(0).assertSize())
require.NoError(t, NewAttestationHistoryArray(1).assertSize())
require.NoError(t, NewAttestationHistoryArray(params.BeaconConfig().WeakSubjectivityPeriod).assertSize())
require.NoError(t, NewAttestationHistoryArray(params.BeaconConfig().WeakSubjectivityPeriod-1).assertSize())
}
func TestGetLatestEpochWritten(t *testing.T) {
ctx := context.Background()
ha := NewAttestationHistoryArray(0)
ha[0] = 28
lew, err := ha.GetLatestEpochWritten(ctx)
require.NoError(t, err)
assert.Equal(t, uint64(28), lew)
}
func TestSetLatestEpochWritten(t *testing.T) {
ctx := context.Background()
ha := NewAttestationHistoryArray(0)
lew, err := ha.SetLatestEpochWritten(ctx, 2828282828)
require.NoError(t, err)
bytes := lew[:latestEpochWrittenSize]
assert.Equal(t, uint64(2828282828), bytesutil.FromBytes8(bytes))
}
func TestGetTargetData(t *testing.T) {
ctx := context.Background()
ha := NewAttestationHistoryArray(0)
td, err := ha.GetTargetData(ctx, 0)
require.NoError(t, err)
assert.DeepEqual(t, emptyHistoryData(), td)
td, err = ha.GetTargetData(ctx, 1)
require.NoError(t, err)
var nilHist *HistoryData
require.Equal(t, nilHist, td)
}
func TestSetTargetData(t *testing.T) {
ctx := context.Background()
type testStruct struct {
name string
enc EncHistoryData
target uint64
source uint64
signingRoot []byte
expected EncHistoryData
error string
}
tests := []testStruct{
{
name: "empty enc",
enc: EncHistoryData{},
target: 0,
source: 100,
signingRoot: []byte{1, 2, 3},
expected: (EncHistoryData)(nil),
error: "encapsulated data size",
},
{
name: "new enc",
enc: NewAttestationHistoryArray(0),
target: 0,
source: 100,
signingRoot: []byte{1, 2, 3},
expected: EncHistoryData{0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x64, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x2, 0x3, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0},
error: "",
},
{
name: "higher target",
enc: NewAttestationHistoryArray(0),
target: 2,
source: 100,
signingRoot: []byte{1, 2, 3},
expected: EncHistoryData{0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x64, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x2, 0x3, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0},
error: "",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
enc, err := tt.enc.SetTargetData(ctx,
tt.target,
&HistoryData{
Source: tt.source,
SigningRoot: tt.signingRoot,
})
if tt.error == "" {
require.NoError(t, err)
td, err := enc.GetTargetData(ctx, tt.target)
require.NoError(t, err)
require.DeepEqual(t, bytesutil.PadTo(tt.signingRoot, 32), td.SigningRoot)
require.Equal(t, tt.source, td.Source)
return
}
assert.ErrorContains(t, tt.error, err)
require.DeepEqual(t, tt.expected, enc)
})
}
}

View File

@@ -1,322 +0,0 @@
package kv
import (
"context"
"encoding/hex"
"path/filepath"
"github.com/pkg/errors"
bolt "go.etcd.io/bbolt"
"go.opencensus.io/trace"
)
var errFailedToCloseSource = errors.New("failed to close the source")
var errFailedToCloseManySources = errors.New("failed to close one or more stores")
type epochProposals struct {
Epoch []byte
Proposals []byte
}
type pubKeyProposals struct {
PubKey [48]byte
Proposals []epochProposals
}
type pubKeyAttestations struct {
PubKey [48]byte
Attestations []byte
}
// Merge merges data from sourceStores into a new store, which is created in targetDirectory.
func Merge(ctx context.Context, sourceStores []*Store, targetDirectory string) error {
ctx, span := trace.StartSpan(ctx, "Validator.Db.Merge")
defer span.End()
allProposals, allAttestations, err := getAllProposalsAndAllAttestations(sourceStores)
if err != nil {
return err
}
return createMergeTargetStore(targetDirectory, allProposals, allAttestations)
}
// Split splits data from sourceStore into several stores, one for each public key in sourceStore.
// Each new store is created in its own subdirectory inside targetDirectory.
func Split(ctx context.Context, sourceStore *Store, targetDirectory string) error {
ctx, span := trace.StartSpan(ctx, "Validator.Db.Split")
defer span.End()
allProposals, allAttestations, err := getAllProposalsAndAllAttestations([]*Store{sourceStore})
if err != nil {
return err
}
return createSplitTargetStores(targetDirectory, allProposals, allAttestations)
}
func getPubKeyProposals(pubKey [48]byte, proposalsBucket *bolt.Bucket) (*pubKeyProposals, error) {
pubKeyProposals := pubKeyProposals{
PubKey: pubKey,
Proposals: []epochProposals{},
}
pubKeyBucket := proposalsBucket.Bucket(pubKey[:])
if pubKeyBucket == nil {
return &pubKeyProposals, nil
}
if err := pubKeyBucket.ForEach(func(epoch, v []byte) error {
epochProposals := epochProposals{
Epoch: make([]byte, len(epoch)),
Proposals: make([]byte, len(v)),
}
copy(epochProposals.Epoch, epoch)
copy(epochProposals.Proposals, v)
pubKeyProposals.Proposals = append(pubKeyProposals.Proposals, epochProposals)
return nil
}); err != nil {
return nil, errors.Wrapf(err, "could not retrieve proposals for public key %x", pubKey[:12])
}
return &pubKeyProposals, nil
}
func createMergeTargetStore(
targetDirectory string,
allProposals []pubKeyProposals,
allAttestations []pubKeyAttestations) (err error) {
newStore, err := NewKVStore(targetDirectory, [][48]byte{})
defer func() {
if deferErr := newStore.Close(); deferErr != nil {
if err != nil {
err = errors.Wrap(err, errFailedToCloseSource.Error())
} else {
err = errors.Wrap(deferErr, errFailedToCloseSource.Error())
}
}
}()
if err != nil {
return errors.Wrapf(err, "could not initialize a new database in %s", targetDirectory)
}
err = newStore.update(func(tx *bolt.Tx) error {
allProposalsBucket := tx.Bucket(newHistoricProposalsBucket)
for _, pubKeyProposals := range allProposals {
proposalsBucket, err := createProposalsBucket(allProposalsBucket, pubKeyProposals.PubKey[:])
if err != nil {
return err
}
if err := addEpochProposals(proposalsBucket, pubKeyProposals.Proposals); err != nil {
return err
}
}
attestationsBucket := tx.Bucket(historicAttestationsBucket)
for _, attestations := range allAttestations {
if err := addAttestations(attestationsBucket, attestations); err != nil {
return err
}
}
return nil
})
return err
}
func createSplitTargetStores(
targetDirectory string,
allProposals []pubKeyProposals,
allAttestations []pubKeyAttestations) (err error) {
var storesToClose []*Store
defer func() {
failedToClose := false
for _, store := range storesToClose {
if deferErr := store.Close(); deferErr != nil {
failedToClose = true
}
}
if failedToClose {
if err != nil {
err = errors.Wrapf(err, errFailedToCloseManySources.Error())
} else {
err = errFailedToCloseManySources
}
}
}()
for _, pubKeyProposals := range allProposals {
dirName := hex.EncodeToString(pubKeyProposals.PubKey[:])[:12]
path := filepath.Join(targetDirectory, dirName)
newStore, err := NewKVStore(path, [][48]byte{})
if err != nil {
return errors.Wrapf(err, "could not create a validator database in %s", path)
}
storesToClose = append(storesToClose, newStore)
if err := newStore.update(func(tx *bolt.Tx) error {
allProposalsBucket := tx.Bucket(newHistoricProposalsBucket)
proposalsBucket, err := createProposalsBucket(allProposalsBucket, pubKeyProposals.PubKey[:])
if err != nil {
return err
}
if err := addEpochProposals(proposalsBucket, pubKeyProposals.Proposals); err != nil {
return err
}
attestationsBucket := tx.Bucket(historicAttestationsBucket)
for _, pubKeyAttestations := range allAttestations {
if string(pubKeyAttestations.PubKey[:]) == string(pubKeyProposals.PubKey[:]) {
if err := addAttestations(attestationsBucket, pubKeyAttestations); err != nil {
return err
}
break
}
}
return nil
}); err != nil {
return err
}
}
// Create stores for attestations belonging to public keys that do not have proposals.
for _, pubKeyAttestations := range allAttestations {
var hasMatchingProposals = false
for _, pubKeyProposals := range allProposals {
if string(pubKeyAttestations.PubKey[:]) == string(pubKeyProposals.PubKey[:]) {
hasMatchingProposals = true
break
}
}
if !hasMatchingProposals {
dirName := hex.EncodeToString(pubKeyAttestations.PubKey[:])[:12]
path := filepath.Join(targetDirectory, dirName)
newStore, err := NewKVStore(path, [][48]byte{})
if err != nil {
return errors.Wrapf(err, "could not create a validator database in %s", path)
}
storesToClose = append(storesToClose, newStore)
if err := newStore.update(func(tx *bolt.Tx) error {
attestationsBucket := tx.Bucket(historicAttestationsBucket)
return addAttestations(attestationsBucket, pubKeyAttestations)
}); err != nil {
return err
}
}
}
return nil
}
func getAllProposalsAndAllAttestations(stores []*Store) ([]pubKeyProposals, []pubKeyAttestations, error) {
var allProposals []pubKeyProposals
var allAttestations []pubKeyAttestations
for _, store := range stores {
// Storing keys upfront will allow using several short transactions (one for every key)
// instead of one long-running transaction for all keys.
var allKeys [][48]byte
if err := store.db.View(func(tx *bolt.Tx) error {
proposalsBucket := tx.Bucket(newHistoricProposalsBucket)
if err := proposalsBucket.ForEach(func(pubKey, _ []byte) error {
var pubKeyCopy [48]byte
copy(pubKeyCopy[:], pubKey)
allKeys = append(allKeys, pubKeyCopy)
return nil
}); err != nil {
return errors.Wrapf(err, "could not retrieve proposals for source in %s", store.databasePath)
}
attestationsBucket := tx.Bucket(historicAttestationsBucket)
if err := attestationsBucket.ForEach(func(pubKey, _ []byte) error {
var pubKeyCopy [48]byte
copy(pubKeyCopy[:], pubKey)
allKeys = append(allKeys, pubKeyCopy)
return nil
}); err != nil {
return errors.Wrapf(err, "could not retrieve attestations for source in %s", store.databasePath)
}
return nil
}); err != nil {
return nil, nil, err
}
allKeys = removeDuplicateKeys(allKeys)
for _, pubKey := range allKeys {
if err := store.db.View(func(tx *bolt.Tx) error {
proposalsBucket := tx.Bucket(newHistoricProposalsBucket)
pubKeyProposals, err := getPubKeyProposals(pubKey, proposalsBucket)
if err != nil {
return err
}
allProposals = append(allProposals, *pubKeyProposals)
attestationsBucket := tx.Bucket(historicAttestationsBucket)
v := attestationsBucket.Get(pubKey[:])
if v != nil {
attestations := pubKeyAttestations{
PubKey: pubKey,
Attestations: make([]byte, len(v)),
}
copy(attestations.Attestations, v)
allAttestations = append(allAttestations, attestations)
}
return nil
}); err != nil {
return nil, nil, errors.Wrapf(err, "could not retrieve data for public key %x", pubKey[:12])
}
}
}
return allProposals, allAttestations, nil
}
func createProposalsBucket(topLevelBucket *bolt.Bucket, pubKey []byte) (*bolt.Bucket, error) {
var bucket, err = topLevelBucket.CreateBucket(pubKey)
if err != nil {
return nil, errors.Wrapf(err, "could not create proposals bucket for public key %x", pubKey[:12])
}
return bucket, nil
}
func addEpochProposals(bucket *bolt.Bucket, proposals []epochProposals) error {
for _, singleProposal := range proposals {
if err := bucket.Put(singleProposal.Epoch, singleProposal.Proposals); err != nil {
return errors.Wrapf(err, "could not add epoch proposals for epoch %v", singleProposal.Epoch)
}
}
return nil
}
func addAttestations(bucket *bolt.Bucket, attestations pubKeyAttestations) error {
if err := bucket.Put(attestations.PubKey[:], attestations.Attestations); err != nil {
return errors.Wrapf(
err,
"could not add public key attestations for public key %x",
attestations.PubKey[:12])
}
return nil
}
func removeDuplicateKeys(keys [][48]byte) [][48]byte {
last := 0
next:
for _, k1 := range keys {
for _, k2 := range keys[:last] {
if k1 == k2 {
continue next
}
}
keys[last] = k1
last++
}
return keys[:last]
}

View File

@@ -1,211 +0,0 @@
package kv
import (
"context"
"encoding/hex"
"path/filepath"
"testing"
"github.com/pkg/errors"
slashpb "github.com/prysmaticlabs/prysm/proto/slashing"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/testutil/require"
bolt "go.etcd.io/bbolt"
)
type storeHistory struct {
Proposals map[[48]byte][]byte
Attestations map[[48]byte]map[uint64]uint64
}
func TestMerge(t *testing.T) {
firstStorePubKeys := [][48]byte{{1}, {2}}
firstStore := setupDB(t, firstStorePubKeys)
secondStorePubKeys := [][48]byte{{3}, {4}}
secondStore := setupDB(t, secondStorePubKeys)
storeHistory1, err := prepareStore(firstStore, firstStorePubKeys)
require.NoError(t, err)
storeHistory2, err := prepareStore(secondStore, secondStorePubKeys)
require.NoError(t, err)
mergedProposals := make(map[[48]byte][]byte)
for k, v := range storeHistory1.Proposals {
mergedProposals[k] = v
}
for k, v := range storeHistory2.Proposals {
mergedProposals[k] = v
}
mergedAttestations := make(map[[48]byte]map[uint64]uint64)
for k, v := range storeHistory1.Attestations {
mergedAttestations[k] = v
}
for k, v := range storeHistory2.Attestations {
mergedAttestations[k] = v
}
mergedStoreHistory := storeHistory{
Proposals: mergedProposals,
Attestations: mergedAttestations,
}
targetDirectory := t.TempDir() + "/target"
err = Merge(context.Background(), []*Store{firstStore, secondStore}, targetDirectory)
require.NoError(t, err, "Merging failed")
mergedStore, err := GetKVStore(targetDirectory)
require.NoError(t, err, "Retrieving the merged store failed")
assertStore(
t,
mergedStore,
append(firstStorePubKeys, secondStorePubKeys[0], secondStorePubKeys[1]),
&mergedStoreHistory)
}
func TestSplit(t *testing.T) {
pubKey1 := [48]byte{1}
pubKey2 := [48]byte{2}
sourceStore := setupDB(t, [][48]byte{pubKey1, pubKey2})
storeHistory1, err := prepareStore(sourceStore, [][48]byte{pubKey1})
require.NoError(t, err)
storeHistory2, err := prepareStore(sourceStore, [][48]byte{pubKey2})
require.NoError(t, err)
targetDirectory := t.TempDir() + "/target"
require.NoError(t, Split(context.Background(), sourceStore, targetDirectory), "Splitting failed")
encodedKey1 := hex.EncodeToString(pubKey1[:])[:12]
keyStore1, err := GetKVStore(filepath.Join(targetDirectory, encodedKey1))
require.NoError(t, err, "Retrieving the store for public key %v failed", encodedKey1)
require.NotNil(t, keyStore1, "No store created for public key %v", encodedKey1)
encodedKey2 := hex.EncodeToString(pubKey2[:])[:12]
keyStore2, err := GetKVStore(filepath.Join(targetDirectory, encodedKey2))
require.NoError(t, err, "Retrieving the store for public key %v failed", encodedKey2)
require.NotNil(t, keyStore2, "No store created for public key %v", encodedKey2)
err = keyStore1.view(func(tx *bolt.Tx) error {
otherKeyProposalsBucket := tx.Bucket(newHistoricProposalsBucket).Bucket(pubKey2[:])
require.Equal(t, (*bolt.Bucket)(nil), otherKeyProposalsBucket, "Store for public key %v contains proposals for another key", encodedKey2)
otherKeyAttestationsBucket := tx.Bucket(historicAttestationsBucket).Bucket(pubKey2[:])
require.Equal(t, (*bolt.Bucket)(nil), otherKeyAttestationsBucket, "Store for public key %v contains attestations for another key", encodedKey2)
return nil
})
require.NoError(t, err)
err = keyStore2.view(func(tx *bolt.Tx) error {
otherKeyProposalsBucket := tx.Bucket(newHistoricProposalsBucket).Bucket(pubKey1[:])
require.Equal(t, (*bolt.Bucket)(nil), otherKeyProposalsBucket, "Store for public key %v contains proposals for another key", encodedKey1)
otherKeyAttestationsBucket := tx.Bucket(historicAttestationsBucket).Bucket(pubKey1[:])
require.Equal(t, (*bolt.Bucket)(nil), otherKeyAttestationsBucket, "Store for public key %v contains attestations for another key", encodedKey1)
return nil
})
require.NoError(t, err)
assertStore(t, keyStore1, [][48]byte{pubKey1}, storeHistory1)
assertStore(t, keyStore2, [][48]byte{pubKey2}, storeHistory2)
}
func TestSplit_AttestationsWithoutMatchingProposalsAreSplit(t *testing.T) {
pubKey1 := [48]byte{1}
pubKey2 := [48]byte{2}
sourceStore := setupDB(t, [][48]byte{pubKey1, pubKey2})
_, err := prepareStoreProposals(sourceStore, [][48]byte{pubKey1})
require.NoError(t, err)
attestationHistory, err := prepareStoreAttestations(sourceStore, [][48]byte{pubKey1, pubKey2})
require.NoError(t, err)
targetDirectory := t.TempDir() + "/target"
require.NoError(t, Split(context.Background(), sourceStore, targetDirectory), "Splitting failed")
encodedKey1 := hex.EncodeToString(pubKey1[:])[:12]
encodedKey2 := hex.EncodeToString(pubKey2[:])[:12]
attestationsOnlyKeyStore, err := GetKVStore(filepath.Join(targetDirectory, encodedKey2))
require.NoError(t, err, "Retrieving the store failed")
require.NotNil(t, attestationsOnlyKeyStore, "No store created for public key %v", encodedKey2)
err = attestationsOnlyKeyStore.view(func(tx *bolt.Tx) error {
otherKeyProposalsBucket := tx.Bucket(newHistoricProposalsBucket).Bucket(pubKey1[:])
require.Equal(t, (*bolt.Bucket)(nil), otherKeyProposalsBucket, "Store for public key %v contains proposals for another key", encodedKey1)
otherKeyAttestationsBucket := tx.Bucket(historicAttestationsBucket).Bucket(pubKey1[:])
require.Equal(t, (*bolt.Bucket)(nil), otherKeyAttestationsBucket, "Store for public key %v contains attestations for another key", encodedKey1)
return nil
})
require.NoError(t, err)
splitAttestationsHistory, err :=
attestationsOnlyKeyStore.AttestationHistoryForPubKeys(context.Background(), [][48]byte{pubKey2})
require.NoError(t, err, "Retrieving attestation history failed for public key %v", encodedKey2)
require.Equal(t, attestationHistory[pubKey2][0], splitAttestationsHistory[pubKey2].TargetToSource[0], "Attestations not merged correctly")
}
func prepareStore(store *Store, pubKeys [][48]byte) (*storeHistory, error) {
proposals, err := prepareStoreProposals(store, pubKeys)
if err != nil {
return nil, err
}
attestations, err := prepareStoreAttestations(store, pubKeys)
if err != nil {
return nil, err
}
history := storeHistory{
Proposals: proposals,
Attestations: attestations,
}
return &history, nil
}
func prepareStoreProposals(store *Store, pubKeys [][48]byte) (map[[48]byte][]byte, error) {
proposals := make(map[[48]byte][]byte)
for i, key := range pubKeys {
signingRoot := bytesutil.PadTo([]byte{byte(i)}, 32)
if err := store.SaveProposalHistoryForSlot(context.Background(), key, 0, signingRoot); err != nil {
return nil, errors.Wrapf(err, "Saving proposal history failed")
}
proposals[key] = signingRoot
}
return proposals, nil
}
func prepareStoreAttestations(store *Store, pubKeys [][48]byte) (map[[48]byte]map[uint64]uint64, error) {
storeAttestationHistory := make(map[[48]byte]*slashpb.AttestationHistory)
attestations := make(map[[48]byte]map[uint64]uint64)
for i, key := range pubKeys {
attestationHistoryMap := make(map[uint64]uint64)
attestationHistoryMap[0] = uint64(i)
attestationHistory := &slashpb.AttestationHistory{
TargetToSource: attestationHistoryMap,
LatestEpochWritten: 0,
}
storeAttestationHistory[key] = attestationHistory
attestations[key] = attestationHistoryMap
}
if err := store.SaveAttestationHistoryForPubKeys(context.Background(), storeAttestationHistory); err != nil {
return nil, errors.Wrapf(err, "Saving attestation history failed")
}
return attestations, nil
}
func assertStore(t *testing.T, store *Store, pubKeys [][48]byte, expectedHistory *storeHistory) {
for _, key := range pubKeys {
proposalHistory, _, err := store.ProposalHistoryForSlot(context.Background(), key, 0)
require.NoError(t, err, "Retrieving proposal history failed for public key %v", key)
expectedProposals := expectedHistory.Proposals[key]
require.DeepEqual(t, expectedProposals, proposalHistory[:], "Proposals are incorrect")
}
attestationHistory, err := store.AttestationHistoryForPubKeys(context.Background(), pubKeys)
require.NoError(t, err, "Retrieving attestation history failed")
for _, key := range pubKeys {
expectedAttestations := expectedHistory.Attestations[key]
require.Equal(t, expectedAttestations[0], attestationHistory[key].TargetToSource[0], "Attestations are incorrect")
}
}

View File

@@ -1,99 +0,0 @@
package kv
import (
"context"
"encoding/binary"
"fmt"
"github.com/pkg/errors"
"github.com/prysmaticlabs/go-bitfield"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/wealdtech/go-bytesutil"
bolt "go.etcd.io/bbolt"
"go.opencensus.io/trace"
)
// ProposalHistoryForPubkey for a validator public key.
type ProposalHistoryForPubkey struct {
Proposals []Proposal
}
type Proposal struct {
Slot uint64 `json:"slot"`
SigningRoot []byte `json:"signing_root"`
}
// ProposalHistoryForEpoch accepts a validator public key and returns the corresponding proposal history.
// Returns nil if there is no proposal history for the validator.
func (store *Store) ProposalHistoryForEpoch(ctx context.Context, publicKey []byte, epoch uint64) (bitfield.Bitlist, error) {
ctx, span := trace.StartSpan(ctx, "Validator.ProposalHistoryForEpoch")
defer span.End()
var err error
// Adding an extra byte for the bitlist length.
slotBitlist := make(bitfield.Bitlist, params.BeaconConfig().SlotsPerEpoch/8+1)
err = store.view(func(tx *bolt.Tx) error {
bucket := tx.Bucket(historicProposalsBucket)
valBucket := bucket.Bucket(publicKey)
if valBucket == nil {
return fmt.Errorf("validator history empty for public key %#x", publicKey)
}
slotBits := valBucket.Get(bytesutil.Bytes8(epoch))
if len(slotBits) == 0 {
slotBitlist = bitfield.NewBitlist(params.BeaconConfig().SlotsPerEpoch)
return nil
}
copy(slotBitlist, slotBits)
return nil
})
return slotBitlist, err
}
// SaveProposalHistoryForEpoch saves the proposal history for the requested validator public key.
func (store *Store) SaveProposalHistoryForEpoch(ctx context.Context, pubKey []byte, epoch uint64, slotBits bitfield.Bitlist) error {
ctx, span := trace.StartSpan(ctx, "Validator.SaveProposalHistoryForEpoch")
defer span.End()
err := store.update(func(tx *bolt.Tx) error {
bucket := tx.Bucket(historicProposalsBucket)
valBucket := bucket.Bucket(pubKey)
if valBucket == nil {
return fmt.Errorf("validator history is empty for validator %#x", pubKey)
}
if err := valBucket.Put(bytesutil.Bytes8(epoch), slotBits); err != nil {
return err
}
return pruneProposalHistory(valBucket, epoch)
})
return err
}
// UpdatePublicKeysBuckets for a specified list of keys.
func (store *Store) OldUpdatePublicKeysBuckets(pubKeys [][48]byte) error {
return store.update(func(tx *bolt.Tx) error {
bucket := tx.Bucket(historicProposalsBucket)
for _, pubKey := range pubKeys {
if _, err := bucket.CreateBucketIfNotExists(pubKey[:]); err != nil {
return errors.Wrap(err, "failed to create proposal history bucket")
}
}
return nil
})
}
func pruneProposalHistory(valBucket *bolt.Bucket, newestEpoch uint64) error {
c := valBucket.Cursor()
for k, _ := c.First(); k != nil; k, _ = c.First() {
epoch := binary.LittleEndian.Uint64(k)
// Only delete epochs that are older than the weak subjectivity period.
if epoch+params.BeaconConfig().WeakSubjectivityPeriod <= newestEpoch {
if err := c.Delete(); err != nil {
return errors.Wrapf(err, "could not prune epoch %d in proposal history", epoch)
}
} else {
// If starting from the oldest, we dont find anything prunable, stop pruning.
break
}
}
return nil
}

View File

@@ -1,196 +0,0 @@
package kv
import (
"bytes"
"context"
"testing"
"github.com/prysmaticlabs/go-bitfield"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/testutil/require"
)
func TestProposalHistoryForEpoch_InitializesNewPubKeys(t *testing.T) {
pubkeys := [][48]byte{{30}, {25}, {20}}
db := setupDB(t, pubkeys)
for _, pub := range pubkeys {
slotBits, err := db.ProposalHistoryForEpoch(context.Background(), pub[:], 0)
require.NoError(t, err)
cleanBits := bitfield.NewBitlist(params.BeaconConfig().SlotsPerEpoch)
require.DeepEqual(t, cleanBits.Bytes(), slotBits.Bytes(), "Expected proposal history slot bits to be empty")
}
}
func TestProposalHistoryForEpoch_NilDB(t *testing.T) {
valPubkey := [48]byte{1, 2, 3}
db := setupDB(t, [][48]byte{})
_, err := db.ProposalHistoryForEpoch(context.Background(), valPubkey[:], 0)
require.ErrorContains(t, "validator history empty for public key", err, "Unexpected error for nil DB")
}
func TestSaveProposalHistoryForEpoch_OK(t *testing.T) {
pubkey := [48]byte{3}
db := setupDB(t, [][48]byte{pubkey})
epoch := uint64(2)
slot := uint64(2)
slotBits := bitfield.Bitlist{0x04, 0x00, 0x00, 0x00, 0x04}
err := db.SaveProposalHistoryForEpoch(context.Background(), pubkey[:], epoch, slotBits)
require.NoError(t, err, "Saving proposal history failed: %v")
savedBits, err := db.ProposalHistoryForEpoch(context.Background(), pubkey[:], epoch)
require.NoError(t, err, "Failed to get proposal history")
require.NotNil(t, savedBits)
require.DeepEqual(t, slotBits, savedBits, "Expected DB to keep object the same")
require.Equal(t, true, savedBits.BitAt(slot), "Expected slot %d to be marked as proposed", slot)
require.Equal(t, false, savedBits.BitAt(slot+1), "Expected slot %d to not be marked as proposed", slot+1)
require.Equal(t, false, savedBits.BitAt(slot-1), "Expected slot %d to not be marked as proposed", slot-1)
}
func TestSaveProposalHistoryForEpoch_Overwrites(t *testing.T) {
pubkey := [48]byte{0}
tests := []struct {
slot uint64
slotBits bitfield.Bitlist
}{
{
slot: uint64(1),
slotBits: bitfield.Bitlist{0x02, 0x00, 0x00, 0x00, 0x02},
},
{
slot: uint64(2),
slotBits: bitfield.Bitlist{0x04, 0x00, 0x00, 0x00, 0x04},
},
{
slot: uint64(3),
slotBits: bitfield.Bitlist{0x08, 0x00, 0x00, 0x00, 0x08},
},
}
for _, tt := range tests {
db := setupDB(t, [][48]byte{pubkey})
err := db.SaveProposalHistoryForEpoch(context.Background(), pubkey[:], 0, tt.slotBits)
require.NoError(t, err, "Saving proposal history failed")
savedBits, err := db.ProposalHistoryForEpoch(context.Background(), pubkey[:], 0)
require.NoError(t, err, "Failed to get proposal history")
require.NotNil(t, savedBits)
require.DeepEqual(t, tt.slotBits, savedBits, "Expected DB to keep object the same")
require.Equal(t, true, savedBits.BitAt(tt.slot), "Expected slot %d to be marked as proposed", tt.slot)
require.Equal(t, false, savedBits.BitAt(tt.slot+1), "Expected slot %d to not be marked as proposed", tt.slot+1)
require.Equal(t, false, savedBits.BitAt(tt.slot-1), "Expected slot %d to not be marked as proposed", tt.slot-1)
}
}
func TestProposalHistoryForEpoch_MultipleEpochs(t *testing.T) {
pubKey := [48]byte{0}
tests := []struct {
slots []uint64
expectedBits []bitfield.Bitlist
}{
{
slots: []uint64{1, 2, 8, 31},
expectedBits: []bitfield.Bitlist{{0b00000110, 0b00000001, 0b00000000, 0b10000000, 0b00000001}},
},
{
slots: []uint64{1, 33, 8},
expectedBits: []bitfield.Bitlist{
{0b00000010, 0b00000001, 0b00000000, 0b00000000, 0b00000001},
{0b00000010, 0b00000000, 0b00000000, 0b00000000, 0b00000001},
},
},
{
slots: []uint64{2, 34, 36},
expectedBits: []bitfield.Bitlist{
{0b00000100, 0b00000000, 0b00000000, 0b00000000, 0b00000001},
{0b00010100, 0b00000000, 0b00000000, 0b00000000, 0b00000001},
},
},
{
slots: []uint64{32, 33, 34},
expectedBits: []bitfield.Bitlist{
{0, 0, 0, 0, 1},
{0b00000111, 0b00000000, 0b00000000, 0b00000000, 0b00000001},
},
},
}
for _, tt := range tests {
db := setupDB(t, [][48]byte{pubKey})
for _, slot := range tt.slots {
slotBits, err := db.ProposalHistoryForEpoch(context.Background(), pubKey[:], helpers.SlotToEpoch(slot))
require.NoError(t, err, "Failed to get proposal history")
slotBits.SetBitAt(slot%params.BeaconConfig().SlotsPerEpoch, true)
err = db.SaveProposalHistoryForEpoch(context.Background(), pubKey[:], helpers.SlotToEpoch(slot), slotBits)
require.NoError(t, err, "Saving proposal history failed")
}
for i, slotBits := range tt.expectedBits {
savedBits, err := db.ProposalHistoryForEpoch(context.Background(), pubKey[:], uint64(i))
require.NoError(t, err, "Failed to get proposal history")
require.DeepEqual(t, slotBits, savedBits, "Unexpected difference in bytes for slots %v", tt.slots)
}
}
}
func TestPruneProposalHistory_OK(t *testing.T) {
slotsPerEpoch := params.BeaconConfig().SlotsPerEpoch
wsPeriod := params.BeaconConfig().WeakSubjectivityPeriod
pubKey := [48]byte{0}
tests := []struct {
slots []uint64
storedEpochs []uint64
removedEpochs []uint64
}{
{
// Go 2 epochs past pruning point.
slots: []uint64{slotsPerEpoch / 2, slotsPerEpoch*5 + 6, (wsPeriod+3)*slotsPerEpoch + 8},
storedEpochs: []uint64{5, 54003},
removedEpochs: []uint64{0},
},
{
// Go 10 epochs past pruning point.
slots: []uint64{
slotsPerEpoch + 4, slotsPerEpoch * 2,
slotsPerEpoch * 3, slotsPerEpoch * 4,
slotsPerEpoch * 5, (wsPeriod+10)*slotsPerEpoch + 8,
},
storedEpochs: []uint64{54010},
removedEpochs: []uint64{1, 2, 3, 4},
},
{
// Prune none.
slots: []uint64{slotsPerEpoch + 4, slotsPerEpoch*2 + 3, slotsPerEpoch*3 + 4, slotsPerEpoch*4 + 3, slotsPerEpoch*5 + 3},
storedEpochs: []uint64{1, 2, 3, 4, 5},
},
}
for _, tt := range tests {
db := setupDB(t, [][48]byte{pubKey})
for _, slot := range tt.slots {
slotBits, err := db.ProposalHistoryForEpoch(context.Background(), pubKey[:], helpers.SlotToEpoch(slot))
require.NoError(t, err, "Failed to get proposal history")
slotBits.SetBitAt(slot%params.BeaconConfig().SlotsPerEpoch, true)
err = db.SaveProposalHistoryForEpoch(context.Background(), pubKey[:], helpers.SlotToEpoch(slot), slotBits)
require.NoError(t, err, "Saving proposal history failed")
}
for _, epoch := range tt.removedEpochs {
savedBits, err := db.ProposalHistoryForEpoch(context.Background(), pubKey[:], epoch)
require.NoError(t, err, "Failed to get proposal history")
require.DeepEqual(t, bitfield.NewBitlist(slotsPerEpoch), savedBits, "Unexpected difference in bytes for epoch %d", epoch)
}
for _, epoch := range tt.storedEpochs {
savedBits, err := db.ProposalHistoryForEpoch(context.Background(), pubKey[:], epoch)
require.NoError(t, err, "Failed to get proposal history")
if bytes.Equal(bitfield.NewBitlist(slotsPerEpoch), savedBits) {
t.Fatalf("unexpected difference in bytes for epoch %d, expected %v vs received %v", epoch, bitfield.NewBitlist(slotsPerEpoch), savedBits)
}
}
}
}

View File

@@ -5,15 +5,24 @@ import (
"fmt"
"github.com/pkg/errors"
"github.com/prysmaticlabs/go-bitfield"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/params"
log "github.com/sirupsen/logrus"
bolt "go.etcd.io/bbolt"
"go.opencensus.io/trace"
)
// ProposalHistoryForPubkey for a validator public key.
type ProposalHistoryForPubkey struct {
Proposals []Proposal
}
// Proposal representation for a validator public key.
type Proposal struct {
Slot uint64 `json:"slot"`
SigningRoot []byte `json:"signing_root"`
}
// ProposedPublicKeys retrieves all public keys in our proposals history bucket.
func (store *Store) ProposedPublicKeys(ctx context.Context) ([][48]byte, error) {
ctx, span := trace.StartSpan(ctx, "Validator.ProposedPublicKeys")
@@ -149,91 +158,6 @@ func (store *Store) HighestSignedProposal(ctx context.Context, publicKey [48]byt
return highestSignedProposalSlot, err
}
// MigrateV2ProposalFormat accepts a validator public key and returns the corresponding signing root.
// Returns nil if there is no proposal history for the validator at this slot.
func (store *Store) MigrateV2ProposalFormat(ctx context.Context) error {
ctx, span := trace.StartSpan(ctx, "Validator.MigrateV2ProposalFormat")
defer span.End()
var allKeys [][48]byte
err := store.db.View(func(tx *bolt.Tx) error {
proposalsBucket := tx.Bucket(historicProposalsBucket)
if err := proposalsBucket.ForEach(func(pubKey, _ []byte) error {
var pubKeyCopy [48]byte
copy(pubKeyCopy[:], pubKey)
allKeys = append(allKeys, pubKeyCopy)
return nil
}); err != nil {
return errors.Wrapf(err, "could not retrieve proposals for source in %s", store.databasePath)
}
return nil
})
if err != nil {
return err
}
allKeys = removeDuplicateKeys(allKeys)
var prs []*pubKeyProposals
err = store.db.View(func(tx *bolt.Tx) error {
proposalsBucket := tx.Bucket(historicProposalsBucket)
for _, pk := range allKeys {
pr, err := getPubKeyProposals(pk, proposalsBucket)
prs = append(prs, pr)
if err != nil {
return errors.Wrap(err, "could not retrieve public key old proposals format")
}
}
return nil
})
if err != nil {
return err
}
err = store.db.Update(func(tx *bolt.Tx) error {
newProposalsBucket := tx.Bucket(newHistoricProposalsBucket)
for _, pr := range prs {
valBucket, err := newProposalsBucket.CreateBucketIfNotExists(pr.PubKey[:])
if err != nil {
return errors.Wrap(err, "could not could not create bucket for public key")
}
for _, epochProposals := range pr.Proposals {
// Adding an extra byte for the bitlist length.
slotBitlist := make(bitfield.Bitlist, params.BeaconConfig().SlotsPerEpoch/8+1)
slotBits := epochProposals.Proposals
if len(slotBits) == 0 {
continue
}
copy(slotBitlist, slotBits)
for i := uint64(0); i < params.BeaconConfig().SlotsPerEpoch; i++ {
if slotBitlist.BitAt(i) {
ss, err := helpers.StartSlot(bytesutil.FromBytes8(epochProposals.Epoch))
if err != nil {
return errors.Wrapf(err, "failed to get start slot of epoch: %d", epochProposals.Epoch)
}
if err := valBucket.Put(bytesutil.Uint64ToBytesBigEndian(ss+i), []byte{1}); err != nil {
return err
}
}
}
}
}
return nil
})
return err
}
// UpdatePublicKeysBuckets for a specified list of keys.
func (store *Store) UpdatePublicKeysBuckets(pubKeys [][48]byte) error {
return store.update(func(tx *bolt.Tx) error {
bucket := tx.Bucket(newHistoricProposalsBucket)
for _, pubKey := range pubKeys {
if _, err := bucket.CreateBucketIfNotExists(pubKey[:]); err != nil {
return errors.Wrap(err, "failed to create proposal history bucket")
}
}
return nil
})
}
func pruneProposalHistoryBySlot(valBucket *bolt.Bucket, newestSlot uint64) error {
c := valBucket.Cursor()
for k, _ := c.First(); k != nil; k, _ = c.First() {
@@ -252,45 +176,3 @@ func pruneProposalHistoryBySlot(valBucket *bolt.Bucket, newestSlot uint64) error
}
return nil
}
// MigrateV2ProposalsProtectionDb exports old proposal protection data format to the
// new format and save the exported flag to database.
func (store *Store) MigrateV2ProposalsProtectionDb(ctx context.Context) error {
importProposals, err := store.shouldImportProposals()
if err != nil {
return err
}
if !importProposals {
return nil
}
log.Info("Starting proposals protection db migration to v2...")
if err := store.MigrateV2ProposalFormat(ctx); err != nil {
return err
}
err = store.db.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket(historicProposalsBucket)
if bucket != nil {
if err := bucket.Put([]byte(proposalExported), []byte{1}); err != nil {
return errors.Wrap(err, "failed to set exported proposals flag in db")
}
}
return nil
})
log.Info("Finished proposals protection db migration to v2")
return err
}
func (store *Store) shouldImportProposals() (bool, error) {
var importProposals bool
err := store.db.View(func(tx *bolt.Tx) error {
proposalBucket := tx.Bucket(historicProposalsBucket)
if proposalBucket != nil && proposalBucket.Stats().KeyN != 0 {
if exported := proposalBucket.Get([]byte(proposalExported)); exported == nil {
importProposals = true
}
}
return nil
})
return importProposals, err
}

View File

@@ -4,7 +4,6 @@ import (
"context"
"testing"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/testutil/assert"
@@ -155,75 +154,6 @@ func TestPruneProposalHistoryBySlot_OK(t *testing.T) {
}
}
func TestStore_ImportProposalHistory(t *testing.T) {
pubkey := [48]byte{3}
ctx := context.Background()
db := setupDB(t, [][48]byte{pubkey})
proposedSlots := make(map[uint64]bool)
proposedSlots[0] = true
proposedSlots[1] = true
proposedSlots[20] = true
proposedSlots[31] = true
proposedSlots[32] = true
proposedSlots[33] = true
proposedSlots[1023] = true
proposedSlots[1024] = true
proposedSlots[1025] = true
lastIndex := 1025 + params.BeaconConfig().SlotsPerEpoch
for slot := range proposedSlots {
slotBitlist, err := db.ProposalHistoryForEpoch(context.Background(), pubkey[:], helpers.SlotToEpoch(slot))
require.NoError(t, err)
slotBitlist.SetBitAt(slot%params.BeaconConfig().SlotsPerEpoch, true)
err = db.SaveProposalHistoryForEpoch(context.Background(), pubkey[:], helpers.SlotToEpoch(slot), slotBitlist)
require.NoError(t, err)
}
err := db.MigrateV2ProposalFormat(ctx)
require.NoError(t, err)
for slot := uint64(0); slot <= lastIndex; slot++ {
if _, ok := proposedSlots[slot]; ok {
root, _, err := db.ProposalHistoryForSlot(ctx, pubkey, slot)
require.NoError(t, err)
require.DeepEqual(t, bytesutil.PadTo([]byte{1}, 32), root[:], "slot: %d", slot)
continue
}
root, _, err := db.ProposalHistoryForSlot(ctx, pubkey, slot)
require.NoError(t, err)
require.DeepEqual(t, bytesutil.PadTo([]byte{}, 32), root[:])
}
}
func TestShouldImportProposals(t *testing.T) {
pubkey := [48]byte{3}
db := setupDB(t, nil)
shouldImport, err := db.shouldImportProposals()
require.NoError(t, err)
require.Equal(t, false, shouldImport, "Empty bucket should not be imported")
err = db.OldUpdatePublicKeysBuckets([][48]byte{pubkey})
require.NoError(t, err)
shouldImport, err = db.shouldImportProposals()
require.NoError(t, err)
require.Equal(t, true, shouldImport, "Bucket with content should be imported")
}
func TestStore_UpdateProposalsProtectionDb(t *testing.T) {
pubkey := [48]byte{3}
db := setupDB(t, [][48]byte{pubkey})
ctx := context.Background()
err := db.OldUpdatePublicKeysBuckets([][48]byte{pubkey})
require.NoError(t, err)
shouldImport, err := db.shouldImportProposals()
require.NoError(t, err)
require.Equal(t, true, shouldImport, "Bucket with content should be imported")
err = db.MigrateV2ProposalsProtectionDb(ctx)
require.NoError(t, err)
shouldImport, err = db.shouldImportProposals()
require.NoError(t, err)
require.Equal(t, false, shouldImport, "Proposals should not be re-imported")
}
func TestStore_ProposedPublicKeys(t *testing.T) {
ctx := context.Background()
validatorDB, err := NewKVStore(t.TempDir(), nil)

View File

@@ -101,12 +101,6 @@ func NewValidatorClient(cliCtx *cli.Context) (*ValidatorClient, error) {
if err := ValidatorClient.initializeFromCLI(cliCtx); err != nil {
return nil, err
}
if err := ValidatorClient.db.MigrateV2ProposalsProtectionDb(cliCtx.Context); err != nil {
return nil, err
}
if err := ValidatorClient.db.MigrateV2AttestationProtectionDb(cliCtx.Context); err != nil {
return nil, err
}
return ValidatorClient, nil
}