Stop Early in Validator Surround Vote Local Protection (#8460)

* include migration and logic for stopping early in slashing protection checks

* remove commented code

* extract methods

* migration logic tested up

* migration up and down tests

* Update validator/db/kv/attester_protection.go

Co-authored-by: terence tsao <terence@prysmaticlabs.com>

* added in pruning and batched migrations

Co-authored-by: Victor Farazdagi <simple.square@gmail.com>
Co-authored-by: terence tsao <terence@prysmaticlabs.com>
This commit is contained in:
Raul Jordan
2021-02-17 13:23:59 -06:00
committed by GitHub
parent 6391dec5de
commit 4f9752bb3e
9 changed files with 540 additions and 47 deletions

View File

@@ -13,6 +13,7 @@ go_library(
"log.go",
"migration.go",
"migration_optimal_attester_protection.go",
"migration_source_target_epochs_bucket.go",
"proposer_protection.go",
"prune_attester_protection.go",
"schema.go",
@@ -50,6 +51,7 @@ go_test(
"eip_blacklisted_keys_test.go",
"genesis_test.go",
"migration_optimal_attester_protection_test.go",
"migration_source_target_epochs_bucket_test.go",
"proposer_protection_test.go",
"prune_attester_protection_test.go",
],

View File

@@ -163,64 +163,113 @@ func (s *Store) CheckSlashableAttestation(
}
sourceEpochsBucket := pkBucket.Bucket(attestationSourceEpochsBucket)
targetEpochsBucket := pkBucket.Bucket(attestationTargetEpochsBucket)
if sourceEpochsBucket == nil {
return nil
}
// Check for surround votes.
return sourceEpochsBucket.ForEach(func(sourceEpochBytes []byte, targetEpochsBytes []byte) error {
if ctx.Err() != nil {
return ctx.Err()
}
existingSourceEpoch := bytesutil.BytesToEpochBigEndian(sourceEpochBytes)
// There can be multiple target epochs attested per source epoch.
attestedTargetEpochs := make([]types.Epoch, 0, len(targetEpochsBytes)/8)
for i := 0; i < len(targetEpochsBytes); i += 8 {
targetEpoch := bytesutil.BytesToEpochBigEndian(targetEpochsBytes[i : i+8])
attestedTargetEpochs = append(attestedTargetEpochs, targetEpoch)
}
for _, existingTargetEpoch := range attestedTargetEpochs {
existingAtt := &ethpb.IndexedAttestation{
Data: &ethpb.AttestationData{
Source: &ethpb.Checkpoint{Epoch: existingSourceEpoch},
Target: &ethpb.Checkpoint{Epoch: existingTargetEpoch},
},
}
// Checks if the incoming attestation is surrounding or
// is surrounded by an existing one.
surrounding := slashutil.IsSurround(att, existingAtt)
surrounded := slashutil.IsSurround(existingAtt, att)
if surrounding {
slashKind = SurroundingVote
return fmt.Errorf(
surroundingVoteMessage,
att.Data.Source.Epoch,
att.Data.Target.Epoch,
existingSourceEpoch,
existingTargetEpoch,
)
}
if surrounded {
slashKind = SurroundedVote
return fmt.Errorf(
surroundedVoteMessage,
att.Data.Source.Epoch,
att.Data.Target.Epoch,
existingSourceEpoch,
existingTargetEpoch,
)
}
}
// Is this attestation surrounding any other?
var err error
slashKind, err = s.checkSurroundingVote(sourceEpochsBucket, att)
if err != nil {
return err
}
if targetEpochsBucket == nil {
return nil
})
}
// Is this attestation surrounded by any other?
slashKind, err = s.checkSurroundedVote(targetEpochsBucket, att)
if err != nil {
return err
}
return nil
})
traceutil.AnnotateError(span, err)
return slashKind, err
}
// Iterate from the back of the bucket since we are looking for target_epoch > att.target_epoch
func (s *Store) checkSurroundedVote(
targetEpochsBucket *bolt.Bucket, att *ethpb.IndexedAttestation,
) (SlashingKind, error) {
c := targetEpochsBucket.Cursor()
for k, v := c.Last(); k != nil; k, v = c.Prev() {
existingTargetEpoch := bytesutil.BytesToEpochBigEndian(k)
if existingTargetEpoch <= att.Data.Target.Epoch {
break
}
// There can be multiple source epochs attested per target epoch.
attestedSourceEpochs := make([]types.Epoch, 0, len(v)/8)
for i := 0; i < len(v); i += 8 {
sourceEpoch := bytesutil.BytesToEpochBigEndian(v[i : i+8])
attestedSourceEpochs = append(attestedSourceEpochs, sourceEpoch)
}
for _, existingSourceEpoch := range attestedSourceEpochs {
existingAtt := &ethpb.IndexedAttestation{
Data: &ethpb.AttestationData{
Source: &ethpb.Checkpoint{Epoch: existingSourceEpoch},
Target: &ethpb.Checkpoint{Epoch: existingTargetEpoch},
},
}
surrounded := slashutil.IsSurround(existingAtt, att)
if surrounded {
return SurroundedVote, fmt.Errorf(
surroundedVoteMessage,
att.Data.Source.Epoch,
att.Data.Target.Epoch,
existingSourceEpoch,
existingTargetEpoch,
)
}
}
}
return NotSlashable, nil
}
// Iterate from the back of the bucket since we are looking for source_epoch > att.source_epoch
func (s *Store) checkSurroundingVote(
sourceEpochsBucket *bolt.Bucket, att *ethpb.IndexedAttestation,
) (SlashingKind, error) {
c := sourceEpochsBucket.Cursor()
for k, v := c.Last(); k != nil; k, v = c.Prev() {
existingSourceEpoch := bytesutil.BytesToEpochBigEndian(k)
if existingSourceEpoch <= att.Data.Source.Epoch {
break
}
// There can be multiple target epochs attested per source epoch.
attestedTargetEpochs := make([]types.Epoch, 0, len(v)/8)
for i := 0; i < len(v); i += 8 {
targetEpoch := bytesutil.BytesToEpochBigEndian(v[i : i+8])
attestedTargetEpochs = append(attestedTargetEpochs, targetEpoch)
}
for _, existingTargetEpoch := range attestedTargetEpochs {
existingAtt := &ethpb.IndexedAttestation{
Data: &ethpb.AttestationData{
Source: &ethpb.Checkpoint{Epoch: existingSourceEpoch},
Target: &ethpb.Checkpoint{Epoch: existingTargetEpoch},
},
}
surrounding := slashutil.IsSurround(att, existingAtt)
if surrounding {
return SurroundingVote, fmt.Errorf(
surroundingVoteMessage,
att.Data.Source.Epoch,
att.Data.Target.Epoch,
existingSourceEpoch,
existingTargetEpoch,
)
}
}
}
return NotSlashable, nil
}
// SaveAttestationsForPubKey stores a batch of attestations all at once.
func (s *Store) SaveAttestationsForPubKey(
ctx context.Context, pubKey [48]byte, signingRoots [][32]byte, atts []*ethpb.IndexedAttestation,
@@ -383,6 +432,22 @@ func (s *Store) saveAttestationRecords(ctx context.Context, atts []*AttestationR
if err := sourceEpochsBucket.Put(sourceEpochBytes, existingAttestedTargetsBytes); err != nil {
return errors.Wrapf(err, "could not save source epoch %d for epoch %d", att.Source, att.Target)
}
targetEpochsBucket, err := pkBucket.CreateBucketIfNotExists(attestationTargetEpochsBucket)
if err != nil {
return errors.Wrap(err, "could not create target epochs bucket")
}
var existingAttestedSourceBytes []byte
if existing := targetEpochsBucket.Get(targetEpochBytes); existing != nil {
existingAttestedSourceBytes = append(existing, sourceEpochBytes...)
} else {
existingAttestedSourceBytes = sourceEpochBytes
}
if err := targetEpochsBucket.Put(targetEpochBytes, existingAttestedSourceBytes); err != nil {
return errors.Wrapf(err, "could not save target epoch %d for epoch %d", att.Target, att.Source)
}
// Initialize buckets for the lowest target and source epochs.
lowestSourceBucket, err := tx.CreateBucketIfNotExists(lowestSignedSourceBucket)
if err != nil {

View File

@@ -46,6 +46,7 @@ var blockedBuckets = [][]byte{
pubKeysBucket,
attestationSigningRootsBucket,
attestationSourceEpochsBucket,
attestationTargetEpochsBucket,
}
type Config struct {

View File

@@ -20,6 +20,9 @@ func (s *Store) RunUpMigrations(ctx context.Context) error {
if err := s.migrateOptimalAttesterProtectionUp(ctx); err != nil {
return err
}
if err := s.migrateSourceTargetEpochsBucketUp(ctx); err != nil {
return err
}
for _, m := range upMigrations {
if ctx.Err() != nil {
@@ -39,6 +42,9 @@ func (s *Store) RunDownMigrations(ctx context.Context) error {
if err := s.migrateOptimalAttesterProtectionDown(ctx); err != nil {
return err
}
if err := s.migrateSourceTargetEpochsBucketDown(ctx); err != nil {
return err
}
for _, m := range downMigrations {
if ctx.Err() != nil {

View File

@@ -0,0 +1,139 @@
package kv
import (
"bytes"
"context"
"github.com/prysmaticlabs/prysm/shared/progressutil"
bolt "go.etcd.io/bbolt"
)
var (
migrationSourceTargetEpochsBucketKey = []byte("source_target_epochs_bucket_0")
)
const (
publicKeyMigrationBatchSize = 100 // Batch update 100 public keys at a time.
)
func (s *Store) migrateSourceTargetEpochsBucketUp(ctx context.Context) error {
// First, we extract the public keys we need to migrate data for.
publicKeyBytes := make([][]byte, 0)
err := s.db.View(func(tx *bolt.Tx) error {
mb := tx.Bucket(migrationsBucket)
if b := mb.Get(migrationSourceTargetEpochsBucketKey); bytes.Equal(b, migrationCompleted) {
return nil // Migration already completed.
}
bkt := tx.Bucket(pubKeysBucket)
return bkt.ForEach(func(k, _ []byte) error {
if k == nil {
return nil
}
nk := make([]byte, len(k))
copy(nk, k)
publicKeyBytes = append(publicKeyBytes, nk)
return nil
})
})
if err != nil {
return err
}
// Next up, we initiate a bolt transaction for batches of public keys for efficiency.
// If we did a single transaction for all public keys, resource use might be too high,
// and if we do a single one per key, the migration will take too long.
batchedKeys := batchPublicKeys(publicKeyBytes, publicKeyMigrationBatchSize)
bar := progressutil.InitializeProgressBar(
len(batchedKeys), "Adding optimizations for validator slashing protection",
)
for _, batch := range batchedKeys {
err = s.db.Update(func(tx *bolt.Tx) error {
bkt := tx.Bucket(pubKeysBucket)
for _, pubKey := range batch {
pkb := bkt.Bucket(pubKey)
sourceBucket := pkb.Bucket(attestationSourceEpochsBucket)
if sourceBucket == nil {
return nil
}
targetBucket, err := pkb.CreateBucketIfNotExists(attestationTargetEpochsBucket)
if err != nil {
return err
}
err = sourceBucket.ForEach(func(sourceEpochBytes, targetEpochsBytes []byte) error {
for i := 0; i < len(targetEpochsBytes); i += 8 {
if err := insertTargetSource(
targetBucket,
targetEpochsBytes[i:i+8],
sourceEpochBytes,
); err != nil {
return err
}
}
return nil
})
if err != nil {
return err
}
}
return nil
})
if err != nil {
return err
}
if err := bar.Add(1); err != nil {
return err
}
}
// Finally we mark the migration as completed.
return s.db.Update(func(tx *bolt.Tx) error {
mb := tx.Bucket(migrationsBucket)
return mb.Put(migrationSourceTargetEpochsBucketKey, migrationCompleted)
})
}
func (s *Store) migrateSourceTargetEpochsBucketDown(ctx context.Context) error {
return s.db.Update(func(tx *bolt.Tx) error {
bkt := tx.Bucket(pubKeysBucket)
err := bkt.ForEach(func(k, _ []byte) error {
if k == nil {
return nil
}
pkBucket := bkt.Bucket(k)
if pkBucket == nil {
return nil
}
return pkBucket.DeleteBucket(attestationTargetEpochsBucket)
})
if err != nil {
return err
}
migrationsBkt := tx.Bucket(migrationsBucket)
return migrationsBkt.Delete(migrationSourceTargetEpochsBucketKey)
})
}
func insertTargetSource(bkt *bolt.Bucket, targetEpochBytes, sourceEpochBytes []byte) error {
var existingAttestedSourceBytes []byte
if existing := bkt.Get(targetEpochBytes); existing != nil {
existingAttestedSourceBytes = append(existing, sourceEpochBytes...)
} else {
existingAttestedSourceBytes = sourceEpochBytes
}
return bkt.Put(targetEpochBytes, existingAttestedSourceBytes)
}
func batchPublicKeys(publicKeys [][]byte, batchSize int) [][][]byte {
if len(publicKeys) < batchSize {
return [][][]byte{publicKeys}
}
batch := make([][][]byte, 0)
for i := 0; i < len(publicKeys); i += batchSize {
if i+batchSize == len(publicKeys)+1 {
batch = append(batch, publicKeys[i:])
} else {
batch = append(batch, publicKeys[i:i+batchSize])
}
}
return batch
}

View File

@@ -0,0 +1,247 @@
package kv
import (
"context"
"errors"
"fmt"
"reflect"
"testing"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/testutil/require"
bolt "go.etcd.io/bbolt"
)
func TestStore_migrateSourceTargetEpochsBucketUp(t *testing.T) {
numEpochs := uint64(100)
numKeys := 50
pubKeys := make([][48]byte, numKeys)
for i := 0; i < numKeys; i++ {
var pk [48]byte
copy(pk[:], fmt.Sprintf("%d", i))
pubKeys[i] = pk
}
tests := []struct {
name string
setup func(t *testing.T, validatorDB *Store)
eval func(t *testing.T, validatorDB *Store)
}{
{
name: "only runs once",
setup: func(t *testing.T, validatorDB *Store) {
err := validatorDB.update(func(tx *bolt.Tx) error {
return tx.Bucket(migrationsBucket).Put(migrationSourceTargetEpochsBucketKey, migrationCompleted)
})
require.NoError(t, err)
},
eval: func(t *testing.T, validatorDB *Store) {
err := validatorDB.view(func(tx *bolt.Tx) error {
data := tx.Bucket(migrationsBucket).Get(migrationSourceTargetEpochsBucketKey)
require.DeepEqual(t, data, migrationCompleted)
return nil
})
require.NoError(t, err)
},
},
{
name: "populates new target epochs bucket",
setup: func(t *testing.T, validatorDB *Store) {
err := validatorDB.update(func(tx *bolt.Tx) error {
bucket := tx.Bucket(pubKeysBucket)
for _, pubKey := range pubKeys {
pkBucket, err := bucket.CreateBucketIfNotExists(pubKey[:])
if err != nil {
return err
}
sourceEpochsBucket, err := pkBucket.CreateBucketIfNotExists(attestationSourceEpochsBucket)
if err != nil {
return err
}
for epoch := uint64(1); epoch < numEpochs; epoch++ {
source := epoch - 1
target := epoch
sourceEpoch := bytesutil.Uint64ToBytesBigEndian(source)
targetEpoch := bytesutil.Uint64ToBytesBigEndian(target)
if err := sourceEpochsBucket.Put(sourceEpoch, targetEpoch); err != nil {
return err
}
}
}
return nil
})
require.NoError(t, err)
},
eval: func(t *testing.T, validatorDB *Store) {
// Verify we indeed have the data for all epochs
// since genesis to epoch 50 under the new schema.
err := validatorDB.view(func(tx *bolt.Tx) error {
bucket := tx.Bucket(pubKeysBucket)
for _, pubKey := range pubKeys {
pkBucket := bucket.Bucket(pubKey[:])
sourceEpochsBucket := pkBucket.Bucket(attestationSourceEpochsBucket)
targetEpochsBucket := pkBucket.Bucket(attestationTargetEpochsBucket)
// Verify we have (source epoch, target epoch) pairs.
for sourceEpoch := uint64(0); sourceEpoch < numEpochs-1; sourceEpoch++ {
sourceEpochBytes := bytesutil.Uint64ToBytesBigEndian(sourceEpoch)
targetEpochBytes := sourceEpochsBucket.Get(sourceEpochBytes)
targetEpoch := bytesutil.BytesToUint64BigEndian(targetEpochBytes)
require.Equal(t, sourceEpoch+1, targetEpoch)
}
// Verify we have (target epoch, source epoch) pairs.
for targetEpoch := uint64(1); targetEpoch < numEpochs; targetEpoch++ {
targetEpochBytes := bytesutil.Uint64ToBytesBigEndian(targetEpoch)
sourceEpochBytes := targetEpochsBucket.Get(targetEpochBytes)
sourceEpoch := bytesutil.BytesToUint64BigEndian(sourceEpochBytes)
require.Equal(t, targetEpoch-1, sourceEpoch)
}
}
return nil
})
require.NoError(t, err)
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
validatorDB := setupDB(t, pubKeys)
tt.setup(t, validatorDB)
require.NoError(t, validatorDB.migrateSourceTargetEpochsBucketUp(context.Background()))
tt.eval(t, validatorDB)
})
}
}
func TestStore_migrateSourceTargetEpochsBucketDown(t *testing.T) {
numKeys := 50
pubKeys := make([][48]byte, numKeys)
for i := 0; i < numKeys; i++ {
var pk [48]byte
copy(pk[:], fmt.Sprintf("%d", i))
pubKeys[i] = pk
}
tests := []struct {
name string
setup func(t *testing.T, validatorDB *Store)
eval func(t *testing.T, validatorDB *Store)
}{
{
name: "unsets the migration completed key upon completion",
setup: func(t *testing.T, validatorDB *Store) {
err := validatorDB.update(func(tx *bolt.Tx) error {
return tx.Bucket(migrationsBucket).Put(migrationSourceTargetEpochsBucketKey, migrationCompleted)
})
require.NoError(t, err)
},
eval: func(t *testing.T, validatorDB *Store) {
err := validatorDB.view(func(tx *bolt.Tx) error {
data := tx.Bucket(migrationsBucket).Get(migrationSourceTargetEpochsBucketKey)
require.DeepEqual(t, true, data == nil)
return nil
})
require.NoError(t, err)
},
},
{
name: "unsets the migration, even if unset already (no panic)",
setup: func(t *testing.T, validatorDB *Store) {},
eval: func(t *testing.T, validatorDB *Store) {
// Ensure the migration is not marked as complete.
err := validatorDB.view(func(tx *bolt.Tx) error {
data := tx.Bucket(migrationsBucket).Get(migrationSourceTargetEpochsBucketKey)
require.DeepNotEqual(t, data, migrationCompleted)
return nil
})
require.NoError(t, err)
},
},
{
name: "deletes the new bucket that was created in the up migration",
setup: func(t *testing.T, validatorDB *Store) {
err := validatorDB.update(func(tx *bolt.Tx) error {
bucket := tx.Bucket(pubKeysBucket)
for _, pubKey := range pubKeys {
pkBucket, err := bucket.CreateBucketIfNotExists(pubKey[:])
if err != nil {
return err
}
if _, err := pkBucket.CreateBucketIfNotExists(attestationSourceEpochsBucket); err != nil {
return err
}
if _, err := pkBucket.CreateBucketIfNotExists(attestationTargetEpochsBucket); err != nil {
return err
}
}
return nil
})
require.NoError(t, err)
},
eval: func(t *testing.T, validatorDB *Store) {
err := validatorDB.view(func(tx *bolt.Tx) error {
bucket := tx.Bucket(pubKeysBucket)
for _, pubKey := range pubKeys {
pkBucket := bucket.Bucket(pubKey[:])
if pkBucket == nil {
return errors.New("expected pubkey bucket to exist")
}
targetEpochsBucket := pkBucket.Bucket(attestationTargetEpochsBucket)
if targetEpochsBucket != nil {
return errors.New("expected target epochs bucket to have been deleted")
}
}
return nil
})
require.NoError(t, err)
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
validatorDB := setupDB(t, nil)
tt.setup(t, validatorDB)
require.NoError(t, validatorDB.migrateSourceTargetEpochsBucketDown(context.Background()))
tt.eval(t, validatorDB)
})
}
}
func Test_batchPublicKeys(t *testing.T) {
tests := []struct {
name string
batchSize int
publicKeys [][]byte
want [][][]byte
}{
{
name: "less than batch size returns all keys",
batchSize: 100,
publicKeys: [][]byte{{1}, {2}, {3}},
want: [][][]byte{{{1}, {2}, {3}}},
},
{
name: "equals batch size returns all keys",
batchSize: 3,
publicKeys: [][]byte{{1}, {2}, {3}},
want: [][][]byte{{{1}, {2}, {3}}},
},
{
name: "> batch size returns proper batches",
batchSize: 2,
publicKeys: [][]byte{{1}, {2}, {3}},
want: [][][]byte{{{1}, {2}}, {{3}}},
},
{
name: "equal size batches returns proper batches",
batchSize: 2,
publicKeys: [][]byte{{1}, {2}, {3}, {4}},
want: [][][]byte{{{1}, {2}}, {{3}, {4}}},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := batchPublicKeys(tt.publicKeys, tt.batchSize); !reflect.DeepEqual(got, tt.want) {
t.Errorf("batchPublicKeys() = %v, want %v", got, tt.want)
}
})
}
}

View File

@@ -27,6 +27,9 @@ func (s *Store) PruneAttestationsOlderThanCurrentWeakSubjectivity(ctx context.Co
if err := pruneSourceEpochsBucket(pkBucket); err != nil {
return err
}
if err := pruneTargetEpochsBucket(pkBucket); err != nil {
return err
}
return pruneSigningRootsBucket(pkBucket)
})
})
@@ -64,6 +67,31 @@ func pruneSourceEpochsBucket(bucket *bolt.Bucket) error {
})
}
func pruneTargetEpochsBucket(bucket *bolt.Bucket) error {
wssPeriod := params.BeaconConfig().WeakSubjectivityPeriod
targetEpochsBucket := bucket.Bucket(attestationTargetEpochsBucket)
if targetEpochsBucket == nil {
return nil
}
// We obtain the highest target epoch from the bucket.
highestTargetEpochBytes, _ := targetEpochsBucket.Cursor().Last()
highestTargetEpoch := bytesutil.BytesToEpochBigEndian(highestTargetEpochBytes)
// No need to prune if the highest epoch we've written is still
// before the first weak subjectivity period.
if highestTargetEpoch < wssPeriod {
return nil
}
c := targetEpochsBucket.Cursor()
for k, _ := c.First(); k != nil; k, _ = c.Next() {
targetEpoch := bytesutil.BytesToEpochBigEndian(k)
if olderThanCurrentWeakSubjectivityPeriod(targetEpoch, highestTargetEpoch) {
return targetEpochsBucket.Delete(k)
}
}
return nil
}
func pruneSigningRootsBucket(bucket *bolt.Bucket) error {
wssPeriod := params.BeaconConfig().WeakSubjectivityPeriod
signingRootsBucket := bucket.Bucket(attestationSigningRootsBucket)

View File

@@ -26,6 +26,7 @@ var (
pubKeysBucket = []byte("pubkeys-bucket")
attestationSigningRootsBucket = []byte("att-signing-roots-bucket")
attestationSourceEpochsBucket = []byte("att-source-epochs-bucket")
attestationTargetEpochsBucket = []byte("att-target-epochs-bucket")
// Migrations
migrationsBucket = []byte("migrations")

View File

@@ -19,10 +19,12 @@ func migrateUp(cliCtx *cli.Context) error {
}
ctx := context.Background()
log.Info("Opening DB")
validatorDB, err := kv.NewKVStore(ctx, dataDir, &kv.Config{})
if err != nil {
return err
}
log.Info("Running migrations")
return validatorDB.RunUpMigrations(ctx)
}
@@ -34,9 +36,11 @@ func migrateDown(cliCtx *cli.Context) error {
}
ctx := context.Background()
log.Info("Opening DB")
validatorDB, err := kv.NewKVStore(ctx, dataDir, &kv.Config{})
if err != nil {
return err
}
log.Info("Running migrations")
return validatorDB.RunDownMigrations(ctx)
}