diff --git a/beacon-chain/db/iface/interface.go b/beacon-chain/db/iface/interface.go index b25a684b47..5fdd9d0a23 100644 --- a/beacon-chain/db/iface/interface.go +++ b/beacon-chain/db/iface/interface.go @@ -97,8 +97,8 @@ type HeadAccessDatabase interface { // SlasherDatabase interface for persisting data related to detecting slashable offenses on Ethereum. type SlasherDatabase interface { io.Closer - SaveLastEpochWrittenForValidators( - ctx context.Context, validatorIndices []types.ValidatorIndex, epoch types.Epoch, + SaveLastEpochsWrittenForValidators( + ctx context.Context, epochByValidator map[types.ValidatorIndex]types.Epoch, ) error SaveAttestationRecordsForValidators( ctx context.Context, diff --git a/beacon-chain/db/slasherkv/pruning.go b/beacon-chain/db/slasherkv/pruning.go index 1a6be7ffcc..5445e4e4cd 100644 --- a/beacon-chain/db/slasherkv/pruning.go +++ b/beacon-chain/db/slasherkv/pruning.go @@ -85,7 +85,7 @@ func (s *Store) PruneAttestationsAtEpoch( // PruneProposalsAtEpoch deletes all proposals from the slasher DB with epoch // less than or equal to the specified epoch. func (s *Store) PruneProposalsAtEpoch( - _ context.Context, maxEpoch types.Epoch, + ctx context.Context, maxEpoch types.Epoch, ) (numPruned uint, err error) { var endPruneSlot types.Slot endPruneSlot, err = slots.EpochEnd(maxEpoch) @@ -128,6 +128,9 @@ func (s *Store) PruneProposalsAtEpoch( c := proposalBkt.Cursor() // We begin a pruning iteration starting from the first item in the bucket. for k, _ := c.First(); k != nil; k, _ = c.Next() { + if ctx.Err() != nil { + return ctx.Err() + } // We check the slot from the current key in the database. // If we have hit a slot that is greater than the end slot of the pruning process, // we then completely exit the process as we are done. diff --git a/beacon-chain/db/slasherkv/slasher.go b/beacon-chain/db/slasherkv/slasher.go index 5a48823dc5..3b1f8d2ad2 100644 --- a/beacon-chain/db/slasherkv/slasher.go +++ b/beacon-chain/db/slasherkv/slasher.go @@ -42,47 +42,73 @@ func (s *Store) LastEpochWrittenForValidators( err := s.db.View(func(tx *bolt.Tx) error { bkt := tx.Bucket(attestedEpochsByValidator) for i, encodedIndex := range encodedIndices { + var epoch types.Epoch epochBytes := bkt.Get(encodedIndex) if epochBytes != nil { - var epoch types.Epoch if err := epoch.UnmarshalSSZ(epochBytes); err != nil { return err } - attestedEpochs = append(attestedEpochs, &slashertypes.AttestedEpochForValidator{ - ValidatorIndex: validatorIndices[i], - Epoch: epoch, - }) } + attestedEpochs = append(attestedEpochs, &slashertypes.AttestedEpochForValidator{ + ValidatorIndex: validatorIndices[i], + Epoch: epoch, + }) } return nil }) return attestedEpochs, err } -// SaveLastEpochWrittenForValidators updates the latest epoch a slice +// SaveLastEpochsWrittenForValidators updates the latest epoch a slice // of validator indices has attested to. -func (s *Store) SaveLastEpochWrittenForValidators( - ctx context.Context, validatorIndices []types.ValidatorIndex, epoch types.Epoch, +func (s *Store) SaveLastEpochsWrittenForValidators( + ctx context.Context, epochByValidator map[types.ValidatorIndex]types.Epoch, ) error { - ctx, span := trace.StartSpan(ctx, "BeaconDB.SaveLastEpochWrittenForValidators") + ctx, span := trace.StartSpan(ctx, "BeaconDB.SaveLastEpochsWrittenForValidators") defer span.End() - encodedIndices := make([][]byte, len(validatorIndices)) - for i, valIdx := range validatorIndices { - encodedIndices[i] = encodeValidatorIndex(valIdx) - } - encodedEpoch, err := epoch.MarshalSSZ() - if err != nil { - return err - } - return s.db.Update(func(tx *bolt.Tx) error { - bkt := tx.Bucket(attestedEpochsByValidator) - for _, encodedIndex := range encodedIndices { - if err = bkt.Put(encodedIndex, encodedEpoch); err != nil { - return err - } + encodedIndices := make([][]byte, 0, len(epochByValidator)) + encodedEpochs := make([][]byte, 0, len(epochByValidator)) + for valIdx, epoch := range epochByValidator { + if ctx.Err() != nil { + return ctx.Err() } - return nil - }) + encodedEpoch, err := epoch.MarshalSSZ() + if err != nil { + return err + } + encodedIndices = append(encodedIndices, encodeValidatorIndex(valIdx)) + encodedEpochs = append(encodedEpochs, encodedEpoch) + } + // The list of validators might be too massive for boltdb to handle in a single transaction, + // so instead we split it into batches and write each batch. + batchSize := 10000 + for i := 0; i < len(encodedIndices); i += batchSize { + if ctx.Err() != nil { + return ctx.Err() + } + if err := s.db.Update(func(tx *bolt.Tx) error { + if ctx.Err() != nil { + return ctx.Err() + } + bkt := tx.Bucket(attestedEpochsByValidator) + min := i + batchSize + if min > len(encodedIndices) { + min = len(encodedIndices) + } + for j, encodedIndex := range encodedIndices[i:min] { + if ctx.Err() != nil { + return ctx.Err() + } + if err := bkt.Put(encodedIndex, encodedEpochs[j]); err != nil { + return err + } + } + return nil + }); err != nil { + return err + } + } + return nil } // CheckAttesterDoubleVotes retries any slashable double votes that exist diff --git a/beacon-chain/db/slasherkv/slasher_test.go b/beacon-chain/db/slasherkv/slasher_test.go index f599d70a3e..d712d9060d 100644 --- a/beacon-chain/db/slasherkv/slasher_test.go +++ b/beacon-chain/db/slasherkv/slasher_test.go @@ -52,9 +52,17 @@ func TestStore_LastEpochWrittenForValidators(t *testing.T) { attestedEpochs, err := beaconDB.LastEpochWrittenForValidators(ctx, indices) require.NoError(t, err) - require.Equal(t, true, len(attestedEpochs) == 0) + require.Equal(t, true, len(attestedEpochs) == len(indices)) + for _, item := range attestedEpochs { + require.Equal(t, types.Epoch(0), item.Epoch) + } - err = beaconDB.SaveLastEpochWrittenForValidators(ctx, indices, epoch) + epochsByValidator := map[types.ValidatorIndex]types.Epoch{ + 1: epoch, + 2: epoch, + 3: epoch, + } + err = beaconDB.SaveLastEpochsWrittenForValidators(ctx, epochsByValidator) require.NoError(t, err) retrievedEpochs, err := beaconDB.LastEpochWrittenForValidators(ctx, indices) diff --git a/beacon-chain/slasher/detect_attestations.go b/beacon-chain/slasher/detect_attestations.go index 128affdf9d..f7b20b8d90 100644 --- a/beacon-chain/slasher/detect_attestations.go +++ b/beacon-chain/slasher/detect_attestations.go @@ -3,40 +3,67 @@ package slasher import ( "context" "fmt" + "time" "github.com/pkg/errors" types "github.com/prysmaticlabs/eth2-types" slashertypes "github.com/prysmaticlabs/prysm/beacon-chain/slasher/types" ethpb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1" - "github.com/prysmaticlabs/prysm/time/slots" + "github.com/sirupsen/logrus" "go.opencensus.io/trace" ) // Takes in a list of indexed attestation wrappers and returns any // found attester slashings to the caller. func (s *Service) checkSlashableAttestations( - ctx context.Context, atts []*slashertypes.IndexedAttestationWrapper, + ctx context.Context, currentEpoch types.Epoch, atts []*slashertypes.IndexedAttestationWrapper, ) ([]*ethpb.AttesterSlashing, error) { - currentEpoch := slots.EpochsSinceGenesis(s.genesisTime) slashings := make([]*ethpb.AttesterSlashing, 0) - indices := make([]types.ValidatorIndex, 0) - // TODO(#8331): Consider using goroutines and wait groups here. + log.Debug("Checking for double votes") + start := time.Now() + doubleVoteSlashings, err := s.checkDoubleVotes(ctx, atts) + if err != nil { + return nil, errors.Wrap(err, "could not check slashable double votes") + } + log.WithField("elapsed", time.Since(start)).Debug("Done checking double votes") + slashings = append(slashings, doubleVoteSlashings...) + groupedAtts := s.groupByValidatorChunkIndex(atts) + log.WithField("numBatches", len(groupedAtts)).Debug("Batching attestations by validator chunk index") + start = time.Now() + batchTimes := make([]time.Duration, 0, len(groupedAtts)) for validatorChunkIdx, batch := range groupedAtts { + innerStart := time.Now() attSlashings, err := s.detectAllAttesterSlashings(ctx, &chunkUpdateArgs{ validatorChunkIndex: validatorChunkIdx, currentEpoch: currentEpoch, }, batch) - if err != nil { - return nil, errors.Wrap(err, "Could not detect slashable attestations") + return nil, err } slashings = append(slashings, attSlashings...) - indices = append(indices, s.params.validatorIndicesInChunk(validatorChunkIdx)...) + indices := s.params.validatorIndicesInChunk(validatorChunkIdx) + for _, idx := range indices { + s.latestEpochWrittenForValidator[idx] = currentEpoch + } + batchTimes = append(batchTimes, time.Since(innerStart)) } - if err := s.serviceCfg.Database.SaveLastEpochWrittenForValidators(ctx, indices, currentEpoch); err != nil { - return nil, err + var avgProcessingTimePerBatch time.Duration + for _, dur := range batchTimes { + avgProcessingTimePerBatch += dur + } + if avgProcessingTimePerBatch != time.Duration(0) { + avgProcessingTimePerBatch = avgProcessingTimePerBatch / time.Duration(len(batchTimes)) + } + log.WithFields(logrus.Fields{ + "numAttestations": len(atts), + "numBatchesByValidatorChunkIndex": len(groupedAtts), + "elapsed": time.Since(start), + "avgBatchProcessingTime": avgProcessingTimePerBatch, + }).Info("Done checking slashable attestations") + if len(slashings) > 0 { + log.WithField("numSlashings", len(slashings)).Warn("Slashable attestation offenses found") } return slashings, nil } @@ -57,17 +84,25 @@ func (s *Service) detectAllAttesterSlashings( args *chunkUpdateArgs, attestations []*slashertypes.IndexedAttestationWrapper, ) ([]*ethpb.AttesterSlashing, error) { - // Check for double votes. - doubleVoteSlashings, err := s.checkDoubleVotes(ctx, attestations) - if err != nil { - return nil, errors.Wrap(err, "could not check slashable double votes") + + // Map of updated chunks by chunk index, which will be saved at the end. + updatedChunks := make(map[uint64]Chunker) + groupedAtts := s.groupByChunkIndex(attestations) + validatorIndices := s.params.validatorIndicesInChunk(args.validatorChunkIndex) + + // Update the min/max span chunks for the change of current epoch. + for _, validatorIndex := range validatorIndices { + if err := s.epochUpdateForValidator(ctx, args, updatedChunks, validatorIndex); err != nil { + return nil, errors.Wrapf( + err, + "could not update validator index chunks %d", + validatorIndex, + ) + } } - // Group attestations by chunk index. - groupedAtts := s.groupByChunkIndex(attestations) - // Update min and max spans and retrieve any detected slashable offenses. - surroundingSlashings, err := s.updateSpans(ctx, &chunkUpdateArgs{ + surroundingSlashings, err := s.updateSpans(ctx, updatedChunks, &chunkUpdateArgs{ kind: slashertypes.MinSpan, validatorChunkIndex: args.validatorChunkIndex, currentEpoch: args.currentEpoch, @@ -80,7 +115,7 @@ func (s *Service) detectAllAttesterSlashings( ) } - surroundedSlashings, err := s.updateSpans(ctx, &chunkUpdateArgs{ + surroundedSlashings, err := s.updateSpans(ctx, updatedChunks, &chunkUpdateArgs{ kind: slashertypes.MaxSpan, validatorChunkIndex: args.validatorChunkIndex, currentEpoch: args.currentEpoch, @@ -93,13 +128,11 @@ func (s *Service) detectAllAttesterSlashings( ) } - // Consolidate all slashings into a slice. - slashings := make([]*ethpb.AttesterSlashing, 0, len(doubleVoteSlashings)+len(surroundingSlashings)+len(surroundedSlashings)) - slashings = append(slashings, doubleVoteSlashings...) + slashings := make([]*ethpb.AttesterSlashing, 0, len(surroundingSlashings)+len(surroundedSlashings)) slashings = append(slashings, surroundingSlashings...) slashings = append(slashings, surroundedSlashings...) - if len(slashings) > 0 { - log.WithField("numSlashings", len(slashings)).Info("Slashable attestation offenses found") + if err := s.saveUpdatedChunks(ctx, args, updatedChunks); err != nil { + return nil, err } return slashings, nil } @@ -167,6 +200,44 @@ func (s *Service) checkDoubleVotesOnDisk( return doubleVoteSlashings, nil } +// This function updates the slashing spans for a given validator for a change in epoch +// since the last epoch we have recorded for the validator. For example, if the last epoch a validator +// has written is N, and the current epoch is N+5, we update entries in the slashing spans +// with their neutral element for epochs N+1 to N+4. This also puts any loaded chunks in a +// map used as a cache for further processing and minimizing database reads later on. +func (s *Service) epochUpdateForValidator( + ctx context.Context, + args *chunkUpdateArgs, + updatedChunks map[uint64]Chunker, + validatorIndex types.ValidatorIndex, +) error { + epoch := s.latestEpochWrittenForValidator[validatorIndex] + if epoch == 0 { + return nil + } + for epoch <= args.currentEpoch { + chunkIdx := s.params.chunkIndex(epoch) + currentChunk, err := s.getChunk(ctx, args, updatedChunks, chunkIdx) + if err != nil { + return err + } + for s.params.chunkIndex(epoch) == chunkIdx && epoch <= args.currentEpoch { + if err := setChunkRawDistance( + s.params, + currentChunk.Chunk(), + validatorIndex, + epoch, + currentChunk.NeutralElement(), + ); err != nil { + return err + } + updatedChunks[chunkIdx] = currentChunk + epoch++ + } + } + return nil +} + // Updates spans and detects any slashable attester offenses along the way. // 1. Determine the chunks we need to use for updating for the validator indices // in a validator chunk index, then retrieve those chunks from the database. @@ -177,30 +248,12 @@ func (s *Service) checkDoubleVotesOnDisk( // 3. Save the updated chunks to disk. func (s *Service) updateSpans( ctx context.Context, + updatedChunks map[uint64]Chunker, args *chunkUpdateArgs, attestationsByChunkIdx map[uint64][]*slashertypes.IndexedAttestationWrapper, ) ([]*ethpb.AttesterSlashing, error) { ctx, span := trace.StartSpan(ctx, "Slasher.updateSpans") defer span.End() - // Determine the chunk indices we need to use for slashing detection. - validatorIndices := s.params.validatorIndicesInChunk(args.validatorChunkIndex) - chunkIndices, err := s.determineChunksToUpdateForValidators(ctx, args, validatorIndices) - if err != nil { - return nil, errors.Wrapf( - err, - "could not determine chunks to update for validator indices %v", - validatorIndices, - ) - } - // Load the required chunks from disk. - chunksByChunkIdx, err := s.loadChunks(ctx, args, chunkIndices) - if err != nil { - return nil, errors.Wrapf( - err, - "could not load chunks for chunk indices %v", - chunkIndices, - ) - } // Apply the attestations to the related chunks and find any // slashings along the way. @@ -227,7 +280,7 @@ func (s *Service) updateSpans( ctx, args, validatorIndex, - chunksByChunkIdx, + updatedChunks, att, ) if err != nil { @@ -245,52 +298,7 @@ func (s *Service) updateSpans( } // Write the updated chunks to disk. - return slashings, s.saveUpdatedChunks(ctx, args, chunksByChunkIdx) -} - -// For a list of validator indices, we retrieve their latest written epoch. Then, for each -// (validator, latest epoch written) pair, we determine the chunks we need to update and -// perform slashing detection with. -func (s *Service) determineChunksToUpdateForValidators( - ctx context.Context, - args *chunkUpdateArgs, - validatorIndices []types.ValidatorIndex, -) (chunkIndices []uint64, err error) { - ctx, span := trace.StartSpan(ctx, "Slasher.determineChunksToUpdateForValidators") - defer span.End() - lastCurrentEpochs, err := s.serviceCfg.Database.LastEpochWrittenForValidators(ctx, validatorIndices) - if err != nil { - err = errors.Wrap(err, "could not get latest epoch attested for validators") - return - } - - // Initialize the last epoch written for each validator to 0. - lastCurrentEpochByValidator := make(map[types.ValidatorIndex]types.Epoch, len(validatorIndices)) - for _, valIdx := range validatorIndices { - lastCurrentEpochByValidator[valIdx] = 0 - } - for _, lastEpoch := range lastCurrentEpochs { - lastCurrentEpochByValidator[lastEpoch.ValidatorIndex] = lastEpoch.Epoch - } - - // For every single validator and their last written current epoch, we determine - // the chunk indices we need to update based on all the chunks between the last - // epoch written and the current epoch, inclusive. - chunkIndicesToUpdate := make(map[uint64]bool) - - for _, epoch := range lastCurrentEpochByValidator { - latestEpochWritten := epoch - for latestEpochWritten <= args.currentEpoch { - chunkIdx := s.params.chunkIndex(latestEpochWritten) - chunkIndicesToUpdate[chunkIdx] = true - latestEpochWritten++ - } - } - chunkIndices = make([]uint64, 0, len(chunkIndicesToUpdate)) - for chunkIdx := range chunkIndicesToUpdate { - chunkIndices = append(chunkIndices, chunkIdx) - } - return + return slashings, nil } // Checks if an incoming attestation is slashable based on the validator chunk it diff --git a/beacon-chain/slasher/detect_attestations_test.go b/beacon-chain/slasher/detect_attestations_test.go index 7d59ed36bc..3f76fa930b 100644 --- a/beacon-chain/slasher/detect_attestations_test.go +++ b/beacon-chain/slasher/detect_attestations_test.go @@ -3,7 +3,6 @@ package slasher import ( "context" "fmt" - "sort" "testing" "time" @@ -241,9 +240,10 @@ func Test_processQueuedAttestations(t *testing.T) { AttestationStateFetcher: mockChain, SlashingPoolInserter: &slashings.PoolMock{}, }, - params: DefaultParams(), - attsQueue: newAttestationsQueue(), - genesisTime: genesisTime, + params: DefaultParams(), + attsQueue: newAttestationsQueue(), + genesisTime: genesisTime, + latestEpochWrittenForValidator: map[types.ValidatorIndex]types.Epoch{}, } currentSlotChan := make(chan types.Slot) exitChan := make(chan struct{}) @@ -299,9 +299,10 @@ func Test_processQueuedAttestations_MultipleChunkIndices(t *testing.T) { AttestationStateFetcher: mockChain, SlashingPoolInserter: &slashings.PoolMock{}, }, - params: slasherParams, - attsQueue: newAttestationsQueue(), - genesisTime: genesisTime, + params: slasherParams, + attsQueue: newAttestationsQueue(), + genesisTime: genesisTime, + latestEpochWrittenForValidator: map[types.ValidatorIndex]types.Epoch{}, } currentSlotChan := make(chan types.Slot) exitChan := make(chan struct{}) @@ -365,9 +366,10 @@ func Test_processQueuedAttestations_OverlappingChunkIndices(t *testing.T) { AttestationStateFetcher: mockChain, SlashingPoolInserter: &slashings.PoolMock{}, }, - params: slasherParams, - attsQueue: newAttestationsQueue(), - genesisTime: genesisTime, + params: slasherParams, + attsQueue: newAttestationsQueue(), + genesisTime: genesisTime, + latestEpochWrittenForValidator: map[types.ValidatorIndex]types.Epoch{}, } currentSlotChan := make(chan types.Slot) exitChan := make(chan struct{}) @@ -397,9 +399,9 @@ func Test_processQueuedAttestations_OverlappingChunkIndices(t *testing.T) { require.LogsDoNotContain(t, hook, "Could not detect") } -func Test_determineChunksToUpdateForValidators_FromLatestWrittenEpoch(t *testing.T) { - slasherDB := dbtest.SetupSlasherDB(t) +func Test_epochUpdateForValidators(t *testing.T) { ctx := context.Background() + slasherDB := dbtest.SetupSlasherDB(t) // Check if the chunk at chunk index already exists in-memory. s := &Service{ @@ -408,71 +410,67 @@ func Test_determineChunksToUpdateForValidators_FromLatestWrittenEpoch(t *testing validatorChunkSize: 2, // 2 validators in a chunk. historyLength: 4, }, - serviceCfg: &ServiceConfig{ - Database: slasherDB, - StateNotifier: &mock.MockStateNotifier{}, - }, + serviceCfg: &ServiceConfig{Database: slasherDB}, + latestEpochWrittenForValidator: map[types.ValidatorIndex]types.Epoch{}, } - validators := []types.ValidatorIndex{ - 1, 2, - } - currentEpoch := types.Epoch(3) - // Set the latest written epoch for validators to current epoch - 1. - latestWrittenEpoch := currentEpoch - 1 - err := slasherDB.SaveLastEpochWrittenForValidators(ctx, validators, latestWrittenEpoch) - require.NoError(t, err) + t.Run("no update if no latest written epoch", func(t *testing.T) { + validators := []types.ValidatorIndex{ + 1, 2, + } + currentEpoch := types.Epoch(3) + // No last written epoch for both validators. + s.latestEpochWrittenForValidator = map[types.ValidatorIndex]types.Epoch{} - // Because the validators have no recorded latest epoch written in the database, - // Because the latest written epoch for the input validators is == 2, we expect - // that we will update all epochs from 2 up to 3 (the current epoch). This is all - // safe contained in chunk index 1. - chunkIndices, err := s.determineChunksToUpdateForValidators( - ctx, - &chunkUpdateArgs{ - currentEpoch: currentEpoch, - }, - validators, - ) - require.NoError(t, err) - require.DeepEqual(t, []uint64{1}, chunkIndices) -} - -func Test_determineChunksToUpdateForValidators_FromGenesis(t *testing.T) { - slasherDB := dbtest.SetupSlasherDB(t) - ctx := context.Background() - - // Check if the chunk at chunk index already exists in-memory. - s := &Service{ - params: &Parameters{ - chunkSize: 2, // 2 epochs in a chunk. - validatorChunkSize: 2, // 2 validators in a chunk. - historyLength: 4, - }, - serviceCfg: &ServiceConfig{ - Database: slasherDB, - StateNotifier: &mock.MockStateNotifier{}, - }, - } - validators := []types.ValidatorIndex{ - 1, 2, - } - // Because the validators have no recorded latest epoch written in the database, - // we expect that we will update all epochs from genesis up to the current epoch. - // Given the chunk size is 2 epochs per chunk, updating with current epoch == 3 - // will mean that we should be updating from epoch 0 to 3, meaning chunk indices 0 and 1. - chunkIndices, err := s.determineChunksToUpdateForValidators( - ctx, - &chunkUpdateArgs{ - currentEpoch: 3, - }, - validators, - ) - require.NoError(t, err) - sort.Slice(chunkIndices, func(i, j int) bool { - return chunkIndices[i] < chunkIndices[j] + // Because the validators have no recorded latest epoch written, we expect + // no chunks to be loaded nor updated to. + updatedChunks := make(map[uint64]Chunker) + for _, valIdx := range validators { + err := s.epochUpdateForValidator( + ctx, + &chunkUpdateArgs{ + currentEpoch: currentEpoch, + }, + updatedChunks, + valIdx, + ) + require.NoError(t, err) + } + require.Equal(t, 0, len(updatedChunks)) + }) + + t.Run("update from latest written epoch", func(t *testing.T) { + validators := []types.ValidatorIndex{ + 1, 2, + } + currentEpoch := types.Epoch(3) + + // Set the latest written epoch for validators to current epoch - 1. + latestWrittenEpoch := currentEpoch - 1 + s.latestEpochWrittenForValidator = map[types.ValidatorIndex]types.Epoch{ + 1: latestWrittenEpoch, + 2: latestWrittenEpoch, + } + + // Because the latest written epoch for the input validators is == 2, we expect + // that we will update all epochs from 2 up to 3 (the current epoch). This is all + // safe contained in chunk index 1. + updatedChunks := make(map[uint64]Chunker) + for _, valIdx := range validators { + err := s.epochUpdateForValidator( + ctx, + &chunkUpdateArgs{ + currentEpoch: currentEpoch, + }, + updatedChunks, + valIdx, + ) + require.NoError(t, err) + } + require.Equal(t, 1, len(updatedChunks)) + _, ok := updatedChunks[1] + require.Equal(t, true, ok) }) - require.DeepEqual(t, []uint64{0, 1}, chunkIndices) } func Test_applyAttestationForValidator_MinSpanChunk(t *testing.T) { @@ -485,6 +483,7 @@ func Test_applyAttestationForValidator_MinSpanChunk(t *testing.T) { Database: slasherDB, StateNotifier: &mock.MockStateNotifier{}, }, + latestEpochWrittenForValidator: map[types.ValidatorIndex]types.Epoch{}, } // We initialize an empty chunks slice. chunk := EmptyMinSpanChunksSlice(params) @@ -545,6 +544,7 @@ func Test_applyAttestationForValidator_MaxSpanChunk(t *testing.T) { Database: slasherDB, StateNotifier: &mock.MockStateNotifier{}, }, + latestEpochWrittenForValidator: map[types.ValidatorIndex]types.Epoch{}, } // We initialize an empty chunks slice. chunk := EmptyMaxSpanChunksSlice(params) @@ -796,7 +796,7 @@ func TestService_processQueuedAttestations(t *testing.T) { tickerChan <- 1 cancel() <-exitChan - assert.LogsContain(t, hook, "New slot, processing queued") + assert.LogsContain(t, hook, "Processing queued") } func BenchmarkCheckSlashableAttestations(b *testing.B) { @@ -885,7 +885,8 @@ func runAttestationsBenchmark(b *testing.B, s *Service, numAtts, numValidators u genesisTime := time.Now().Add(-time.Second * time.Duration(totalSeconds)) s.genesisTime = genesisTime - _, err := s.checkSlashableAttestations(context.Background(), atts) + epoch := slots.EpochsSinceGenesis(genesisTime) + _, err := s.checkSlashableAttestations(context.Background(), epoch, atts) require.NoError(b, err) } } diff --git a/beacon-chain/slasher/receive.go b/beacon-chain/slasher/receive.go index b73ef2f5fd..8c1fb969b7 100644 --- a/beacon-chain/slasher/receive.go +++ b/beacon-chain/slasher/receive.go @@ -72,7 +72,7 @@ func (s *Service) receiveBlocks(ctx context.Context, beaconBlockHeadersChan chan } } -// Process queued attestations every time an epoch ticker fires. We retrieve +// Process queued attestations every time a slot ticker fires. We retrieve // these attestations from a queue, then group them all by validator chunk index. // This grouping will allow us to perform detection on batches of attestations // per validator chunk index which can be done concurrently. @@ -98,9 +98,8 @@ func (s *Service) processQueuedAttestations(ctx context.Context, slotTicker <-ch "numValidAtts": len(validAtts), "numDeferredAtts": len(validInFuture), "numDroppedAtts": numDropped, - }).Info("New slot, processing queued atts for slashing detection") + }).Info("Processing queued attestations for slashing detection") - start := time.Now() // Save the attestation records to our database. if err := s.serviceCfg.Database.SaveAttestationRecordsForValidators( ctx, validAtts, @@ -110,7 +109,7 @@ func (s *Service) processQueuedAttestations(ctx context.Context, slotTicker <-ch } // Check for slashings. - slashings, err := s.checkSlashableAttestations(ctx, validAtts) + slashings, err := s.checkSlashableAttestations(ctx, currentEpoch, validAtts) if err != nil { log.WithError(err).Error("Could not check slashable attestations") continue @@ -123,8 +122,6 @@ func (s *Service) processQueuedAttestations(ctx context.Context, slotTicker <-ch continue } - log.WithField("elapsed", time.Since(start)).Debug("Done checking slashable attestations") - processedAttestationsTotal.Add(float64(len(validAtts))) case <-ctx.Done(): return @@ -147,7 +144,7 @@ func (s *Service) processQueuedBlocks(ctx context.Context, slotTicker <-chan typ "currentSlot": currentSlot, "currentEpoch": currentEpoch, "numBlocks": len(blocks), - }).Info("New slot, processing queued blocks for slashing detection") + }).Info("Processing queued blocks for slashing detection") start := time.Now() // Check for slashings. @@ -202,6 +199,7 @@ func (s *Service) pruneSlasherDataWithinSlidingWindow(ctx context.Context, curre // attempt to prune at all. return nil } + start := time.Now() log.WithFields(logrus.Fields{ "currentEpoch": currentEpoch, "pruningAllBeforeEpoch": maxPruningEpoch, @@ -221,6 +219,7 @@ func (s *Service) pruneSlasherDataWithinSlidingWindow(ctx context.Context, curre log.WithFields(logrus.Fields{ "prunedAttestationRecords": numPrunedAtts, "prunedProposalRecords": numPrunedProposals, - }).Info("Successfully pruned slasher data") + "elapsed": time.Since(start), + }).Info("Successfully ran slasher data pruning") return nil } diff --git a/beacon-chain/slasher/receive_test.go b/beacon-chain/slasher/receive_test.go index c871d33d90..06c2df6af3 100644 --- a/beacon-chain/slasher/receive_test.go +++ b/beacon-chain/slasher/receive_test.go @@ -306,5 +306,5 @@ func TestService_processQueuedBlocks(t *testing.T) { tickerChan <- 0 cancel() <-exitChan - assert.LogsContain(t, hook, "New slot, processing queued") + assert.LogsContain(t, hook, "Processing queued") } diff --git a/beacon-chain/slasher/rpc.go b/beacon-chain/slasher/rpc.go index ee5a1b0a32..120d39e56b 100644 --- a/beacon-chain/slasher/rpc.go +++ b/beacon-chain/slasher/rpc.go @@ -7,6 +7,7 @@ import ( types "github.com/prysmaticlabs/eth2-types" slashertypes "github.com/prysmaticlabs/prysm/beacon-chain/slasher/types" ethpb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1" + "github.com/prysmaticlabs/prysm/time/slots" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) @@ -59,7 +60,8 @@ func (s *Service) IsSlashableAttestation( SigningRoot: dataRoot, } - attesterSlashings, err := s.checkSlashableAttestations(ctx, []*slashertypes.IndexedAttestationWrapper{indexedAttWrapper}) + currentEpoch := slots.EpochsSinceGenesis(s.genesisTime) + attesterSlashings, err := s.checkSlashableAttestations(ctx, currentEpoch, []*slashertypes.IndexedAttestationWrapper{indexedAttWrapper}) if err != nil { return nil, status.Errorf(codes.Internal, "Could not check if attestation is slashable: %v", err) } diff --git a/beacon-chain/slasher/rpc_test.go b/beacon-chain/slasher/rpc_test.go index 03ece4459d..0af55f6381 100644 --- a/beacon-chain/slasher/rpc_test.go +++ b/beacon-chain/slasher/rpc_test.go @@ -83,9 +83,10 @@ func TestIsSlashableAttestation(t *testing.T) { serviceCfg: &ServiceConfig{ Database: slasherDB, }, - params: DefaultParams(), - blksQueue: newBlocksQueue(), - genesisTime: genesisTime, + params: DefaultParams(), + blksQueue: newBlocksQueue(), + genesisTime: genesisTime, + latestEpochWrittenForValidator: map[types.ValidatorIndex]types.Epoch{}, } prevAtts := []*slashertypes.IndexedAttestationWrapper{ createAttestationWrapper(t, 2, 3, []uint64{0}, []byte{1}), @@ -93,7 +94,7 @@ func TestIsSlashableAttestation(t *testing.T) { } err := slasherDB.SaveAttestationRecordsForValidators(ctx, prevAtts) require.NoError(t, err) - attesterSlashings, err := s.checkSlashableAttestations(ctx, prevAtts) + attesterSlashings, err := s.checkSlashableAttestations(ctx, currentEpoch, prevAtts) require.NoError(t, err) require.Equal(t, 0, len(attesterSlashings)) @@ -125,7 +126,7 @@ func TestIsSlashableAttestation(t *testing.T) { { name: "should detect multiple surround if multiple same indices", attToCheck: createAttestationWrapper(t, 1, 4, []uint64{0, 1}, []byte{2}), - amtSlashable: 2, + amtSlashable: 4, }, } for _, tt := range tests { diff --git a/beacon-chain/slasher/service.go b/beacon-chain/slasher/service.go index c1eae5908b..baa8ea3561 100644 --- a/beacon-chain/slasher/service.go +++ b/beacon-chain/slasher/service.go @@ -22,6 +22,10 @@ import ( "github.com/prysmaticlabs/prysm/time/slots" ) +const ( + shutdownTimeout = time.Minute * 5 +) + // ServiceConfig for the slasher service in the beacon node. // This struct allows us to specify required dependencies and // parameters for slasher to function as needed. @@ -49,77 +53,78 @@ type SlashingChecker interface { // Service defining a slasher implementation as part of // the beacon node, able to detect eth2 slashable offenses. type Service struct { - params *Parameters - serviceCfg *ServiceConfig - indexedAttsChan chan *ethpb.IndexedAttestation - beaconBlockHeadersChan chan *ethpb.SignedBeaconBlockHeader - attsQueue *attestationsQueue - blksQueue *blocksQueue - ctx context.Context - cancel context.CancelFunc - genesisTime time.Time - attsSlotTicker *slots.SlotTicker - blocksSlotTicker *slots.SlotTicker - pruningSlotTicker *slots.SlotTicker + params *Parameters + serviceCfg *ServiceConfig + indexedAttsChan chan *ethpb.IndexedAttestation + beaconBlockHeadersChan chan *ethpb.SignedBeaconBlockHeader + attsQueue *attestationsQueue + blksQueue *blocksQueue + ctx context.Context + cancel context.CancelFunc + genesisTime time.Time + attsSlotTicker *slots.SlotTicker + blocksSlotTicker *slots.SlotTicker + pruningSlotTicker *slots.SlotTicker + latestEpochWrittenForValidator map[types.ValidatorIndex]types.Epoch } // New instantiates a new slasher from configuration values. func New(ctx context.Context, srvCfg *ServiceConfig) (*Service, error) { ctx, cancel := context.WithCancel(ctx) return &Service{ - params: DefaultParams(), - serviceCfg: srvCfg, - indexedAttsChan: make(chan *ethpb.IndexedAttestation, 1), - beaconBlockHeadersChan: make(chan *ethpb.SignedBeaconBlockHeader, 1), - attsQueue: newAttestationsQueue(), - blksQueue: newBlocksQueue(), - ctx: ctx, - cancel: cancel, + params: DefaultParams(), + serviceCfg: srvCfg, + indexedAttsChan: make(chan *ethpb.IndexedAttestation, 1), + beaconBlockHeadersChan: make(chan *ethpb.SignedBeaconBlockHeader, 1), + attsQueue: newAttestationsQueue(), + blksQueue: newBlocksQueue(), + ctx: ctx, + cancel: cancel, + latestEpochWrittenForValidator: make(map[types.ValidatorIndex]types.Epoch), }, nil } // Start listening for received indexed attestations and blocks // and perform slashing detection on them. func (s *Service) Start() { - go s.run() + go s.run() // Start functions must be non-blocking. } func (s *Service) run() { - stateChannel := make(chan *feed.Event, 1) - stateSub := s.serviceCfg.StateNotifier.StateFeed().Subscribe(stateChannel) - stateEvent := <-stateChannel + s.waitForChainInitialization() + s.waitForSync(s.genesisTime) - // Wait for us to receive the genesis time via a chain started notification. - if stateEvent.Type == statefeed.ChainStarted { - data, ok := stateEvent.Data.(*statefeed.ChainStartedData) - if !ok { - log.Error("Could not receive chain start notification, want *statefeed.ChainStartedData") - return - } - s.genesisTime = data.StartTime - log.WithField("genesisTime", s.genesisTime).Info("Starting slasher, received chain start event") - } else if stateEvent.Type == statefeed.Initialized { - // Alternatively, if the chain has already started, we then read the genesis - // time value from this data. - data, ok := stateEvent.Data.(*statefeed.InitializedData) - if !ok { - log.Error("Could not receive chain start notification, want *statefeed.ChainStartedData") - return - } - s.genesisTime = data.StartTime - log.WithField("genesisTime", s.genesisTime).Info("Starting slasher, chain already initialized") - } else { - // This should not happen. - log.Error("Could start slasher, could not receive chain start event") + log.Info("Completed chain sync, starting slashing detection") + + // Get the latest eopch written for each validator from disk on startup. + headState, err := s.serviceCfg.HeadStateFetcher.HeadState(s.ctx) + if err != nil { + log.WithError(err).Error("Failed to fetch head state") return } - - stateSub.Unsubscribe() - s.waitForSync(s.genesisTime) + numVals := headState.NumValidators() + validatorIndices := make([]types.ValidatorIndex, numVals) + for i := 0; i < numVals; i++ { + validatorIndices[i] = types.ValidatorIndex(i) + } + start := time.Now() + log.Info("Reading last epoch written for each validator...") + epochsByValidator, err := s.serviceCfg.Database.LastEpochWrittenForValidators( + s.ctx, validatorIndices, + ) + if err != nil { + log.Error(err) + return + } + for _, item := range epochsByValidator { + s.latestEpochWrittenForValidator[item.ValidatorIndex] = item.Epoch + } + log.WithField("elapsed", time.Since(start)).Info( + "Finished retrieving last epoch written per validator", + ) indexedAttsChan := make(chan *ethpb.IndexedAttestation, 1) beaconBlockHeadersChan := make(chan *ethpb.SignedBeaconBlockHeader, 1) - log.Info("Completed chain sync, starting slashing detection") go s.receiveAttestations(s.ctx, indexedAttsChan) go s.receiveBlocks(s.ctx, beaconBlockHeadersChan) @@ -144,6 +149,20 @@ func (s *Service) Stop() error { if s.pruningSlotTicker != nil { s.pruningSlotTicker.Done() } + // Flush the latest epoch written map to disk. + start := time.Now() + // New context as the service context has already been canceled. + ctx, innerCancel := context.WithTimeout(context.Background(), shutdownTimeout) + defer innerCancel() + log.Info("Flushing last epoch written for each validator to disk, please wait") + if err := s.serviceCfg.Database.SaveLastEpochsWrittenForValidators( + ctx, s.latestEpochWrittenForValidator, + ); err != nil { + log.Error(err) + } + log.WithField("elapsed", time.Since(start)).Debug( + "Finished saving last epoch written per validator", + ) return nil } @@ -152,8 +171,45 @@ func (s *Service) Status() error { return nil } +func (s *Service) waitForChainInitialization() { + stateChannel := make(chan *feed.Event, 1) + stateSub := s.serviceCfg.StateNotifier.StateFeed().Subscribe(stateChannel) + defer stateSub.Unsubscribe() + defer close(stateChannel) + for { + select { + case stateEvent := <-stateChannel: + // Wait for us to receive the genesis time via a chain started notification. + if stateEvent.Type == statefeed.Initialized { + // Alternatively, if the chain has already started, we then read the genesis + // time value from this data. + data, ok := stateEvent.Data.(*statefeed.InitializedData) + if !ok { + log.Error( + "Could not receive chain start notification, want *statefeed.ChainStartedData", + ) + return + } + s.genesisTime = data.StartTime + log.WithField("genesisTime", s.genesisTime).Info( + "Slasher received chain initialization event", + ) + return + } + case err := <-stateSub.Err(): + log.WithError(err).Error( + "Slasher could not subscribe to state events", + ) + return + case <-s.ctx.Done(): + return + } + } + +} + func (s *Service) waitForSync(genesisTime time.Time) { - if slots.SinceGenesis(genesisTime) == 0 || !s.serviceCfg.SyncChecker.Syncing() { + if slots.SinceGenesis(genesisTime) < params.BeaconConfig().SlotsPerEpoch || !s.serviceCfg.SyncChecker.Syncing() { return } slotTicker := slots.NewSlotTicker(s.genesisTime, params.BeaconConfig().SecondsPerSlot) diff --git a/beacon-chain/slasher/service_test.go b/beacon-chain/slasher/service_test.go index c775006d01..5704145ead 100644 --- a/beacon-chain/slasher/service_test.go +++ b/beacon-chain/slasher/service_test.go @@ -30,44 +30,7 @@ func TestMain(m *testing.M) { m.Run() } -func TestService_StartStop_ChainStartEvent(t *testing.T) { - slasherDB := dbtest.SetupSlasherDB(t) - hook := logTest.NewGlobal() - - beaconState, err := util.NewBeaconState() - require.NoError(t, err) - currentSlot := types.Slot(4) - require.NoError(t, beaconState.SetSlot(currentSlot)) - mockChain := &mock.ChainService{ - State: beaconState, - Slot: ¤tSlot, - } - - srv, err := New(context.Background(), &ServiceConfig{ - IndexedAttestationsFeed: new(event.Feed), - BeaconBlockHeadersFeed: new(event.Feed), - StateNotifier: &mock.MockStateNotifier{}, - Database: slasherDB, - HeadStateFetcher: mockChain, - SyncChecker: &mockSync.Sync{IsSyncing: false}, - }) - require.NoError(t, err) - go srv.Start() - time.Sleep(time.Millisecond * 100) - srv.serviceCfg.StateNotifier.StateFeed().Send(&feed.Event{ - Type: statefeed.ChainStarted, - Data: &statefeed.ChainStartedData{StartTime: time.Now()}, - }) - time.Sleep(time.Millisecond * 100) - srv.attsSlotTicker = &slots.SlotTicker{} - srv.blocksSlotTicker = &slots.SlotTicker{} - srv.pruningSlotTicker = &slots.SlotTicker{} - require.NoError(t, srv.Stop()) - require.NoError(t, srv.Status()) - require.LogsContain(t, hook, "received chain start event") -} - -func TestService_StartStop_ChainAlreadyInitialized(t *testing.T) { +func TestService_StartStop_ChainInitialized(t *testing.T) { slasherDB := dbtest.SetupSlasherDB(t) hook := logTest.NewGlobal() beaconState, err := util.NewBeaconState() @@ -99,5 +62,5 @@ func TestService_StartStop_ChainAlreadyInitialized(t *testing.T) { srv.pruningSlotTicker = &slots.SlotTicker{} require.NoError(t, srv.Stop()) require.NoError(t, srv.Status()) - require.LogsContain(t, hook, "chain already initialized") + require.LogsContain(t, hook, "received chain initialization") } diff --git a/beacon-chain/sync/BUILD.bazel b/beacon-chain/sync/BUILD.bazel index ee963c0655..f0f6dc2639 100644 --- a/beacon-chain/sync/BUILD.bazel +++ b/beacon-chain/sync/BUILD.bazel @@ -48,7 +48,7 @@ go_library( importpath = "github.com/prysmaticlabs/prysm/beacon-chain/sync", visibility = [ "//beacon-chain:__subpackages__", - "//testing/fuzz:__pkg__", + "//testing:__subpackages__", ], deps = [ "//async:go_default_library", diff --git a/testing/endtoend/slasher_simulator_e2e_test.go b/testing/endtoend/slasher_simulator_e2e_test.go index 6ea5f6d0c8..62deb8106d 100644 --- a/testing/endtoend/slasher_simulator_e2e_test.go +++ b/testing/endtoend/slasher_simulator_e2e_test.go @@ -19,6 +19,32 @@ import ( logTest "github.com/sirupsen/logrus/hooks/test" ) +type mockSyncChecker struct{} + +func (c mockSyncChecker) Initialized() bool { + return true +} + +func (c mockSyncChecker) Syncing() bool { + return false +} + +func (c mockSyncChecker) Synced() bool { + return true +} + +func (c mockSyncChecker) Status() error { + return nil +} + +func (c mockSyncChecker) Resync() error { + return nil +} + +func (mockSyncChecker) IsSynced(_ context.Context) (bool, error) { + return true, nil +} + func TestEndToEnd_SlasherSimulator(t *testing.T) { hook := logTest.NewGlobal() ctx := context.Background() @@ -66,6 +92,7 @@ func TestEndToEnd_SlasherSimulator(t *testing.T) { StateGen: gen, PrivateKeysByValidatorIndex: privKeys, SlashingsPool: &slashings.PoolMock{}, + SyncChecker: mockSyncChecker{}, }) require.NoError(t, err) sim.Start() diff --git a/testing/slasher/simulator/BUILD.bazel b/testing/slasher/simulator/BUILD.bazel index 7837b9229f..3565c27463 100644 --- a/testing/slasher/simulator/BUILD.bazel +++ b/testing/slasher/simulator/BUILD.bazel @@ -24,6 +24,7 @@ go_library( "//beacon-chain/slasher:go_default_library", "//beacon-chain/state:go_default_library", "//beacon-chain/state/stategen:go_default_library", + "//beacon-chain/sync:go_default_library", "//config/params:go_default_library", "//crypto/bls:go_default_library", "//crypto/rand:go_default_library", diff --git a/testing/slasher/simulator/attestation_generator.go b/testing/slasher/simulator/attestation_generator.go index bfc6e00b9d..5ea4b6b81c 100644 --- a/testing/slasher/simulator/attestation_generator.go +++ b/testing/slasher/simulator/attestation_generator.go @@ -29,10 +29,10 @@ func (s *Simulator) generateAttestationsForSlot( (committeesPerSlot * uint64(s.srvConfig.Params.SlotsPerEpoch)) valsPerSlot := committeesPerSlot * valsPerCommittee - var sourceEpoch types.Epoch = 0 - if currentEpoch != 0 { - sourceEpoch = currentEpoch - 1 + if currentEpoch < 2 { + return nil, nil, nil } + sourceEpoch := currentEpoch - 1 var slashedIndices []uint64 startIdx := valsPerSlot * uint64(slot%s.srvConfig.Params.SlotsPerEpoch) diff --git a/testing/slasher/simulator/simulator.go b/testing/slasher/simulator/simulator.go index a94152e0a1..85c217e4ff 100644 --- a/testing/slasher/simulator/simulator.go +++ b/testing/slasher/simulator/simulator.go @@ -2,6 +2,7 @@ package simulator import ( "context" + "fmt" "time" types "github.com/prysmaticlabs/eth2-types" @@ -13,6 +14,7 @@ import ( "github.com/prysmaticlabs/prysm/beacon-chain/operations/slashings" "github.com/prysmaticlabs/prysm/beacon-chain/slasher" "github.com/prysmaticlabs/prysm/beacon-chain/state/stategen" + "github.com/prysmaticlabs/prysm/beacon-chain/sync" "github.com/prysmaticlabs/prysm/config/params" "github.com/prysmaticlabs/prysm/crypto/bls" ethpb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1" @@ -32,6 +34,7 @@ type ServiceConfig struct { StateGen stategen.StateManager SlashingsPool slashings.PoolManager PrivateKeysByValidatorIndex map[types.ValidatorIndex]bls.SecretKey + SyncChecker sync.Checker } // Parameters for a slasher simulator. @@ -90,6 +93,7 @@ func New(ctx context.Context, srvConfig *ServiceConfig) (*Simulator, error) { AttestationStateFetcher: srvConfig.AttestationStateFetcher, StateGen: srvConfig.StateGen, SlashingPoolInserter: srvConfig.SlashingsPool, + SyncChecker: srvConfig.SyncChecker, }) if err != nil { return nil, err @@ -132,8 +136,8 @@ func (s *Simulator) Start() { time.Sleep(time.Second) s.genesisTime = time.Now() s.srvConfig.StateNotifier.StateFeed().Send(&feed.Event{ - Type: statefeed.ChainStarted, - Data: &statefeed.ChainStartedData{StartTime: s.genesisTime}, + Type: statefeed.Initialized, + Data: &statefeed.InitializedData{StartTime: s.genesisTime}, }) // We simulate blocks and attestations for N epochs. @@ -256,10 +260,12 @@ func (s *Simulator) verifySlashingsWereDetected(ctx context.Context) { for slashingRoot, slashing := range s.sentAttesterSlashings { if _, ok := detectedAttesterSlashings[slashingRoot]; !ok { log.WithFields(logrus.Fields{ - "targetEpoch": slashing.Attestation_1.Data.Target.Epoch, - "prevTargetEpoch": slashing.Attestation_2.Data.Target.Epoch, - "sourceEpoch": slashing.Attestation_1.Data.Source.Epoch, - "prevSourceEpoch": slashing.Attestation_2.Data.Source.Epoch, + "targetEpoch": slashing.Attestation_1.Data.Target.Epoch, + "prevTargetEpoch": slashing.Attestation_2.Data.Target.Epoch, + "sourceEpoch": slashing.Attestation_1.Data.Source.Epoch, + "prevSourceEpoch": slashing.Attestation_2.Data.Source.Epoch, + "prevBeaconBlockRoot": fmt.Sprintf("%#x", slashing.Attestation_1.Data.BeaconBlockRoot), + "newBeaconBlockRoot": fmt.Sprintf("%#x", slashing.Attestation_2.Data.BeaconBlockRoot), }).Errorf("Did not detect simulated attester slashing") continue }