Slasher Significant Optimizations (#9833)

* optimizations to slasher runtime

* remove unnecessary code

* test for epoch update

* commentary

* Gaz

* fmt

* amend test

* better logging

* better logs

* log

* div 0

* more logging

* no log

* use map instead

* passing

* comments

* passing

* for select loop wait for init

* sub

* srv

* debug

* fix panic

* gaz

* builds

* sim gen

* ineff

* commentary

* data

* log

* base

* try

* rem logs

* sim logs

* fix wait for sync event

* ev

* init

* init

* Update beacon-chain/slasher/service.go

Co-authored-by: Preston Van Loon <preston@prysmaticlabs.com>

* comments

* elapsed

* Update testing/slasher/simulator/simulator.go

Co-authored-by: Preston Van Loon <preston@prysmaticlabs.com>

* timeout

* inner cancel

* ctx err everywhere

* Add context aware to several potentially long running db operations

* Fix missing param after updating with develop

Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
Co-authored-by: Preston Van Loon <preston@prysmaticlabs.com>
This commit is contained in:
Raul Jordan
2021-12-06 16:45:38 -05:00
committed by GitHub
parent e53be1acbe
commit 98fea2e94d
17 changed files with 411 additions and 310 deletions

View File

@@ -97,8 +97,8 @@ type HeadAccessDatabase interface {
// SlasherDatabase interface for persisting data related to detecting slashable offenses on Ethereum.
type SlasherDatabase interface {
io.Closer
SaveLastEpochWrittenForValidators(
ctx context.Context, validatorIndices []types.ValidatorIndex, epoch types.Epoch,
SaveLastEpochsWrittenForValidators(
ctx context.Context, epochByValidator map[types.ValidatorIndex]types.Epoch,
) error
SaveAttestationRecordsForValidators(
ctx context.Context,

View File

@@ -85,7 +85,7 @@ func (s *Store) PruneAttestationsAtEpoch(
// PruneProposalsAtEpoch deletes all proposals from the slasher DB with epoch
// less than or equal to the specified epoch.
func (s *Store) PruneProposalsAtEpoch(
_ context.Context, maxEpoch types.Epoch,
ctx context.Context, maxEpoch types.Epoch,
) (numPruned uint, err error) {
var endPruneSlot types.Slot
endPruneSlot, err = slots.EpochEnd(maxEpoch)
@@ -128,6 +128,9 @@ func (s *Store) PruneProposalsAtEpoch(
c := proposalBkt.Cursor()
// We begin a pruning iteration starting from the first item in the bucket.
for k, _ := c.First(); k != nil; k, _ = c.Next() {
if ctx.Err() != nil {
return ctx.Err()
}
// We check the slot from the current key in the database.
// If we have hit a slot that is greater than the end slot of the pruning process,
// we then completely exit the process as we are done.

View File

@@ -42,47 +42,73 @@ func (s *Store) LastEpochWrittenForValidators(
err := s.db.View(func(tx *bolt.Tx) error {
bkt := tx.Bucket(attestedEpochsByValidator)
for i, encodedIndex := range encodedIndices {
var epoch types.Epoch
epochBytes := bkt.Get(encodedIndex)
if epochBytes != nil {
var epoch types.Epoch
if err := epoch.UnmarshalSSZ(epochBytes); err != nil {
return err
}
attestedEpochs = append(attestedEpochs, &slashertypes.AttestedEpochForValidator{
ValidatorIndex: validatorIndices[i],
Epoch: epoch,
})
}
attestedEpochs = append(attestedEpochs, &slashertypes.AttestedEpochForValidator{
ValidatorIndex: validatorIndices[i],
Epoch: epoch,
})
}
return nil
})
return attestedEpochs, err
}
// SaveLastEpochWrittenForValidators updates the latest epoch a slice
// SaveLastEpochsWrittenForValidators updates the latest epoch a slice
// of validator indices has attested to.
func (s *Store) SaveLastEpochWrittenForValidators(
ctx context.Context, validatorIndices []types.ValidatorIndex, epoch types.Epoch,
func (s *Store) SaveLastEpochsWrittenForValidators(
ctx context.Context, epochByValidator map[types.ValidatorIndex]types.Epoch,
) error {
ctx, span := trace.StartSpan(ctx, "BeaconDB.SaveLastEpochWrittenForValidators")
ctx, span := trace.StartSpan(ctx, "BeaconDB.SaveLastEpochsWrittenForValidators")
defer span.End()
encodedIndices := make([][]byte, len(validatorIndices))
for i, valIdx := range validatorIndices {
encodedIndices[i] = encodeValidatorIndex(valIdx)
}
encodedEpoch, err := epoch.MarshalSSZ()
if err != nil {
return err
}
return s.db.Update(func(tx *bolt.Tx) error {
bkt := tx.Bucket(attestedEpochsByValidator)
for _, encodedIndex := range encodedIndices {
if err = bkt.Put(encodedIndex, encodedEpoch); err != nil {
return err
}
encodedIndices := make([][]byte, 0, len(epochByValidator))
encodedEpochs := make([][]byte, 0, len(epochByValidator))
for valIdx, epoch := range epochByValidator {
if ctx.Err() != nil {
return ctx.Err()
}
return nil
})
encodedEpoch, err := epoch.MarshalSSZ()
if err != nil {
return err
}
encodedIndices = append(encodedIndices, encodeValidatorIndex(valIdx))
encodedEpochs = append(encodedEpochs, encodedEpoch)
}
// The list of validators might be too massive for boltdb to handle in a single transaction,
// so instead we split it into batches and write each batch.
batchSize := 10000
for i := 0; i < len(encodedIndices); i += batchSize {
if ctx.Err() != nil {
return ctx.Err()
}
if err := s.db.Update(func(tx *bolt.Tx) error {
if ctx.Err() != nil {
return ctx.Err()
}
bkt := tx.Bucket(attestedEpochsByValidator)
min := i + batchSize
if min > len(encodedIndices) {
min = len(encodedIndices)
}
for j, encodedIndex := range encodedIndices[i:min] {
if ctx.Err() != nil {
return ctx.Err()
}
if err := bkt.Put(encodedIndex, encodedEpochs[j]); err != nil {
return err
}
}
return nil
}); err != nil {
return err
}
}
return nil
}
// CheckAttesterDoubleVotes retries any slashable double votes that exist

View File

@@ -52,9 +52,17 @@ func TestStore_LastEpochWrittenForValidators(t *testing.T) {
attestedEpochs, err := beaconDB.LastEpochWrittenForValidators(ctx, indices)
require.NoError(t, err)
require.Equal(t, true, len(attestedEpochs) == 0)
require.Equal(t, true, len(attestedEpochs) == len(indices))
for _, item := range attestedEpochs {
require.Equal(t, types.Epoch(0), item.Epoch)
}
err = beaconDB.SaveLastEpochWrittenForValidators(ctx, indices, epoch)
epochsByValidator := map[types.ValidatorIndex]types.Epoch{
1: epoch,
2: epoch,
3: epoch,
}
err = beaconDB.SaveLastEpochsWrittenForValidators(ctx, epochsByValidator)
require.NoError(t, err)
retrievedEpochs, err := beaconDB.LastEpochWrittenForValidators(ctx, indices)

View File

@@ -3,40 +3,67 @@ package slasher
import (
"context"
"fmt"
"time"
"github.com/pkg/errors"
types "github.com/prysmaticlabs/eth2-types"
slashertypes "github.com/prysmaticlabs/prysm/beacon-chain/slasher/types"
ethpb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/time/slots"
"github.com/sirupsen/logrus"
"go.opencensus.io/trace"
)
// Takes in a list of indexed attestation wrappers and returns any
// found attester slashings to the caller.
func (s *Service) checkSlashableAttestations(
ctx context.Context, atts []*slashertypes.IndexedAttestationWrapper,
ctx context.Context, currentEpoch types.Epoch, atts []*slashertypes.IndexedAttestationWrapper,
) ([]*ethpb.AttesterSlashing, error) {
currentEpoch := slots.EpochsSinceGenesis(s.genesisTime)
slashings := make([]*ethpb.AttesterSlashing, 0)
indices := make([]types.ValidatorIndex, 0)
// TODO(#8331): Consider using goroutines and wait groups here.
log.Debug("Checking for double votes")
start := time.Now()
doubleVoteSlashings, err := s.checkDoubleVotes(ctx, atts)
if err != nil {
return nil, errors.Wrap(err, "could not check slashable double votes")
}
log.WithField("elapsed", time.Since(start)).Debug("Done checking double votes")
slashings = append(slashings, doubleVoteSlashings...)
groupedAtts := s.groupByValidatorChunkIndex(atts)
log.WithField("numBatches", len(groupedAtts)).Debug("Batching attestations by validator chunk index")
start = time.Now()
batchTimes := make([]time.Duration, 0, len(groupedAtts))
for validatorChunkIdx, batch := range groupedAtts {
innerStart := time.Now()
attSlashings, err := s.detectAllAttesterSlashings(ctx, &chunkUpdateArgs{
validatorChunkIndex: validatorChunkIdx,
currentEpoch: currentEpoch,
}, batch)
if err != nil {
return nil, errors.Wrap(err, "Could not detect slashable attestations")
return nil, err
}
slashings = append(slashings, attSlashings...)
indices = append(indices, s.params.validatorIndicesInChunk(validatorChunkIdx)...)
indices := s.params.validatorIndicesInChunk(validatorChunkIdx)
for _, idx := range indices {
s.latestEpochWrittenForValidator[idx] = currentEpoch
}
batchTimes = append(batchTimes, time.Since(innerStart))
}
if err := s.serviceCfg.Database.SaveLastEpochWrittenForValidators(ctx, indices, currentEpoch); err != nil {
return nil, err
var avgProcessingTimePerBatch time.Duration
for _, dur := range batchTimes {
avgProcessingTimePerBatch += dur
}
if avgProcessingTimePerBatch != time.Duration(0) {
avgProcessingTimePerBatch = avgProcessingTimePerBatch / time.Duration(len(batchTimes))
}
log.WithFields(logrus.Fields{
"numAttestations": len(atts),
"numBatchesByValidatorChunkIndex": len(groupedAtts),
"elapsed": time.Since(start),
"avgBatchProcessingTime": avgProcessingTimePerBatch,
}).Info("Done checking slashable attestations")
if len(slashings) > 0 {
log.WithField("numSlashings", len(slashings)).Warn("Slashable attestation offenses found")
}
return slashings, nil
}
@@ -57,17 +84,25 @@ func (s *Service) detectAllAttesterSlashings(
args *chunkUpdateArgs,
attestations []*slashertypes.IndexedAttestationWrapper,
) ([]*ethpb.AttesterSlashing, error) {
// Check for double votes.
doubleVoteSlashings, err := s.checkDoubleVotes(ctx, attestations)
if err != nil {
return nil, errors.Wrap(err, "could not check slashable double votes")
// Map of updated chunks by chunk index, which will be saved at the end.
updatedChunks := make(map[uint64]Chunker)
groupedAtts := s.groupByChunkIndex(attestations)
validatorIndices := s.params.validatorIndicesInChunk(args.validatorChunkIndex)
// Update the min/max span chunks for the change of current epoch.
for _, validatorIndex := range validatorIndices {
if err := s.epochUpdateForValidator(ctx, args, updatedChunks, validatorIndex); err != nil {
return nil, errors.Wrapf(
err,
"could not update validator index chunks %d",
validatorIndex,
)
}
}
// Group attestations by chunk index.
groupedAtts := s.groupByChunkIndex(attestations)
// Update min and max spans and retrieve any detected slashable offenses.
surroundingSlashings, err := s.updateSpans(ctx, &chunkUpdateArgs{
surroundingSlashings, err := s.updateSpans(ctx, updatedChunks, &chunkUpdateArgs{
kind: slashertypes.MinSpan,
validatorChunkIndex: args.validatorChunkIndex,
currentEpoch: args.currentEpoch,
@@ -80,7 +115,7 @@ func (s *Service) detectAllAttesterSlashings(
)
}
surroundedSlashings, err := s.updateSpans(ctx, &chunkUpdateArgs{
surroundedSlashings, err := s.updateSpans(ctx, updatedChunks, &chunkUpdateArgs{
kind: slashertypes.MaxSpan,
validatorChunkIndex: args.validatorChunkIndex,
currentEpoch: args.currentEpoch,
@@ -93,13 +128,11 @@ func (s *Service) detectAllAttesterSlashings(
)
}
// Consolidate all slashings into a slice.
slashings := make([]*ethpb.AttesterSlashing, 0, len(doubleVoteSlashings)+len(surroundingSlashings)+len(surroundedSlashings))
slashings = append(slashings, doubleVoteSlashings...)
slashings := make([]*ethpb.AttesterSlashing, 0, len(surroundingSlashings)+len(surroundedSlashings))
slashings = append(slashings, surroundingSlashings...)
slashings = append(slashings, surroundedSlashings...)
if len(slashings) > 0 {
log.WithField("numSlashings", len(slashings)).Info("Slashable attestation offenses found")
if err := s.saveUpdatedChunks(ctx, args, updatedChunks); err != nil {
return nil, err
}
return slashings, nil
}
@@ -167,6 +200,44 @@ func (s *Service) checkDoubleVotesOnDisk(
return doubleVoteSlashings, nil
}
// This function updates the slashing spans for a given validator for a change in epoch
// since the last epoch we have recorded for the validator. For example, if the last epoch a validator
// has written is N, and the current epoch is N+5, we update entries in the slashing spans
// with their neutral element for epochs N+1 to N+4. This also puts any loaded chunks in a
// map used as a cache for further processing and minimizing database reads later on.
func (s *Service) epochUpdateForValidator(
ctx context.Context,
args *chunkUpdateArgs,
updatedChunks map[uint64]Chunker,
validatorIndex types.ValidatorIndex,
) error {
epoch := s.latestEpochWrittenForValidator[validatorIndex]
if epoch == 0 {
return nil
}
for epoch <= args.currentEpoch {
chunkIdx := s.params.chunkIndex(epoch)
currentChunk, err := s.getChunk(ctx, args, updatedChunks, chunkIdx)
if err != nil {
return err
}
for s.params.chunkIndex(epoch) == chunkIdx && epoch <= args.currentEpoch {
if err := setChunkRawDistance(
s.params,
currentChunk.Chunk(),
validatorIndex,
epoch,
currentChunk.NeutralElement(),
); err != nil {
return err
}
updatedChunks[chunkIdx] = currentChunk
epoch++
}
}
return nil
}
// Updates spans and detects any slashable attester offenses along the way.
// 1. Determine the chunks we need to use for updating for the validator indices
// in a validator chunk index, then retrieve those chunks from the database.
@@ -177,30 +248,12 @@ func (s *Service) checkDoubleVotesOnDisk(
// 3. Save the updated chunks to disk.
func (s *Service) updateSpans(
ctx context.Context,
updatedChunks map[uint64]Chunker,
args *chunkUpdateArgs,
attestationsByChunkIdx map[uint64][]*slashertypes.IndexedAttestationWrapper,
) ([]*ethpb.AttesterSlashing, error) {
ctx, span := trace.StartSpan(ctx, "Slasher.updateSpans")
defer span.End()
// Determine the chunk indices we need to use for slashing detection.
validatorIndices := s.params.validatorIndicesInChunk(args.validatorChunkIndex)
chunkIndices, err := s.determineChunksToUpdateForValidators(ctx, args, validatorIndices)
if err != nil {
return nil, errors.Wrapf(
err,
"could not determine chunks to update for validator indices %v",
validatorIndices,
)
}
// Load the required chunks from disk.
chunksByChunkIdx, err := s.loadChunks(ctx, args, chunkIndices)
if err != nil {
return nil, errors.Wrapf(
err,
"could not load chunks for chunk indices %v",
chunkIndices,
)
}
// Apply the attestations to the related chunks and find any
// slashings along the way.
@@ -227,7 +280,7 @@ func (s *Service) updateSpans(
ctx,
args,
validatorIndex,
chunksByChunkIdx,
updatedChunks,
att,
)
if err != nil {
@@ -245,52 +298,7 @@ func (s *Service) updateSpans(
}
// Write the updated chunks to disk.
return slashings, s.saveUpdatedChunks(ctx, args, chunksByChunkIdx)
}
// For a list of validator indices, we retrieve their latest written epoch. Then, for each
// (validator, latest epoch written) pair, we determine the chunks we need to update and
// perform slashing detection with.
func (s *Service) determineChunksToUpdateForValidators(
ctx context.Context,
args *chunkUpdateArgs,
validatorIndices []types.ValidatorIndex,
) (chunkIndices []uint64, err error) {
ctx, span := trace.StartSpan(ctx, "Slasher.determineChunksToUpdateForValidators")
defer span.End()
lastCurrentEpochs, err := s.serviceCfg.Database.LastEpochWrittenForValidators(ctx, validatorIndices)
if err != nil {
err = errors.Wrap(err, "could not get latest epoch attested for validators")
return
}
// Initialize the last epoch written for each validator to 0.
lastCurrentEpochByValidator := make(map[types.ValidatorIndex]types.Epoch, len(validatorIndices))
for _, valIdx := range validatorIndices {
lastCurrentEpochByValidator[valIdx] = 0
}
for _, lastEpoch := range lastCurrentEpochs {
lastCurrentEpochByValidator[lastEpoch.ValidatorIndex] = lastEpoch.Epoch
}
// For every single validator and their last written current epoch, we determine
// the chunk indices we need to update based on all the chunks between the last
// epoch written and the current epoch, inclusive.
chunkIndicesToUpdate := make(map[uint64]bool)
for _, epoch := range lastCurrentEpochByValidator {
latestEpochWritten := epoch
for latestEpochWritten <= args.currentEpoch {
chunkIdx := s.params.chunkIndex(latestEpochWritten)
chunkIndicesToUpdate[chunkIdx] = true
latestEpochWritten++
}
}
chunkIndices = make([]uint64, 0, len(chunkIndicesToUpdate))
for chunkIdx := range chunkIndicesToUpdate {
chunkIndices = append(chunkIndices, chunkIdx)
}
return
return slashings, nil
}
// Checks if an incoming attestation is slashable based on the validator chunk it

View File

@@ -3,7 +3,6 @@ package slasher
import (
"context"
"fmt"
"sort"
"testing"
"time"
@@ -241,9 +240,10 @@ func Test_processQueuedAttestations(t *testing.T) {
AttestationStateFetcher: mockChain,
SlashingPoolInserter: &slashings.PoolMock{},
},
params: DefaultParams(),
attsQueue: newAttestationsQueue(),
genesisTime: genesisTime,
params: DefaultParams(),
attsQueue: newAttestationsQueue(),
genesisTime: genesisTime,
latestEpochWrittenForValidator: map[types.ValidatorIndex]types.Epoch{},
}
currentSlotChan := make(chan types.Slot)
exitChan := make(chan struct{})
@@ -299,9 +299,10 @@ func Test_processQueuedAttestations_MultipleChunkIndices(t *testing.T) {
AttestationStateFetcher: mockChain,
SlashingPoolInserter: &slashings.PoolMock{},
},
params: slasherParams,
attsQueue: newAttestationsQueue(),
genesisTime: genesisTime,
params: slasherParams,
attsQueue: newAttestationsQueue(),
genesisTime: genesisTime,
latestEpochWrittenForValidator: map[types.ValidatorIndex]types.Epoch{},
}
currentSlotChan := make(chan types.Slot)
exitChan := make(chan struct{})
@@ -365,9 +366,10 @@ func Test_processQueuedAttestations_OverlappingChunkIndices(t *testing.T) {
AttestationStateFetcher: mockChain,
SlashingPoolInserter: &slashings.PoolMock{},
},
params: slasherParams,
attsQueue: newAttestationsQueue(),
genesisTime: genesisTime,
params: slasherParams,
attsQueue: newAttestationsQueue(),
genesisTime: genesisTime,
latestEpochWrittenForValidator: map[types.ValidatorIndex]types.Epoch{},
}
currentSlotChan := make(chan types.Slot)
exitChan := make(chan struct{})
@@ -397,9 +399,9 @@ func Test_processQueuedAttestations_OverlappingChunkIndices(t *testing.T) {
require.LogsDoNotContain(t, hook, "Could not detect")
}
func Test_determineChunksToUpdateForValidators_FromLatestWrittenEpoch(t *testing.T) {
slasherDB := dbtest.SetupSlasherDB(t)
func Test_epochUpdateForValidators(t *testing.T) {
ctx := context.Background()
slasherDB := dbtest.SetupSlasherDB(t)
// Check if the chunk at chunk index already exists in-memory.
s := &Service{
@@ -408,71 +410,67 @@ func Test_determineChunksToUpdateForValidators_FromLatestWrittenEpoch(t *testing
validatorChunkSize: 2, // 2 validators in a chunk.
historyLength: 4,
},
serviceCfg: &ServiceConfig{
Database: slasherDB,
StateNotifier: &mock.MockStateNotifier{},
},
serviceCfg: &ServiceConfig{Database: slasherDB},
latestEpochWrittenForValidator: map[types.ValidatorIndex]types.Epoch{},
}
validators := []types.ValidatorIndex{
1, 2,
}
currentEpoch := types.Epoch(3)
// Set the latest written epoch for validators to current epoch - 1.
latestWrittenEpoch := currentEpoch - 1
err := slasherDB.SaveLastEpochWrittenForValidators(ctx, validators, latestWrittenEpoch)
require.NoError(t, err)
t.Run("no update if no latest written epoch", func(t *testing.T) {
validators := []types.ValidatorIndex{
1, 2,
}
currentEpoch := types.Epoch(3)
// No last written epoch for both validators.
s.latestEpochWrittenForValidator = map[types.ValidatorIndex]types.Epoch{}
// Because the validators have no recorded latest epoch written in the database,
// Because the latest written epoch for the input validators is == 2, we expect
// that we will update all epochs from 2 up to 3 (the current epoch). This is all
// safe contained in chunk index 1.
chunkIndices, err := s.determineChunksToUpdateForValidators(
ctx,
&chunkUpdateArgs{
currentEpoch: currentEpoch,
},
validators,
)
require.NoError(t, err)
require.DeepEqual(t, []uint64{1}, chunkIndices)
}
func Test_determineChunksToUpdateForValidators_FromGenesis(t *testing.T) {
slasherDB := dbtest.SetupSlasherDB(t)
ctx := context.Background()
// Check if the chunk at chunk index already exists in-memory.
s := &Service{
params: &Parameters{
chunkSize: 2, // 2 epochs in a chunk.
validatorChunkSize: 2, // 2 validators in a chunk.
historyLength: 4,
},
serviceCfg: &ServiceConfig{
Database: slasherDB,
StateNotifier: &mock.MockStateNotifier{},
},
}
validators := []types.ValidatorIndex{
1, 2,
}
// Because the validators have no recorded latest epoch written in the database,
// we expect that we will update all epochs from genesis up to the current epoch.
// Given the chunk size is 2 epochs per chunk, updating with current epoch == 3
// will mean that we should be updating from epoch 0 to 3, meaning chunk indices 0 and 1.
chunkIndices, err := s.determineChunksToUpdateForValidators(
ctx,
&chunkUpdateArgs{
currentEpoch: 3,
},
validators,
)
require.NoError(t, err)
sort.Slice(chunkIndices, func(i, j int) bool {
return chunkIndices[i] < chunkIndices[j]
// Because the validators have no recorded latest epoch written, we expect
// no chunks to be loaded nor updated to.
updatedChunks := make(map[uint64]Chunker)
for _, valIdx := range validators {
err := s.epochUpdateForValidator(
ctx,
&chunkUpdateArgs{
currentEpoch: currentEpoch,
},
updatedChunks,
valIdx,
)
require.NoError(t, err)
}
require.Equal(t, 0, len(updatedChunks))
})
t.Run("update from latest written epoch", func(t *testing.T) {
validators := []types.ValidatorIndex{
1, 2,
}
currentEpoch := types.Epoch(3)
// Set the latest written epoch for validators to current epoch - 1.
latestWrittenEpoch := currentEpoch - 1
s.latestEpochWrittenForValidator = map[types.ValidatorIndex]types.Epoch{
1: latestWrittenEpoch,
2: latestWrittenEpoch,
}
// Because the latest written epoch for the input validators is == 2, we expect
// that we will update all epochs from 2 up to 3 (the current epoch). This is all
// safe contained in chunk index 1.
updatedChunks := make(map[uint64]Chunker)
for _, valIdx := range validators {
err := s.epochUpdateForValidator(
ctx,
&chunkUpdateArgs{
currentEpoch: currentEpoch,
},
updatedChunks,
valIdx,
)
require.NoError(t, err)
}
require.Equal(t, 1, len(updatedChunks))
_, ok := updatedChunks[1]
require.Equal(t, true, ok)
})
require.DeepEqual(t, []uint64{0, 1}, chunkIndices)
}
func Test_applyAttestationForValidator_MinSpanChunk(t *testing.T) {
@@ -485,6 +483,7 @@ func Test_applyAttestationForValidator_MinSpanChunk(t *testing.T) {
Database: slasherDB,
StateNotifier: &mock.MockStateNotifier{},
},
latestEpochWrittenForValidator: map[types.ValidatorIndex]types.Epoch{},
}
// We initialize an empty chunks slice.
chunk := EmptyMinSpanChunksSlice(params)
@@ -545,6 +544,7 @@ func Test_applyAttestationForValidator_MaxSpanChunk(t *testing.T) {
Database: slasherDB,
StateNotifier: &mock.MockStateNotifier{},
},
latestEpochWrittenForValidator: map[types.ValidatorIndex]types.Epoch{},
}
// We initialize an empty chunks slice.
chunk := EmptyMaxSpanChunksSlice(params)
@@ -796,7 +796,7 @@ func TestService_processQueuedAttestations(t *testing.T) {
tickerChan <- 1
cancel()
<-exitChan
assert.LogsContain(t, hook, "New slot, processing queued")
assert.LogsContain(t, hook, "Processing queued")
}
func BenchmarkCheckSlashableAttestations(b *testing.B) {
@@ -885,7 +885,8 @@ func runAttestationsBenchmark(b *testing.B, s *Service, numAtts, numValidators u
genesisTime := time.Now().Add(-time.Second * time.Duration(totalSeconds))
s.genesisTime = genesisTime
_, err := s.checkSlashableAttestations(context.Background(), atts)
epoch := slots.EpochsSinceGenesis(genesisTime)
_, err := s.checkSlashableAttestations(context.Background(), epoch, atts)
require.NoError(b, err)
}
}

View File

@@ -72,7 +72,7 @@ func (s *Service) receiveBlocks(ctx context.Context, beaconBlockHeadersChan chan
}
}
// Process queued attestations every time an epoch ticker fires. We retrieve
// Process queued attestations every time a slot ticker fires. We retrieve
// these attestations from a queue, then group them all by validator chunk index.
// This grouping will allow us to perform detection on batches of attestations
// per validator chunk index which can be done concurrently.
@@ -98,9 +98,8 @@ func (s *Service) processQueuedAttestations(ctx context.Context, slotTicker <-ch
"numValidAtts": len(validAtts),
"numDeferredAtts": len(validInFuture),
"numDroppedAtts": numDropped,
}).Info("New slot, processing queued atts for slashing detection")
}).Info("Processing queued attestations for slashing detection")
start := time.Now()
// Save the attestation records to our database.
if err := s.serviceCfg.Database.SaveAttestationRecordsForValidators(
ctx, validAtts,
@@ -110,7 +109,7 @@ func (s *Service) processQueuedAttestations(ctx context.Context, slotTicker <-ch
}
// Check for slashings.
slashings, err := s.checkSlashableAttestations(ctx, validAtts)
slashings, err := s.checkSlashableAttestations(ctx, currentEpoch, validAtts)
if err != nil {
log.WithError(err).Error("Could not check slashable attestations")
continue
@@ -123,8 +122,6 @@ func (s *Service) processQueuedAttestations(ctx context.Context, slotTicker <-ch
continue
}
log.WithField("elapsed", time.Since(start)).Debug("Done checking slashable attestations")
processedAttestationsTotal.Add(float64(len(validAtts)))
case <-ctx.Done():
return
@@ -147,7 +144,7 @@ func (s *Service) processQueuedBlocks(ctx context.Context, slotTicker <-chan typ
"currentSlot": currentSlot,
"currentEpoch": currentEpoch,
"numBlocks": len(blocks),
}).Info("New slot, processing queued blocks for slashing detection")
}).Info("Processing queued blocks for slashing detection")
start := time.Now()
// Check for slashings.
@@ -202,6 +199,7 @@ func (s *Service) pruneSlasherDataWithinSlidingWindow(ctx context.Context, curre
// attempt to prune at all.
return nil
}
start := time.Now()
log.WithFields(logrus.Fields{
"currentEpoch": currentEpoch,
"pruningAllBeforeEpoch": maxPruningEpoch,
@@ -221,6 +219,7 @@ func (s *Service) pruneSlasherDataWithinSlidingWindow(ctx context.Context, curre
log.WithFields(logrus.Fields{
"prunedAttestationRecords": numPrunedAtts,
"prunedProposalRecords": numPrunedProposals,
}).Info("Successfully pruned slasher data")
"elapsed": time.Since(start),
}).Info("Successfully ran slasher data pruning")
return nil
}

View File

@@ -306,5 +306,5 @@ func TestService_processQueuedBlocks(t *testing.T) {
tickerChan <- 0
cancel()
<-exitChan
assert.LogsContain(t, hook, "New slot, processing queued")
assert.LogsContain(t, hook, "Processing queued")
}

View File

@@ -7,6 +7,7 @@ import (
types "github.com/prysmaticlabs/eth2-types"
slashertypes "github.com/prysmaticlabs/prysm/beacon-chain/slasher/types"
ethpb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/time/slots"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
@@ -59,7 +60,8 @@ func (s *Service) IsSlashableAttestation(
SigningRoot: dataRoot,
}
attesterSlashings, err := s.checkSlashableAttestations(ctx, []*slashertypes.IndexedAttestationWrapper{indexedAttWrapper})
currentEpoch := slots.EpochsSinceGenesis(s.genesisTime)
attesterSlashings, err := s.checkSlashableAttestations(ctx, currentEpoch, []*slashertypes.IndexedAttestationWrapper{indexedAttWrapper})
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not check if attestation is slashable: %v", err)
}

View File

@@ -83,9 +83,10 @@ func TestIsSlashableAttestation(t *testing.T) {
serviceCfg: &ServiceConfig{
Database: slasherDB,
},
params: DefaultParams(),
blksQueue: newBlocksQueue(),
genesisTime: genesisTime,
params: DefaultParams(),
blksQueue: newBlocksQueue(),
genesisTime: genesisTime,
latestEpochWrittenForValidator: map[types.ValidatorIndex]types.Epoch{},
}
prevAtts := []*slashertypes.IndexedAttestationWrapper{
createAttestationWrapper(t, 2, 3, []uint64{0}, []byte{1}),
@@ -93,7 +94,7 @@ func TestIsSlashableAttestation(t *testing.T) {
}
err := slasherDB.SaveAttestationRecordsForValidators(ctx, prevAtts)
require.NoError(t, err)
attesterSlashings, err := s.checkSlashableAttestations(ctx, prevAtts)
attesterSlashings, err := s.checkSlashableAttestations(ctx, currentEpoch, prevAtts)
require.NoError(t, err)
require.Equal(t, 0, len(attesterSlashings))
@@ -125,7 +126,7 @@ func TestIsSlashableAttestation(t *testing.T) {
{
name: "should detect multiple surround if multiple same indices",
attToCheck: createAttestationWrapper(t, 1, 4, []uint64{0, 1}, []byte{2}),
amtSlashable: 2,
amtSlashable: 4,
},
}
for _, tt := range tests {

View File

@@ -22,6 +22,10 @@ import (
"github.com/prysmaticlabs/prysm/time/slots"
)
const (
shutdownTimeout = time.Minute * 5
)
// ServiceConfig for the slasher service in the beacon node.
// This struct allows us to specify required dependencies and
// parameters for slasher to function as needed.
@@ -49,77 +53,78 @@ type SlashingChecker interface {
// Service defining a slasher implementation as part of
// the beacon node, able to detect eth2 slashable offenses.
type Service struct {
params *Parameters
serviceCfg *ServiceConfig
indexedAttsChan chan *ethpb.IndexedAttestation
beaconBlockHeadersChan chan *ethpb.SignedBeaconBlockHeader
attsQueue *attestationsQueue
blksQueue *blocksQueue
ctx context.Context
cancel context.CancelFunc
genesisTime time.Time
attsSlotTicker *slots.SlotTicker
blocksSlotTicker *slots.SlotTicker
pruningSlotTicker *slots.SlotTicker
params *Parameters
serviceCfg *ServiceConfig
indexedAttsChan chan *ethpb.IndexedAttestation
beaconBlockHeadersChan chan *ethpb.SignedBeaconBlockHeader
attsQueue *attestationsQueue
blksQueue *blocksQueue
ctx context.Context
cancel context.CancelFunc
genesisTime time.Time
attsSlotTicker *slots.SlotTicker
blocksSlotTicker *slots.SlotTicker
pruningSlotTicker *slots.SlotTicker
latestEpochWrittenForValidator map[types.ValidatorIndex]types.Epoch
}
// New instantiates a new slasher from configuration values.
func New(ctx context.Context, srvCfg *ServiceConfig) (*Service, error) {
ctx, cancel := context.WithCancel(ctx)
return &Service{
params: DefaultParams(),
serviceCfg: srvCfg,
indexedAttsChan: make(chan *ethpb.IndexedAttestation, 1),
beaconBlockHeadersChan: make(chan *ethpb.SignedBeaconBlockHeader, 1),
attsQueue: newAttestationsQueue(),
blksQueue: newBlocksQueue(),
ctx: ctx,
cancel: cancel,
params: DefaultParams(),
serviceCfg: srvCfg,
indexedAttsChan: make(chan *ethpb.IndexedAttestation, 1),
beaconBlockHeadersChan: make(chan *ethpb.SignedBeaconBlockHeader, 1),
attsQueue: newAttestationsQueue(),
blksQueue: newBlocksQueue(),
ctx: ctx,
cancel: cancel,
latestEpochWrittenForValidator: make(map[types.ValidatorIndex]types.Epoch),
}, nil
}
// Start listening for received indexed attestations and blocks
// and perform slashing detection on them.
func (s *Service) Start() {
go s.run()
go s.run() // Start functions must be non-blocking.
}
func (s *Service) run() {
stateChannel := make(chan *feed.Event, 1)
stateSub := s.serviceCfg.StateNotifier.StateFeed().Subscribe(stateChannel)
stateEvent := <-stateChannel
s.waitForChainInitialization()
s.waitForSync(s.genesisTime)
// Wait for us to receive the genesis time via a chain started notification.
if stateEvent.Type == statefeed.ChainStarted {
data, ok := stateEvent.Data.(*statefeed.ChainStartedData)
if !ok {
log.Error("Could not receive chain start notification, want *statefeed.ChainStartedData")
return
}
s.genesisTime = data.StartTime
log.WithField("genesisTime", s.genesisTime).Info("Starting slasher, received chain start event")
} else if stateEvent.Type == statefeed.Initialized {
// Alternatively, if the chain has already started, we then read the genesis
// time value from this data.
data, ok := stateEvent.Data.(*statefeed.InitializedData)
if !ok {
log.Error("Could not receive chain start notification, want *statefeed.ChainStartedData")
return
}
s.genesisTime = data.StartTime
log.WithField("genesisTime", s.genesisTime).Info("Starting slasher, chain already initialized")
} else {
// This should not happen.
log.Error("Could start slasher, could not receive chain start event")
log.Info("Completed chain sync, starting slashing detection")
// Get the latest eopch written for each validator from disk on startup.
headState, err := s.serviceCfg.HeadStateFetcher.HeadState(s.ctx)
if err != nil {
log.WithError(err).Error("Failed to fetch head state")
return
}
stateSub.Unsubscribe()
s.waitForSync(s.genesisTime)
numVals := headState.NumValidators()
validatorIndices := make([]types.ValidatorIndex, numVals)
for i := 0; i < numVals; i++ {
validatorIndices[i] = types.ValidatorIndex(i)
}
start := time.Now()
log.Info("Reading last epoch written for each validator...")
epochsByValidator, err := s.serviceCfg.Database.LastEpochWrittenForValidators(
s.ctx, validatorIndices,
)
if err != nil {
log.Error(err)
return
}
for _, item := range epochsByValidator {
s.latestEpochWrittenForValidator[item.ValidatorIndex] = item.Epoch
}
log.WithField("elapsed", time.Since(start)).Info(
"Finished retrieving last epoch written per validator",
)
indexedAttsChan := make(chan *ethpb.IndexedAttestation, 1)
beaconBlockHeadersChan := make(chan *ethpb.SignedBeaconBlockHeader, 1)
log.Info("Completed chain sync, starting slashing detection")
go s.receiveAttestations(s.ctx, indexedAttsChan)
go s.receiveBlocks(s.ctx, beaconBlockHeadersChan)
@@ -144,6 +149,20 @@ func (s *Service) Stop() error {
if s.pruningSlotTicker != nil {
s.pruningSlotTicker.Done()
}
// Flush the latest epoch written map to disk.
start := time.Now()
// New context as the service context has already been canceled.
ctx, innerCancel := context.WithTimeout(context.Background(), shutdownTimeout)
defer innerCancel()
log.Info("Flushing last epoch written for each validator to disk, please wait")
if err := s.serviceCfg.Database.SaveLastEpochsWrittenForValidators(
ctx, s.latestEpochWrittenForValidator,
); err != nil {
log.Error(err)
}
log.WithField("elapsed", time.Since(start)).Debug(
"Finished saving last epoch written per validator",
)
return nil
}
@@ -152,8 +171,45 @@ func (s *Service) Status() error {
return nil
}
func (s *Service) waitForChainInitialization() {
stateChannel := make(chan *feed.Event, 1)
stateSub := s.serviceCfg.StateNotifier.StateFeed().Subscribe(stateChannel)
defer stateSub.Unsubscribe()
defer close(stateChannel)
for {
select {
case stateEvent := <-stateChannel:
// Wait for us to receive the genesis time via a chain started notification.
if stateEvent.Type == statefeed.Initialized {
// Alternatively, if the chain has already started, we then read the genesis
// time value from this data.
data, ok := stateEvent.Data.(*statefeed.InitializedData)
if !ok {
log.Error(
"Could not receive chain start notification, want *statefeed.ChainStartedData",
)
return
}
s.genesisTime = data.StartTime
log.WithField("genesisTime", s.genesisTime).Info(
"Slasher received chain initialization event",
)
return
}
case err := <-stateSub.Err():
log.WithError(err).Error(
"Slasher could not subscribe to state events",
)
return
case <-s.ctx.Done():
return
}
}
}
func (s *Service) waitForSync(genesisTime time.Time) {
if slots.SinceGenesis(genesisTime) == 0 || !s.serviceCfg.SyncChecker.Syncing() {
if slots.SinceGenesis(genesisTime) < params.BeaconConfig().SlotsPerEpoch || !s.serviceCfg.SyncChecker.Syncing() {
return
}
slotTicker := slots.NewSlotTicker(s.genesisTime, params.BeaconConfig().SecondsPerSlot)

View File

@@ -30,44 +30,7 @@ func TestMain(m *testing.M) {
m.Run()
}
func TestService_StartStop_ChainStartEvent(t *testing.T) {
slasherDB := dbtest.SetupSlasherDB(t)
hook := logTest.NewGlobal()
beaconState, err := util.NewBeaconState()
require.NoError(t, err)
currentSlot := types.Slot(4)
require.NoError(t, beaconState.SetSlot(currentSlot))
mockChain := &mock.ChainService{
State: beaconState,
Slot: &currentSlot,
}
srv, err := New(context.Background(), &ServiceConfig{
IndexedAttestationsFeed: new(event.Feed),
BeaconBlockHeadersFeed: new(event.Feed),
StateNotifier: &mock.MockStateNotifier{},
Database: slasherDB,
HeadStateFetcher: mockChain,
SyncChecker: &mockSync.Sync{IsSyncing: false},
})
require.NoError(t, err)
go srv.Start()
time.Sleep(time.Millisecond * 100)
srv.serviceCfg.StateNotifier.StateFeed().Send(&feed.Event{
Type: statefeed.ChainStarted,
Data: &statefeed.ChainStartedData{StartTime: time.Now()},
})
time.Sleep(time.Millisecond * 100)
srv.attsSlotTicker = &slots.SlotTicker{}
srv.blocksSlotTicker = &slots.SlotTicker{}
srv.pruningSlotTicker = &slots.SlotTicker{}
require.NoError(t, srv.Stop())
require.NoError(t, srv.Status())
require.LogsContain(t, hook, "received chain start event")
}
func TestService_StartStop_ChainAlreadyInitialized(t *testing.T) {
func TestService_StartStop_ChainInitialized(t *testing.T) {
slasherDB := dbtest.SetupSlasherDB(t)
hook := logTest.NewGlobal()
beaconState, err := util.NewBeaconState()
@@ -99,5 +62,5 @@ func TestService_StartStop_ChainAlreadyInitialized(t *testing.T) {
srv.pruningSlotTicker = &slots.SlotTicker{}
require.NoError(t, srv.Stop())
require.NoError(t, srv.Status())
require.LogsContain(t, hook, "chain already initialized")
require.LogsContain(t, hook, "received chain initialization")
}

View File

@@ -48,7 +48,7 @@ go_library(
importpath = "github.com/prysmaticlabs/prysm/beacon-chain/sync",
visibility = [
"//beacon-chain:__subpackages__",
"//testing/fuzz:__pkg__",
"//testing:__subpackages__",
],
deps = [
"//async:go_default_library",

View File

@@ -19,6 +19,32 @@ import (
logTest "github.com/sirupsen/logrus/hooks/test"
)
type mockSyncChecker struct{}
func (c mockSyncChecker) Initialized() bool {
return true
}
func (c mockSyncChecker) Syncing() bool {
return false
}
func (c mockSyncChecker) Synced() bool {
return true
}
func (c mockSyncChecker) Status() error {
return nil
}
func (c mockSyncChecker) Resync() error {
return nil
}
func (mockSyncChecker) IsSynced(_ context.Context) (bool, error) {
return true, nil
}
func TestEndToEnd_SlasherSimulator(t *testing.T) {
hook := logTest.NewGlobal()
ctx := context.Background()
@@ -66,6 +92,7 @@ func TestEndToEnd_SlasherSimulator(t *testing.T) {
StateGen: gen,
PrivateKeysByValidatorIndex: privKeys,
SlashingsPool: &slashings.PoolMock{},
SyncChecker: mockSyncChecker{},
})
require.NoError(t, err)
sim.Start()

View File

@@ -24,6 +24,7 @@ go_library(
"//beacon-chain/slasher:go_default_library",
"//beacon-chain/state:go_default_library",
"//beacon-chain/state/stategen:go_default_library",
"//beacon-chain/sync:go_default_library",
"//config/params:go_default_library",
"//crypto/bls:go_default_library",
"//crypto/rand:go_default_library",

View File

@@ -29,10 +29,10 @@ func (s *Simulator) generateAttestationsForSlot(
(committeesPerSlot * uint64(s.srvConfig.Params.SlotsPerEpoch))
valsPerSlot := committeesPerSlot * valsPerCommittee
var sourceEpoch types.Epoch = 0
if currentEpoch != 0 {
sourceEpoch = currentEpoch - 1
if currentEpoch < 2 {
return nil, nil, nil
}
sourceEpoch := currentEpoch - 1
var slashedIndices []uint64
startIdx := valsPerSlot * uint64(slot%s.srvConfig.Params.SlotsPerEpoch)

View File

@@ -2,6 +2,7 @@ package simulator
import (
"context"
"fmt"
"time"
types "github.com/prysmaticlabs/eth2-types"
@@ -13,6 +14,7 @@ import (
"github.com/prysmaticlabs/prysm/beacon-chain/operations/slashings"
"github.com/prysmaticlabs/prysm/beacon-chain/slasher"
"github.com/prysmaticlabs/prysm/beacon-chain/state/stategen"
"github.com/prysmaticlabs/prysm/beacon-chain/sync"
"github.com/prysmaticlabs/prysm/config/params"
"github.com/prysmaticlabs/prysm/crypto/bls"
ethpb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1"
@@ -32,6 +34,7 @@ type ServiceConfig struct {
StateGen stategen.StateManager
SlashingsPool slashings.PoolManager
PrivateKeysByValidatorIndex map[types.ValidatorIndex]bls.SecretKey
SyncChecker sync.Checker
}
// Parameters for a slasher simulator.
@@ -90,6 +93,7 @@ func New(ctx context.Context, srvConfig *ServiceConfig) (*Simulator, error) {
AttestationStateFetcher: srvConfig.AttestationStateFetcher,
StateGen: srvConfig.StateGen,
SlashingPoolInserter: srvConfig.SlashingsPool,
SyncChecker: srvConfig.SyncChecker,
})
if err != nil {
return nil, err
@@ -132,8 +136,8 @@ func (s *Simulator) Start() {
time.Sleep(time.Second)
s.genesisTime = time.Now()
s.srvConfig.StateNotifier.StateFeed().Send(&feed.Event{
Type: statefeed.ChainStarted,
Data: &statefeed.ChainStartedData{StartTime: s.genesisTime},
Type: statefeed.Initialized,
Data: &statefeed.InitializedData{StartTime: s.genesisTime},
})
// We simulate blocks and attestations for N epochs.
@@ -256,10 +260,12 @@ func (s *Simulator) verifySlashingsWereDetected(ctx context.Context) {
for slashingRoot, slashing := range s.sentAttesterSlashings {
if _, ok := detectedAttesterSlashings[slashingRoot]; !ok {
log.WithFields(logrus.Fields{
"targetEpoch": slashing.Attestation_1.Data.Target.Epoch,
"prevTargetEpoch": slashing.Attestation_2.Data.Target.Epoch,
"sourceEpoch": slashing.Attestation_1.Data.Source.Epoch,
"prevSourceEpoch": slashing.Attestation_2.Data.Source.Epoch,
"targetEpoch": slashing.Attestation_1.Data.Target.Epoch,
"prevTargetEpoch": slashing.Attestation_2.Data.Target.Epoch,
"sourceEpoch": slashing.Attestation_1.Data.Source.Epoch,
"prevSourceEpoch": slashing.Attestation_2.Data.Source.Epoch,
"prevBeaconBlockRoot": fmt.Sprintf("%#x", slashing.Attestation_1.Data.BeaconBlockRoot),
"newBeaconBlockRoot": fmt.Sprintf("%#x", slashing.Attestation_2.Data.BeaconBlockRoot),
}).Errorf("Did not detect simulated attester slashing")
continue
}