diff --git a/beacon-chain/slasher/BUILD.bazel b/beacon-chain/slasher/BUILD.bazel index e6a3bc7739..faee32660d 100644 --- a/beacon-chain/slasher/BUILD.bazel +++ b/beacon-chain/slasher/BUILD.bazel @@ -4,6 +4,7 @@ go_library( name = "go_default_library", srcs = [ "chunks.go", + "detect_attestations.go", "doc.go", "helpers.go", "metrics.go", @@ -30,12 +31,14 @@ go_library( "//container/slice:go_default_library", "//encoding/bytesutil:go_default_library", "//proto/prysm/v1alpha1:go_default_library", + "//time/slots:go_default_library", "@com_github_ferranbt_fastssz//:go_default_library", "@com_github_pkg_errors//:go_default_library", "@com_github_prometheus_client_golang//prometheus:go_default_library", "@com_github_prometheus_client_golang//prometheus/promauto:go_default_library", "@com_github_prysmaticlabs_eth2_types//:go_default_library", "@com_github_sirupsen_logrus//:go_default_library", + "@io_opencensus_go//trace:go_default_library", ], ) @@ -43,6 +46,7 @@ go_test( name = "go_default_test", srcs = [ "chunks_test.go", + "detect_attestations_test.go", "helpers_test.go", "params_test.go", "process_slashings_test.go", @@ -53,6 +57,7 @@ go_test( deps = [ "//async/event:go_default_library", "//beacon-chain/blockchain/testing:go_default_library", + "//beacon-chain/core:go_default_library", "//beacon-chain/core/signing:go_default_library", "//beacon-chain/db/testing:go_default_library", "//beacon-chain/operations/slashings:go_default_library", diff --git a/beacon-chain/slasher/detect_attestations.go b/beacon-chain/slasher/detect_attestations.go new file mode 100644 index 0000000000..9746b32bee --- /dev/null +++ b/beacon-chain/slasher/detect_attestations.go @@ -0,0 +1,474 @@ +package slasher + +import ( + "context" + "fmt" + + "github.com/pkg/errors" + types "github.com/prysmaticlabs/eth2-types" + slashertypes "github.com/prysmaticlabs/prysm/beacon-chain/slasher/types" + ethpb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1" + "github.com/prysmaticlabs/prysm/time/slots" + "go.opencensus.io/trace" +) + +// Takes in a list of indexed attestation wrappers and returns any +// found attester slashings to the caller. +func (s *Service) checkSlashableAttestations( + ctx context.Context, atts []*slashertypes.IndexedAttestationWrapper, +) ([]*ethpb.AttesterSlashing, error) { + currentEpoch := slots.EpochsSinceGenesis(s.genesisTime) + slashings := make([]*ethpb.AttesterSlashing, 0) + indices := make([]types.ValidatorIndex, 0) + + // TODO(#8331): Consider using goroutines and wait groups here. + groupedAtts := s.groupByValidatorChunkIndex(atts) + for validatorChunkIdx, batch := range groupedAtts { + attSlashings, err := s.detectAllAttesterSlashings(ctx, &chunkUpdateArgs{ + validatorChunkIndex: validatorChunkIdx, + currentEpoch: currentEpoch, + }, batch) + + slashings = append(slashings, attSlashings...) + if err != nil { + return nil, errors.Wrap(err, "Could not detect slashable attestations") + } + indices = append(indices, s.params.validatorIndicesInChunk(validatorChunkIdx)...) + } + if err := s.serviceCfg.Database.SaveLastEpochWrittenForValidators(ctx, indices, currentEpoch); err != nil { + return nil, err + } + return slashings, nil +} + +// Given a list of attestations all corresponding to a validator chunk index as well +// as the current epoch in time, we perform slashing detection. +// The process is as follows given a list of attestations: +// +// 1. Check for attester double votes using the list of attestations. +// 2. Group the attestations by chunk index. +// 3. Update the min and max spans for those grouped attestations, check if any slashings are +// found in the process +// 4. Update the latest written epoch for all validators involved to the current epoch. +// +// This function performs a lot of critical actions and is split into smaller helpers for cleanliness. +func (s *Service) detectAllAttesterSlashings( + ctx context.Context, + args *chunkUpdateArgs, + attestations []*slashertypes.IndexedAttestationWrapper, +) ([]*ethpb.AttesterSlashing, error) { + // Check for double votes. + doubleVoteSlashings, err := s.checkDoubleVotes(ctx, attestations) + if err != nil { + return nil, errors.Wrap(err, "could not check slashable double votes") + } + + // Group attestations by chunk index. + groupedAtts := s.groupByChunkIndex(attestations) + + // Update min and max spans and retrieve any detected slashable offenses. + surroundingSlashings, err := s.updateSpans(ctx, &chunkUpdateArgs{ + kind: slashertypes.MinSpan, + validatorChunkIndex: args.validatorChunkIndex, + currentEpoch: args.currentEpoch, + }, groupedAtts) + if err != nil { + return nil, errors.Wrapf( + err, + "could not update min attestation spans for validator chunk index %d", + args.validatorChunkIndex, + ) + } + + surroundedSlashings, err := s.updateSpans(ctx, &chunkUpdateArgs{ + kind: slashertypes.MaxSpan, + validatorChunkIndex: args.validatorChunkIndex, + currentEpoch: args.currentEpoch, + }, groupedAtts) + if err != nil { + return nil, errors.Wrapf( + err, + "could not update max attestation spans for validator chunk index %d", + args.validatorChunkIndex, + ) + } + + // Consolidate all slashings into a slice. + slashings := make([]*ethpb.AttesterSlashing, 0) + slashings = append(slashings, doubleVoteSlashings...) + slashings = append(slashings, surroundingSlashings...) + slashings = append(slashings, surroundedSlashings...) + if len(slashings) > 0 { + log.WithField("numSlashings", len(slashings)).Info("Slashable attestation offenses found") + } + return slashings, nil +} + +// Check for attester slashing double votes by looking at every single validator index +// in each attestation's attesting indices and checking if there already exist records for such +// attestation's target epoch. If so, we append a double vote slashing object to a list of slashings +// we return to the caller. +func (s *Service) checkDoubleVotes( + ctx context.Context, attestations []*slashertypes.IndexedAttestationWrapper, +) ([]*ethpb.AttesterSlashing, error) { + ctx, span := trace.StartSpan(ctx, "Slasher.checkDoubleVotes") + defer span.End() + // We check if there are any slashable double votes in the input list + // of attestations with respect to each other. + slashings := make([]*ethpb.AttesterSlashing, 0) + existingAtts := make(map[string]*slashertypes.IndexedAttestationWrapper) + for _, att := range attestations { + for _, valIdx := range att.IndexedAttestation.AttestingIndices { + key := uintToString(uint64(att.IndexedAttestation.Data.Target.Epoch)) + ":" + uintToString(valIdx) + existingAtt, ok := existingAtts[key] + if !ok { + existingAtts[key] = att + continue + } + if att.SigningRoot != existingAtt.SigningRoot { + doubleVotesTotal.Inc() + slashings = append(slashings, ðpb.AttesterSlashing{ + Attestation_1: existingAtt.IndexedAttestation, + Attestation_2: att.IndexedAttestation, + }) + } + } + } + + // We check if there are any slashable double votes in the input list + // of attestations with respect to our database. + moreSlashings, err := s.checkDoubleVotesOnDisk(ctx, attestations) + if err != nil { + return nil, errors.Wrap(err, "could not check attestation double votes on disk") + } + return append(slashings, moreSlashings...), nil +} + +// Check for double votes in our database given a list of incoming attestations. +func (s *Service) checkDoubleVotesOnDisk( + ctx context.Context, attestations []*slashertypes.IndexedAttestationWrapper, +) ([]*ethpb.AttesterSlashing, error) { + ctx, span := trace.StartSpan(ctx, "Slasher.checkDoubleVotesOnDisk") + defer span.End() + doubleVotes, err := s.serviceCfg.Database.CheckAttesterDoubleVotes( + ctx, attestations, + ) + if err != nil { + return nil, errors.Wrap(err, "could not retrieve potential double votes from disk") + } + doubleVoteSlashings := make([]*ethpb.AttesterSlashing, 0) + for _, doubleVote := range doubleVotes { + doubleVotesTotal.Inc() + doubleVoteSlashings = append(doubleVoteSlashings, ðpb.AttesterSlashing{ + Attestation_1: doubleVote.PrevAttestationWrapper.IndexedAttestation, + Attestation_2: doubleVote.AttestationWrapper.IndexedAttestation, + }) + } + return doubleVoteSlashings, nil +} + +// Updates spans and detects any slashable attester offenses along the way. +// 1. Determine the chunks we need to use for updating for the validator indices +// in a validator chunk index, then retrieve those chunks from the database. +// 2. Using the chunks from step (1): +// for every attestation by chunk index: +// for each validator in the attestation's attesting indices: +// - Check if the attestation is slashable, if so return a slashing object. +// 3. Save the updated chunks to disk. +func (s *Service) updateSpans( + ctx context.Context, + args *chunkUpdateArgs, + attestationsByChunkIdx map[uint64][]*slashertypes.IndexedAttestationWrapper, +) ([]*ethpb.AttesterSlashing, error) { + ctx, span := trace.StartSpan(ctx, "Slasher.updateSpans") + defer span.End() + // Determine the chunk indices we need to use for slashing detection. + validatorIndices := s.params.validatorIndicesInChunk(args.validatorChunkIndex) + chunkIndices, err := s.determineChunksToUpdateForValidators(ctx, args, validatorIndices) + if err != nil { + return nil, errors.Wrapf( + err, + "could not determine chunks to update for validator indices %v", + validatorIndices, + ) + } + // Load the required chunks from disk. + chunksByChunkIdx, err := s.loadChunks(ctx, args, chunkIndices) + if err != nil { + return nil, errors.Wrapf( + err, + "could not load chunks for chunk indices %v", + chunkIndices, + ) + } + + // Apply the attestations to the related chunks and find any + // slashings along the way. + slashings := make([]*ethpb.AttesterSlashing, 0) + for _, attestationBatch := range attestationsByChunkIdx { + for _, att := range attestationBatch { + for _, validatorIdx := range att.IndexedAttestation.AttestingIndices { + validatorIndex := types.ValidatorIndex(validatorIdx) + computedValidatorChunkIdx := s.params.validatorChunkIndex(validatorIndex) + + // Every validator chunk index represents a range of validators. + // If it possible that the validator index in this loop iteration is + // not part of the validator chunk index we are updating chunks for. + // + // For example, if there are 4 validators per validator chunk index, + // then validator chunk index 0 contains validator indices [0, 1, 2, 3] + // If we see an attestation with attesting indices [3, 4, 5] and we are updating + // chunks for validator chunk index 0, only validator index 3 should make + // it past this line. + if args.validatorChunkIndex != computedValidatorChunkIdx { + continue + } + slashing, err := s.applyAttestationForValidator( + ctx, + args, + validatorIndex, + chunksByChunkIdx, + att, + ) + if err != nil { + return nil, errors.Wrapf( + err, + "could not apply attestation for validator index %d", + validatorIndex, + ) + } + if slashing != nil { + slashings = append(slashings, slashing) + } + } + } + } + + // Write the updated chunks to disk. + return slashings, s.saveUpdatedChunks(ctx, args, chunksByChunkIdx) +} + +// For a list of validator indices, we retrieve their latest written epoch. Then, for each +// (validator, latest epoch written) pair, we determine the chunks we need to update and +// perform slashing detection with. +func (s *Service) determineChunksToUpdateForValidators( + ctx context.Context, + args *chunkUpdateArgs, + validatorIndices []types.ValidatorIndex, +) (chunkIndices []uint64, err error) { + ctx, span := trace.StartSpan(ctx, "Slasher.determineChunksToUpdateForValidators") + defer span.End() + lastCurrentEpochs, err := s.serviceCfg.Database.LastEpochWrittenForValidators(ctx, validatorIndices) + if err != nil { + err = errors.Wrap(err, "could not get latest epoch attested for validators") + return + } + + // Initialize the last epoch written for each validator to 0. + lastCurrentEpochByValidator := make(map[types.ValidatorIndex]types.Epoch, len(validatorIndices)) + for _, valIdx := range validatorIndices { + lastCurrentEpochByValidator[valIdx] = 0 + } + for _, lastEpoch := range lastCurrentEpochs { + lastCurrentEpochByValidator[lastEpoch.ValidatorIndex] = lastEpoch.Epoch + } + + // For every single validator and their last written current epoch, we determine + // the chunk indices we need to update based on all the chunks between the last + // epoch written and the current epoch, inclusive. + chunkIndicesToUpdate := make(map[uint64]bool) + + for _, epoch := range lastCurrentEpochByValidator { + latestEpochWritten := epoch + for latestEpochWritten <= args.currentEpoch { + chunkIdx := s.params.chunkIndex(latestEpochWritten) + chunkIndicesToUpdate[chunkIdx] = true + latestEpochWritten++ + } + } + chunkIndices = make([]uint64, 0, len(chunkIndicesToUpdate)) + for chunkIdx := range chunkIndicesToUpdate { + chunkIndices = append(chunkIndices, chunkIdx) + } + return +} + +// Checks if an incoming attestation is slashable based on the validator chunk it +// corresponds to. If a slashable offense is found, we return it to the caller. +// If not, then update every single chunk the attestation covers, starting from its +// source epoch up to its target. +func (s *Service) applyAttestationForValidator( + ctx context.Context, + args *chunkUpdateArgs, + validatorIndex types.ValidatorIndex, + chunksByChunkIdx map[uint64]Chunker, + attestation *slashertypes.IndexedAttestationWrapper, +) (*ethpb.AttesterSlashing, error) { + ctx, span := trace.StartSpan(ctx, "Slasher.applyAttestationForValidator") + defer span.End() + sourceEpoch := attestation.IndexedAttestation.Data.Source.Epoch + targetEpoch := attestation.IndexedAttestation.Data.Target.Epoch + + attestationDistance.Observe(float64(targetEpoch) - float64(sourceEpoch)) + + chunkIdx := s.params.chunkIndex(sourceEpoch) + chunk, err := s.getChunk(ctx, args, chunksByChunkIdx, chunkIdx) + if err != nil { + return nil, errors.Wrapf(err, "could not get chunk at index %d", chunkIdx) + } + + // Check slashable, if so, return the slashing. + slashing, err := chunk.CheckSlashable( + ctx, + s.serviceCfg.Database, + validatorIndex, + attestation, + ) + if err != nil { + return nil, errors.Wrapf( + err, + "could not check if attestation for validator index %d is slashable", + validatorIndex, + ) + } + if slashing != nil { + return slashing, nil + } + + // Get the first start epoch for the chunk. If it does not exist or + // is not possible based on the input arguments, do not continue with the update. + startEpoch, exists := chunk.StartEpoch(sourceEpoch, args.currentEpoch) + if !exists { + return nil, nil + } + + // Given a single attestation could span across multiple chunks + // for a validator min or max span, we attempt to update the current chunk + // for the source epoch of the attestation. If the update function tells + // us we need to proceed to the next chunk, we continue by determining + // the start epoch of the next chunk. We exit once no longer need to + // keep updating chunks. + for { + chunkIdx = s.params.chunkIndex(startEpoch) + chunk, err := s.getChunk(ctx, args, chunksByChunkIdx, chunkIdx) + if err != nil { + return nil, errors.Wrapf(err, "could not get chunk at index %d", chunkIdx) + } + keepGoing, err := chunk.Update( + &chunkUpdateArgs{ + chunkIndex: chunkIdx, + currentEpoch: args.currentEpoch, + }, + validatorIndex, + startEpoch, + targetEpoch, + ) + if err != nil { + return nil, errors.Wrapf( + err, + "could not update chunk at chunk index %d for validator index %d and current epoch %d", + chunkIdx, + validatorIndex, + args.currentEpoch, + ) + } + // We update the chunksByChunkIdx map with the chunk we just updated. + chunksByChunkIdx[chunkIdx] = chunk + if !keepGoing { + break + } + // Move to first epoch of next chunk if needed. + startEpoch = chunk.NextChunkStartEpoch(startEpoch) + } + return nil, nil +} + +// Retrieves a chunk at a chunk index from a map. If such chunk does not exist, which +// should be rare (occurring when we receive an attestation with source and target epochs +// that span multiple chunk indices), then we fallback to fetching from disk. +func (s *Service) getChunk( + ctx context.Context, + args *chunkUpdateArgs, + chunksByChunkIdx map[uint64]Chunker, + chunkIdx uint64, +) (Chunker, error) { + chunk, ok := chunksByChunkIdx[chunkIdx] + if ok { + return chunk, nil + } + // We can ensure we load the appropriate chunk we need by fetching from the DB. + diskChunks, err := s.loadChunks(ctx, args, []uint64{chunkIdx}) + if err != nil { + return nil, errors.Wrapf(err, "could not load chunk at index %d", chunkIdx) + } + if chunk, ok := diskChunks[chunkIdx]; ok { + return chunk, nil + } + return nil, fmt.Errorf("could not retrieve chunk at chunk index %d from disk", chunkIdx) +} + +// Load chunks for a specified list of chunk indices. We attempt to load it from the database. +// If the data exists, then we initialize a chunk of a specified kind. Otherwise, we create +// an empty chunk, add it to our map, and then return it to the caller. +func (s *Service) loadChunks( + ctx context.Context, + args *chunkUpdateArgs, + chunkIndices []uint64, +) (map[uint64]Chunker, error) { + ctx, span := trace.StartSpan(ctx, "Slasher.loadChunks") + defer span.End() + chunkKeys := make([][]byte, 0, len(chunkIndices)) + for _, chunkIdx := range chunkIndices { + chunkKeys = append(chunkKeys, s.params.flatSliceID(args.validatorChunkIndex, chunkIdx)) + } + rawChunks, chunksExist, err := s.serviceCfg.Database.LoadSlasherChunks(ctx, args.kind, chunkKeys) + if err != nil { + return nil, errors.Wrapf( + err, + "could not load slasher chunk index", + ) + } + chunksByChunkIdx := make(map[uint64]Chunker, len(rawChunks)) + for i := 0; i < len(rawChunks); i++ { + // If the chunk exists in the database, we initialize it from the raw bytes data. + // If it does not exist, we initialize an empty chunk. + var chunk Chunker + switch args.kind { + case slashertypes.MinSpan: + if chunksExist[i] { + chunk, err = MinChunkSpansSliceFrom(s.params, rawChunks[i]) + } else { + chunk = EmptyMinSpanChunksSlice(s.params) + } + case slashertypes.MaxSpan: + if chunksExist[i] { + chunk, err = MaxChunkSpansSliceFrom(s.params, rawChunks[i]) + } else { + chunk = EmptyMaxSpanChunksSlice(s.params) + } + } + if err != nil { + return nil, errors.Wrap(err, "could not initialize chunk") + } + chunksByChunkIdx[chunkIndices[i]] = chunk + } + return chunksByChunkIdx, nil +} + +// Saves updated chunks to disk given the required database schema. +func (s *Service) saveUpdatedChunks( + ctx context.Context, + args *chunkUpdateArgs, + updatedChunksByChunkIdx map[uint64]Chunker, +) error { + ctx, span := trace.StartSpan(ctx, "Slasher.saveUpdatedChunks") + defer span.End() + chunkKeys := make([][]byte, 0, len(updatedChunksByChunkIdx)) + chunks := make([][]uint16, 0, len(updatedChunksByChunkIdx)) + for chunkIdx, chunk := range updatedChunksByChunkIdx { + chunkKeys = append(chunkKeys, s.params.flatSliceID(args.validatorChunkIndex, chunkIdx)) + chunks = append(chunks, chunk.Chunk()) + } + chunksSavedTotal.Add(float64(len(chunks))) + return s.serviceCfg.Database.SaveSlasherChunks(ctx, args.kind, chunkKeys, chunks) +} diff --git a/beacon-chain/slasher/detect_attestations_test.go b/beacon-chain/slasher/detect_attestations_test.go new file mode 100644 index 0000000000..772bd247dc --- /dev/null +++ b/beacon-chain/slasher/detect_attestations_test.go @@ -0,0 +1,890 @@ +package slasher + +import ( + "context" + "fmt" + "sort" + "testing" + "time" + + types "github.com/prysmaticlabs/eth2-types" + mock "github.com/prysmaticlabs/prysm/beacon-chain/blockchain/testing" + "github.com/prysmaticlabs/prysm/beacon-chain/core" + "github.com/prysmaticlabs/prysm/beacon-chain/core/signing" + dbtest "github.com/prysmaticlabs/prysm/beacon-chain/db/testing" + "github.com/prysmaticlabs/prysm/beacon-chain/operations/slashings" + slashertypes "github.com/prysmaticlabs/prysm/beacon-chain/slasher/types" + "github.com/prysmaticlabs/prysm/config/params" + "github.com/prysmaticlabs/prysm/crypto/bls" + ethpb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1" + "github.com/prysmaticlabs/prysm/testing/assert" + "github.com/prysmaticlabs/prysm/testing/require" + "github.com/prysmaticlabs/prysm/testing/util" + logTest "github.com/sirupsen/logrus/hooks/test" +) + +func Test_processQueuedAttestations(t *testing.T) { + type args struct { + attestationQueue []*slashertypes.IndexedAttestationWrapper + currentEpoch types.Epoch + } + tests := []struct { + name string + args args + shouldNotBeSlashable bool + }{ + { + name: "Detects surrounding vote (source 1, target 2), (source 0, target 3)", + args: args{ + attestationQueue: []*slashertypes.IndexedAttestationWrapper{ + createAttestationWrapper(t, 1, 2, []uint64{0, 1}, nil), + createAttestationWrapper(t, 0, 3, []uint64{0, 1}, nil), + }, + currentEpoch: 4, + }, + }, + { + name: "Detects surrounding vote (source 50, target 51), (source 0, target 1000)", + args: args{ + attestationQueue: []*slashertypes.IndexedAttestationWrapper{ + createAttestationWrapper(t, 50, 51, []uint64{0}, nil), + createAttestationWrapper(t, 0, 1000, []uint64{0}, nil), + }, + currentEpoch: 1000, + }, + }, + { + name: "Detects surrounded vote (source 0, target 3), (source 1, target 2)", + args: args{ + attestationQueue: []*slashertypes.IndexedAttestationWrapper{ + createAttestationWrapper(t, 0, 3, []uint64{0, 1}, nil), + createAttestationWrapper(t, 1, 2, []uint64{0, 1}, nil), + }, + currentEpoch: 4, + }, + }, + { + name: "Detects double vote, (source 1, target 2), (source 0, target 2)", + args: args{ + attestationQueue: []*slashertypes.IndexedAttestationWrapper{ + createAttestationWrapper(t, 1, 2, []uint64{0, 1}, nil), + createAttestationWrapper(t, 0, 2, []uint64{0, 1}, nil), + }, + currentEpoch: 4, + }, + }, + { + name: "Not slashable, surrounding but non-overlapping attesting indices within same validator chunk index", + args: args{ + attestationQueue: []*slashertypes.IndexedAttestationWrapper{ + createAttestationWrapper(t, 1, 2, []uint64{0}, nil), + createAttestationWrapper(t, 0, 3, []uint64{1}, nil), + }, + currentEpoch: 4, + }, + shouldNotBeSlashable: true, + }, + { + name: "Not slashable, surrounded but non-overlapping attesting indices within same validator chunk index", + args: args{ + attestationQueue: []*slashertypes.IndexedAttestationWrapper{ + createAttestationWrapper(t, 0, 3, []uint64{0, 1}, nil), + createAttestationWrapper(t, 1, 2, []uint64{2, 3}, nil), + }, + currentEpoch: 4, + }, + shouldNotBeSlashable: true, + }, + { + name: "Not slashable, surrounding but non-overlapping attesting indices in different validator chunk index", + args: args{ + attestationQueue: []*slashertypes.IndexedAttestationWrapper{ + createAttestationWrapper(t, 0, 3, []uint64{0}, nil), + createAttestationWrapper( + t, + 1, + 2, + []uint64{params.BeaconConfig().MinGenesisActiveValidatorCount - 1}, + nil, + ), + }, + currentEpoch: 4, + }, + shouldNotBeSlashable: true, + }, + { + name: "Not slashable, surrounded but non-overlapping attesting indices in different validator chunk index", + args: args{ + attestationQueue: []*slashertypes.IndexedAttestationWrapper{ + createAttestationWrapper(t, 0, 3, []uint64{0}, nil), + createAttestationWrapper( + t, + 1, + 2, + []uint64{params.BeaconConfig().MinGenesisActiveValidatorCount - 1}, + nil, + ), + }, + currentEpoch: 4, + }, + shouldNotBeSlashable: true, + }, + { + name: "Not slashable, (source 1, target 2), (source 2, target 3)", + args: args{ + attestationQueue: []*slashertypes.IndexedAttestationWrapper{ + createAttestationWrapper(t, 1, 2, []uint64{0, 1}, nil), + createAttestationWrapper(t, 2, 3, []uint64{0, 1}, nil), + }, + currentEpoch: 4, + }, + shouldNotBeSlashable: true, + }, + { + name: "Not slashable, (source 0, target 3), (source 2, target 4)", + args: args{ + attestationQueue: []*slashertypes.IndexedAttestationWrapper{ + createAttestationWrapper(t, 0, 3, []uint64{0, 1}, nil), + createAttestationWrapper(t, 2, 4, []uint64{0, 1}, nil), + }, + currentEpoch: 4, + }, + shouldNotBeSlashable: true, + }, + { + name: "Not slashable, (source 0, target 2), (source 0, target 3)", + args: args{ + attestationQueue: []*slashertypes.IndexedAttestationWrapper{ + createAttestationWrapper(t, 0, 2, []uint64{0, 1}, nil), + createAttestationWrapper(t, 0, 3, []uint64{0, 1}, nil), + }, + currentEpoch: 4, + }, + shouldNotBeSlashable: true, + }, + { + name: "Not slashable, (source 0, target 3), (source 0, target 2)", + args: args{ + attestationQueue: []*slashertypes.IndexedAttestationWrapper{ + createAttestationWrapper(t, 0, 3, []uint64{0, 1}, nil), + createAttestationWrapper(t, 0, 2, []uint64{0, 1}, nil), + }, + currentEpoch: 4, + }, + shouldNotBeSlashable: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + hook := logTest.NewGlobal() + defer hook.Reset() + slasherDB := dbtest.SetupSlasherDB(t) + ctx, cancel := context.WithCancel(context.Background()) + + currentTime := time.Now() + totalSlots := uint64(tt.args.currentEpoch) * uint64(params.BeaconConfig().SlotsPerEpoch) + secondsSinceGenesis := time.Duration(totalSlots * params.BeaconConfig().SecondsPerSlot) + genesisTime := currentTime.Add(-secondsSinceGenesis * time.Second) + + beaconState, err := util.NewBeaconState() + require.NoError(t, err) + slot, err := core.StartSlot(tt.args.currentEpoch) + require.NoError(t, err) + require.NoError(t, beaconState.SetSlot(slot)) + mockChain := &mock.ChainService{ + State: beaconState, + Slot: &slot, + } + + // Initialize validators in the state. + numVals := params.BeaconConfig().MinGenesisActiveValidatorCount + validators := make([]*ethpb.Validator, numVals) + privKeys := make([]bls.SecretKey, numVals) + for i := range validators { + privKey, err := bls.RandKey() + require.NoError(t, err) + privKeys[i] = privKey + validators[i] = ðpb.Validator{ + PublicKey: privKey.PublicKey().Marshal(), + WithdrawalCredentials: make([]byte, 32), + } + } + err = beaconState.SetValidators(validators) + require.NoError(t, err) + domain, err := signing.Domain( + beaconState.Fork(), + 0, + params.BeaconConfig().DomainBeaconAttester, + beaconState.GenesisValidatorRoot(), + ) + require.NoError(t, err) + + // Create valid signatures for all input attestations in the test. + for _, attestationWrapper := range tt.args.attestationQueue { + signingRoot, err := signing.ComputeSigningRoot(attestationWrapper.IndexedAttestation.Data, domain) + require.NoError(t, err) + attestingIndices := attestationWrapper.IndexedAttestation.AttestingIndices + sigs := make([]bls.Signature, len(attestingIndices)) + for i, validatorIndex := range attestingIndices { + privKey := privKeys[validatorIndex] + sigs[i] = privKey.Sign(signingRoot[:]) + } + attestationWrapper.IndexedAttestation.Signature = bls.AggregateSignatures(sigs).Marshal() + } + + s := &Service{ + serviceCfg: &ServiceConfig{ + Database: slasherDB, + StateNotifier: &mock.MockStateNotifier{}, + HeadStateFetcher: mockChain, + AttestationStateFetcher: mockChain, + SlashingPoolInserter: &slashings.PoolMock{}, + }, + params: DefaultParams(), + attsQueue: newAttestationsQueue(), + genesisTime: genesisTime, + } + currentSlotChan := make(chan types.Slot) + exitChan := make(chan struct{}) + go func() { + s.processQueuedAttestations(ctx, currentSlotChan) + exitChan <- struct{}{} + }() + s.attsQueue.extend(tt.args.attestationQueue) + currentSlotChan <- slot + time.Sleep(time.Millisecond * 200) + cancel() + <-exitChan + if tt.shouldNotBeSlashable { + require.LogsDoNotContain(t, hook, "Attester slashing detected") + } else { + require.LogsContain(t, hook, "Attester slashing detected") + } + }) + } +} + +func Test_processQueuedAttestations_MultipleChunkIndices(t *testing.T) { + hook := logTest.NewGlobal() + defer hook.Reset() + + slasherDB := dbtest.SetupSlasherDB(t) + ctx, cancel := context.WithCancel(context.Background()) + slasherParams := DefaultParams() + + // We process submit attestations from chunk index 0 to chunk index 1. + // What we want to test here is if we can proceed + // with processing queued attestations once the chunk index changes. + // For example, epochs 0 - 15 are chunk 0, epochs 16 - 31 are chunk 1, etc. + startEpoch := types.Epoch(slasherParams.chunkSize) + endEpoch := types.Epoch(slasherParams.chunkSize + 1) + + currentTime := time.Now() + totalSlots := uint64(startEpoch) * uint64(params.BeaconConfig().SlotsPerEpoch) + secondsSinceGenesis := time.Duration(totalSlots * params.BeaconConfig().SecondsPerSlot) + genesisTime := currentTime.Add(-secondsSinceGenesis * time.Second) + + beaconState, err := util.NewBeaconState() + require.NoError(t, err) + mockChain := &mock.ChainService{ + State: beaconState, + } + + s := &Service{ + serviceCfg: &ServiceConfig{ + Database: slasherDB, + StateNotifier: &mock.MockStateNotifier{}, + HeadStateFetcher: mockChain, + AttestationStateFetcher: mockChain, + SlashingPoolInserter: &slashings.PoolMock{}, + }, + params: slasherParams, + attsQueue: newAttestationsQueue(), + genesisTime: genesisTime, + } + currentSlotChan := make(chan types.Slot) + exitChan := make(chan struct{}) + go func() { + s.processQueuedAttestations(ctx, currentSlotChan) + exitChan <- struct{}{} + }() + + for i := startEpoch; i <= endEpoch; i++ { + source := types.Epoch(0) + target := types.Epoch(0) + if i != 0 { + source = i - 1 + target = i + } + var sr [32]byte + copy(sr[:], fmt.Sprintf("%d", i)) + att := createAttestationWrapper(t, source, target, []uint64{0}, sr[:]) + s.attsQueue = newAttestationsQueue() + s.attsQueue.push(att) + slot, err := core.StartSlot(i) + require.NoError(t, err) + require.NoError(t, mockChain.State.SetSlot(slot)) + s.serviceCfg.HeadStateFetcher = mockChain + currentSlotChan <- slot + } + + time.Sleep(time.Millisecond * 200) + cancel() + <-exitChan + require.LogsDoNotContain(t, hook, "Slashable offenses found") + require.LogsDoNotContain(t, hook, "Could not detect") +} + +func Test_processQueuedAttestations_OverlappingChunkIndices(t *testing.T) { + hook := logTest.NewGlobal() + defer hook.Reset() + + slasherDB := dbtest.SetupSlasherDB(t) + ctx, cancel := context.WithCancel(context.Background()) + slasherParams := DefaultParams() + + startEpoch := types.Epoch(slasherParams.chunkSize) + + currentTime := time.Now() + totalSlots := uint64(startEpoch) * uint64(params.BeaconConfig().SlotsPerEpoch) + secondsSinceGenesis := time.Duration(totalSlots * params.BeaconConfig().SecondsPerSlot) + genesisTime := currentTime.Add(-secondsSinceGenesis * time.Second) + + beaconState, err := util.NewBeaconState() + require.NoError(t, err) + mockChain := &mock.ChainService{ + State: beaconState, + } + + s := &Service{ + serviceCfg: &ServiceConfig{ + Database: slasherDB, + StateNotifier: &mock.MockStateNotifier{}, + HeadStateFetcher: mockChain, + AttestationStateFetcher: mockChain, + SlashingPoolInserter: &slashings.PoolMock{}, + }, + params: slasherParams, + attsQueue: newAttestationsQueue(), + genesisTime: genesisTime, + } + currentSlotChan := make(chan types.Slot) + exitChan := make(chan struct{}) + go func() { + s.processQueuedAttestations(ctx, currentSlotChan) + exitChan <- struct{}{} + }() + + // We create two attestations fully spanning chunk indices 0 and chunk 1 + att1 := createAttestationWrapper(t, types.Epoch(slasherParams.chunkSize-2), types.Epoch(slasherParams.chunkSize), []uint64{0, 1}, nil) + att2 := createAttestationWrapper(t, types.Epoch(slasherParams.chunkSize-1), types.Epoch(slasherParams.chunkSize+1), []uint64{0, 1}, nil) + + // We attempt to process the batch. + s.attsQueue = newAttestationsQueue() + s.attsQueue.push(att1) + s.attsQueue.push(att2) + slot, err := core.StartSlot(att2.IndexedAttestation.Data.Target.Epoch) + require.NoError(t, err) + mockChain.Slot = &slot + s.serviceCfg.HeadStateFetcher = mockChain + currentSlotChan <- slot + + time.Sleep(time.Millisecond * 200) + cancel() + <-exitChan + require.LogsDoNotContain(t, hook, "Slashable offenses found") + require.LogsDoNotContain(t, hook, "Could not detect") +} + +func Test_determineChunksToUpdateForValidators_FromLatestWrittenEpoch(t *testing.T) { + slasherDB := dbtest.SetupSlasherDB(t) + ctx := context.Background() + + // Check if the chunk at chunk index already exists in-memory. + s := &Service{ + params: &Parameters{ + chunkSize: 2, // 2 epochs in a chunk. + validatorChunkSize: 2, // 2 validators in a chunk. + historyLength: 4, + }, + serviceCfg: &ServiceConfig{ + Database: slasherDB, + StateNotifier: &mock.MockStateNotifier{}, + }, + } + validators := []types.ValidatorIndex{ + 1, 2, + } + currentEpoch := types.Epoch(3) + + // Set the latest written epoch for validators to current epoch - 1. + latestWrittenEpoch := currentEpoch - 1 + err := slasherDB.SaveLastEpochWrittenForValidators(ctx, validators, latestWrittenEpoch) + require.NoError(t, err) + + // Because the validators have no recorded latest epoch written in the database, + // Because the latest written epoch for the input validators is == 2, we expect + // that we will update all epochs from 2 up to 3 (the current epoch). This is all + // safe contained in chunk index 1. + chunkIndices, err := s.determineChunksToUpdateForValidators( + ctx, + &chunkUpdateArgs{ + currentEpoch: currentEpoch, + }, + validators, + ) + require.NoError(t, err) + require.DeepEqual(t, []uint64{1}, chunkIndices) +} + +func Test_determineChunksToUpdateForValidators_FromGenesis(t *testing.T) { + slasherDB := dbtest.SetupSlasherDB(t) + ctx := context.Background() + + // Check if the chunk at chunk index already exists in-memory. + s := &Service{ + params: &Parameters{ + chunkSize: 2, // 2 epochs in a chunk. + validatorChunkSize: 2, // 2 validators in a chunk. + historyLength: 4, + }, + serviceCfg: &ServiceConfig{ + Database: slasherDB, + StateNotifier: &mock.MockStateNotifier{}, + }, + } + validators := []types.ValidatorIndex{ + 1, 2, + } + // Because the validators have no recorded latest epoch written in the database, + // we expect that we will update all epochs from genesis up to the current epoch. + // Given the chunk size is 2 epochs per chunk, updating with current epoch == 3 + // will mean that we should be updating from epoch 0 to 3, meaning chunk indices 0 and 1. + chunkIndices, err := s.determineChunksToUpdateForValidators( + ctx, + &chunkUpdateArgs{ + currentEpoch: 3, + }, + validators, + ) + require.NoError(t, err) + sort.Slice(chunkIndices, func(i, j int) bool { + return chunkIndices[i] < chunkIndices[j] + }) + require.DeepEqual(t, []uint64{0, 1}, chunkIndices) +} + +func Test_applyAttestationForValidator_MinSpanChunk(t *testing.T) { + ctx := context.Background() + slasherDB := dbtest.SetupSlasherDB(t) + params := DefaultParams() + srv := &Service{ + params: params, + serviceCfg: &ServiceConfig{ + Database: slasherDB, + StateNotifier: &mock.MockStateNotifier{}, + }, + } + // We initialize an empty chunks slice. + chunk := EmptyMinSpanChunksSlice(params) + chunkIdx := uint64(0) + currentEpoch := types.Epoch(3) + validatorIdx := types.ValidatorIndex(0) + args := &chunkUpdateArgs{ + chunkIndex: chunkIdx, + currentEpoch: currentEpoch, + } + chunksByChunkIdx := map[uint64]Chunker{ + chunkIdx: chunk, + } + + // We apply attestation with (source 1, target 2) for our validator. + source := types.Epoch(1) + target := types.Epoch(2) + att := createAttestationWrapper(t, source, target, nil, nil) + slashing, err := srv.applyAttestationForValidator( + ctx, + args, + validatorIdx, + chunksByChunkIdx, + att, + ) + require.NoError(t, err) + require.Equal(t, true, slashing == nil) + att.IndexedAttestation.AttestingIndices = []uint64{uint64(validatorIdx)} + err = slasherDB.SaveAttestationRecordsForValidators( + ctx, + []*slashertypes.IndexedAttestationWrapper{att}, + ) + require.NoError(t, err) + + // Next, we apply an attestation with (source 0, target 3) and + // expect a slashable offense to be returned. + source = types.Epoch(0) + target = types.Epoch(3) + slashableAtt := createAttestationWrapper(t, source, target, nil, nil) + slashing, err = srv.applyAttestationForValidator( + ctx, + args, + validatorIdx, + chunksByChunkIdx, + slashableAtt, + ) + require.NoError(t, err) + require.NotNil(t, slashing) +} + +func Test_applyAttestationForValidator_MaxSpanChunk(t *testing.T) { + ctx := context.Background() + slasherDB := dbtest.SetupSlasherDB(t) + params := DefaultParams() + srv := &Service{ + params: params, + serviceCfg: &ServiceConfig{ + Database: slasherDB, + StateNotifier: &mock.MockStateNotifier{}, + }, + } + // We initialize an empty chunks slice. + chunk := EmptyMaxSpanChunksSlice(params) + chunkIdx := uint64(0) + currentEpoch := types.Epoch(3) + validatorIdx := types.ValidatorIndex(0) + args := &chunkUpdateArgs{ + chunkIndex: chunkIdx, + currentEpoch: currentEpoch, + } + chunksByChunkIdx := map[uint64]Chunker{ + chunkIdx: chunk, + } + + // We apply attestation with (source 0, target 3) for our validator. + source := types.Epoch(0) + target := types.Epoch(3) + att := createAttestationWrapper(t, source, target, nil, nil) + slashing, err := srv.applyAttestationForValidator( + ctx, + args, + validatorIdx, + chunksByChunkIdx, + att, + ) + require.NoError(t, err) + require.Equal(t, true, slashing == nil) + att.IndexedAttestation.AttestingIndices = []uint64{uint64(validatorIdx)} + err = slasherDB.SaveAttestationRecordsForValidators( + ctx, + []*slashertypes.IndexedAttestationWrapper{att}, + ) + require.NoError(t, err) + + // Next, we apply an attestation with (source 1, target 2) and + // expect a slashable offense to be returned. + source = types.Epoch(1) + target = types.Epoch(2) + slashableAtt := createAttestationWrapper(t, source, target, nil, nil) + slashing, err = srv.applyAttestationForValidator( + ctx, + args, + validatorIdx, + chunksByChunkIdx, + slashableAtt, + ) + require.NoError(t, err) + require.NotNil(t, slashing) +} + +func Test_checkDoubleVotes_SlashableInputAttestations(t *testing.T) { + slasherDB := dbtest.SetupSlasherDB(t) + ctx := context.Background() + // For a list of input attestations, check that we can + // indeed check there could exist a double vote offense + // within the list with respect to other entries in the list. + atts := []*slashertypes.IndexedAttestationWrapper{ + createAttestationWrapper(t, 0, 1, []uint64{1, 2}, []byte{1}), + createAttestationWrapper(t, 0, 2, []uint64{1, 2}, []byte{1}), + createAttestationWrapper(t, 0, 2, []uint64{1, 2}, []byte{2}), // Different signing root. + } + srv := &Service{ + serviceCfg: &ServiceConfig{ + Database: slasherDB, + StateNotifier: &mock.MockStateNotifier{}, + }, + params: DefaultParams(), + } + prev1 := createAttestationWrapper(t, 0, 2, []uint64{1, 2}, []byte{1}) + cur1 := createAttestationWrapper(t, 0, 2, []uint64{1, 2}, []byte{2}) + prev2 := createAttestationWrapper(t, 0, 2, []uint64{1, 2}, []byte{1}) + cur2 := createAttestationWrapper(t, 0, 2, []uint64{1, 2}, []byte{2}) + wanted := []*ethpb.AttesterSlashing{ + { + Attestation_1: prev1.IndexedAttestation, + Attestation_2: cur1.IndexedAttestation, + }, + { + Attestation_1: prev2.IndexedAttestation, + Attestation_2: cur2.IndexedAttestation, + }, + } + slashings, err := srv.checkDoubleVotes(ctx, atts) + require.NoError(t, err) + require.DeepEqual(t, wanted, slashings) +} + +func Test_checkDoubleVotes_SlashableAttestationsOnDisk(t *testing.T) { + slasherDB := dbtest.SetupSlasherDB(t) + ctx := context.Background() + // For a list of input attestations, check that we can + // indeed check there could exist a double vote offense + // within the list with respect to previous entries in the db. + prevAtts := []*slashertypes.IndexedAttestationWrapper{ + createAttestationWrapper(t, 0, 1, []uint64{1, 2}, []byte{1}), + createAttestationWrapper(t, 0, 2, []uint64{1, 2}, []byte{1}), + } + srv := &Service{ + serviceCfg: &ServiceConfig{ + Database: slasherDB, + StateNotifier: &mock.MockStateNotifier{}, + }, + params: DefaultParams(), + } + err := slasherDB.SaveAttestationRecordsForValidators(ctx, prevAtts) + require.NoError(t, err) + + prev1 := createAttestationWrapper(t, 0, 2, []uint64{1, 2}, []byte{1}) + cur1 := createAttestationWrapper(t, 0, 2, []uint64{1, 2}, []byte{2}) + prev2 := createAttestationWrapper(t, 0, 2, []uint64{1, 2}, []byte{1}) + cur2 := createAttestationWrapper(t, 0, 2, []uint64{1, 2}, []byte{2}) + wanted := []*ethpb.AttesterSlashing{ + { + Attestation_1: prev1.IndexedAttestation, + Attestation_2: cur1.IndexedAttestation, + }, + { + Attestation_1: prev2.IndexedAttestation, + Attestation_2: cur2.IndexedAttestation, + }, + } + newAtts := []*slashertypes.IndexedAttestationWrapper{ + createAttestationWrapper(t, 0, 2, []uint64{1, 2}, []byte{2}), // Different signing root. + } + slashings, err := srv.checkDoubleVotes(ctx, newAtts) + require.NoError(t, err) + require.DeepEqual(t, wanted, slashings) +} + +func Test_loadChunks_MinSpans(t *testing.T) { + testLoadChunks(t, slashertypes.MinSpan) +} + +func Test_loadChunks_MaxSpans(t *testing.T) { + testLoadChunks(t, slashertypes.MaxSpan) +} + +func testLoadChunks(t *testing.T, kind slashertypes.ChunkKind) { + slasherDB := dbtest.SetupSlasherDB(t) + ctx := context.Background() + + // Check if the chunk at chunk index already exists in-memory. + params := DefaultParams() + s := &Service{ + params: DefaultParams(), + serviceCfg: &ServiceConfig{ + Database: slasherDB, + StateNotifier: &mock.MockStateNotifier{}, + }, + } + // If a chunk at a chunk index does not exist, ensure it + // is initialized as an empty chunk. + var emptyChunk Chunker + if kind == slashertypes.MinSpan { + emptyChunk = EmptyMinSpanChunksSlice(params) + } else { + emptyChunk = EmptyMaxSpanChunksSlice(params) + } + chunkIdx := uint64(2) + received, err := s.loadChunks(ctx, &chunkUpdateArgs{ + validatorChunkIndex: 0, + kind: kind, + }, []uint64{chunkIdx}) + require.NoError(t, err) + wanted := map[uint64]Chunker{ + chunkIdx: emptyChunk, + } + require.DeepEqual(t, wanted, received) + + // Save chunks to disk, then load them properly from disk. + var existingChunk Chunker + if kind == slashertypes.MinSpan { + existingChunk = EmptyMinSpanChunksSlice(params) + } else { + existingChunk = EmptyMaxSpanChunksSlice(params) + } + validatorIdx := types.ValidatorIndex(0) + epochInChunk := types.Epoch(0) + targetEpoch := types.Epoch(2) + err = setChunkDataAtEpoch( + params, + existingChunk.Chunk(), + validatorIdx, + epochInChunk, + targetEpoch, + ) + require.NoError(t, err) + require.DeepNotEqual(t, existingChunk, emptyChunk) + + updatedChunks := map[uint64]Chunker{ + 2: existingChunk, + 4: existingChunk, + 6: existingChunk, + } + err = s.saveUpdatedChunks( + ctx, + &chunkUpdateArgs{ + validatorChunkIndex: 0, + kind: kind, + }, + updatedChunks, + ) + require.NoError(t, err) + // Check if the retrieved chunks match what we just saved to disk. + received, err = s.loadChunks(ctx, &chunkUpdateArgs{ + validatorChunkIndex: 0, + kind: kind, + }, []uint64{2, 4, 6}) + require.NoError(t, err) + require.DeepEqual(t, updatedChunks, received) +} + +func TestService_processQueuedAttestations(t *testing.T) { + hook := logTest.NewGlobal() + slasherDB := dbtest.SetupSlasherDB(t) + + beaconState, err := util.NewBeaconState() + require.NoError(t, err) + slot, err := core.StartSlot(1) + require.NoError(t, err) + require.NoError(t, beaconState.SetSlot(slot)) + mockChain := &mock.ChainService{ + State: beaconState, + Slot: &slot, + } + + s := &Service{ + params: DefaultParams(), + serviceCfg: &ServiceConfig{ + Database: slasherDB, + StateNotifier: &mock.MockStateNotifier{}, + HeadStateFetcher: mockChain, + }, + attsQueue: newAttestationsQueue(), + } + + s.attsQueue.extend([]*slashertypes.IndexedAttestationWrapper{ + createAttestationWrapper(t, 0, 1, []uint64{0, 1} /* indices */, nil /* signingRoot */), + }) + ctx, cancel := context.WithCancel(context.Background()) + tickerChan := make(chan types.Slot) + exitChan := make(chan struct{}) + go func() { + s.processQueuedAttestations(ctx, tickerChan) + exitChan <- struct{}{} + }() + + // Send a value over the ticker. + tickerChan <- 1 + cancel() + <-exitChan + assert.LogsContain(t, hook, "New slot, processing queued") +} + +func BenchmarkCheckSlashableAttestations(b *testing.B) { + slasherDB := dbtest.SetupSlasherDB(b) + + beaconState, err := util.NewBeaconState() + require.NoError(b, err) + slot := types.Slot(0) + mockChain := &mock.ChainService{ + State: beaconState, + Slot: &slot, + } + + s := &Service{ + params: DefaultParams(), + serviceCfg: &ServiceConfig{ + Database: slasherDB, + StateNotifier: &mock.MockStateNotifier{}, + HeadStateFetcher: mockChain, + }, + attsQueue: newAttestationsQueue(), + } + + b.Run("1 attestation 1 validator", func(b *testing.B) { + b.ResetTimer() + runAttestationsBenchmark(b, s, 1, 1 /* validator */) + }) + b.Run("1 attestation 100 validators", func(b *testing.B) { + b.ResetTimer() + runAttestationsBenchmark(b, s, 1, 100 /* validator */) + }) + b.Run("1 attestation 1000 validators", func(b *testing.B) { + b.ResetTimer() + runAttestationsBenchmark(b, s, 1, 1000 /* validator */) + }) + + b.Run("100 attestations 1 validator", func(b *testing.B) { + b.ResetTimer() + runAttestationsBenchmark(b, s, 100, 1 /* validator */) + }) + b.Run("100 attestations 100 validators", func(b *testing.B) { + b.ResetTimer() + runAttestationsBenchmark(b, s, 100, 100 /* validator */) + }) + b.Run("100 attestations 1000 validators", func(b *testing.B) { + b.ResetTimer() + runAttestationsBenchmark(b, s, 100, 1000 /* validator */) + }) + + b.Run("1000 attestations 1 validator", func(b *testing.B) { + b.ResetTimer() + runAttestationsBenchmark(b, s, 1000, 1 /* validator */) + }) + b.Run("1000 attestations 100 validators", func(b *testing.B) { + b.ResetTimer() + runAttestationsBenchmark(b, s, 1000, 100 /* validator */) + }) + b.Run("1000 attestations 1000 validators", func(b *testing.B) { + b.ResetTimer() + runAttestationsBenchmark(b, s, 1000, 1000 /* validator */) + }) +} + +func runAttestationsBenchmark(b *testing.B, s *Service, numAtts, numValidators uint64) { + indices := make([]uint64, numValidators) + for i := uint64(0); i < numValidators; i++ { + indices[i] = i + } + atts := make([]*slashertypes.IndexedAttestationWrapper, numAtts) + for i := uint64(0); i < numAtts; i++ { + source := types.Epoch(i) + target := types.Epoch(i + 1) + signingRoot := [32]byte{} + copy(signingRoot[:], fmt.Sprintf("%d", i)) + atts[i] = createAttestationWrapper( + b, + source, + target, /* target */ + indices, /* indices */ + signingRoot[:], /* signingRoot */ + ) + } + for i := 0; i < b.N; i++ { + numEpochs := numAtts + totalSeconds := numEpochs * uint64(params.BeaconConfig().SlotsPerEpoch) * params.BeaconConfig().SecondsPerSlot + genesisTime := time.Now().Add(-time.Second * time.Duration(totalSeconds)) + s.genesisTime = genesisTime + + _, err := s.checkSlashableAttestations(context.Background(), atts) + require.NoError(b, err) + } +} diff --git a/beacon-chain/slasher/receive.go b/beacon-chain/slasher/receive.go index b6b1bcb816..def436d990 100644 --- a/beacon-chain/slasher/receive.go +++ b/beacon-chain/slasher/receive.go @@ -110,8 +110,11 @@ func (s *Service) processQueuedAttestations(ctx context.Context, slotTicker <-ch } // Check for slashings. - // TODO(#8331): Detect slashings. - slashings := make([]*ethpb.AttesterSlashing, 0) + slashings, err := s.checkSlashableAttestations(ctx, validAtts) + if err != nil { + log.WithError(err).Error("Could not detect slashable attestations") + continue + } // Process attester slashings by verifying their signatures, submitting // to the beacon node's operations pool, and logging them. diff --git a/beacon-chain/slasher/service.go b/beacon-chain/slasher/service.go index 066fc34fdf..8b7b2953b2 100644 --- a/beacon-chain/slasher/service.go +++ b/beacon-chain/slasher/service.go @@ -1,6 +1,8 @@ package slasher import ( + "time" + "github.com/prysmaticlabs/prysm/async/event" "github.com/prysmaticlabs/prysm/beacon-chain/blockchain" statefeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/state" @@ -26,8 +28,9 @@ type ServiceConfig struct { // Service for running slasher mode in a beacon node. type Service struct { - params *Parameters - serviceCfg *ServiceConfig - blksQueue *blocksQueue - attsQueue *attestationsQueue + params *Parameters + serviceCfg *ServiceConfig + blksQueue *blocksQueue + attsQueue *attestationsQueue + genesisTime time.Time }