Add Back New Attester Protection DB Logic (#8242)

* Revert "Revert New Attester Protection DB Logic (#8237)"

This reverts commit 6738fa3493.

* Batch Attestation Records and Flush All at Once in Validator DB (#8243)

* begin flushing logic

* finalize logic before starting tests

* make code DRY

* better log fields

* gaz

* tweak parameter

* rename

* clarifying comment on error handling in event feed

* comprehensive tests

* more comments

* explain parameters in comments

* renamed consts

* Apply suggestions from code review

* gaz

* simplify

* typo

* comments
This commit is contained in:
Raul Jordan
2021-01-11 17:59:17 -06:00
committed by GitHub
parent 323eac6d6c
commit d97596348e
27 changed files with 971 additions and 913 deletions

View File

@@ -35,6 +35,7 @@ go_library(
"//shared/hashutil:go_default_library",
"//shared/mathutil:go_default_library",
"//shared/params:go_default_library",
"//shared/slashutil:go_default_library",
"//shared/sliceutil:go_default_library",
"//shared/trieutil:go_default_library",
"@com_github_gogo_protobuf//proto:go_default_library",

View File

@@ -10,6 +10,7 @@ import (
v "github.com/prysmaticlabs/prysm/beacon-chain/core/validators"
stateTrie "github.com/prysmaticlabs/prysm/beacon-chain/state"
"github.com/prysmaticlabs/prysm/shared/attestationutil"
"github.com/prysmaticlabs/prysm/shared/slashutil"
"github.com/prysmaticlabs/prysm/shared/sliceutil"
)
@@ -120,7 +121,10 @@ func IsSlashableAttestationData(data1, data2 *ethpb.AttestationData) bool {
return false
}
isDoubleVote := !attestationutil.AttDataIsEqual(data1, data2) && data1.Target.Epoch == data2.Target.Epoch
isSurroundVote := data1.Source.Epoch < data2.Source.Epoch && data2.Target.Epoch < data1.Target.Epoch
att1 := &ethpb.IndexedAttestation{Data: data1}
att2 := &ethpb.IndexedAttestation{Data: data2}
// Check if att1 is surrounding att2.
isSurroundVote := slashutil.IsSurround(att1, att2)
return isDoubleVote || isSurroundVote
}

View File

@@ -0,0 +1,12 @@
load("@prysm//tools/go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = ["progress.go"],
importpath = "github.com/prysmaticlabs/prysm/shared/progressutil",
visibility = ["//visibility:public"],
deps = [
"@com_github_k0kubun_go_ansi//:go_default_library",
"@com_github_schollz_progressbar_v3//:go_default_library",
],
)

View File

@@ -0,0 +1,27 @@
package progressutil
import (
"fmt"
"github.com/k0kubun/go-ansi"
"github.com/schollz/progressbar/v3"
)
// InitializeProgressBar standard for use in Prysm.
func InitializeProgressBar(numItems int, msg string) *progressbar.ProgressBar {
return progressbar.NewOptions(
numItems,
progressbar.OptionFullWidth(),
progressbar.OptionSetWriter(ansi.NewAnsiStdout()),
progressbar.OptionEnableColorCodes(true),
progressbar.OptionSetTheme(progressbar.Theme{
Saucer: "[green]=[reset]",
SaucerHead: "[green]>[reset]",
SaucerPadding: " ",
BarStart: "[",
BarEnd: "]",
}),
progressbar.OptionOnCompletion(func() { fmt.Println() }),
progressbar.OptionSetDescription(msg),
)
}

View File

@@ -0,0 +1,17 @@
load("@io_bazel_rules_go//go:def.bzl", "go_test")
load("@prysm//tools/go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = ["surround_votes.go"],
importpath = "github.com/prysmaticlabs/prysm/shared/slashutil",
visibility = ["//visibility:public"],
deps = ["@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library"],
)
go_test(
name = "go_default_test",
srcs = ["surround_votes_test.go"],
embed = [":go_default_library"],
deps = ["@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library"],
)

View File

@@ -0,0 +1,16 @@
package slashutil
import ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
// IsSurround checks if an attestation, a, is surrounding
// another one, b, based on the eth2 slashing conditions specified
// by @protolambda https://github.com/protolambda/eth2-surround#definition.
//
// s: source
// t: target
//
// a surrounds b if: s_a < s_b and t_b < t_a
//
func IsSurround(a, b *ethpb.IndexedAttestation) bool {
return a.Data.Source.Epoch < b.Data.Source.Epoch && b.Data.Target.Epoch < a.Data.Target.Epoch
}

View File

@@ -0,0 +1,92 @@
package slashutil
import (
"testing"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
)
func TestIsSurround(t *testing.T) {
type args struct {
a *ethpb.IndexedAttestation
b *ethpb.IndexedAttestation
}
tests := []struct {
name string
args args
want bool
}{
{
name: "0 values returns false",
args: args{
a: createAttestation(0, 0),
b: createAttestation(0, 0),
},
want: false,
},
{
name: "detects surrounding attestation",
args: args{
a: createAttestation(2, 5),
b: createAttestation(3, 4),
},
want: true,
},
{
name: "new attestation source == old source, but new target < old target",
args: args{
a: createAttestation(3, 5),
b: createAttestation(3, 4),
},
want: false,
},
{
name: "new attestation source > old source, but new target == old target",
args: args{
a: createAttestation(3, 5),
b: createAttestation(4, 5),
},
want: false,
},
{
name: "new attestation source and targets equal to old one",
args: args{
a: createAttestation(3, 5),
b: createAttestation(3, 5),
},
want: false,
},
{
name: "new attestation source == old source, but new target > old target",
args: args{
a: createAttestation(3, 5),
b: createAttestation(3, 6),
},
want: false,
},
{
name: "new attestation source < old source, but new target == old target",
args: args{
a: createAttestation(3, 5),
b: createAttestation(2, 5),
},
want: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := IsSurround(tt.args.a, tt.args.b); got != tt.want {
t.Errorf("IsSurrounding() = %v, want %v", got, tt.want)
}
})
}
}
func createAttestation(source, target uint64) *ethpb.IndexedAttestation {
return &ethpb.IndexedAttestation{
Data: &ethpb.AttestationData{
Source: &ethpb.Checkpoint{Epoch: source},
Target: &ethpb.Checkpoint{Epoch: target},
},
}
}

View File

@@ -19,6 +19,7 @@ go_library(
"//shared/bytesutil:go_default_library",
"//shared/event:go_default_library",
"//shared/hashutil:go_default_library",
"//shared/slashutil:go_default_library",
"//shared/sliceutil:go_default_library",
"//slasher/beaconclient:go_default_library",
"//slasher/db:go_default_library",
@@ -49,6 +50,7 @@ go_test(
"//proto/slashing:go_default_library",
"//shared/bytesutil:go_default_library",
"//shared/event:go_default_library",
"//shared/slashutil:go_default_library",
"//shared/testutil/assert:go_default_library",
"//shared/testutil/require:go_default_library",
"//slasher/db/testing:go_default_library",

View File

@@ -10,6 +10,7 @@ import (
"github.com/prysmaticlabs/prysm/shared/attestationutil"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/hashutil"
"github.com/prysmaticlabs/prysm/shared/slashutil"
"github.com/prysmaticlabs/prysm/shared/sliceutil"
status "github.com/prysmaticlabs/prysm/slasher/db/types"
"github.com/prysmaticlabs/prysm/slasher/detection/attestations/types"
@@ -137,8 +138,8 @@ func (ds *Service) detectSurroundVotes(
if att.Data == nil {
continue
}
isSurround := isSurrounding(incomingAtt, att)
isSurrounded := isSurrounding(att, incomingAtt)
isSurround := slashutil.IsSurround(incomingAtt, att)
isSurrounded := slashutil.IsSurround(att, incomingAtt)
if !isSurround && !isSurrounded {
continue
}
@@ -210,11 +211,6 @@ func isDoubleVote(incomingAtt, prevAtt *ethpb.IndexedAttestation) bool {
return !attestationutil.AttDataIsEqual(incomingAtt.Data, prevAtt.Data) && incomingAtt.Data.Target.Epoch == prevAtt.Data.Target.Epoch
}
func isSurrounding(incomingAtt, prevAtt *ethpb.IndexedAttestation) bool {
return incomingAtt.Data.Source.Epoch < prevAtt.Data.Source.Epoch &&
incomingAtt.Data.Target.Epoch > prevAtt.Data.Target.Epoch
}
// UpdateHighestAttestation updates to the db the highest source and target attestations for a each validator.
func (ds *Service) UpdateHighestAttestation(ctx context.Context, att *ethpb.IndexedAttestation) error {
for _, idx := range att.AttestingIndices {

View File

@@ -9,6 +9,7 @@ import (
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
slashpb "github.com/prysmaticlabs/prysm/proto/slashing"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/slashutil"
"github.com/prysmaticlabs/prysm/shared/testutil/assert"
"github.com/prysmaticlabs/prysm/shared/testutil/require"
testDB "github.com/prysmaticlabs/prysm/slasher/db/testing"
@@ -189,7 +190,7 @@ func TestDetect_detectAttesterSlashings_Surround(t *testing.T) {
for _, ss := range slashings {
slashingAtt1 := ss.Attestation_1
slashingAtt2 := ss.Attestation_2
if !isSurrounding(slashingAtt1, slashingAtt2) {
if !slashutil.IsSurround(slashingAtt1, slashingAtt2) {
t.Fatalf(
"Expected slashing to be valid, received atts %d->%d and %d->%d",
slashingAtt2.Data.Source.Epoch,

View File

@@ -1,16 +1,13 @@
package client
import (
"bytes"
"context"
"fmt"
"github.com/pkg/errors"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/shared/featureconfig"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/validator/db/kv"
"github.com/sirupsen/logrus"
"go.opencensus.io/trace"
)
@@ -30,40 +27,24 @@ func (v *validator) slashableAttestationCheck(
defer span.End()
fmtKey := fmt.Sprintf("%#x", pubKey[:])
attesterHistory, err := v.db.AttestationHistoryForPubKeyV2(ctx, pubKey)
slashingKind, err := v.db.CheckSlashableAttestation(ctx, pubKey, signingRoot, indexedAtt)
if err != nil {
return errors.Wrap(err, "could not get attester history")
}
slashable, err := isNewAttSlashable(
ctx,
attesterHistory,
indexedAtt.Data.Source.Epoch,
indexedAtt.Data.Target.Epoch,
signingRoot,
)
if err != nil {
return errors.Wrap(err, "could not check if attestation is slashable")
}
if slashable {
if v.emitAccountMetrics {
ValidatorAttestFailVec.WithLabelValues(fmtKey).Inc()
}
return errors.New(failedAttLocalProtectionErr)
switch slashingKind {
case kv.DoubleVote:
log.Warn("Attestation is slashable as it is a double vote")
case kv.SurroundingVote:
log.Warn("Attestation is slashable as it is surrounding a previous attestation")
case kv.SurroundedVote:
log.Warn("Attestation is slashable as it is surrounded by a previous attestation")
}
return errors.Wrap(err, failedAttLocalProtectionErr)
}
newHistory, err := kv.MarkAllAsAttestedSinceLatestWrittenEpoch(
ctx,
attesterHistory,
indexedAtt.Data.Target.Epoch,
&kv.HistoryData{
Source: indexedAtt.Data.Source.Epoch,
SigningRoot: signingRoot[:],
},
)
if err != nil {
return errors.Wrapf(err, "could not mark epoch %d as attested", indexedAtt.Data.Target.Epoch)
}
if err := v.db.SaveAttestationHistoryForPubKeyV2(ctx, pubKey, newHistory); err != nil {
return errors.Wrapf(err, "could not save attestation history for public key: %#x", pubKey)
if err := v.db.SaveAttestationForPubKey(ctx, pubKey, signingRoot, indexedAtt); err != nil {
return errors.Wrap(err, "could not save attestation history for validator public key")
}
// TODO(#7813): Add back the saving of lowest target and lowest source epoch
@@ -78,146 +59,3 @@ func (v *validator) slashableAttestationCheck(
}
return nil
}
// isNewAttSlashable uses the attestation history to determine if an attestation of sourceEpoch
// and targetEpoch would be slashable. It can detect double, surrounding, and surrounded votes.
func isNewAttSlashable(
ctx context.Context,
history kv.EncHistoryData,
sourceEpoch,
targetEpoch uint64,
signingRoot [32]byte,
) (bool, error) {
ctx, span := trace.StartSpan(ctx, "isNewAttSlashable")
defer span.End()
if history == nil {
return false, nil
}
wsPeriod := params.BeaconConfig().WeakSubjectivityPeriod
// Previously pruned, we should return false.
latestEpochWritten, err := history.GetLatestEpochWritten(ctx)
if err != nil {
log.WithError(err).Error("Could not get latest epoch written from encapsulated data")
return false, err
}
// Underflow protected older then weak subjectivity check.
if latestEpochWritten >= wsPeriod && targetEpoch <= latestEpochWritten-wsPeriod {
return false, nil
}
// Check if there has already been a vote for this target epoch.
hd, err := history.GetTargetData(ctx, targetEpoch)
if err != nil {
return false, errors.Wrapf(err, "could not get target data for epoch: %d", targetEpoch)
}
if !hd.IsEmpty() && !bytes.Equal(signingRoot[:], hd.SigningRoot) {
log.WithFields(logrus.Fields{
"signingRoot": fmt.Sprintf("%#x", signingRoot),
"targetEpoch": targetEpoch,
"previouslyAttestedSigningRoot": fmt.Sprintf("%#x", hd.SigningRoot),
}).Warn("Attempted to submit a double vote, but blocked by slashing protection")
return true, nil
}
isSurround, err := isSurroundVote(ctx, history, latestEpochWritten, sourceEpoch, targetEpoch)
if err != nil {
return false, errors.Wrap(err, "could not check if attestation is surround vote")
}
return isSurround, nil
}
func isSurroundVote(
ctx context.Context,
history kv.EncHistoryData,
latestEpochWritten,
sourceEpoch,
targetEpoch uint64,
) (bool, error) {
for i := sourceEpoch; i <= targetEpoch; i++ {
historicalAtt, err := checkHistoryAtTargetEpoch(ctx, history, latestEpochWritten, i)
if err != nil {
return false, errors.Wrapf(err, "could not check historical attestation at target epoch: %d", i)
}
if historicalAtt.IsEmpty() {
continue
}
prevTarget := i
prevSource := historicalAtt.Source
if surroundingPrevAttestation(prevSource, prevTarget, sourceEpoch, targetEpoch) {
// Surrounding attestation caught.
log.WithFields(logrus.Fields{
"targetEpoch": targetEpoch,
"sourceEpoch": sourceEpoch,
"previouslyAttestedTargetEpoch": prevTarget,
"previouslyAttestedSourceEpoch": prevSource,
}).Warn("Attempted to submit a surrounding attestation, but blocked by slashing protection")
return true, nil
}
}
// Check if the new attestation is being surrounded.
for i := targetEpoch; i <= latestEpochWritten; i++ {
historicalAtt, err := checkHistoryAtTargetEpoch(ctx, history, latestEpochWritten, i)
if err != nil {
return false, errors.Wrapf(err, "could not check historical attestation at target epoch: %d", i)
}
if historicalAtt.IsEmpty() {
continue
}
prevTarget := i
prevSource := historicalAtt.Source
if surroundedByPrevAttestation(prevSource, prevTarget, sourceEpoch, targetEpoch) {
// Surrounded attestation caught.
log.WithFields(logrus.Fields{
"targetEpoch": targetEpoch,
"sourceEpoch": sourceEpoch,
"previouslyAttestedTargetEpoch": prevTarget,
"previouslyAttestedSourceEpoch": prevSource,
}).Warn("Attempted to submit a surrounded attestation, but blocked by slashing protection")
return true, nil
}
}
return false, nil
}
func surroundedByPrevAttestation(prevSource, prevTarget, newSource, newTarget uint64) bool {
return prevSource < newSource && newTarget < prevTarget
}
func surroundingPrevAttestation(prevSource, prevTarget, newSource, newTarget uint64) bool {
return newSource < prevSource && prevTarget < newTarget
}
// Checks that the difference between the latest epoch written and
// target epoch is greater than or equal to the weak subjectivity period.
func differenceOutsideWeakSubjectivityBounds(latestEpochWritten, targetEpoch uint64) bool {
wsPeriod := params.BeaconConfig().WeakSubjectivityPeriod
return latestEpochWritten >= wsPeriod && targetEpoch <= latestEpochWritten-wsPeriod
}
// safeTargetToSource makes sure the epoch accessed is within bounds, and if it's not it at
// returns the "default" nil value.
// Returns the actual attesting history at a specified target epoch.
// The response is nil if there was no attesting history at that epoch.
func checkHistoryAtTargetEpoch(
ctx context.Context,
history kv.EncHistoryData,
latestEpochWritten,
targetEpoch uint64,
) (*kv.HistoryData, error) {
wsPeriod := params.BeaconConfig().WeakSubjectivityPeriod
if differenceOutsideWeakSubjectivityBounds(latestEpochWritten, targetEpoch) {
return nil, nil
}
// Ignore target epoch is > latest written.
if targetEpoch > latestEpochWritten {
return nil, nil
}
historicalAtt, err := history.GetTargetData(ctx, targetEpoch%wsPeriod)
if err != nil {
return nil, errors.Wrapf(err, "could not get target data for target epoch: %d", targetEpoch)
}
return historicalAtt, nil
}

View File

@@ -2,16 +2,13 @@ package client
import (
"context"
"reflect"
"testing"
"github.com/golang/mock/gomock"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/featureconfig"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/testutil/require"
"github.com/prysmaticlabs/prysm/validator/db/kv"
mockSlasher "github.com/prysmaticlabs/prysm/validator/testing"
)
@@ -195,501 +192,3 @@ func Test_slashableAttestationCheck_GenesisEpoch(t *testing.T) {
require.NoError(t, err)
require.Equal(t, uint64(0), e)
}
func TestAttestationHistory_BlocksDoubleAttestation(t *testing.T) {
ctx := context.Background()
history := kv.NewAttestationHistoryArray(3)
// Mark an attestation spanning epochs 0 to 3.
newAttSource := uint64(0)
newAttTarget := uint64(3)
sr1 := [32]byte{1}
newHist, err := kv.MarkAllAsAttestedSinceLatestWrittenEpoch(ctx, history, newAttTarget, &kv.HistoryData{
Source: newAttSource,
SigningRoot: sr1[:],
})
require.NoError(t, err)
history = newHist
lew, err := history.GetLatestEpochWritten(ctx)
require.NoError(t, err)
require.Equal(t, newAttTarget, lew, "Unexpected latest epoch written")
// Try an attestation that should be slashable (double att) spanning epochs 1 to 3.
sr2 := [32]byte{2}
newAttSource = uint64(1)
newAttTarget = uint64(3)
slashable, err := isNewAttSlashable(ctx, history, newAttSource, newAttTarget, sr2)
require.NoError(t, err)
if !slashable {
t.Fatalf("Expected attestation of source %d and target %d to be considered slashable", newAttSource, newAttTarget)
}
}
func TestAttestationHistory_Prunes(t *testing.T) {
ctx := context.Background()
wsPeriod := params.BeaconConfig().WeakSubjectivityPeriod
signingRoot := [32]byte{1}
signingRoot2 := [32]byte{2}
signingRoot3 := [32]byte{3}
signingRoot4 := [32]byte{4}
history := kv.NewAttestationHistoryArray(0)
// Try an attestation on totally unmarked history, should not be slashable.
slashable, err := isNewAttSlashable(ctx, history, 0, wsPeriod+5, signingRoot)
require.NoError(t, err)
require.Equal(t, false, slashable, "Should not be slashable")
// Mark attestations spanning epochs 0 to 3 and 6 to 9.
prunedNewAttSource := uint64(0)
prunedNewAttTarget := uint64(3)
newHist, err := kv.MarkAllAsAttestedSinceLatestWrittenEpoch(ctx, history, prunedNewAttTarget, &kv.HistoryData{
Source: prunedNewAttSource,
SigningRoot: signingRoot[:],
})
require.NoError(t, err)
history = newHist
newAttSource := prunedNewAttSource + 6
newAttTarget := prunedNewAttTarget + 6
newHist, err = kv.MarkAllAsAttestedSinceLatestWrittenEpoch(ctx, history, newAttTarget, &kv.HistoryData{
Source: newAttSource,
SigningRoot: signingRoot2[:],
})
require.NoError(t, err)
history = newHist
lte, err := history.GetLatestEpochWritten(ctx)
require.NoError(t, err)
require.Equal(t, newAttTarget, lte, "Unexpected latest epoch")
// Mark an attestation spanning epochs 54000 to 54003.
farNewAttSource := newAttSource + wsPeriod
farNewAttTarget := newAttTarget + wsPeriod
newHist, err = kv.MarkAllAsAttestedSinceLatestWrittenEpoch(ctx, history, farNewAttTarget, &kv.HistoryData{
Source: farNewAttSource,
SigningRoot: signingRoot3[:],
})
require.NoError(t, err)
history = newHist
lte, err = history.GetLatestEpochWritten(ctx)
require.NoError(t, err)
require.Equal(t, farNewAttTarget, lte, "Unexpected latest epoch")
histAtt, err := checkHistoryAtTargetEpoch(ctx, history, lte, prunedNewAttTarget)
require.NoError(t, err)
require.Equal(t, (*kv.HistoryData)(nil), histAtt, "Unexpectedly marked attestation")
histAtt, err = checkHistoryAtTargetEpoch(ctx, history, lte, farNewAttTarget)
require.NoError(t, err)
require.Equal(t, farNewAttSource, histAtt.Source, "Unexpectedly marked attestation")
// Try an attestation from existing source to outside prune, should slash.
slashable, err = isNewAttSlashable(ctx, history, newAttSource, farNewAttTarget, signingRoot4)
require.NoError(t, err)
if !slashable {
t.Fatalf("Expected attestation of source %d, target %d to be considered slashable", newAttSource, farNewAttTarget)
}
// Try an attestation from before existing target to outside prune, should slash.
slashable, err = isNewAttSlashable(ctx, history, newAttTarget-1, farNewAttTarget, signingRoot4)
require.NoError(t, err)
if !slashable {
t.Fatalf("Expected attestation of source %d, target %d to be considered slashable", newAttTarget-1, farNewAttTarget)
}
// Try an attestation larger than pruning amount, should slash.
slashable, err = isNewAttSlashable(ctx, history, 0, farNewAttTarget+5, signingRoot4)
require.NoError(t, err)
if !slashable {
t.Fatalf("Expected attestation of source 0, target %d to be considered slashable", farNewAttTarget+5)
}
}
func TestAttestationHistory_BlocksSurroundedAttestation(t *testing.T) {
ctx := context.Background()
history := kv.NewAttestationHistoryArray(0)
// Mark an attestation spanning epochs 0 to 3.
signingRoot := [32]byte{1}
newAttSource := uint64(0)
newAttTarget := uint64(3)
newHist, err := kv.MarkAllAsAttestedSinceLatestWrittenEpoch(ctx, history, newAttTarget, &kv.HistoryData{
Source: newAttSource,
SigningRoot: signingRoot[:],
})
require.NoError(t, err)
history = newHist
lte, err := history.GetLatestEpochWritten(ctx)
require.NoError(t, err)
require.Equal(t, newAttTarget, lte)
// Try an attestation that should be slashable (being surrounded) spanning epochs 1 to 2.
newAttSource = uint64(1)
newAttTarget = uint64(2)
slashable, err := isNewAttSlashable(ctx, history, newAttSource, newAttTarget, signingRoot)
require.NoError(t, err)
require.Equal(t, true, slashable, "Expected slashable attestation")
}
func TestAttestationHistory_BlocksSurroundingAttestation(t *testing.T) {
ctx := context.Background()
history := kv.NewAttestationHistoryArray(0)
signingRoot := [32]byte{1}
// Mark an attestation spanning epochs 1 to 2.
newAttSource := uint64(1)
newAttTarget := uint64(2)
newHist, err := kv.MarkAllAsAttestedSinceLatestWrittenEpoch(ctx, history, newAttTarget, &kv.HistoryData{
Source: newAttSource,
SigningRoot: signingRoot[:],
})
require.NoError(t, err)
history = newHist
lte, err := history.GetLatestEpochWritten(ctx)
require.NoError(t, err)
require.Equal(t, newAttTarget, lte)
ts, err := history.GetTargetData(ctx, newAttTarget)
require.NoError(t, err)
require.Equal(t, newAttSource, ts.Source)
// Try an attestation that should be slashable (surrounding) spanning epochs 0 to 3.
newAttSource = uint64(0)
newAttTarget = uint64(3)
slashable, err := isNewAttSlashable(ctx, history, newAttSource, newAttTarget, signingRoot)
require.NoError(t, err)
require.Equal(t, true, slashable)
}
func Test_isSurroundVote(t *testing.T) {
ctx := context.Background()
source := uint64(1)
target := uint64(4)
history := kv.NewAttestationHistoryArray(0)
signingRoot1 := bytesutil.PadTo([]byte{1}, 32)
hist, err := history.SetTargetData(ctx, target, &kv.HistoryData{
Source: source,
SigningRoot: signingRoot1,
})
require.NoError(t, err)
history = hist
tests := []struct {
name string
history kv.EncHistoryData
latestEpochWritten uint64
sourceEpoch uint64
targetEpoch uint64
want bool
wantErr bool
}{
{
name: "ignores attestations outside of weak subjectivity bounds",
history: kv.NewAttestationHistoryArray(0),
latestEpochWritten: 2 * params.BeaconConfig().WeakSubjectivityPeriod,
targetEpoch: params.BeaconConfig().WeakSubjectivityPeriod,
sourceEpoch: params.BeaconConfig().WeakSubjectivityPeriod,
want: false,
},
{
name: "detects surrounding attestations",
history: history,
latestEpochWritten: target,
targetEpoch: target + 1,
sourceEpoch: source - 1,
want: true,
},
{
name: "detects surrounded attestations",
history: history,
latestEpochWritten: target,
targetEpoch: target - 1,
sourceEpoch: source + 1,
want: true,
},
{
name: "new attestation source == old source, but new target < old target",
history: history,
latestEpochWritten: target,
targetEpoch: target - 1,
sourceEpoch: source,
want: false,
},
{
name: "new attestation source > old source, but new target == old target",
history: history,
latestEpochWritten: target,
targetEpoch: target,
sourceEpoch: source + 1,
want: false,
},
{
name: "new attestation source and targets equal to old one",
history: history,
latestEpochWritten: target,
targetEpoch: target,
sourceEpoch: source,
want: false,
},
{
name: "new attestation source == old source, but new target > old target",
history: history,
latestEpochWritten: target,
targetEpoch: target + 1,
sourceEpoch: source,
want: false,
},
{
name: "new attestation source < old source, but new target == old target",
history: history,
latestEpochWritten: target,
targetEpoch: target,
sourceEpoch: source - 1,
want: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := isSurroundVote(ctx, tt.history, tt.latestEpochWritten, tt.sourceEpoch, tt.targetEpoch)
if (err != nil) != tt.wantErr {
t.Errorf("isSurroundVote() error = %v, wantErr %v", err, tt.wantErr)
return
}
if got != tt.want {
t.Errorf("isSurroundVote() got = %v, want %v", got, tt.want)
}
})
}
}
func Test_surroundedByPrevAttestation(t *testing.T) {
type args struct {
oldSource uint64
oldTarget uint64
newSource uint64
newTarget uint64
}
tests := []struct {
name string
args args
want bool
}{
{
name: "0 values returns false",
args: args{
oldSource: 0,
oldTarget: 0,
newSource: 0,
newTarget: 0,
},
want: false,
},
{
name: "new attestation is surrounded by an old one",
args: args{
oldSource: 2,
oldTarget: 6,
newSource: 3,
newTarget: 5,
},
want: true,
},
{
name: "new attestation source and targets equal to old one",
args: args{
oldSource: 3,
oldTarget: 5,
newSource: 3,
newTarget: 5,
},
want: false,
},
{
name: "new attestation source == old source, but new target < old target",
args: args{
oldSource: 3,
oldTarget: 5,
newSource: 3,
newTarget: 4,
},
want: false,
},
{
name: "new attestation source > old source, but new target == old target",
args: args{
oldSource: 3,
oldTarget: 5,
newSource: 4,
newTarget: 5,
},
want: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := surroundedByPrevAttestation(tt.args.oldSource, tt.args.oldTarget, tt.args.newSource, tt.args.newTarget); got != tt.want {
t.Errorf("surroundedByPrevAttestation() = %v, want %v", got, tt.want)
}
})
}
}
func Test_surroundingPrevAttestation(t *testing.T) {
type args struct {
oldSource uint64
oldTarget uint64
newSource uint64
newTarget uint64
}
tests := []struct {
name string
args args
want bool
}{
{
name: "0 values returns false",
args: args{
oldSource: 0,
oldTarget: 0,
newSource: 0,
newTarget: 0,
},
want: false,
},
{
name: "new attestation is surrounding an old one",
args: args{
oldSource: 3,
oldTarget: 5,
newSource: 2,
newTarget: 6,
},
want: true,
},
{
name: "new attestation source and targets equal to old one",
args: args{
oldSource: 3,
oldTarget: 5,
newSource: 3,
newTarget: 5,
},
want: false,
},
{
name: "new attestation source == old source, but new target > old target",
args: args{
oldSource: 3,
oldTarget: 5,
newSource: 3,
newTarget: 6,
},
want: false,
},
{
name: "new attestation source < old source, but new target == old target",
args: args{
oldSource: 3,
oldTarget: 5,
newSource: 2,
newTarget: 5,
},
want: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := surroundingPrevAttestation(tt.args.oldSource, tt.args.oldTarget, tt.args.newSource, tt.args.newTarget); got != tt.want {
t.Errorf("surroundingPrevAttestation() = %v, want %v", got, tt.want)
}
})
}
}
func Test_checkHistoryAtTargetEpoch(t *testing.T) {
ctx := context.Background()
history := kv.NewAttestationHistoryArray(0)
signingRoot1 := bytesutil.PadTo([]byte{1}, 32)
hist, err := history.SetTargetData(ctx, 1, &kv.HistoryData{
Source: 0,
SigningRoot: signingRoot1,
})
require.NoError(t, err)
history = hist
tests := []struct {
name string
history kv.EncHistoryData
latestEpochWritten uint64
targetEpoch uint64
want *kv.HistoryData
wantErr bool
}{
{
name: "ignores difference in epochs outside of weak subjectivity bounds",
history: kv.NewAttestationHistoryArray(0),
latestEpochWritten: 2 * params.BeaconConfig().WeakSubjectivityPeriod,
targetEpoch: params.BeaconConfig().WeakSubjectivityPeriod,
want: nil,
wantErr: false,
},
{
name: "ignores target epoch > latest written epoch",
history: kv.NewAttestationHistoryArray(0),
latestEpochWritten: params.BeaconConfig().WeakSubjectivityPeriod,
targetEpoch: params.BeaconConfig().WeakSubjectivityPeriod + 1,
want: nil,
wantErr: false,
},
{
name: "target epoch == latest written epoch should return correct results",
history: history,
latestEpochWritten: 1,
targetEpoch: 1,
want: &kv.HistoryData{
Source: 0,
SigningRoot: signingRoot1,
},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := checkHistoryAtTargetEpoch(ctx, tt.history, tt.latestEpochWritten, tt.targetEpoch)
if (err != nil) != tt.wantErr {
t.Errorf("checkHistoryAtTargetEpoch() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("checkHistoryAtTargetEpoch() got = %v, want %v", got, tt.want)
}
})
}
}
func Test_differenceOutsideWeakSubjectivityBounds(t *testing.T) {
tests := []struct {
name string
want bool
latestEpochWritten uint64
targetEpoch uint64
}{
{
name: "difference of weak subjectivity period - 1 returns false",
latestEpochWritten: (2 * params.BeaconConfig().WeakSubjectivityPeriod) - 1,
targetEpoch: params.BeaconConfig().WeakSubjectivityPeriod,
want: false,
},
{
name: "difference of weak subjectivity period returns true",
latestEpochWritten: 2 * params.BeaconConfig().WeakSubjectivityPeriod,
targetEpoch: params.BeaconConfig().WeakSubjectivityPeriod,
want: true,
},
{
name: "difference > weak subjectivity period returns true",
latestEpochWritten: (2 * params.BeaconConfig().WeakSubjectivityPeriod) + 1,
targetEpoch: params.BeaconConfig().WeakSubjectivityPeriod,
want: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := differenceOutsideWeakSubjectivityBounds(tt.latestEpochWritten, tt.targetEpoch); got != tt.want {
t.Errorf("differenceOutsideWeakSubjectivityBounds() = %v, want %v", got, tt.want)
}
})
}
}

View File

@@ -9,5 +9,6 @@ go_library(
deps = [
"//shared/backuputil:go_default_library",
"//validator/db/kv:go_default_library",
"@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library",
],
)

View File

@@ -5,6 +5,7 @@ import (
"context"
"io"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/shared/backuputil"
"github.com/prysmaticlabs/prysm/validator/db/kv"
)
@@ -36,7 +37,11 @@ type ValidatorDB interface {
LowestSignedSourceEpoch(ctx context.Context, publicKey [48]byte) (uint64, error)
SaveLowestSignedTargetEpoch(ctx context.Context, publicKey [48]byte, epoch uint64) error
SaveLowestSignedSourceEpoch(ctx context.Context, publicKey [48]byte, epoch uint64) error
AttestationHistoryForPubKeyV2(ctx context.Context, publicKey [48]byte) (kv.EncHistoryData, error)
SaveAttestationHistoryForPubKeyV2(ctx context.Context, publicKey [48]byte, history kv.EncHistoryData) error
AttestedPublicKeys(ctx context.Context) ([][48]byte, error)
CheckSlashableAttestation(
ctx context.Context, pubKey [48]byte, signingRoot [32]byte, att *ethpb.IndexedAttestation,
) (kv.SlashingKind, error)
SaveAttestationForPubKey(
ctx context.Context, pubKey [48]byte, signingRoot [32]byte, att *ethpb.IndexedAttestation,
) error
}

View File

@@ -12,6 +12,7 @@ go_library(
"historical_attestations.go",
"log.go",
"migration.go",
"migration_optimal_attester_protection.go",
"proposal_history_v2.go",
"prune_attester_protection.go",
"schema.go",
@@ -21,9 +22,12 @@ go_library(
deps = [
"//beacon-chain/core/helpers:go_default_library",
"//shared/bytesutil:go_default_library",
"//shared/featureconfig:go_default_library",
"//shared/event:go_default_library",
"//shared/fileutil:go_default_library",
"//shared/params:go_default_library",
"//shared/progressutil:go_default_library",
"//shared/slashutil:go_default_library",
"@com_github_golang_snappy//:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_prometheus_client_golang//prometheus:go_default_library",
"@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library",
@@ -43,16 +47,23 @@ go_test(
"db_test.go",
"genesis_test.go",
"historical_attestations_test.go",
"migration_optimal_attester_protection_test.go",
"proposal_history_v2_test.go",
"prune_attester_protection_test.go",
],
embed = [":go_default_library"],
deps = [
"//shared/bytesutil:go_default_library",
"//shared/fileutil:go_default_library",
"//shared/params:go_default_library",
"//shared/testutil/assert:go_default_library",
"//shared/testutil/require:go_default_library",
"@com_github_golang_snappy//:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_prometheus_client_golang//prometheus:go_default_library",
"@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@com_github_sirupsen_logrus//hooks/test:go_default_library",
"@io_etcd_go_bbolt//:go_default_library",
],
)

View File

@@ -4,7 +4,6 @@ import (
"context"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/featureconfig"
bolt "go.etcd.io/bbolt"
"go.opencensus.io/trace"
)
@@ -27,55 +26,6 @@ func (store *Store) AttestedPublicKeys(ctx context.Context) ([][48]byte, error)
return attestedPublicKeys, err
}
// AttestationHistoryForPubKeyV2 returns the corresponding attesting
// history for a specified validator public key.
func (store *Store) AttestationHistoryForPubKeyV2(ctx context.Context, publicKey [48]byte) (EncHistoryData, error) {
ctx, span := trace.StartSpan(ctx, "Validator.AttestationHistoryForPubKeyV2")
defer span.End()
if !featureconfig.Get().DisableAttestingHistoryDBCache {
store.lock.RLock()
if history, ok := store.attestingHistoriesByPubKey[publicKey]; ok {
store.lock.RUnlock()
return history, nil
}
store.lock.RUnlock()
}
var err error
var attestationHistory EncHistoryData
err = store.view(func(tx *bolt.Tx) error {
bucket := tx.Bucket(historicAttestationsBucket)
enc := bucket.Get(publicKey[:])
if len(enc) == 0 {
attestationHistory = NewAttestationHistoryArray(0)
} else {
attestationHistory = enc
}
return nil
})
if !featureconfig.Get().DisableAttestingHistoryDBCache {
store.lock.Lock()
store.attestingHistoriesByPubKey[publicKey] = attestationHistory
store.lock.Unlock()
}
return attestationHistory, err
}
// SaveAttestationHistoryForPubKeyV2 saves the attestation history for the requested validator public key.
func (store *Store) SaveAttestationHistoryForPubKeyV2(ctx context.Context, pubKey [48]byte, history EncHistoryData) error {
ctx, span := trace.StartSpan(ctx, "Validator.SaveAttestationHistoryForPubKeyV2")
defer span.End()
err := store.update(func(tx *bolt.Tx) error {
bucket := tx.Bucket(historicAttestationsBucket)
return bucket.Put(pubKey[:], history)
})
if !featureconfig.Get().DisableAttestingHistoryDBCache {
store.lock.Lock()
store.attestingHistoriesByPubKey[pubKey] = history
store.lock.Unlock()
}
return err
}
// LowestSignedSourceEpoch returns the lowest signed source epoch for a validator public key.
// If no data exists, returning 0 is a sensible default.
func (store *Store) LowestSignedSourceEpoch(ctx context.Context, publicKey [48]byte) (uint64, error) {

View File

@@ -4,59 +4,9 @@ import (
"context"
"testing"
"github.com/prysmaticlabs/prysm/shared/testutil/assert"
"github.com/prysmaticlabs/prysm/shared/testutil/require"
)
func TestAttestationHistoryForPubKey_OK(t *testing.T) {
ctx := context.Background()
pubKey := [48]byte{30}
db := setupDB(t, [][48]byte{pubKey})
_, err := db.AttestationHistoryForPubKeyV2(context.Background(), pubKey)
require.NoError(t, err)
history := NewAttestationHistoryArray(53999)
history, err = history.SetTargetData(
ctx,
10,
&HistoryData{
Source: uint64(1),
SigningRoot: []byte{1, 2, 3},
},
)
require.NoError(t, err)
err = db.SaveAttestationHistoryForPubKeyV2(context.Background(), pubKey, history)
require.NoError(t, err)
got, err := db.AttestationHistoryForPubKeyV2(context.Background(), pubKey)
require.NoError(t, err)
require.DeepEqual(t, history, got, "Expected attestation history epoch bits to be empty")
}
func TestStore_AttestedPublicKeys(t *testing.T) {
ctx := context.Background()
validatorDB, err := NewKVStore(ctx, t.TempDir(), nil)
require.NoError(t, err, "Failed to instantiate DB")
t.Cleanup(func() {
require.NoError(t, validatorDB.Close(), "Failed to close database")
require.NoError(t, validatorDB.ClearDB(), "Failed to clear database")
})
keys, err := validatorDB.AttestedPublicKeys(ctx)
require.NoError(t, err)
assert.DeepEqual(t, make([][48]byte, 0), keys)
pubKey := [48]byte{1}
err = validatorDB.SaveAttestationHistoryForPubKeyV2(ctx, pubKey, NewAttestationHistoryArray(0))
require.NoError(t, err)
keys, err = validatorDB.AttestedPublicKeys(ctx)
require.NoError(t, err)
assert.DeepEqual(t, [][48]byte{pubKey}, keys)
}
func TestLowestSignedSourceEpoch_SaveRetrieve(t *testing.T) {
ctx := context.Background()
validatorDB, err := NewKVStore(ctx, t.TempDir(), nil)

View File

@@ -4,15 +4,38 @@ import (
"bytes"
"context"
"fmt"
"time"
"github.com/pkg/errors"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/slashutil"
bolt "go.etcd.io/bbolt"
"go.opencensus.io/trace"
)
// SlashingKind used for helpful information upon detection.
type SlashingKind int
// An attestation record can be represented by these simple values
// for manipulation by database methods.
type attestationRecord struct {
pubKey [48]byte
source uint64
target uint64
signingRoot [32]byte
}
// A wrapper over an error received from a background routine
// saving batched attestations for slashing protection.
// This wrapper allows us to send this response over event feeds,
// as our event feed does not allow sending `nil` values to
// subscribers.
type saveAttestationsResponse struct {
err error
}
// Enums representing the types of slashable events for attesters.
const (
NotSlashable SlashingKind = iota
DoubleVote
@@ -29,7 +52,7 @@ var (
// CheckSlashableAttestation verifies an incoming attestation is
// not a double vote for a validator public key nor a surround vote.
func (store *Store) CheckSlashableAttestation(
ctx context.Context, pubKey [48]byte, signingRoot [32]byte, att *ethpb.Attestation,
ctx context.Context, pubKey [48]byte, signingRoot [32]byte, att *ethpb.IndexedAttestation,
) (SlashingKind, error) {
var slashKind SlashingKind
err := store.view(func(tx *bolt.Tx) error {
@@ -58,8 +81,16 @@ func (store *Store) CheckSlashableAttestation(
return sourceEpochsBucket.ForEach(func(sourceEpochBytes []byte, targetEpochBytes []byte) error {
existingSourceEpoch := bytesutil.BytesToUint64BigEndian(sourceEpochBytes)
existingTargetEpoch := bytesutil.BytesToUint64BigEndian(targetEpochBytes)
surrounding := att.Data.Source.Epoch < existingSourceEpoch && att.Data.Target.Epoch > existingTargetEpoch
surrounded := att.Data.Source.Epoch > existingSourceEpoch && att.Data.Target.Epoch < existingTargetEpoch
existingAtt := &ethpb.IndexedAttestation{
Data: &ethpb.AttestationData{
Source: &ethpb.Checkpoint{Epoch: existingSourceEpoch},
Target: &ethpb.Checkpoint{Epoch: existingTargetEpoch},
},
}
// Checks if the incoming attestation is surrounding or
// is surrounded by an existing one.
surrounding := slashutil.IsSurround(att, existingAtt)
surrounded := slashutil.IsSurround(existingAtt, att)
if surrounding {
slashKind = SurroundingVote
return fmt.Errorf(
@@ -86,32 +117,112 @@ func (store *Store) CheckSlashableAttestation(
return slashKind, err
}
// ApplyAttestationForPubKey applies an attestation for a validator public
// key by storing its signing root under the appropriate bucket as well
// as its source and target epochs for future slashing protection checks.
func (store *Store) ApplyAttestationForPubKey(
ctx context.Context, pubKey [48]byte, signingRoot [32]byte, att *ethpb.Attestation,
// SaveAttestationForPubKey saves an attestation for a validator public
// key for local validator slashing protection.
func (store *Store) SaveAttestationForPubKey(
ctx context.Context, pubKey [48]byte, signingRoot [32]byte, att *ethpb.IndexedAttestation,
) error {
return store.update(func(tx *bolt.Tx) error {
bucket := tx.Bucket(pubKeysBucket)
pkBucket, err := bucket.CreateBucketIfNotExists(pubKey[:])
if err != nil {
return err
}
sourceEpochBytes := bytesutil.Uint64ToBytesBigEndian(att.Data.Source.Epoch)
targetEpochBytes := bytesutil.Uint64ToBytesBigEndian(att.Data.Target.Epoch)
store.batchedAttestationsChan <- &attestationRecord{
pubKey: pubKey,
source: att.Data.Source.Epoch,
target: att.Data.Target.Epoch,
signingRoot: signingRoot,
}
// Subscribe to be notified when the attestation record queued
// for saving to the DB is indeed saved. If an error occurred
// during the process of saving the attestation record, the sender
// will give us that error. We use a buffered channel
// to prevent blocking the sender from notifying us of the result.
responseChan := make(chan saveAttestationsResponse, 1)
defer close(responseChan)
sub := store.batchAttestationsFlushedFeed.Subscribe(responseChan)
defer sub.Unsubscribe()
res := <-responseChan
return res.err
}
signingRootsBucket, err := pkBucket.CreateBucketIfNotExists(attestationSigningRootsBucket)
if err != nil {
return err
// Meant to run as a background routine, this function checks whether:
// (a) we have reached a max capacity of batched attestations in the Store or
// (b) attestationBatchWriteInterval has passed
// Based on whichever comes first, this function then proceeds
// to flush the attestations to the DB all at once in a single boltDB
// transaction for efficiency. Then, batched attestations slice is emptied out.
func (store *Store) batchAttestationWrites(ctx context.Context) {
ticker := time.NewTicker(attestationBatchWriteInterval)
defer ticker.Stop()
for {
select {
case v := <-store.batchedAttestationsChan:
store.batchedAttestations = append(store.batchedAttestations, v)
if len(store.batchedAttestations) == attestationBatchCapacity {
log.WithField("numRecords", attestationBatchCapacity).Debug(
"Reached max capacity of batched attestation records, flushing to DB",
)
store.flushAttestationRecords(ctx)
}
case <-ticker.C:
if len(store.batchedAttestations) > 0 {
log.WithField("numRecords", len(store.batchedAttestations)).Debug(
"Batched attestation records write interval reached, flushing to DB",
)
store.flushAttestationRecords(ctx)
}
case <-ctx.Done():
return
}
if err := signingRootsBucket.Put(targetEpochBytes, signingRoot[:]); err != nil {
return err
}
sourceEpochsBucket, err := pkBucket.CreateBucketIfNotExists(attestationSourceEpochsBucket)
if err != nil {
return err
}
return sourceEpochsBucket.Put(sourceEpochBytes, targetEpochBytes)
}
}
// Flushes a list of batched attestations to the database
// and resets the list of batched attestations for future writes.
// This function notifies all subscribers for flushed attestations
// of the result of the save operation.
func (store *Store) flushAttestationRecords(ctx context.Context) {
err := store.saveAttestationRecords(ctx, store.batchedAttestations)
// If there was no error, we reset the batched attestations slice.
if err == nil {
log.Debug("Successfully flushed batched attestations to DB")
store.batchedAttestations = make([]*attestationRecord, 0, attestationBatchCapacity)
}
// Forward the error, if any, to all subscribers via an event feed.
// We use a struct wrapper around the error as the event feed
// cannot handle sending a raw `nil` in case there is no error.
store.batchAttestationsFlushedFeed.Send(saveAttestationsResponse{
err: err,
})
}
// Saves a list of attestation records to the database in a single boltDB
// transaction to minimize write lock contention compared to doing them
// all in individual, isolated boltDB transactions.
func (store *Store) saveAttestationRecords(ctx context.Context, atts []*attestationRecord) error {
ctx, span := trace.StartSpan(ctx, "Validator.saveAttestationRecords")
defer span.End()
return store.update(func(tx *bolt.Tx) error {
bucket := tx.Bucket(pubKeysBucket)
for _, att := range atts {
pkBucket, err := bucket.CreateBucketIfNotExists(att.pubKey[:])
if err != nil {
return errors.Wrap(err, "could not create public key bucket")
}
sourceEpochBytes := bytesutil.Uint64ToBytesBigEndian(att.source)
targetEpochBytes := bytesutil.Uint64ToBytesBigEndian(att.target)
signingRootsBucket, err := pkBucket.CreateBucketIfNotExists(attestationSigningRootsBucket)
if err != nil {
return errors.Wrap(err, "could not create signing roots bucket")
}
if err := signingRootsBucket.Put(targetEpochBytes, att.signingRoot[:]); err != nil {
return errors.Wrapf(err, "could not save signing signing root for epoch %d", att.target)
}
sourceEpochsBucket, err := pkBucket.CreateBucketIfNotExists(attestationSourceEpochsBucket)
if err != nil {
return errors.Wrap(err, "could not create source epochs bucket")
}
if err := sourceEpochsBucket.Put(sourceEpochBytes, targetEpochBytes); err != nil {
return errors.Wrapf(err, "could not save source epoch %d for epoch %d", att.source, att.target)
}
}
return nil
})
}

View File

@@ -2,14 +2,17 @@ package kv
import (
"context"
"fmt"
"os"
"path/filepath"
"sync"
"testing"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/testutil/assert"
"github.com/prysmaticlabs/prysm/shared/testutil/require"
logTest "github.com/sirupsen/logrus/hooks/test"
bolt "go.etcd.io/bbolt"
)
@@ -20,9 +23,9 @@ func TestStore_CheckSlashableAttestation_DoubleVote(t *testing.T) {
validatorDB := setupDB(t, pubKeys)
tests := []struct {
name string
existingAttestation *ethpb.Attestation
existingAttestation *ethpb.IndexedAttestation
existingSigningRoot [32]byte
incomingAttestation *ethpb.Attestation
incomingAttestation *ethpb.IndexedAttestation
incomingSigningRoot [32]byte
want bool
}{
@@ -61,7 +64,7 @@ func TestStore_CheckSlashableAttestation_DoubleVote(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := validatorDB.ApplyAttestationForPubKey(
err := validatorDB.SaveAttestationForPubKey(
ctx,
pubKeys[0],
tt.existingSigningRoot,
@@ -118,7 +121,7 @@ func TestStore_CheckSlashableAttestation_SurroundVote_54kEpochs(t *testing.T) {
tests := []struct {
name string
signingRoot [32]byte
attestation *ethpb.Attestation
attestation *ethpb.IndexedAttestation
want SlashingKind
}{
{
@@ -157,6 +160,116 @@ func TestStore_CheckSlashableAttestation_SurroundVote_54kEpochs(t *testing.T) {
}
}
func TestSaveAttestationForPubKey_BatchWrites_FullCapacity(t *testing.T) {
hook := logTest.NewGlobal()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
numValidators := attestationBatchCapacity
pubKeys := make([][48]byte, numValidators)
validatorDB := setupDB(t, pubKeys)
// For each public key, we attempt to save an attestation with signing root.
var wg sync.WaitGroup
for i, pubKey := range pubKeys {
wg.Add(1)
go func(j int, pk [48]byte, w *sync.WaitGroup) {
defer w.Done()
var signingRoot [32]byte
copy(signingRoot[:], fmt.Sprintf("%d", j))
att := createAttestation(uint64(j), uint64(j)+1)
err := validatorDB.SaveAttestationForPubKey(ctx, pk, signingRoot, att)
require.NoError(t, err)
}(i, pubKey, &wg)
}
wg.Wait()
// We verify that we reached the max capacity of batched attestations
// before we are required to force flush them to the DB.
require.LogsContain(t, hook, "Reached max capacity of batched attestation records")
require.LogsDoNotContain(t, hook, "Batched attestation records write interval reached")
require.LogsContain(t, hook, "Successfully flushed batched attestations to DB")
require.Equal(t, 0, len(validatorDB.batchedAttestations))
// We then verify all the data we wanted to save is indeed saved to disk.
err := validatorDB.view(func(tx *bolt.Tx) error {
bucket := tx.Bucket(pubKeysBucket)
for i, pubKey := range pubKeys {
var signingRoot [32]byte
copy(signingRoot[:], fmt.Sprintf("%d", i))
pkBucket := bucket.Bucket(pubKey[:])
signingRootsBucket := pkBucket.Bucket(attestationSigningRootsBucket)
sourceEpochsBucket := pkBucket.Bucket(attestationSourceEpochsBucket)
source := bytesutil.Uint64ToBytesBigEndian(uint64(i))
target := bytesutil.Uint64ToBytesBigEndian(uint64(i) + 1)
savedSigningRoot := signingRootsBucket.Get(target)
require.DeepEqual(t, signingRoot[:], savedSigningRoot)
savedTarget := sourceEpochsBucket.Get(source)
require.DeepEqual(t, signingRoot[:], savedSigningRoot)
require.DeepEqual(t, target, savedTarget)
}
return nil
})
require.NoError(t, err)
}
func TestSaveAttestationForPubKey_BatchWrites_LowCapacity_TimerReached(t *testing.T) {
hook := logTest.NewGlobal()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Number of validators equal to half the total capacity
// of batch attestation processing. This will allow us to
// test force flushing to the DB based on a timer instead
// of the max capacity being reached.
numValidators := attestationBatchCapacity / 2
pubKeys := make([][48]byte, numValidators)
validatorDB := setupDB(t, pubKeys)
// For each public key, we attempt to save an attestation with signing root.
var wg sync.WaitGroup
for i, pubKey := range pubKeys {
wg.Add(1)
go func(j int, pk [48]byte, w *sync.WaitGroup) {
defer w.Done()
var signingRoot [32]byte
copy(signingRoot[:], fmt.Sprintf("%d", j))
att := createAttestation(uint64(j), uint64(j)+1)
err := validatorDB.SaveAttestationForPubKey(ctx, pk, signingRoot, att)
require.NoError(t, err)
}(i, pubKey, &wg)
}
wg.Wait()
// We verify that we reached a timer interval for force flushing records
// before we are required to force flush them to the DB.
require.LogsDoNotContain(t, hook, "Reached max capacity of batched attestation records")
require.LogsContain(t, hook, "Batched attestation records write interval reached")
require.LogsContain(t, hook, "Successfully flushed batched attestations to DB")
require.Equal(t, 0, len(validatorDB.batchedAttestations))
// We then verify all the data we wanted to save is indeed saved to disk.
err := validatorDB.view(func(tx *bolt.Tx) error {
bucket := tx.Bucket(pubKeysBucket)
for i, pubKey := range pubKeys {
var signingRoot [32]byte
copy(signingRoot[:], fmt.Sprintf("%d", i))
pkBucket := bucket.Bucket(pubKey[:])
signingRootsBucket := pkBucket.Bucket(attestationSigningRootsBucket)
sourceEpochsBucket := pkBucket.Bucket(attestationSourceEpochsBucket)
source := bytesutil.Uint64ToBytesBigEndian(uint64(i))
target := bytesutil.Uint64ToBytesBigEndian(uint64(i) + 1)
savedSigningRoot := signingRootsBucket.Get(target)
require.DeepEqual(t, signingRoot[:], savedSigningRoot)
savedTarget := sourceEpochsBucket.Get(source)
require.DeepEqual(t, signingRoot[:], savedSigningRoot)
require.DeepEqual(t, target, savedTarget)
}
return nil
})
require.NoError(t, err)
}
func BenchmarkStore_CheckSlashableAttestation_Surround_SafeAttestation_54kEpochs(b *testing.B) {
numValidators := 1
numEpochs := uint64(54000)
@@ -211,7 +324,7 @@ func benchCheckSurroundVote(
require.NoError(b, err)
// Will surround many attestations.
var surroundingVote *ethpb.Attestation
var surroundingVote *ethpb.IndexedAttestation
if shouldSurround {
surroundingVote = createAttestation(numEpochs/2, numEpochs)
} else {
@@ -231,8 +344,8 @@ func benchCheckSurroundVote(
}
}
func createAttestation(source, target uint64) *ethpb.Attestation {
return &ethpb.Attestation{
func createAttestation(source, target uint64) *ethpb.IndexedAttestation {
return &ethpb.IndexedAttestation{
Data: &ethpb.AttestationData{
Source: &ethpb.Checkpoint{
Epoch: source,

View File

@@ -5,27 +5,42 @@ import (
"context"
"os"
"path/filepath"
"sync"
"time"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
prombolt "github.com/prysmaticlabs/prombbolt"
"github.com/prysmaticlabs/prysm/shared/featureconfig"
"github.com/prysmaticlabs/prysm/shared/event"
"github.com/prysmaticlabs/prysm/shared/fileutil"
"github.com/prysmaticlabs/prysm/shared/params"
bolt "go.etcd.io/bbolt"
)
const (
// Number of attestation records we can hold in memory
// before we flush them to the database. Roughly corresponds
// to the max number of keys per validator client, but there is no
// detriment if there are more keys than this capacity, as attestations
// for those keys will simply be flushed at the next flush interval.
attestationBatchCapacity = 2048
// Time interval after which we flush attestation records to the database
// from a batch kept in memory for slashing protection.
attestationBatchWriteInterval = time.Millisecond * 100
)
// ProtectionDbFileName Validator slashing protection db file name.
var ProtectionDbFileName = "validator.db"
var (
ProtectionDbFileName = "validator.db"
)
// Store defines an implementation of the Prysm Database interface
// using BoltDB as the underlying persistent kv-store for eth2.
type Store struct {
db *bolt.DB
databasePath string
lock sync.RWMutex
attestingHistoriesByPubKey map[[48]byte]EncHistoryData
db *bolt.DB
databasePath string
batchedAttestations []*attestationRecord
batchedAttestationsChan chan *attestationRecord
batchAttestationsFlushedFeed *event.Feed
}
// Close closes the underlying boltdb database.
@@ -87,9 +102,11 @@ func NewKVStore(ctx context.Context, dirPath string, pubKeys [][48]byte) (*Store
}
kv := &Store{
db: boltDB,
databasePath: dirPath,
attestingHistoriesByPubKey: make(map[[48]byte]EncHistoryData),
db: boltDB,
databasePath: dirPath,
batchedAttestations: make([]*attestationRecord, 0, attestationBatchCapacity),
batchedAttestationsChan: make(chan *attestationRecord, attestationBatchCapacity),
batchAttestationsFlushedFeed: new(event.Feed),
}
if err := kv.db.Update(func(tx *bolt.Tx) error {
@@ -116,22 +133,20 @@ func NewKVStore(ctx context.Context, dirPath string, pubKeys [][48]byte) (*Store
}
}
// We then fetch the attestation histories for each public key
// and store them in a map for usage at runtime.
if !featureconfig.Get().DisableAttestingHistoryDBCache {
// No need for a lock here as this function is only called once
// to initialize the database and would lead to deadlocks otherwise.
for _, pubKey := range pubKeys {
history, err := kv.AttestationHistoryForPubKeyV2(ctx, pubKey)
if err != nil {
return nil, err
}
kv.attestingHistoriesByPubKey[pubKey] = history
}
// Perform a special migration to an optimal attester protection DB schema.
if err := kv.migrateOptimalAttesterProtection(ctx); err != nil {
return nil, errors.Wrap(err, "could not migrate attester protection to more efficient format")
}
// Prune attesting records older than the current weak subjectivity period.
if err := kv.PruneAttestationsOlderThanCurrentWeakSubjectivity(ctx); err != nil {
return nil, errors.Wrap(err, "could not prune old attestations from DB")
}
// Batch save attestation records for slashing protection at timed
// intervals to our database.
go kv.batchAttestationWrites(ctx)
return kv, prometheus.Register(createBoltCollector(kv.db))
}

View File

@@ -2,11 +2,20 @@ package kv
import (
"context"
"io/ioutil"
"testing"
"github.com/prysmaticlabs/prysm/shared/testutil/require"
"github.com/sirupsen/logrus"
)
func TestMain(m *testing.M) {
logrus.SetLevel(logrus.DebugLevel)
logrus.SetOutput(ioutil.Discard)
m.Run()
}
// setupDB instantiates and returns a DB instance for the validator client.
func setupDB(t testing.TB, pubkeys [][48]byte) *Store {
db, err := NewKVStore(context.Background(), t.TempDir(), pubkeys)

View File

@@ -0,0 +1,124 @@
package kv
import (
"bytes"
"context"
"github.com/golang/snappy"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/progressutil"
bolt "go.etcd.io/bbolt"
)
var migrationOptimalAttesterProtectionKey = []byte("optimal_attester_protection_0")
// Migrate attester protection to a more optimal format in the DB. Given we
// stored attesting history as large, 2Mb arrays per validator, we need to perform
// this migration differently than the rest, ensuring we perform each expensive bolt
// update in its own transaction to prevent having everything on the heap.
func (store *Store) migrateOptimalAttesterProtection(ctx context.Context) error {
publicKeyBytes := make([][]byte, 0)
attestingHistoryBytes := make([][]byte, 0)
numKeys := 0
err := store.db.Update(func(tx *bolt.Tx) error {
mb := tx.Bucket(migrationsBucket)
if b := mb.Get(migrationOptimalAttesterProtectionKey); bytes.Equal(b, migrationCompleted) {
return nil // Migration already completed.
}
bkt := tx.Bucket(historicAttestationsBucket)
numKeys = bkt.Stats().KeyN
if err := bkt.ForEach(func(k, v []byte) error {
if v == nil {
return nil
}
bucket := tx.Bucket(pubKeysBucket)
pkBucket, err := bucket.CreateBucketIfNotExists(k)
if err != nil {
return err
}
_, err = pkBucket.CreateBucketIfNotExists(attestationSourceEpochsBucket)
if err != nil {
return err
}
_, err = pkBucket.CreateBucketIfNotExists(attestationSigningRootsBucket)
if err != nil {
return err
}
nk := make([]byte, len(k))
copy(nk, k)
nv := make([]byte, len(v))
copy(nv, v)
publicKeyBytes = append(publicKeyBytes, nk)
attestingHistoryBytes = append(attestingHistoryBytes, nv)
return nil
}); err != nil {
return err
}
return nil
})
if err != nil {
return err
}
bar := progressutil.InitializeProgressBar(numKeys, "Migrating attesting history to more efficient format")
for i, publicKey := range publicKeyBytes {
attestingHistoryForPubKey := attestingHistoryBytes[i]
err = store.db.Update(func(tx *bolt.Tx) error {
if attestingHistoryForPubKey == nil {
return nil
}
var attestingHistory EncHistoryData
var err error
attestingHistory, err = snappy.Decode(nil /*dst*/, attestingHistoryForPubKey)
if err != nil {
return err
}
bucket := tx.Bucket(pubKeysBucket)
pkBucket := bucket.Bucket(publicKey)
sourceEpochsBucket := pkBucket.Bucket(attestationSourceEpochsBucket)
signingRootsBucket := pkBucket.Bucket(attestationSigningRootsBucket)
// Extract every single source, target, signing root
// from the attesting history then insert them into the
// respective buckets under the new db schema.
latestEpochWritten, err := attestingHistory.GetLatestEpochWritten(ctx)
if err != nil {
return err
}
// For every epoch since genesis up to the highest epoch written, we then
// extract historical data and insert it into the new schema.
for targetEpoch := uint64(0); targetEpoch <= latestEpochWritten; targetEpoch++ {
historicalAtt, err := attestingHistory.GetTargetData(ctx, targetEpoch)
if err != nil {
return err
}
if historicalAtt.IsEmpty() {
continue
}
targetEpochBytes := bytesutil.Uint64ToBytesBigEndian(targetEpoch)
sourceEpochBytes := bytesutil.Uint64ToBytesBigEndian(historicalAtt.Source)
if err := sourceEpochsBucket.Put(sourceEpochBytes, targetEpochBytes); err != nil {
return err
}
if err := signingRootsBucket.Put(targetEpochBytes, historicalAtt.SigningRoot); err != nil {
return err
}
}
return bar.Add(1)
})
if err != nil {
return err
}
}
return store.db.Update(func(tx *bolt.Tx) error {
mb := tx.Bucket(migrationsBucket)
if err := mb.Put(migrationOptimalAttesterProtectionKey, migrationCompleted); err != nil {
return err
}
return nil
})
}

View File

@@ -0,0 +1,247 @@
package kv
import (
"context"
"fmt"
"path/filepath"
"testing"
"github.com/golang/snappy"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/fileutil"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/testutil/require"
bolt "go.etcd.io/bbolt"
)
func Test_migrateOptimalAttesterProtection(t *testing.T) {
tests := []struct {
name string
setup func(t *testing.T, validatorDB *Store)
eval func(t *testing.T, validatorDB *Store)
}{
{
name: "only runs once",
setup: func(t *testing.T, validatorDB *Store) {
err := validatorDB.update(func(tx *bolt.Tx) error {
return tx.Bucket(migrationsBucket).Put(migrationOptimalAttesterProtectionKey, migrationCompleted)
})
require.NoError(t, err)
},
eval: func(t *testing.T, validatorDB *Store) {
err := validatorDB.view(func(tx *bolt.Tx) error {
data := tx.Bucket(migrationsBucket).Get(migrationOptimalAttesterProtectionKey)
require.DeepEqual(t, data, migrationCompleted)
return nil
})
require.NoError(t, err)
},
},
{
name: "populates optimized schema buckets",
setup: func(t *testing.T, validatorDB *Store) {
ctx := context.Background()
pubKey := [48]byte{1}
history := NewAttestationHistoryArray(0)
// Attest all epochs from genesis to 50.
numEpochs := uint64(50)
for i := uint64(1); i <= numEpochs; i++ {
var sr [32]byte
copy(sr[:], fmt.Sprintf("%d", i))
newHist, err := history.SetTargetData(ctx, i, &HistoryData{
Source: i - 1,
SigningRoot: sr[:],
})
require.NoError(t, err)
history = newHist
}
newHist, err := history.SetLatestEpochWritten(ctx, numEpochs)
require.NoError(t, err)
err = validatorDB.update(func(tx *bolt.Tx) error {
bucket := tx.Bucket(historicAttestationsBucket)
enc := snappy.Encode(nil /*dst*/, newHist)
return bucket.Put(pubKey[:], enc)
})
require.NoError(t, err)
},
eval: func(t *testing.T, validatorDB *Store) {
// Verify we indeed have the data for all epochs
// since genesis to epoch 50 under the new schema.
err := validatorDB.view(func(tx *bolt.Tx) error {
pubKey := [48]byte{1}
bucket := tx.Bucket(pubKeysBucket)
pkBucket := bucket.Bucket(pubKey[:])
signingRootsBucket := pkBucket.Bucket(attestationSigningRootsBucket)
sourceEpochsBucket := pkBucket.Bucket(attestationSourceEpochsBucket)
numEpochs := uint64(50)
// Verify we have signing roots for target epochs 1 to 50 correctly.
for targetEpoch := uint64(1); targetEpoch <= numEpochs; targetEpoch++ {
var sr [32]byte
copy(sr[:], fmt.Sprintf("%d", targetEpoch))
targetEpochBytes := bytesutil.Uint64ToBytesBigEndian(targetEpoch)
migratedSigningRoot := signingRootsBucket.Get(targetEpochBytes)
require.DeepEqual(t, sr[:], migratedSigningRoot)
}
// Verify we have (source epoch, target epoch) pairs for epochs 0 to 50 correctly.
for sourceEpoch := uint64(0); sourceEpoch < numEpochs; sourceEpoch++ {
sourceEpochBytes := bytesutil.Uint64ToBytesBigEndian(sourceEpoch)
targetEpochBytes := sourceEpochsBucket.Get(sourceEpochBytes)
targetEpoch := bytesutil.BytesToUint64BigEndian(targetEpochBytes)
require.Equal(t, sourceEpoch+1, targetEpoch)
}
return nil
})
require.NoError(t, err)
},
},
{
name: "partial data saved for both types still completes the migration successfully",
setup: func(t *testing.T, validatorDB *Store) {
ctx := context.Background()
pubKey := [48]byte{1}
history := NewAttestationHistoryArray(0)
// Attest all epochs from genesis to 50.
numEpochs := uint64(50)
for i := uint64(1); i <= numEpochs; i++ {
var sr [32]byte
copy(sr[:], fmt.Sprintf("%d", i))
newHist, err := history.SetTargetData(ctx, i, &HistoryData{
Source: i - 1,
SigningRoot: sr[:],
})
require.NoError(t, err)
history = newHist
}
newHist, err := history.SetLatestEpochWritten(ctx, numEpochs)
require.NoError(t, err)
err = validatorDB.update(func(tx *bolt.Tx) error {
bucket := tx.Bucket(historicAttestationsBucket)
enc := snappy.Encode(nil /*dst*/, newHist)
return bucket.Put(pubKey[:], enc)
})
require.NoError(t, err)
// Run the migration.
require.NoError(t, validatorDB.migrateOptimalAttesterProtection(ctx))
// Then delete the migration completed key.
err = validatorDB.update(func(tx *bolt.Tx) error {
mb := tx.Bucket(migrationsBucket)
return mb.Delete(migrationOptimalAttesterProtectionKey)
})
require.NoError(t, err)
// Write one more entry to the DB with the old format.
var sr [32]byte
copy(sr[:], fmt.Sprintf("%d", numEpochs+1))
newHist, err = newHist.SetTargetData(ctx, numEpochs+1, &HistoryData{
Source: numEpochs,
SigningRoot: sr[:],
})
require.NoError(t, err)
newHist, err = newHist.SetLatestEpochWritten(ctx, numEpochs+1)
require.NoError(t, err)
err = validatorDB.update(func(tx *bolt.Tx) error {
bucket := tx.Bucket(historicAttestationsBucket)
enc := snappy.Encode(nil /*dst*/, newHist)
return bucket.Put(pubKey[:], enc)
})
require.NoError(t, err)
},
eval: func(t *testing.T, validatorDB *Store) {
// Verify we indeed have the data for all epochs
// since genesis to epoch 50+1 under the new schema.
err := validatorDB.view(func(tx *bolt.Tx) error {
pubKey := [48]byte{1}
bucket := tx.Bucket(pubKeysBucket)
pkBucket := bucket.Bucket(pubKey[:])
signingRootsBucket := pkBucket.Bucket(attestationSigningRootsBucket)
sourceEpochsBucket := pkBucket.Bucket(attestationSourceEpochsBucket)
numEpochs := uint64(50)
// Verify we have signing roots for target epochs 1 to 50 correctly.
for targetEpoch := uint64(1); targetEpoch <= numEpochs+1; targetEpoch++ {
var sr [32]byte
copy(sr[:], fmt.Sprintf("%d", targetEpoch))
targetEpochBytes := bytesutil.Uint64ToBytesBigEndian(targetEpoch)
migratedSigningRoot := signingRootsBucket.Get(targetEpochBytes)
require.DeepEqual(t, sr[:], migratedSigningRoot)
}
// Verify we have (source epoch, target epoch) pairs for epochs 0 to 50 correctly.
for sourceEpoch := uint64(0); sourceEpoch < numEpochs+1; sourceEpoch++ {
sourceEpochBytes := bytesutil.Uint64ToBytesBigEndian(sourceEpoch)
targetEpochBytes := sourceEpochsBucket.Get(sourceEpochBytes)
targetEpoch := bytesutil.BytesToUint64BigEndian(targetEpochBytes)
require.Equal(t, sourceEpoch+1, targetEpoch)
}
return nil
})
require.NoError(t, err)
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
validatorDB, err := setupDBWithoutMigration(t.TempDir())
require.NoError(t, err, "Failed to instantiate DB")
t.Cleanup(func() {
require.NoError(t, validatorDB.Close(), "Failed to close database")
require.NoError(t, validatorDB.ClearDB(), "Failed to clear database")
})
tt.setup(t, validatorDB)
require.NoError(t, validatorDB.migrateOptimalAttesterProtection(context.Background()))
tt.eval(t, validatorDB)
})
}
}
func setupDBWithoutMigration(dirPath string) (*Store, error) {
hasDir, err := fileutil.HasDir(dirPath)
if err != nil {
return nil, err
}
if !hasDir {
if err := fileutil.MkdirAll(dirPath); err != nil {
return nil, err
}
}
datafile := filepath.Join(dirPath, ProtectionDbFileName)
boltDB, err := bolt.Open(datafile, params.BeaconIoConfig().ReadWritePermissions, &bolt.Options{Timeout: params.BeaconIoConfig().BoltTimeout})
if err != nil {
if errors.Is(err, bolt.ErrTimeout) {
return nil, errors.New("cannot obtain database lock, database may be in use by another process")
}
return nil, err
}
kv := &Store{
db: boltDB,
databasePath: dirPath,
}
if err := kv.db.Update(func(tx *bolt.Tx) error {
return createBuckets(
tx,
genesisInfoBucket,
historicAttestationsBucket,
historicProposalsBucket,
lowestSignedSourceBucket,
lowestSignedTargetBucket,
lowestSignedProposalsBucket,
highestSignedProposalsBucket,
pubKeysBucket,
migrationsBucket,
)
}); err != nil {
return nil, err
}
return kv, prometheus.Register(createBoltCollector(kv.db))
}

View File

@@ -19,6 +19,7 @@ go_library(
"//validator/db/kv:go_default_library",
"@com_github_k0kubun_go_ansi//:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library",
"@com_github_schollz_progressbar_v3//:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
],
@@ -43,6 +44,7 @@ go_test(
"//shared/testutil/require:go_default_library",
"//validator/db/kv:go_default_library",
"//validator/db/testing:go_default_library",
"@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library",
"@com_github_sirupsen_logrus//hooks/test:go_default_library",
],
)

View File

@@ -38,3 +38,9 @@ type SignedBlock struct {
Slot string `json:"slot"`
SigningRoot string `json:"signing_root,omitempty"`
}
type historicalAttestation struct {
sourceEpoch uint64
targetEpoch uint64
signingRoot [32]byte
}

View File

@@ -9,6 +9,7 @@ import (
"io/ioutil"
"github.com/pkg/errors"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/hashutil"
"github.com/prysmaticlabs/prysm/validator/db"
@@ -49,7 +50,7 @@ func ImportStandardProtectionJSON(ctx context.Context, validatorDB db.Database,
return errors.Wrap(err, "could not parse unique entries for attestations by public key")
}
attestingHistoryByPubKey := make(map[[48]byte]kv.EncHistoryData)
attestingHistoryByPubKey := make(map[[48]byte][]*historicalAttestation)
proposalHistoryByPubKey := make(map[[48]byte]kv.ProposalHistoryForPubkey)
for pubKey, signedBlocks := range signedBlocksByPubKey {
// Transform the processed signed blocks data from the JSON
@@ -64,11 +65,11 @@ func ImportStandardProtectionJSON(ctx context.Context, validatorDB db.Database,
for pubKey, signedAtts := range signedAttsByPubKey {
// Transform the processed signed attestation data from the JSON
// file into the internal Prysm representation of attesting history.
attestingHistory, err := transformSignedAttestations(ctx, signedAtts)
historicalAtt, err := transformSignedAttestations(ctx, signedAtts)
if err != nil {
return errors.Wrapf(err, "could not parse signed attestations in JSON file for key %#x", pubKey)
}
attestingHistoryByPubKey[pubKey] = *attestingHistory
attestingHistoryByPubKey[pubKey] = historicalAtt
}
// We save the histories to disk as atomic operations, ensuring that this only occurs
@@ -92,12 +93,24 @@ func ImportStandardProtectionJSON(ctx context.Context, validatorDB db.Database,
len(attestingHistoryByPubKey),
"Importing attesting history for validator public keys",
)
for pubKey, history := range attestingHistoryByPubKey {
for pubKey, attestations := range attestingHistoryByPubKey {
if err := bar.Add(1); err != nil {
log.WithError(err).Debug("Could not increase progress bar")
}
if err := validatorDB.SaveAttestationHistoryForPubKeyV2(ctx, pubKey, history); err != nil {
return errors.Wrap(err, "could not save attesting history from imported JSON to database")
for _, att := range attestations {
indexedAtt := &ethpb.IndexedAttestation{
Data: &ethpb.AttestationData{
Source: &ethpb.Checkpoint{
Epoch: att.sourceEpoch,
},
Target: &ethpb.Checkpoint{
Epoch: att.targetEpoch,
},
},
}
if err := validatorDB.SaveAttestationForPubKey(ctx, pubKey, att.signingRoot, indexedAtt); err != nil {
return errors.Wrap(err, "could not save attestation from imported JSON to database")
}
}
}
return saveLowestSourceTargetToDB(ctx, validatorDB, signedAttsByPubKey)
@@ -222,19 +235,13 @@ func transformSignedBlocks(ctx context.Context, signedBlocks []*SignedBlock) (*k
}, nil
}
func transformSignedAttestations(ctx context.Context, atts []*SignedAttestation) (*kv.EncHistoryData, error) {
attestingHistory := kv.NewAttestationHistoryArray(0)
highestEpochWritten := uint64(0)
var err error
func transformSignedAttestations(ctx context.Context, atts []*SignedAttestation) ([]*historicalAttestation, error) {
historicalAtts := make([]*historicalAttestation, 0)
for _, attestation := range atts {
target, err := uint64FromString(attestation.TargetEpoch)
if err != nil {
return nil, fmt.Errorf("%d is not a valid epoch: %v", target, err)
}
// Keep track of the highest epoch written from the imported JSON.
if target > highestEpochWritten {
highestEpochWritten = target
}
source, err := uint64FromString(attestation.SourceEpoch)
if err != nil {
return nil, fmt.Errorf("%d is not a valid epoch: %v", source, err)
@@ -247,18 +254,13 @@ func transformSignedAttestations(ctx context.Context, atts []*SignedAttestation)
return nil, fmt.Errorf("%#x is not a valid root: %v", signingRoot, err)
}
}
attestingHistory, err = attestingHistory.SetTargetData(
ctx, target, &kv.HistoryData{Source: source, SigningRoot: signingRoot[:]},
)
if err != nil {
return nil, errors.Wrap(err, "could not set target data for attesting history")
}
historicalAtts = append(historicalAtts, &historicalAttestation{
sourceEpoch: source,
targetEpoch: target,
signingRoot: signingRoot,
})
}
attestingHistory, err = attestingHistory.SetLatestEpochWritten(ctx, highestEpochWritten)
if err != nil {
return nil, errors.Wrap(err, "could not set latest epoch written")
}
return &attestingHistory, nil
return historicalAtts, nil
}
// This saves the lowest source and target epoch from the individual validator to the DB.

View File

@@ -9,6 +9,7 @@ import (
"reflect"
"testing"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/shared/bls"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/hashutil"
@@ -74,15 +75,22 @@ func TestStore_ImportInterchangeData_BadFormat_PreventsDBWrites(t *testing.T) {
// sure writing is an atomic operation: either the import succeeds and saves the slashing protection
// data to our DB, or it does not.
for i := 0; i < len(publicKeys); i++ {
receivedAttestingHistory, err := validatorDB.AttestationHistoryForPubKeyV2(ctx, publicKeys[i])
require.NoError(t, err)
defaultAttestingHistory := kv.NewAttestationHistoryArray(0)
require.DeepEqual(
t,
defaultAttestingHistory,
receivedAttestingHistory,
"Imported attestation protection history is different than the empty default",
)
for _, att := range attestingHistory[i] {
indexedAtt := &ethpb.IndexedAttestation{
Data: &ethpb.AttestationData{
Source: &ethpb.Checkpoint{
Epoch: att.sourceEpoch,
},
Target: &ethpb.Checkpoint{
Epoch: att.targetEpoch,
},
},
}
slashingKind, err := validatorDB.CheckSlashableAttestation(ctx, publicKeys[i], [32]byte{}, indexedAtt)
// We expect we do not have an attesting history for each attestation
require.NoError(t, err)
require.Equal(t, kv.NotSlashable, slashingKind)
}
proposals := proposalHistory[i].Proposals
for _, proposal := range proposals {
receivedProposalSigningRoot, _, err := validatorDB.ProposalHistoryForSlot(ctx, publicKeys[i], proposal.Slot)
@@ -120,14 +128,25 @@ func TestStore_ImportInterchangeData_OK(t *testing.T) {
// Next, we attempt to retrieve the attesting and proposals histories from our database and
// verify those indeed match the originally generated mock histories.
for i := 0; i < len(publicKeys); i++ {
receivedAttestingHistory, err := validatorDB.AttestationHistoryForPubKeyV2(ctx, publicKeys[i])
require.NoError(t, err)
require.DeepEqual(
t,
attestingHistory[i],
receivedAttestingHistory,
"We should have stored any attesting history",
)
for _, att := range attestingHistory[i] {
indexedAtt := &ethpb.IndexedAttestation{
Data: &ethpb.AttestationData{
Source: &ethpb.Checkpoint{
Epoch: att.sourceEpoch,
},
Target: &ethpb.Checkpoint{
Epoch: att.targetEpoch,
},
},
}
slashingKind, err := validatorDB.CheckSlashableAttestation(ctx, publicKeys[i], [32]byte{}, indexedAtt)
// We expect we have an attesting history for the attestation and when
// attempting to verify the same att is slashable with a different signing root,
// we expect to receive a double vote slashing kind.
require.NotNil(t, err)
require.Equal(t, kv.DoubleVote, slashingKind)
}
proposals := proposalHistory[i].Proposals
for _, proposal := range proposals {
receivedProposalSigningRoot, _, err := validatorDB.ProposalHistoryForSlot(ctx, publicKeys[i], proposal.Slot)
@@ -772,63 +791,53 @@ func Test_saveLowestSourceTargetToDBt_Ok(t *testing.T) {
func mockSlashingProtectionJSON(
t *testing.T,
publicKeys [][48]byte,
attestingHistories []kv.EncHistoryData,
attestingHistories [][]*historicalAttestation,
proposalHistories []kv.ProposalHistoryForPubkey,
) *EIPSlashingProtectionFormat {
standardProtectionFormat := &EIPSlashingProtectionFormat{}
standardProtectionFormat.Metadata.GenesisValidatorsRoot = fmt.Sprintf("%#x", bytesutil.PadTo([]byte{32}, 32))
standardProtectionFormat.Metadata.InterchangeFormatVersion = INTERCHANGE_FORMAT_VERSION
ctx := context.Background()
for i := 0; i < len(publicKeys); i++ {
data := &ProtectionData{
Pubkey: fmt.Sprintf("%#x", publicKeys[i]),
}
highestEpochWritten, err := attestingHistories[i].GetLatestEpochWritten(ctx)
require.NoError(t, err)
for target := uint64(0); target <= highestEpochWritten; target++ {
hd, err := attestingHistories[i].GetTargetData(ctx, target)
require.NoError(t, err)
for _, att := range attestingHistories[i] {
data.SignedAttestations = append(data.SignedAttestations, &SignedAttestation{
TargetEpoch: fmt.Sprintf("%d", target),
SourceEpoch: fmt.Sprintf("%d", hd.Source),
SigningRoot: fmt.Sprintf("%#x", hd.SigningRoot),
TargetEpoch: fmt.Sprintf("%d", att.targetEpoch),
SourceEpoch: fmt.Sprintf("%d", att.sourceEpoch),
SigningRoot: fmt.Sprintf("%#x", att.signingRoot),
})
}
for target := uint64(0); target < highestEpochWritten; target++ {
proposal := proposalHistories[i].Proposals[target]
for _, proposal := range proposalHistories[i].Proposals {
block := &SignedBlock{
Slot: fmt.Sprintf("%d", proposal.Slot),
SigningRoot: fmt.Sprintf("%#x", proposal.SigningRoot),
}
data.SignedBlocks = append(data.SignedBlocks, block)
}
standardProtectionFormat.Data = append(standardProtectionFormat.Data, data)
}
return standardProtectionFormat
}
func mockAttestingAndProposalHistories(t *testing.T, numValidators int) ([]kv.EncHistoryData, []kv.ProposalHistoryForPubkey) {
func mockAttestingAndProposalHistories(t *testing.T, numValidators int) ([][]*historicalAttestation, []kv.ProposalHistoryForPubkey) {
// deduplicate and transform them into our internal format.
attData := make([]kv.EncHistoryData, numValidators)
attData := make([][]*historicalAttestation, numValidators)
proposalData := make([]kv.ProposalHistoryForPubkey, numValidators)
gen := rand.NewGenerator()
ctx := context.Background()
for v := 0; v < numValidators; v++ {
var err error
latestTarget := gen.Intn(int(params.BeaconConfig().WeakSubjectivityPeriod) / 1000)
hd := kv.NewAttestationHistoryArray(uint64(latestTarget))
historicalAtts := make([]*historicalAttestation, 0)
proposals := make([]kv.Proposal, 0)
for i := 1; i < latestTarget; i++ {
signingRoot := [32]byte{}
signingRootStr := fmt.Sprintf("%d", i)
copy(signingRoot[:], signingRootStr)
historyData := &kv.HistoryData{
Source: uint64(gen.Intn(100000)),
SigningRoot: signingRoot[:],
}
hd, err = hd.SetTargetData(ctx, uint64(i), historyData)
require.NoError(t, err)
historicalAtts = append(historicalAtts, &historicalAttestation{
sourceEpoch: uint64(gen.Intn(100000)),
targetEpoch: uint64(i),
signingRoot: signingRoot,
})
}
for i := 1; i <= latestTarget; i++ {
signingRoot := [32]byte{}
@@ -840,9 +849,7 @@ func mockAttestingAndProposalHistories(t *testing.T, numValidators int) ([]kv.En
})
}
proposalData[v] = kv.ProposalHistoryForPubkey{Proposals: proposals}
hd, err = hd.SetLatestEpochWritten(ctx, uint64(latestTarget))
require.NoError(t, err)
attData[v] = hd
attData[v] = historicalAtts
}
return attData, proposalData
}