mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-08 23:18:15 -05:00
Beacon node slasher improvement (#13549)
* Slasher: Ensure all gorouting are stopped before running `Stop` actions. Fixes #13550. In tests, `exitChan` are now useless since waitgroup are used to wait for all goroutines to be stopped. * `slasher.go`: Add comments and rename some variables. - NFC * `detect_blocks.go`: Improve. - NFC - Rename some variables. - Add comments. - Use second element of `range` when possible. * `chunks.go`: Remove `_`receivers. - NFC * `validateAttestationIntegrity`: Improve documentation. - NFC * `filterAttestations`: Avoid `else`and rename variable. - NFC * `slasher.go`: Fix and add comments. * `SaveAttestationRecordsForValidators`: Remove unused code. * `LastEpochWrittenForValidators`: Name variables consistently. - NFC Avoid mixes between `indice(s)`and `index(es)`. * `SaveLastEpochsWrittenForValidators`: Name variables consistently. - NFC * `CheckAttesterDoubleVotes`: Rename variables and add comments. - NFC * `schema.go`: Add comments. - NFC * `processQueuedAttestations`: Add comments. - NFC * `checkDoubleVotes`: Rename variable. - NFC * `Test_processQueuedAttestations`: Ensure there is no error log. * `shouldNotBeSlashable` => `shouldBeSlashable` * `Test_processQueuedAttestations`: Add 2 test cases: - Same target with different signing roots - Same target with same signing roots * `checkDoubleVotesOnDisk` ==> `checkDoubleVotes`. Before this commit, `checkDoubleVotes` did two tasks: - Checking if there are any slashable double votes in the input list of attestations with respect to each other. - Checking if there are any slashable double votes in the input list of attestations with respect to our database. However, `checkDoubleVotes` is called only in `checkSlashableAttestations`. And `checkSlashableAttestations` is called only in: - `processQueuedAttestations`, and in - `IsSlashableAttestation` Study of case `processQueuedAttestations`: --------------------------------------------- In `processQueuedAttestations`, `checkSlashableAttestations` is ALWAYS called after `Database.SaveAttestationRecordsForValidators`. It means that, when calling `checkSlashableAttestations`, `validAtts` are ALREADY stored in the DB. Each attestation of `validAtts` will be checked twice: - Against the other attestations of `validAtts` (the portion of deleted code) - Against the content of the database. One of those two checks is redundent. ==> We can remove the check against other attestations in `validAtts`. Study of case `Database.SaveAttestationRecordsForValidators`: ---------------------------------------------------------------- In `Database.SaveAttestationRecordsForValidators`, `checkSlashableAttestations` is ALWAYS called with a list of attestations containing only ONE attestation. This only attestaion will be checked twice: - Against itself, and an attestation cannot conflict with itself. - Against the content of the database. ==> We can remove the check against other attestations in `validAtts`. ========================= In both cases, we showed that we can remove the check of attestation against the content of `validAtts`, and the corresponding test `Test_checkDoubleVotes_SlashableInputAttestations`. * `Test_processQueuedBlocks_DetectsDoubleProposals`: Wrap proposals. So we can add new proposals later. * Fix slasher multiple proposals false negative. If a first batch of blocks is sent with: - validator 1 - slot 4 - signing root 1 - validator 1 - slot 5 - signing root 1 Then, if a second batch of blocks is sent with: - validator 1 - slot 4 - signing root 2 Because we have two blocks proposed by the same validator (1) and for the same slot (4), but with two different signing roots (1 and 2), the validator 1 should be slashed. This is not the case before this commit. A new test case has been added as well to check this. Fixes #13551 * `params.go`: Change comments. - NFC * `CheckSlashable`: Keep the happy path without indentation. * `detectAllAttesterSlashings` => `checkSurrounds`. * Update beacon-chain/db/slasherkv/slasher.go Co-authored-by: Sammy Rosso <15244892+saolyn@users.noreply.github.com> * Update beacon-chain/db/slasherkv/slasher.go Co-authored-by: Sammy Rosso <15244892+saolyn@users.noreply.github.com> * `CheckAttesterDoubleVotes`: Keep happy path without indentation. Well, even if, in our case, "happy path" mean slashing. * 'SaveAttestationRecordsForValidators': Save the first attestation. In case of multiple votes, arbitrarily save the first attestation. Saving the first one in particular has no functional impact, since in any case all attestations will be tested against the content of the database. So all but the first one will be detected as slashable. However, saving the first one and not an other one let us not to modify the end to end tests, since they expect the first one to be saved in the database. * Rename `min` => `minimum`. Not to conflict with the new `min` built-in function. * `couldNotSaveSlashableAtt` ==> `couldNotCheckSlashableAtt` --------- Co-authored-by: Sammy Rosso <15244892+saolyn@users.noreply.github.com>
This commit is contained in:
@@ -7,10 +7,21 @@ package slasherkv
|
||||
// it easy to scan for keys that have a certain shard number as a prefix and return those
|
||||
// corresponding attestations.
|
||||
var (
|
||||
// Slasher buckets.
|
||||
attestedEpochsByValidator = []byte("attested-epochs-by-validator")
|
||||
attestationRecordsBucket = []byte("attestation-records")
|
||||
|
||||
// key: (encoded) ValidatorIndex
|
||||
// value: (encoded) Epoch
|
||||
attestedEpochsByValidator = []byte("attested-epochs-by-validator")
|
||||
|
||||
// key: attestation SigningRoot
|
||||
// value: (encoded + compressed) IndexedAttestation
|
||||
attestationRecordsBucket = []byte("attestation-records")
|
||||
|
||||
// key: (encoded) Target Epoch + (encoded) ValidatorIndex
|
||||
// value: attestation SigningRoot
|
||||
attestationDataRootsBucket = []byte("attestation-data-roots")
|
||||
proposalRecordsBucket = []byte("proposal-records")
|
||||
slasherChunksBucket = []byte("slasher-chunks")
|
||||
|
||||
// key: Slot+ValidatorIndex
|
||||
// value: (encoded) SignedBlockHeaderWrapper
|
||||
proposalRecordsBucket = []byte("proposal-records")
|
||||
slasherChunksBucket = []byte("slasher-chunks")
|
||||
)
|
||||
|
||||
@@ -29,72 +29,90 @@ const (
|
||||
// LastEpochWrittenForValidators given a list of validator indices returns the latest
|
||||
// epoch we have recorded the validators writing data for.
|
||||
func (s *Store) LastEpochWrittenForValidators(
|
||||
ctx context.Context, validatorIndices []primitives.ValidatorIndex,
|
||||
ctx context.Context, validatorIndexes []primitives.ValidatorIndex,
|
||||
) ([]*slashertypes.AttestedEpochForValidator, error) {
|
||||
_, span := trace.StartSpan(ctx, "BeaconDB.LastEpochWrittenForValidators")
|
||||
defer span.End()
|
||||
|
||||
attestedEpochs := make([]*slashertypes.AttestedEpochForValidator, 0)
|
||||
encodedIndices := make([][]byte, len(validatorIndices))
|
||||
for i, valIdx := range validatorIndices {
|
||||
encodedIndices[i] = encodeValidatorIndex(valIdx)
|
||||
encodedIndexes := make([][]byte, len(validatorIndexes))
|
||||
|
||||
for i, validatorIndex := range validatorIndexes {
|
||||
encodedIndexes[i] = encodeValidatorIndex(validatorIndex)
|
||||
}
|
||||
|
||||
err := s.db.View(func(tx *bolt.Tx) error {
|
||||
bkt := tx.Bucket(attestedEpochsByValidator)
|
||||
for i, encodedIndex := range encodedIndices {
|
||||
|
||||
for i, encodedIndex := range encodedIndexes {
|
||||
var epoch primitives.Epoch
|
||||
|
||||
epochBytes := bkt.Get(encodedIndex)
|
||||
if epochBytes != nil {
|
||||
if err := epoch.UnmarshalSSZ(epochBytes); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
attestedEpochs = append(attestedEpochs, &slashertypes.AttestedEpochForValidator{
|
||||
ValidatorIndex: validatorIndices[i],
|
||||
ValidatorIndex: validatorIndexes[i],
|
||||
Epoch: epoch,
|
||||
})
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
return attestedEpochs, err
|
||||
}
|
||||
|
||||
// SaveLastEpochsWrittenForValidators updates the latest epoch a slice
|
||||
// of validator indices has attested to.
|
||||
func (s *Store) SaveLastEpochsWrittenForValidators(
|
||||
ctx context.Context, epochByValidator map[primitives.ValidatorIndex]primitives.Epoch,
|
||||
ctx context.Context, epochByValIndex map[primitives.ValidatorIndex]primitives.Epoch,
|
||||
) error {
|
||||
ctx, span := trace.StartSpan(ctx, "BeaconDB.SaveLastEpochsWrittenForValidators")
|
||||
defer span.End()
|
||||
encodedIndices := make([][]byte, 0, len(epochByValidator))
|
||||
encodedEpochs := make([][]byte, 0, len(epochByValidator))
|
||||
for valIdx, epoch := range epochByValidator {
|
||||
|
||||
const batchSize = 10000
|
||||
|
||||
encodedIndexes := make([][]byte, 0, len(epochByValIndex))
|
||||
encodedEpochs := make([][]byte, 0, len(epochByValIndex))
|
||||
|
||||
for valIndex, epoch := range epochByValIndex {
|
||||
if ctx.Err() != nil {
|
||||
return ctx.Err()
|
||||
}
|
||||
|
||||
encodedEpoch, err := epoch.MarshalSSZ()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
encodedIndices = append(encodedIndices, encodeValidatorIndex(valIdx))
|
||||
|
||||
encodedIndexes = append(encodedIndexes, encodeValidatorIndex(valIndex))
|
||||
encodedEpochs = append(encodedEpochs, encodedEpoch)
|
||||
}
|
||||
|
||||
// The list of validators might be too massive for boltdb to handle in a single transaction,
|
||||
// so instead we split it into batches and write each batch.
|
||||
batchSize := 10000
|
||||
for i := 0; i < len(encodedIndices); i += batchSize {
|
||||
for i := 0; i < len(encodedIndexes); i += batchSize {
|
||||
if ctx.Err() != nil {
|
||||
return ctx.Err()
|
||||
}
|
||||
|
||||
if err := s.db.Update(func(tx *bolt.Tx) error {
|
||||
if ctx.Err() != nil {
|
||||
return ctx.Err()
|
||||
}
|
||||
|
||||
bkt := tx.Bucket(attestedEpochsByValidator)
|
||||
min := i + batchSize
|
||||
if min > len(encodedIndices) {
|
||||
min = len(encodedIndices)
|
||||
|
||||
minimum := i + batchSize
|
||||
if minimum > len(encodedIndexes) {
|
||||
minimum = len(encodedIndexes)
|
||||
}
|
||||
for j, encodedIndex := range encodedIndices[i:min] {
|
||||
|
||||
for j, encodedIndex := range encodedIndexes[i:minimum] {
|
||||
if ctx.Err() != nil {
|
||||
return ctx.Err()
|
||||
}
|
||||
@@ -102,79 +120,106 @@ func (s *Store) SaveLastEpochsWrittenForValidators(
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// CheckAttesterDoubleVotes retries any slashable double votes that exist
|
||||
// for a series of input attestations.
|
||||
// CheckAttesterDoubleVotes retrieves any slashable double votes that exist
|
||||
// for a series of input attestations with respect to the database.
|
||||
func (s *Store) CheckAttesterDoubleVotes(
|
||||
ctx context.Context, attestations []*slashertypes.IndexedAttestationWrapper,
|
||||
) ([]*slashertypes.AttesterDoubleVote, error) {
|
||||
ctx, span := trace.StartSpan(ctx, "BeaconDB.CheckAttesterDoubleVotes")
|
||||
defer span.End()
|
||||
|
||||
doubleVotes := make([]*slashertypes.AttesterDoubleVote, 0)
|
||||
doubleVotesMu := sync.Mutex{}
|
||||
mu := sync.Mutex{}
|
||||
eg, egctx := errgroup.WithContext(ctx)
|
||||
for _, att := range attestations {
|
||||
|
||||
for _, attestation := range attestations {
|
||||
// Copy the iteration instance to a local variable to give each go-routine its own copy to play with.
|
||||
// See https://golang.org/doc/faq#closures_and_goroutines for more details.
|
||||
attToProcess := att
|
||||
// process every attestation parallelly.
|
||||
attToProcess := attestation
|
||||
|
||||
// Process each attestation in parallel.
|
||||
eg.Go(func() error {
|
||||
err := s.db.View(func(tx *bolt.Tx) error {
|
||||
signingRootsBkt := tx.Bucket(attestationDataRootsBucket)
|
||||
attRecordsBkt := tx.Bucket(attestationRecordsBucket)
|
||||
|
||||
encEpoch := encodeTargetEpoch(attToProcess.IndexedAttestation.Data.Target.Epoch)
|
||||
localDoubleVotes := make([]*slashertypes.AttesterDoubleVote, 0)
|
||||
localDoubleVotes := []*slashertypes.AttesterDoubleVote{}
|
||||
|
||||
for _, valIdx := range attToProcess.IndexedAttestation.AttestingIndices {
|
||||
// Check if there is signing root in the database for this combination
|
||||
// of validator index and target epoch.
|
||||
encIdx := encodeValidatorIndex(primitives.ValidatorIndex(valIdx))
|
||||
validatorEpochKey := append(encEpoch, encIdx...)
|
||||
attRecordsKey := signingRootsBkt.Get(validatorEpochKey)
|
||||
|
||||
// An attestation record key is comprised of a signing root (32 bytes).
|
||||
if len(attRecordsKey) < attestationRecordKeySize {
|
||||
// If there is no signing root for this combination,
|
||||
// then there is no double vote. We can continue to the next validator.
|
||||
continue
|
||||
}
|
||||
|
||||
// Retrieve the attestation record corresponding to the signing root
|
||||
// from the database.
|
||||
encExistingAttRecord := attRecordsBkt.Get(attRecordsKey)
|
||||
if encExistingAttRecord == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
existingSigningRoot := bytesutil.ToBytes32(attRecordsKey[:signingRootSize])
|
||||
if existingSigningRoot != attToProcess.SigningRoot {
|
||||
existingAttRecord, err := decodeAttestationRecord(encExistingAttRecord)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
slashAtt := &slashertypes.AttesterDoubleVote{
|
||||
ValidatorIndex: primitives.ValidatorIndex(valIdx),
|
||||
Target: attToProcess.IndexedAttestation.Data.Target.Epoch,
|
||||
PrevAttestationWrapper: existingAttRecord,
|
||||
AttestationWrapper: attToProcess,
|
||||
}
|
||||
localDoubleVotes = append(localDoubleVotes, slashAtt)
|
||||
if existingSigningRoot == attToProcess.SigningRoot {
|
||||
continue
|
||||
}
|
||||
|
||||
// There is a double vote.
|
||||
existingAttRecord, err := decodeAttestationRecord(encExistingAttRecord)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Build the proof of double vote.
|
||||
slashAtt := &slashertypes.AttesterDoubleVote{
|
||||
ValidatorIndex: primitives.ValidatorIndex(valIdx),
|
||||
Target: attToProcess.IndexedAttestation.Data.Target.Epoch,
|
||||
PrevAttestationWrapper: existingAttRecord,
|
||||
AttestationWrapper: attToProcess,
|
||||
}
|
||||
|
||||
localDoubleVotes = append(localDoubleVotes, slashAtt)
|
||||
}
|
||||
// if any routine is cancelled, then cancel this routine too
|
||||
|
||||
// If any routine is cancelled, then cancel this routine too.
|
||||
select {
|
||||
case <-egctx.Done():
|
||||
return egctx.Err()
|
||||
default:
|
||||
}
|
||||
// if there are any doible votes in this attestation, add it to the global double votes
|
||||
|
||||
// If there are any double votes in this attestation, add it to the global double votes.
|
||||
if len(localDoubleVotes) > 0 {
|
||||
doubleVotesMu.Lock()
|
||||
defer doubleVotesMu.Unlock()
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
doubleVotes = append(doubleVotes, localDoubleVotes...)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
return err
|
||||
})
|
||||
}
|
||||
|
||||
return doubleVotes, eg.Wait()
|
||||
}
|
||||
|
||||
@@ -211,6 +256,8 @@ func (s *Store) AttestationRecordForValidator(
|
||||
}
|
||||
|
||||
// SaveAttestationRecordsForValidators saves attestation records for the specified indices.
|
||||
// If multiple attestations are provided for the same validator index + target epoch combination,
|
||||
// then only the first one is (arbitrarily) saved in the `attestationDataRootsBucket` bucket.
|
||||
func (s *Store) SaveAttestationRecordsForValidators(
|
||||
ctx context.Context,
|
||||
attestations []*slashertypes.IndexedAttestationWrapper,
|
||||
@@ -219,37 +266,40 @@ func (s *Store) SaveAttestationRecordsForValidators(
|
||||
defer span.End()
|
||||
encodedTargetEpoch := make([][]byte, len(attestations))
|
||||
encodedRecords := make([][]byte, len(attestations))
|
||||
encodedIndices := make([][]byte, len(attestations))
|
||||
for i, att := range attestations {
|
||||
encEpoch := encodeTargetEpoch(att.IndexedAttestation.Data.Target.Epoch)
|
||||
value, err := encodeAttestationRecord(att)
|
||||
|
||||
for i, attestation := range attestations {
|
||||
encEpoch := encodeTargetEpoch(attestation.IndexedAttestation.Data.Target.Epoch)
|
||||
|
||||
value, err := encodeAttestationRecord(attestation)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
indicesBytes := make([]byte, len(att.IndexedAttestation.AttestingIndices)*8)
|
||||
for _, idx := range att.IndexedAttestation.AttestingIndices {
|
||||
encodedIdx := encodeValidatorIndex(primitives.ValidatorIndex(idx))
|
||||
indicesBytes = append(indicesBytes, encodedIdx...)
|
||||
}
|
||||
encodedIndices[i] = indicesBytes
|
||||
|
||||
encodedTargetEpoch[i] = encEpoch
|
||||
encodedRecords[i] = value
|
||||
}
|
||||
|
||||
return s.db.Update(func(tx *bolt.Tx) error {
|
||||
attRecordsBkt := tx.Bucket(attestationRecordsBucket)
|
||||
signingRootsBkt := tx.Bucket(attestationDataRootsBucket)
|
||||
for i, att := range attestations {
|
||||
if err := attRecordsBkt.Put(att.SigningRoot[:], encodedRecords[i]); err != nil {
|
||||
|
||||
for i := len(attestations) - 1; i >= 0; i-- {
|
||||
attestation := attestations[i]
|
||||
|
||||
if err := attRecordsBkt.Put(attestation.SigningRoot[:], encodedRecords[i]); err != nil {
|
||||
return err
|
||||
}
|
||||
for _, valIdx := range att.IndexedAttestation.AttestingIndices {
|
||||
|
||||
for _, valIdx := range attestation.IndexedAttestation.AttestingIndices {
|
||||
encIdx := encodeValidatorIndex(primitives.ValidatorIndex(valIdx))
|
||||
|
||||
key := append(encodedTargetEpoch[i], encIdx...)
|
||||
if err := signingRootsBkt.Put(key, att.SigningRoot[:]); err != nil {
|
||||
if err := signingRootsBkt.Put(key, attestation.SigningRoot[:]); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
@@ -314,43 +364,60 @@ func (s *Store) SaveSlasherChunks(
|
||||
}
|
||||
|
||||
// CheckDoubleBlockProposals takes in a list of proposals and for each,
|
||||
// checks if there already exists a proposal at the same slot+validatorIndex combination. If so,
|
||||
// We check if the existing signing root is not-empty and is different than the incoming
|
||||
// proposal signing root. If so, we return a double block proposal object.
|
||||
// checks if there already exists a proposal at the same slot+validatorIndex combination.
|
||||
// If so, it checks if the existing signing root is not-empty and is different than
|
||||
// the incoming proposal signing root.
|
||||
// If so, it returns a double block proposal object.
|
||||
func (s *Store) CheckDoubleBlockProposals(
|
||||
ctx context.Context, proposals []*slashertypes.SignedBlockHeaderWrapper,
|
||||
ctx context.Context, incomingProposals []*slashertypes.SignedBlockHeaderWrapper,
|
||||
) ([]*ethpb.ProposerSlashing, error) {
|
||||
_, span := trace.StartSpan(ctx, "BeaconDB.CheckDoubleBlockProposals")
|
||||
defer span.End()
|
||||
proposerSlashings := make([]*ethpb.ProposerSlashing, 0, len(proposals))
|
||||
|
||||
proposerSlashings := make([]*ethpb.ProposerSlashing, 0, len(incomingProposals))
|
||||
|
||||
err := s.db.View(func(tx *bolt.Tx) error {
|
||||
// Retrieve the proposal records bucket
|
||||
bkt := tx.Bucket(proposalRecordsBucket)
|
||||
for _, proposal := range proposals {
|
||||
|
||||
for _, incomingProposal := range incomingProposals {
|
||||
// Build the key corresponding to this slot + validator index combination
|
||||
key, err := keyForValidatorProposal(
|
||||
proposal.SignedBeaconBlockHeader.Header.Slot,
|
||||
proposal.SignedBeaconBlockHeader.Header.ProposerIndex,
|
||||
incomingProposal.SignedBeaconBlockHeader.Header.Slot,
|
||||
incomingProposal.SignedBeaconBlockHeader.Header.ProposerIndex,
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Retrieve the existing proposal record from the database
|
||||
encExistingProposalWrapper := bkt.Get(key)
|
||||
|
||||
// If there is no existing proposal record (empty result), then there is no double proposal.
|
||||
// We can continue to the next proposal.
|
||||
if len(encExistingProposalWrapper) < signingRootSize {
|
||||
continue
|
||||
}
|
||||
|
||||
// Compare the proposal signing root in the DB with the incoming proposal signing root.
|
||||
// If they differ, we have a double proposal.
|
||||
existingSigningRoot := bytesutil.ToBytes32(encExistingProposalWrapper[:signingRootSize])
|
||||
if existingSigningRoot != proposal.SigningRoot {
|
||||
if existingSigningRoot != incomingProposal.SigningRoot {
|
||||
existingProposalWrapper, err := decodeProposalRecord(encExistingProposalWrapper)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
proposerSlashings = append(proposerSlashings, ðpb.ProposerSlashing{
|
||||
Header_1: existingProposalWrapper.SignedBeaconBlockHeader,
|
||||
Header_2: proposal.SignedBeaconBlockHeader,
|
||||
Header_2: incomingProposal.SignedBeaconBlockHeader,
|
||||
})
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
return proposerSlashings, err
|
||||
}
|
||||
|
||||
@@ -384,14 +451,20 @@ func (s *Store) BlockProposalForValidator(
|
||||
|
||||
// SaveBlockProposals takes in a list of block proposals and saves them to our
|
||||
// proposal records bucket in the database.
|
||||
// If multiple proposals are provided for the same slot + validatorIndex combination,
|
||||
// then only the last one is saved in the database.
|
||||
func (s *Store) SaveBlockProposals(
|
||||
ctx context.Context, proposals []*slashertypes.SignedBlockHeaderWrapper,
|
||||
) error {
|
||||
_, span := trace.StartSpan(ctx, "BeaconDB.SaveBlockProposals")
|
||||
defer span.End()
|
||||
|
||||
encodedKeys := make([][]byte, len(proposals))
|
||||
encodedProposals := make([][]byte, len(proposals))
|
||||
|
||||
// Loop over all proposals to encode keys and proposals themselves.
|
||||
for i, proposal := range proposals {
|
||||
// Encode the key for this proposal.
|
||||
key, err := keyForValidatorProposal(
|
||||
proposal.SignedBeaconBlockHeader.Header.Slot,
|
||||
proposal.SignedBeaconBlockHeader.Header.ProposerIndex,
|
||||
@@ -399,20 +472,29 @@ func (s *Store) SaveBlockProposals(
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Encode the proposal itself.
|
||||
enc, err := encodeProposalRecord(proposal)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
encodedKeys[i] = key
|
||||
encodedProposals[i] = enc
|
||||
}
|
||||
|
||||
// All proposals are saved into the DB in a single transaction.
|
||||
return s.db.Update(func(tx *bolt.Tx) error {
|
||||
// Retrieve the proposal records bucket.
|
||||
bkt := tx.Bucket(proposalRecordsBucket)
|
||||
|
||||
// Save all proposals.
|
||||
for i := range proposals {
|
||||
if err := bkt.Put(encodedKeys[i], encodedProposals[i]); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
@@ -472,7 +554,7 @@ func suffixForAttestationRecordsKey(key, encodedValidatorIndex []byte) bool {
|
||||
return bytes.Equal(encIdx, encodedValidatorIndex)
|
||||
}
|
||||
|
||||
// Disk key for a validator proposal, including a slot+validatorIndex as a byte slice.
|
||||
// keyForValidatorProposal returns a disk key for a validator proposal, including a slot+validatorIndex as a byte slice.
|
||||
func keyForValidatorProposal(slot primitives.Slot, proposerIndex primitives.ValidatorIndex) ([]byte, error) {
|
||||
encSlot, err := slot.MarshalSSZ()
|
||||
if err != nil {
|
||||
@@ -512,37 +594,55 @@ func decodeSlasherChunk(enc []byte) ([]uint16, error) {
|
||||
return chunk, nil
|
||||
}
|
||||
|
||||
// Decode attestation record from bytes.
|
||||
// Encode attestation record to bytes.
|
||||
// The output encoded attestation record consists in the signing root concatened with the compressed attestation record.
|
||||
func encodeAttestationRecord(att *slashertypes.IndexedAttestationWrapper) ([]byte, error) {
|
||||
if att == nil || att.IndexedAttestation == nil {
|
||||
return []byte{}, errors.New("nil proposal record")
|
||||
}
|
||||
|
||||
// Encode attestation.
|
||||
encodedAtt, err := att.IndexedAttestation.MarshalSSZ()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Compress attestation.
|
||||
compressedAtt := snappy.Encode(nil, encodedAtt)
|
||||
|
||||
return append(att.SigningRoot[:], compressedAtt...), nil
|
||||
}
|
||||
|
||||
// Decode attestation record from bytes.
|
||||
// The input encoded attestation record consists in the signing root concatened with the compressed attestation record.
|
||||
func decodeAttestationRecord(encoded []byte) (*slashertypes.IndexedAttestationWrapper, error) {
|
||||
if len(encoded) < signingRootSize {
|
||||
return nil, fmt.Errorf("wrong length for encoded attestation record, want 32, got %d", len(encoded))
|
||||
return nil, fmt.Errorf("wrong length for encoded attestation record, want minimum %d, got %d", signingRootSize, len(encoded))
|
||||
}
|
||||
signingRoot := encoded[:signingRootSize]
|
||||
decodedAtt := ðpb.IndexedAttestation{}
|
||||
|
||||
// Decompress attestation.
|
||||
decodedAttBytes, err := snappy.Decode(nil, encoded[signingRootSize:])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Decode attestation.
|
||||
decodedAtt := ðpb.IndexedAttestation{}
|
||||
if err := decodedAtt.UnmarshalSSZ(decodedAttBytes); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &slashertypes.IndexedAttestationWrapper{
|
||||
|
||||
// Decode signing root.
|
||||
signingRootBytes := encoded[:signingRootSize]
|
||||
signingRoot := bytesutil.ToBytes32(signingRootBytes)
|
||||
|
||||
// Return decoded attestation.
|
||||
attestation := &slashertypes.IndexedAttestationWrapper{
|
||||
IndexedAttestation: decodedAtt,
|
||||
SigningRoot: bytesutil.ToBytes32(signingRoot),
|
||||
}, nil
|
||||
SigningRoot: signingRoot,
|
||||
}
|
||||
|
||||
return attestation, nil
|
||||
}
|
||||
|
||||
func encodeProposalRecord(blkHdr *slashertypes.SignedBlockHeaderWrapper) ([]byte, error) {
|
||||
|
||||
@@ -23,7 +23,7 @@ type chunkUpdateArgs struct {
|
||||
}
|
||||
|
||||
// Chunker defines a struct which represents a slice containing a chunk for K different validator's
|
||||
// min spans used for surround vote detection in slasher. The interface defines methods used to check
|
||||
// min/max spans used for surround vote detection in slasher. The interface defines methods used to check
|
||||
// if an attestation is slashable for a validator index based on the contents of
|
||||
// the chunk as well as the ability to update the data in the chunk with incoming information.
|
||||
type Chunker interface {
|
||||
@@ -153,12 +153,12 @@ func MaxChunkSpansSliceFrom(params *Parameters, chunk []uint16) (*MaxSpanChunksS
|
||||
|
||||
// NeutralElement for a min span chunks slice is undefined, in this case
|
||||
// using MaxUint16 as a sane value given it is impossible we reach it.
|
||||
func (_ *MinSpanChunksSlice) NeutralElement() uint16 {
|
||||
func (*MinSpanChunksSlice) NeutralElement() uint16 {
|
||||
return math.MaxUint16
|
||||
}
|
||||
|
||||
// NeutralElement for a max span chunks slice is 0.
|
||||
func (_ *MaxSpanChunksSlice) NeutralElement() uint16 {
|
||||
func (*MaxSpanChunksSlice) NeutralElement() uint16 {
|
||||
return 0
|
||||
}
|
||||
|
||||
@@ -191,12 +191,14 @@ func (m *MinSpanChunksSlice) CheckSlashable(
|
||||
) (*ethpb.AttesterSlashing, error) {
|
||||
sourceEpoch := attestation.IndexedAttestation.Data.Source.Epoch
|
||||
targetEpoch := attestation.IndexedAttestation.Data.Target.Epoch
|
||||
|
||||
minTarget, err := chunkDataAtEpoch(m.params, m.data, validatorIdx, sourceEpoch)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(
|
||||
err, "could not get min target for validator %d at epoch %d", validatorIdx, sourceEpoch,
|
||||
)
|
||||
}
|
||||
|
||||
if targetEpoch > minTarget {
|
||||
existingAttRecord, err := slasherDB.AttestationRecordForValidator(
|
||||
ctx, validatorIdx, minTarget,
|
||||
@@ -206,16 +208,20 @@ func (m *MinSpanChunksSlice) CheckSlashable(
|
||||
err, "could not get existing attestation record at target %d", minTarget,
|
||||
)
|
||||
}
|
||||
if existingAttRecord != nil {
|
||||
if sourceEpoch < existingAttRecord.IndexedAttestation.Data.Source.Epoch {
|
||||
surroundingVotesTotal.Inc()
|
||||
return ðpb.AttesterSlashing{
|
||||
Attestation_1: attestation.IndexedAttestation,
|
||||
Attestation_2: existingAttRecord.IndexedAttestation,
|
||||
}, nil
|
||||
}
|
||||
|
||||
if existingAttRecord == nil {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
if sourceEpoch < existingAttRecord.IndexedAttestation.Data.Source.Epoch {
|
||||
surroundingVotesTotal.Inc()
|
||||
return ðpb.AttesterSlashing{
|
||||
Attestation_1: attestation.IndexedAttestation,
|
||||
Attestation_2: existingAttRecord.IndexedAttestation,
|
||||
}, nil
|
||||
}
|
||||
}
|
||||
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
@@ -224,7 +230,7 @@ func (m *MinSpanChunksSlice) CheckSlashable(
|
||||
// within the max span chunks slice. Recall that for an incoming attestation, B, and an
|
||||
// existing attestation, A:
|
||||
//
|
||||
// B surrounds A if and only if B.target < max_spans[B.source]
|
||||
// B is surrounded by A if and only if B.target < max_spans[B.source]
|
||||
//
|
||||
// That is, this condition is sufficient to check if an incoming attestation
|
||||
// is surrounded by a previous one. We also check if we indeed have an existing
|
||||
@@ -238,12 +244,14 @@ func (m *MaxSpanChunksSlice) CheckSlashable(
|
||||
) (*ethpb.AttesterSlashing, error) {
|
||||
sourceEpoch := attestation.IndexedAttestation.Data.Source.Epoch
|
||||
targetEpoch := attestation.IndexedAttestation.Data.Target.Epoch
|
||||
|
||||
maxTarget, err := chunkDataAtEpoch(m.params, m.data, validatorIdx, sourceEpoch)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(
|
||||
err, "could not get max target for validator %d at epoch %d", validatorIdx, sourceEpoch,
|
||||
)
|
||||
}
|
||||
|
||||
if targetEpoch < maxTarget {
|
||||
existingAttRecord, err := slasherDB.AttestationRecordForValidator(
|
||||
ctx, validatorIdx, maxTarget,
|
||||
@@ -253,14 +261,17 @@ func (m *MaxSpanChunksSlice) CheckSlashable(
|
||||
err, "could not get existing attestation record at target %d", maxTarget,
|
||||
)
|
||||
}
|
||||
if existingAttRecord != nil {
|
||||
if existingAttRecord.IndexedAttestation.Data.Source.Epoch < sourceEpoch {
|
||||
surroundedVotesTotal.Inc()
|
||||
return ðpb.AttesterSlashing{
|
||||
Attestation_1: existingAttRecord.IndexedAttestation,
|
||||
Attestation_2: attestation.IndexedAttestation,
|
||||
}, nil
|
||||
}
|
||||
|
||||
if existingAttRecord == nil {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
if existingAttRecord.IndexedAttestation.Data.Source.Epoch < sourceEpoch {
|
||||
surroundedVotesTotal.Inc()
|
||||
return ðpb.AttesterSlashing{
|
||||
Attestation_1: existingAttRecord.IndexedAttestation,
|
||||
Attestation_2: attestation.IndexedAttestation,
|
||||
}, nil
|
||||
}
|
||||
}
|
||||
return nil, nil
|
||||
@@ -331,8 +342,8 @@ func (m *MinSpanChunksSlice) Update(
|
||||
minEpoch = args.currentEpoch - (m.params.historyLength - 1)
|
||||
}
|
||||
epochInChunk := startEpoch
|
||||
// We go down the chunk for the validator, updating every value starting at start_epoch down to min_epoch.
|
||||
// As long as the epoch, e, in the same chunk index and e >= min_epoch, we proceed with
|
||||
// We go down the chunk for the validator, updating every value starting at startEpoch down to minEpoch.
|
||||
// As long as the epoch, e, in the same chunk index and e >= minEpoch, we proceed with
|
||||
// a for loop.
|
||||
for m.params.chunkIndex(epochInChunk) == args.chunkIndex && epochInChunk >= minEpoch {
|
||||
var chunkTarget primitives.Epoch
|
||||
@@ -375,7 +386,7 @@ func (m *MaxSpanChunksSlice) Update(
|
||||
newTargetEpoch primitives.Epoch,
|
||||
) (keepGoing bool, err error) {
|
||||
epochInChunk := startEpoch
|
||||
// We go down the chunk for the validator, updating every value starting at start_epoch up to
|
||||
// We go down the chunk for the validator, updating every value starting at startEpoch up to
|
||||
// and including the current epoch. As long as the epoch, e, is in the same chunk index and e <= currentEpoch,
|
||||
// we proceed with a for loop.
|
||||
for m.params.chunkIndex(epochInChunk) == args.chunkIndex && epochInChunk <= args.currentEpoch {
|
||||
@@ -436,7 +447,7 @@ func (m *MinSpanChunksSlice) StartEpoch(
|
||||
|
||||
// StartEpoch given a source epoch and current epoch, determines the start epoch of
|
||||
// a max span chunk for use in chunk updates. The source epoch cannot be >= the current epoch.
|
||||
func (_ *MaxSpanChunksSlice) StartEpoch(
|
||||
func (*MaxSpanChunksSlice) StartEpoch(
|
||||
sourceEpoch, currentEpoch primitives.Epoch,
|
||||
) (epoch primitives.Epoch, exists bool) {
|
||||
if sourceEpoch >= currentEpoch {
|
||||
|
||||
@@ -20,6 +20,7 @@ func (s *Service) checkSlashableAttestations(
|
||||
) ([]*ethpb.AttesterSlashing, error) {
|
||||
slashings := make([]*ethpb.AttesterSlashing, 0)
|
||||
|
||||
// Double votes
|
||||
log.Debug("Checking for double votes")
|
||||
start := time.Now()
|
||||
doubleVoteSlashings, err := s.checkDoubleVotes(ctx, atts)
|
||||
@@ -29,42 +30,53 @@ func (s *Service) checkSlashableAttestations(
|
||||
log.WithField("elapsed", time.Since(start)).Debug("Done checking double votes")
|
||||
slashings = append(slashings, doubleVoteSlashings...)
|
||||
|
||||
// Surrounding / surrounded votes
|
||||
groupedAtts := s.groupByValidatorChunkIndex(atts)
|
||||
log.WithField("numBatches", len(groupedAtts)).Debug("Batching attestations by validator chunk index")
|
||||
start = time.Now()
|
||||
batchTimes := make([]time.Duration, 0, len(groupedAtts))
|
||||
|
||||
for validatorChunkIdx, batch := range groupedAtts {
|
||||
innerStart := time.Now()
|
||||
attSlashings, err := s.detectAllAttesterSlashings(ctx, &chunkUpdateArgs{
|
||||
|
||||
attSlashings, err := s.checkSurrounds(ctx, &chunkUpdateArgs{
|
||||
validatorChunkIndex: validatorChunkIdx,
|
||||
currentEpoch: currentEpoch,
|
||||
}, batch)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
slashings = append(slashings, attSlashings...)
|
||||
|
||||
indices := s.params.validatorIndicesInChunk(validatorChunkIdx)
|
||||
for _, idx := range indices {
|
||||
s.latestEpochWrittenForValidator[idx] = currentEpoch
|
||||
}
|
||||
|
||||
batchTimes = append(batchTimes, time.Since(innerStart))
|
||||
}
|
||||
var avgProcessingTimePerBatch time.Duration
|
||||
|
||||
avgProcessingTimePerBatch := time.Duration(0)
|
||||
for _, dur := range batchTimes {
|
||||
avgProcessingTimePerBatch += dur
|
||||
}
|
||||
|
||||
if avgProcessingTimePerBatch != time.Duration(0) {
|
||||
avgProcessingTimePerBatch = avgProcessingTimePerBatch / time.Duration(len(batchTimes))
|
||||
}
|
||||
|
||||
log.WithFields(logrus.Fields{
|
||||
"numAttestations": len(atts),
|
||||
"numBatchesByValidatorChunkIndex": len(groupedAtts),
|
||||
"elapsed": time.Since(start),
|
||||
"avgBatchProcessingTime": avgProcessingTimePerBatch,
|
||||
}).Info("Done checking slashable attestations")
|
||||
|
||||
if len(slashings) > 0 {
|
||||
log.WithField("numSlashings", len(slashings)).Warn("Slashable attestation offenses found")
|
||||
}
|
||||
|
||||
return slashings, nil
|
||||
}
|
||||
|
||||
@@ -72,14 +84,13 @@ func (s *Service) checkSlashableAttestations(
|
||||
// as the current epoch in time, we perform slashing detection.
|
||||
// The process is as follows given a list of attestations:
|
||||
//
|
||||
// 1. Check for attester double votes using the list of attestations.
|
||||
// 2. Group the attestations by chunk index.
|
||||
// 3. Update the min and max spans for those grouped attestations, check if any slashings are
|
||||
// 1. Group the attestations by chunk index.
|
||||
// 2. Update the min and max spans for those grouped attestations, check if any slashings are
|
||||
// found in the process
|
||||
// 4. Update the latest written epoch for all validators involved to the current epoch.
|
||||
// 3. Update the latest written epoch for all validators involved to the current epoch.
|
||||
//
|
||||
// This function performs a lot of critical actions and is split into smaller helpers for cleanliness.
|
||||
func (s *Service) detectAllAttesterSlashings(
|
||||
func (s *Service) checkSurrounds(
|
||||
ctx context.Context,
|
||||
args *chunkUpdateArgs,
|
||||
attestations []*slashertypes.IndexedAttestationWrapper,
|
||||
@@ -136,48 +147,8 @@ func (s *Service) detectAllAttesterSlashings(
|
||||
return slashings, nil
|
||||
}
|
||||
|
||||
// Check for attester slashing double votes by looking at every single validator index
|
||||
// in each attestation's attesting indices and checking if there already exist records for such
|
||||
// attestation's target epoch. If so, we append a double vote slashing object to a list of slashings
|
||||
// we return to the caller.
|
||||
func (s *Service) checkDoubleVotes(
|
||||
ctx context.Context, attestations []*slashertypes.IndexedAttestationWrapper,
|
||||
) ([]*ethpb.AttesterSlashing, error) {
|
||||
ctx, span := trace.StartSpan(ctx, "Slasher.checkDoubleVotes")
|
||||
defer span.End()
|
||||
// We check if there are any slashable double votes in the input list
|
||||
// of attestations with respect to each other.
|
||||
slashings := make([]*ethpb.AttesterSlashing, 0)
|
||||
existingAtts := make(map[string]*slashertypes.IndexedAttestationWrapper)
|
||||
for _, att := range attestations {
|
||||
for _, valIdx := range att.IndexedAttestation.AttestingIndices {
|
||||
key := uintToString(uint64(att.IndexedAttestation.Data.Target.Epoch)) + ":" + uintToString(valIdx)
|
||||
existingAtt, ok := existingAtts[key]
|
||||
if !ok {
|
||||
existingAtts[key] = att
|
||||
continue
|
||||
}
|
||||
if att.SigningRoot != existingAtt.SigningRoot {
|
||||
doubleVotesTotal.Inc()
|
||||
slashings = append(slashings, ðpb.AttesterSlashing{
|
||||
Attestation_1: existingAtt.IndexedAttestation,
|
||||
Attestation_2: att.IndexedAttestation,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// We check if there are any slashable double votes in the input list
|
||||
// of attestations with respect to our database.
|
||||
moreSlashings, err := s.checkDoubleVotesOnDisk(ctx, attestations)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "could not check attestation double votes on disk")
|
||||
}
|
||||
return append(slashings, moreSlashings...), nil
|
||||
}
|
||||
|
||||
// Check for double votes in our database given a list of incoming attestations.
|
||||
func (s *Service) checkDoubleVotesOnDisk(
|
||||
func (s *Service) checkDoubleVotes(
|
||||
ctx context.Context, attestations []*slashertypes.IndexedAttestationWrapper,
|
||||
) ([]*ethpb.AttesterSlashing, error) {
|
||||
ctx, span := trace.StartSpan(ctx, "Slasher.checkDoubleVotesOnDisk")
|
||||
|
||||
@@ -30,10 +30,32 @@ func Test_processQueuedAttestations(t *testing.T) {
|
||||
currentEpoch primitives.Epoch
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
args args
|
||||
shouldNotBeSlashable bool
|
||||
name string
|
||||
args args
|
||||
shouldBeSlashable bool
|
||||
}{
|
||||
{
|
||||
name: "Same target with different signing roots",
|
||||
args: args{
|
||||
attestationQueue: []*slashertypes.IndexedAttestationWrapper{
|
||||
createAttestationWrapper(t, 1, 2, []uint64{0, 1}, []byte{1}),
|
||||
createAttestationWrapper(t, 1, 2, []uint64{0, 1}, []byte{2}),
|
||||
},
|
||||
currentEpoch: 4,
|
||||
},
|
||||
shouldBeSlashable: true,
|
||||
},
|
||||
{
|
||||
name: "Same target with same signing roots",
|
||||
args: args{
|
||||
attestationQueue: []*slashertypes.IndexedAttestationWrapper{
|
||||
createAttestationWrapper(t, 1, 2, []uint64{0, 1}, []byte{1}),
|
||||
createAttestationWrapper(t, 1, 2, []uint64{0, 1}, []byte{1}),
|
||||
},
|
||||
currentEpoch: 4,
|
||||
},
|
||||
shouldBeSlashable: false,
|
||||
},
|
||||
{
|
||||
name: "Detects surrounding vote (source 1, target 2), (source 0, target 3)",
|
||||
args: args{
|
||||
@@ -43,6 +65,7 @@ func Test_processQueuedAttestations(t *testing.T) {
|
||||
},
|
||||
currentEpoch: 4,
|
||||
},
|
||||
shouldBeSlashable: true,
|
||||
},
|
||||
{
|
||||
name: "Detects surrounding vote (source 50, target 51), (source 0, target 1000)",
|
||||
@@ -53,6 +76,7 @@ func Test_processQueuedAttestations(t *testing.T) {
|
||||
},
|
||||
currentEpoch: 1000,
|
||||
},
|
||||
shouldBeSlashable: true,
|
||||
},
|
||||
{
|
||||
name: "Detects surrounded vote (source 0, target 3), (source 1, target 2)",
|
||||
@@ -63,6 +87,7 @@ func Test_processQueuedAttestations(t *testing.T) {
|
||||
},
|
||||
currentEpoch: 4,
|
||||
},
|
||||
shouldBeSlashable: true,
|
||||
},
|
||||
{
|
||||
name: "Detects double vote, (source 1, target 2), (source 0, target 2)",
|
||||
@@ -73,6 +98,7 @@ func Test_processQueuedAttestations(t *testing.T) {
|
||||
},
|
||||
currentEpoch: 4,
|
||||
},
|
||||
shouldBeSlashable: true,
|
||||
},
|
||||
{
|
||||
name: "Not slashable, surrounding but non-overlapping attesting indices within same validator chunk index",
|
||||
@@ -83,7 +109,7 @@ func Test_processQueuedAttestations(t *testing.T) {
|
||||
},
|
||||
currentEpoch: 4,
|
||||
},
|
||||
shouldNotBeSlashable: true,
|
||||
shouldBeSlashable: false,
|
||||
},
|
||||
{
|
||||
name: "Not slashable, surrounded but non-overlapping attesting indices within same validator chunk index",
|
||||
@@ -94,7 +120,7 @@ func Test_processQueuedAttestations(t *testing.T) {
|
||||
},
|
||||
currentEpoch: 4,
|
||||
},
|
||||
shouldNotBeSlashable: true,
|
||||
shouldBeSlashable: false,
|
||||
},
|
||||
{
|
||||
name: "Not slashable, surrounding but non-overlapping attesting indices in different validator chunk index",
|
||||
@@ -111,7 +137,7 @@ func Test_processQueuedAttestations(t *testing.T) {
|
||||
},
|
||||
currentEpoch: 4,
|
||||
},
|
||||
shouldNotBeSlashable: true,
|
||||
shouldBeSlashable: false,
|
||||
},
|
||||
{
|
||||
name: "Not slashable, surrounded but non-overlapping attesting indices in different validator chunk index",
|
||||
@@ -128,7 +154,7 @@ func Test_processQueuedAttestations(t *testing.T) {
|
||||
},
|
||||
currentEpoch: 4,
|
||||
},
|
||||
shouldNotBeSlashable: true,
|
||||
shouldBeSlashable: false,
|
||||
},
|
||||
{
|
||||
name: "Not slashable, (source 1, target 2), (source 2, target 3)",
|
||||
@@ -139,7 +165,7 @@ func Test_processQueuedAttestations(t *testing.T) {
|
||||
},
|
||||
currentEpoch: 4,
|
||||
},
|
||||
shouldNotBeSlashable: true,
|
||||
shouldBeSlashable: false,
|
||||
},
|
||||
{
|
||||
name: "Not slashable, (source 0, target 3), (source 2, target 4)",
|
||||
@@ -150,7 +176,7 @@ func Test_processQueuedAttestations(t *testing.T) {
|
||||
},
|
||||
currentEpoch: 4,
|
||||
},
|
||||
shouldNotBeSlashable: true,
|
||||
shouldBeSlashable: false,
|
||||
},
|
||||
{
|
||||
name: "Not slashable, (source 0, target 2), (source 0, target 3)",
|
||||
@@ -161,7 +187,7 @@ func Test_processQueuedAttestations(t *testing.T) {
|
||||
},
|
||||
currentEpoch: 4,
|
||||
},
|
||||
shouldNotBeSlashable: true,
|
||||
shouldBeSlashable: false,
|
||||
},
|
||||
{
|
||||
name: "Not slashable, (source 0, target 3), (source 0, target 2)",
|
||||
@@ -172,7 +198,7 @@ func Test_processQueuedAttestations(t *testing.T) {
|
||||
},
|
||||
currentEpoch: 4,
|
||||
},
|
||||
shouldNotBeSlashable: true,
|
||||
shouldBeSlashable: false,
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
@@ -246,21 +272,24 @@ func Test_processQueuedAttestations(t *testing.T) {
|
||||
s.genesisTime = genesisTime
|
||||
|
||||
currentSlotChan := make(chan primitives.Slot)
|
||||
exitChan := make(chan struct{})
|
||||
s.wg.Add(1)
|
||||
go func() {
|
||||
s.processQueuedAttestations(ctx, currentSlotChan)
|
||||
exitChan <- struct{}{}
|
||||
}()
|
||||
s.attsQueue.extend(tt.args.attestationQueue)
|
||||
currentSlotChan <- slot
|
||||
time.Sleep(time.Millisecond * 200)
|
||||
cancel()
|
||||
<-exitChan
|
||||
if tt.shouldNotBeSlashable {
|
||||
require.LogsDoNotContain(t, hook, "Attester slashing detected")
|
||||
} else {
|
||||
s.wg.Wait()
|
||||
if tt.shouldBeSlashable {
|
||||
require.LogsContain(t, hook, "Attester slashing detected")
|
||||
} else {
|
||||
require.LogsDoNotContain(t, hook, "Attester slashing detected")
|
||||
}
|
||||
|
||||
require.LogsDoNotContain(t, hook, couldNotSaveAttRecord)
|
||||
require.LogsDoNotContain(t, hook, couldNotCheckSlashableAtt)
|
||||
require.LogsDoNotContain(t, hook, couldNotProcessAttesterSlashings)
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -304,10 +333,9 @@ func Test_processQueuedAttestations_MultipleChunkIndices(t *testing.T) {
|
||||
s.genesisTime = genesisTime
|
||||
|
||||
currentSlotChan := make(chan primitives.Slot)
|
||||
exitChan := make(chan struct{})
|
||||
s.wg.Add(1)
|
||||
go func() {
|
||||
s.processQueuedAttestations(ctx, currentSlotChan)
|
||||
exitChan <- struct{}{}
|
||||
}()
|
||||
|
||||
for i := startEpoch; i <= endEpoch; i++ {
|
||||
@@ -331,7 +359,7 @@ func Test_processQueuedAttestations_MultipleChunkIndices(t *testing.T) {
|
||||
|
||||
time.Sleep(time.Millisecond * 200)
|
||||
cancel()
|
||||
<-exitChan
|
||||
s.wg.Wait()
|
||||
require.LogsDoNotContain(t, hook, "Slashable offenses found")
|
||||
require.LogsDoNotContain(t, hook, "Could not detect")
|
||||
}
|
||||
@@ -370,10 +398,9 @@ func Test_processQueuedAttestations_OverlappingChunkIndices(t *testing.T) {
|
||||
s.genesisTime = genesisTime
|
||||
|
||||
currentSlotChan := make(chan primitives.Slot)
|
||||
exitChan := make(chan struct{})
|
||||
s.wg.Add(1)
|
||||
go func() {
|
||||
s.processQueuedAttestations(ctx, currentSlotChan)
|
||||
exitChan <- struct{}{}
|
||||
}()
|
||||
|
||||
// We create two attestations fully spanning chunk indices 0 and chunk 1
|
||||
@@ -392,7 +419,7 @@ func Test_processQueuedAttestations_OverlappingChunkIndices(t *testing.T) {
|
||||
|
||||
time.Sleep(time.Millisecond * 200)
|
||||
cancel()
|
||||
<-exitChan
|
||||
s.wg.Wait()
|
||||
require.LogsDoNotContain(t, hook, "Slashable offenses found")
|
||||
require.LogsDoNotContain(t, hook, "Could not detect")
|
||||
}
|
||||
@@ -593,44 +620,6 @@ func Test_applyAttestationForValidator_MaxSpanChunk(t *testing.T) {
|
||||
require.NotNil(t, slashing)
|
||||
}
|
||||
|
||||
func Test_checkDoubleVotes_SlashableInputAttestations(t *testing.T) {
|
||||
slasherDB := dbtest.SetupSlasherDB(t)
|
||||
ctx := context.Background()
|
||||
// For a list of input attestations, check that we can
|
||||
// indeed check there could exist a double vote offense
|
||||
// within the list with respect to other entries in the list.
|
||||
atts := []*slashertypes.IndexedAttestationWrapper{
|
||||
createAttestationWrapper(t, 0, 1, []uint64{1, 2}, []byte{1}),
|
||||
createAttestationWrapper(t, 0, 2, []uint64{1, 2}, []byte{1}),
|
||||
createAttestationWrapper(t, 0, 2, []uint64{1, 2}, []byte{2}), // Different signing root.
|
||||
}
|
||||
srv, err := New(context.Background(),
|
||||
&ServiceConfig{
|
||||
Database: slasherDB,
|
||||
StateNotifier: &mock.MockStateNotifier{},
|
||||
ClockWaiter: startup.NewClockSynchronizer(),
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
prev1 := createAttestationWrapper(t, 0, 2, []uint64{1, 2}, []byte{1})
|
||||
cur1 := createAttestationWrapper(t, 0, 2, []uint64{1, 2}, []byte{2})
|
||||
prev2 := createAttestationWrapper(t, 0, 2, []uint64{1, 2}, []byte{1})
|
||||
cur2 := createAttestationWrapper(t, 0, 2, []uint64{1, 2}, []byte{2})
|
||||
wanted := []*ethpb.AttesterSlashing{
|
||||
{
|
||||
Attestation_1: prev1.IndexedAttestation,
|
||||
Attestation_2: cur1.IndexedAttestation,
|
||||
},
|
||||
{
|
||||
Attestation_1: prev2.IndexedAttestation,
|
||||
Attestation_2: cur2.IndexedAttestation,
|
||||
},
|
||||
}
|
||||
slashings, err := srv.checkDoubleVotes(ctx, atts)
|
||||
require.NoError(t, err)
|
||||
require.DeepEqual(t, wanted, slashings)
|
||||
}
|
||||
|
||||
func Test_checkDoubleVotes_SlashableAttestationsOnDisk(t *testing.T) {
|
||||
slasherDB := dbtest.SetupSlasherDB(t)
|
||||
ctx := context.Background()
|
||||
@@ -787,16 +776,15 @@ func TestService_processQueuedAttestations(t *testing.T) {
|
||||
})
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
tickerChan := make(chan primitives.Slot)
|
||||
exitChan := make(chan struct{})
|
||||
s.wg.Add(1)
|
||||
go func() {
|
||||
s.processQueuedAttestations(ctx, tickerChan)
|
||||
exitChan <- struct{}{}
|
||||
}()
|
||||
|
||||
// Send a value over the ticker.
|
||||
tickerChan <- 1
|
||||
cancel()
|
||||
<-exitChan
|
||||
s.wg.Wait()
|
||||
assert.LogsContain(t, hook, "Processing queued")
|
||||
}
|
||||
|
||||
|
||||
@@ -5,7 +5,6 @@ import (
|
||||
|
||||
"github.com/pkg/errors"
|
||||
slashertypes "github.com/prysmaticlabs/prysm/v4/beacon-chain/slasher/types"
|
||||
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
|
||||
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
|
||||
"go.opencensus.io/trace"
|
||||
)
|
||||
@@ -13,80 +12,70 @@ import (
|
||||
// detectProposerSlashings takes in signed block header wrappers and returns a list of proposer slashings detected.
|
||||
func (s *Service) detectProposerSlashings(
|
||||
ctx context.Context,
|
||||
proposedBlocks []*slashertypes.SignedBlockHeaderWrapper,
|
||||
incomingProposals []*slashertypes.SignedBlockHeaderWrapper,
|
||||
) ([]*ethpb.ProposerSlashing, error) {
|
||||
ctx, span := trace.StartSpan(ctx, "slasher.detectProposerSlashings")
|
||||
defer span.End()
|
||||
|
||||
// internalSlashings will contain any slashable double proposals in the input list
|
||||
// of proposals with respect to each other.
|
||||
internalSlashings := []*ethpb.ProposerSlashing{}
|
||||
|
||||
existingProposals := make(map[string]*slashertypes.SignedBlockHeaderWrapper)
|
||||
|
||||
// We check if there are any slashable double proposals in the input list
|
||||
// of proposals with respect to each other.
|
||||
slashings := make([]*ethpb.ProposerSlashing, 0)
|
||||
existingProposals := make(map[string]*slashertypes.SignedBlockHeaderWrapper)
|
||||
for i, proposal := range proposedBlocks {
|
||||
key := proposalKey(proposal)
|
||||
for _, incomingProposal := range incomingProposals {
|
||||
key := proposalKey(incomingProposal)
|
||||
existingProposal, ok := existingProposals[key]
|
||||
|
||||
// If we have not seen this proposal before, we add it to our map of existing proposals
|
||||
// and we continue to the next proposal.
|
||||
if !ok {
|
||||
existingProposals[key] = proposal
|
||||
existingProposals[key] = incomingProposal
|
||||
continue
|
||||
}
|
||||
if isDoubleProposal(proposedBlocks[i].SigningRoot, existingProposal.SigningRoot) {
|
||||
|
||||
// If we have seen this proposal before, we check if it is a double proposal.
|
||||
if isDoubleProposal(incomingProposal.SigningRoot, existingProposal.SigningRoot) {
|
||||
doubleProposalsTotal.Inc()
|
||||
|
||||
slashing := ðpb.ProposerSlashing{
|
||||
Header_1: existingProposal.SignedBeaconBlockHeader,
|
||||
Header_2: proposedBlocks[i].SignedBeaconBlockHeader,
|
||||
Header_2: incomingProposal.SignedBeaconBlockHeader,
|
||||
}
|
||||
slashings = append(slashings, slashing)
|
||||
|
||||
internalSlashings = append(internalSlashings, slashing)
|
||||
}
|
||||
}
|
||||
|
||||
proposerSlashings, err := s.serviceCfg.Database.CheckDoubleBlockProposals(ctx, proposedBlocks)
|
||||
// We check if there are any slashable double proposals in the input list
|
||||
// of proposals with respect to the slasher database.
|
||||
databaseSlashings, err := s.serviceCfg.Database.CheckDoubleBlockProposals(ctx, incomingProposals)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "could not check for double proposals on disk")
|
||||
}
|
||||
if err := s.saveSafeProposals(ctx, proposedBlocks, proposerSlashings); err != nil {
|
||||
|
||||
// We save the safe proposals (with respect to the database) to our database.
|
||||
// If some proposals in incomingProposals are slashable with respect to each other,
|
||||
// we (arbitrarily) save the last one to the database.
|
||||
if err := s.serviceCfg.Database.SaveBlockProposals(ctx, incomingProposals); err != nil {
|
||||
return nil, errors.Wrap(err, "could not save safe proposals")
|
||||
}
|
||||
slashings = append(slashings, proposerSlashings...)
|
||||
return slashings, nil
|
||||
}
|
||||
|
||||
// Check for double proposals in our database given a list of incoming block proposals.
|
||||
// For the proposals that were not slashable, we save them to the database.
|
||||
func (s *Service) saveSafeProposals(
|
||||
ctx context.Context,
|
||||
proposedBlocks []*slashertypes.SignedBlockHeaderWrapper,
|
||||
proposerSlashings []*ethpb.ProposerSlashing,
|
||||
) error {
|
||||
ctx, span := trace.StartSpan(ctx, "slasher.saveSafeProposals")
|
||||
defer span.End()
|
||||
return s.serviceCfg.Database.SaveBlockProposals(
|
||||
ctx,
|
||||
filterSafeProposals(proposedBlocks, proposerSlashings),
|
||||
)
|
||||
}
|
||||
|
||||
func filterSafeProposals(
|
||||
proposedBlocks []*slashertypes.SignedBlockHeaderWrapper,
|
||||
proposerSlashings []*ethpb.ProposerSlashing,
|
||||
) []*slashertypes.SignedBlockHeaderWrapper {
|
||||
// We initialize a map of proposers that are safe from slashing.
|
||||
safeProposers := make(map[primitives.ValidatorIndex]*slashertypes.SignedBlockHeaderWrapper, len(proposedBlocks))
|
||||
for _, proposal := range proposedBlocks {
|
||||
safeProposers[proposal.SignedBeaconBlockHeader.Header.ProposerIndex] = proposal
|
||||
}
|
||||
for _, doubleProposal := range proposerSlashings {
|
||||
// If a proposer is found to have committed a slashable offense, we delete
|
||||
// them from the safe proposers map.
|
||||
delete(safeProposers, doubleProposal.Header_1.Header.ProposerIndex)
|
||||
}
|
||||
// We save all the proposals that are determined "safe" and not-slashable to our database.
|
||||
safeProposals := make([]*slashertypes.SignedBlockHeaderWrapper, 0, len(safeProposers))
|
||||
for _, proposal := range safeProposers {
|
||||
safeProposals = append(safeProposals, proposal)
|
||||
}
|
||||
return safeProposals
|
||||
|
||||
// totalSlashings contain all slashings we have detected.
|
||||
totalSlashings := append(internalSlashings, databaseSlashings...)
|
||||
return totalSlashings, nil
|
||||
}
|
||||
|
||||
// proposalKey build a key which is a combination of the slot and the proposer index.
|
||||
// If a validator proposes several blocks for the same slot, then several (potentially slashable)
|
||||
// proposals will correspond to the same key.
|
||||
func proposalKey(proposal *slashertypes.SignedBlockHeaderWrapper) string {
|
||||
header := proposal.SignedBeaconBlockHeader.Header
|
||||
return uintToString(uint64(header.Slot)) + ":" + uintToString(uint64(header.ProposerIndex))
|
||||
|
||||
slotKey := uintToString(uint64(header.Slot))
|
||||
proposerIndexKey := uintToString(uint64(header.ProposerIndex))
|
||||
|
||||
return slotKey + ":" + proposerIndexKey
|
||||
}
|
||||
|
||||
@@ -22,94 +22,139 @@ import (
|
||||
logTest "github.com/sirupsen/logrus/hooks/test"
|
||||
)
|
||||
|
||||
type wrapped struct {
|
||||
slot primitives.Slot
|
||||
signedBlkHeaders []*slashertypes.SignedBlockHeaderWrapper
|
||||
}
|
||||
|
||||
func Test_processQueuedBlocks_DetectsDoubleProposals(t *testing.T) {
|
||||
hook := logTest.NewGlobal()
|
||||
slasherDB := dbtest.SetupSlasherDB(t)
|
||||
beaconDB := dbtest.SetupDB(t)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
beaconState, err := util.NewBeaconState()
|
||||
require.NoError(t, err)
|
||||
|
||||
// Initialize validators in the state.
|
||||
numVals := params.BeaconConfig().MinGenesisActiveValidatorCount
|
||||
validators := make([]*ethpb.Validator, numVals)
|
||||
privKeys := make([]bls.SecretKey, numVals)
|
||||
for i := range validators {
|
||||
privKey, err := bls.RandKey()
|
||||
require.NoError(t, err)
|
||||
privKeys[i] = privKey
|
||||
validators[i] = ðpb.Validator{
|
||||
PublicKey: privKey.PublicKey().Marshal(),
|
||||
WithdrawalCredentials: make([]byte, 32),
|
||||
}
|
||||
}
|
||||
err = beaconState.SetValidators(validators)
|
||||
require.NoError(t, err)
|
||||
domain, err := signing.Domain(
|
||||
beaconState.Fork(),
|
||||
0,
|
||||
params.BeaconConfig().DomainBeaconProposer,
|
||||
beaconState.GenesisValidatorsRoot(),
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
mockChain := &mock.ChainService{
|
||||
State: beaconState,
|
||||
}
|
||||
s := &Service{
|
||||
serviceCfg: &ServiceConfig{
|
||||
Database: slasherDB,
|
||||
StateNotifier: &mock.MockStateNotifier{},
|
||||
HeadStateFetcher: mockChain,
|
||||
StateGen: stategen.New(beaconDB, doublylinkedtree.New()),
|
||||
SlashingPoolInserter: &slashingsmock.PoolMock{},
|
||||
ClockWaiter: startup.NewClockSynchronizer(),
|
||||
testCases := []struct {
|
||||
name string
|
||||
wraps []wrapped
|
||||
}{
|
||||
{
|
||||
name: "detects double proposals in the same batch",
|
||||
wraps: []wrapped{
|
||||
{
|
||||
4,
|
||||
[]*slashertypes.SignedBlockHeaderWrapper{
|
||||
createProposalWrapper(t, 4, 1, []byte{1}),
|
||||
createProposalWrapper(t, 4, 1, []byte{1}),
|
||||
createProposalWrapper(t, 4, 1, []byte{1}),
|
||||
createProposalWrapper(t, 4, 1, []byte{2}),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "detects double proposals in the different batches",
|
||||
wraps: []wrapped{
|
||||
{
|
||||
5,
|
||||
[]*slashertypes.SignedBlockHeaderWrapper{
|
||||
createProposalWrapper(t, 4, 1, []byte{1}),
|
||||
createProposalWrapper(t, 5, 1, []byte{1}),
|
||||
},
|
||||
},
|
||||
{
|
||||
6,
|
||||
[]*slashertypes.SignedBlockHeaderWrapper{
|
||||
createProposalWrapper(t, 4, 1, []byte{2}),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
params: DefaultParams(),
|
||||
blksQueue: newBlocksQueue(),
|
||||
}
|
||||
|
||||
parentRoot := bytesutil.ToBytes32([]byte("parent"))
|
||||
err = s.serviceCfg.StateGen.SaveState(ctx, parentRoot, beaconState)
|
||||
require.NoError(t, err)
|
||||
for _, tt := range testCases {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
hook := logTest.NewGlobal()
|
||||
beaconDB := dbtest.SetupDB(t)
|
||||
slasherDB := dbtest.SetupSlasherDB(t)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
currentSlotChan := make(chan primitives.Slot)
|
||||
exitChan := make(chan struct{})
|
||||
go func() {
|
||||
s.processQueuedBlocks(ctx, currentSlotChan)
|
||||
exitChan <- struct{}{}
|
||||
}()
|
||||
beaconState, err := util.NewBeaconState()
|
||||
require.NoError(t, err)
|
||||
|
||||
signedBlkHeaders := []*slashertypes.SignedBlockHeaderWrapper{
|
||||
createProposalWrapper(t, 4, 1, []byte{1}),
|
||||
createProposalWrapper(t, 4, 1, []byte{1}),
|
||||
createProposalWrapper(t, 4, 1, []byte{1}),
|
||||
createProposalWrapper(t, 4, 1, []byte{2}),
|
||||
// Initialize validators in the state.
|
||||
numVals := params.BeaconConfig().MinGenesisActiveValidatorCount
|
||||
validators := make([]*ethpb.Validator, numVals)
|
||||
privKeys := make([]bls.SecretKey, numVals)
|
||||
for i := range validators {
|
||||
privKey, err := bls.RandKey()
|
||||
require.NoError(t, err)
|
||||
privKeys[i] = privKey
|
||||
validators[i] = ðpb.Validator{
|
||||
PublicKey: privKey.PublicKey().Marshal(),
|
||||
WithdrawalCredentials: make([]byte, 32),
|
||||
}
|
||||
}
|
||||
err = beaconState.SetValidators(validators)
|
||||
require.NoError(t, err)
|
||||
domain, err := signing.Domain(
|
||||
beaconState.Fork(),
|
||||
0,
|
||||
params.BeaconConfig().DomainBeaconProposer,
|
||||
beaconState.GenesisValidatorsRoot(),
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
mockChain := &mock.ChainService{
|
||||
State: beaconState,
|
||||
}
|
||||
s := &Service{
|
||||
serviceCfg: &ServiceConfig{
|
||||
Database: slasherDB,
|
||||
StateNotifier: &mock.MockStateNotifier{},
|
||||
HeadStateFetcher: mockChain,
|
||||
StateGen: stategen.New(beaconDB, doublylinkedtree.New()),
|
||||
SlashingPoolInserter: &slashingsmock.PoolMock{},
|
||||
ClockWaiter: startup.NewClockSynchronizer(),
|
||||
},
|
||||
params: DefaultParams(),
|
||||
blksQueue: newBlocksQueue(),
|
||||
}
|
||||
|
||||
parentRoot := bytesutil.ToBytes32([]byte("parent"))
|
||||
err = s.serviceCfg.StateGen.SaveState(ctx, parentRoot, beaconState)
|
||||
require.NoError(t, err)
|
||||
|
||||
currentSlotChan := make(chan primitives.Slot)
|
||||
s.wg.Add(1)
|
||||
go func() {
|
||||
s.processQueuedBlocks(ctx, currentSlotChan)
|
||||
}()
|
||||
|
||||
for _, wrap := range tt.wraps {
|
||||
// Add valid signatures to the block headers we are testing.
|
||||
for _, proposalWrapper := range wrap.signedBlkHeaders {
|
||||
proposalWrapper.SignedBeaconBlockHeader.Header.ParentRoot = parentRoot[:]
|
||||
headerHtr, err := proposalWrapper.SignedBeaconBlockHeader.Header.HashTreeRoot()
|
||||
require.NoError(t, err)
|
||||
|
||||
container := ðpb.SigningData{
|
||||
ObjectRoot: headerHtr[:],
|
||||
Domain: domain,
|
||||
}
|
||||
|
||||
signingRoot, err := container.HashTreeRoot()
|
||||
require.NoError(t, err)
|
||||
|
||||
privKey := privKeys[proposalWrapper.SignedBeaconBlockHeader.Header.ProposerIndex]
|
||||
proposalWrapper.SignedBeaconBlockHeader.Signature = privKey.Sign(signingRoot[:]).Marshal()
|
||||
}
|
||||
|
||||
s.blksQueue.extend(wrap.signedBlkHeaders)
|
||||
|
||||
currentSlot := primitives.Slot(4)
|
||||
currentSlotChan <- currentSlot
|
||||
}
|
||||
|
||||
cancel()
|
||||
s.wg.Wait()
|
||||
require.LogsContain(t, hook, "Proposer slashing detected")
|
||||
})
|
||||
}
|
||||
|
||||
// Add valid signatures to the block headers we are testing.
|
||||
for _, proposalWrapper := range signedBlkHeaders {
|
||||
proposalWrapper.SignedBeaconBlockHeader.Header.ParentRoot = parentRoot[:]
|
||||
headerHtr, err := proposalWrapper.SignedBeaconBlockHeader.Header.HashTreeRoot()
|
||||
require.NoError(t, err)
|
||||
container := ðpb.SigningData{
|
||||
ObjectRoot: headerHtr[:],
|
||||
Domain: domain,
|
||||
}
|
||||
signingRoot, err := container.HashTreeRoot()
|
||||
require.NoError(t, err)
|
||||
privKey := privKeys[proposalWrapper.SignedBeaconBlockHeader.Header.ProposerIndex]
|
||||
proposalWrapper.SignedBeaconBlockHeader.Signature = privKey.Sign(signingRoot[:]).Marshal()
|
||||
}
|
||||
|
||||
s.blksQueue.extend(signedBlkHeaders)
|
||||
|
||||
currentSlot := primitives.Slot(4)
|
||||
currentSlotChan <- currentSlot
|
||||
cancel()
|
||||
<-exitChan
|
||||
require.LogsContain(t, hook, "Proposer slashing detected")
|
||||
}
|
||||
|
||||
func Test_processQueuedBlocks_NotSlashable(t *testing.T) {
|
||||
@@ -137,10 +182,9 @@ func Test_processQueuedBlocks_NotSlashable(t *testing.T) {
|
||||
blksQueue: newBlocksQueue(),
|
||||
}
|
||||
currentSlotChan := make(chan primitives.Slot)
|
||||
exitChan := make(chan struct{})
|
||||
s.wg.Add(1)
|
||||
go func() {
|
||||
s.processQueuedBlocks(ctx, currentSlotChan)
|
||||
exitChan <- struct{}{}
|
||||
}()
|
||||
s.blksQueue.extend([]*slashertypes.SignedBlockHeaderWrapper{
|
||||
createProposalWrapper(t, 4, 1, []byte{1}),
|
||||
@@ -148,7 +192,7 @@ func Test_processQueuedBlocks_NotSlashable(t *testing.T) {
|
||||
})
|
||||
currentSlotChan <- currentSlot
|
||||
cancel()
|
||||
<-exitChan
|
||||
s.wg.Wait()
|
||||
require.LogsDoNotContain(t, hook, "Proposer slashing detected")
|
||||
}
|
||||
|
||||
|
||||
@@ -52,12 +52,12 @@ func (s *Service) groupByChunkIndex(
|
||||
// This function returns a list of valid attestations, a list of attestations that are
|
||||
// valid in the future, and the number of attestations dropped.
|
||||
func (s *Service) filterAttestations(
|
||||
atts []*slashertypes.IndexedAttestationWrapper, currentEpoch primitives.Epoch,
|
||||
attWrappers []*slashertypes.IndexedAttestationWrapper, currentEpoch primitives.Epoch,
|
||||
) (valid, validInFuture []*slashertypes.IndexedAttestationWrapper, numDropped int) {
|
||||
valid = make([]*slashertypes.IndexedAttestationWrapper, 0, len(atts))
|
||||
validInFuture = make([]*slashertypes.IndexedAttestationWrapper, 0, len(atts))
|
||||
valid = make([]*slashertypes.IndexedAttestationWrapper, 0, len(attWrappers))
|
||||
validInFuture = make([]*slashertypes.IndexedAttestationWrapper, 0, len(attWrappers))
|
||||
|
||||
for _, attWrapper := range atts {
|
||||
for _, attWrapper := range attWrappers {
|
||||
if attWrapper == nil || !validateAttestationIntegrity(attWrapper.IndexedAttestation) {
|
||||
numDropped++
|
||||
continue
|
||||
@@ -73,18 +73,19 @@ func (s *Service) filterAttestations(
|
||||
// If an attestations's target epoch is in the future, we defer processing for later.
|
||||
if attWrapper.IndexedAttestation.Data.Target.Epoch > currentEpoch {
|
||||
validInFuture = append(validInFuture, attWrapper)
|
||||
} else {
|
||||
valid = append(valid, attWrapper)
|
||||
continue
|
||||
}
|
||||
|
||||
// The attestation is valid.
|
||||
valid = append(valid, attWrapper)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Validates the attestation data integrity, ensuring we have no nil values for
|
||||
// source, epoch, and that the source epoch of the attestation must be less than
|
||||
// the target epoch, which is a precondition for performing slashing detection.
|
||||
// This function also checks the attestation source epoch is within the history size
|
||||
// we keep track of for slashing detection.
|
||||
// source and target epochs, and that the source epoch of the attestation must
|
||||
// be less than the target epoch, which is a precondition for performing slashing
|
||||
// detection (except for the genesis epoch).
|
||||
func validateAttestationIntegrity(att *ethpb.IndexedAttestation) bool {
|
||||
// If an attestation is malformed, we drop it.
|
||||
if att == nil ||
|
||||
|
||||
@@ -10,16 +10,10 @@ import (
|
||||
// To properly access the element at epoch `e` for a validator index `i`, we leverage helper
|
||||
// functions from these parameter values as nice abstractions. the following parameters are
|
||||
// required for the helper functions defined in this file.
|
||||
//
|
||||
// (C) chunkSize defines how many elements are in a chunk for a validator
|
||||
// min or max span slice.
|
||||
// (K) validatorChunkSize defines how many validators' chunks we store in a single
|
||||
// flat byte slice on disk.
|
||||
// (H) historyLength defines how many epochs we keep of min or max spans.
|
||||
type Parameters struct {
|
||||
chunkSize uint64
|
||||
validatorChunkSize uint64
|
||||
historyLength primitives.Epoch
|
||||
chunkSize uint64 // C - defines how many elements are in a chunk for a validator min or max span slice.
|
||||
validatorChunkSize uint64 // K - defines how many validators' chunks we store in a single flat byte slice on disk.
|
||||
historyLength primitives.Epoch // H - defines how many epochs we keep of min or max spans.
|
||||
}
|
||||
|
||||
// DefaultParams defines default values for slasher's important parameters, defined
|
||||
@@ -98,8 +92,8 @@ func (p *Parameters) lastEpoch(chunkIndex uint64) primitives.Epoch {
|
||||
// with (validatorIndex % K)*C + (epoch % C), which gives us:
|
||||
//
|
||||
// (2 % 3)*3 + (1 % 3) =
|
||||
// (2*3) + (1) =
|
||||
// 7
|
||||
// 2*3 + 1 =
|
||||
// 7
|
||||
//
|
||||
// val0 val1 val2
|
||||
// | | |
|
||||
|
||||
@@ -12,10 +12,18 @@ import (
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
const (
|
||||
couldNotSaveAttRecord = "Could not save attestation records to DB"
|
||||
couldNotCheckSlashableAtt = "Could not check slashable attestations"
|
||||
couldNotProcessAttesterSlashings = "Could not process attester slashings"
|
||||
)
|
||||
|
||||
// Receive indexed attestations from some source event feed,
|
||||
// validating their integrity before appending them to an attestation queue
|
||||
// for batch processing in a separate routine.
|
||||
func (s *Service) receiveAttestations(ctx context.Context, indexedAttsChan chan *ethpb.IndexedAttestation) {
|
||||
defer s.wg.Done()
|
||||
|
||||
sub := s.serviceCfg.IndexedAttestationsFeed.Subscribe(indexedAttsChan)
|
||||
defer sub.Unsubscribe()
|
||||
for {
|
||||
@@ -45,6 +53,8 @@ func (s *Service) receiveAttestations(ctx context.Context, indexedAttsChan chan
|
||||
|
||||
// Receive beacon blocks from some source event feed,
|
||||
func (s *Service) receiveBlocks(ctx context.Context, beaconBlockHeadersChan chan *ethpb.SignedBeaconBlockHeader) {
|
||||
defer s.wg.Done()
|
||||
|
||||
sub := s.serviceCfg.BeaconBlockHeadersFeed.Subscribe(beaconBlockHeadersChan)
|
||||
defer sub.Unsubscribe()
|
||||
for {
|
||||
@@ -77,6 +87,8 @@ func (s *Service) receiveBlocks(ctx context.Context, beaconBlockHeadersChan chan
|
||||
// This grouping will allow us to perform detection on batches of attestations
|
||||
// per validator chunk index which can be done concurrently.
|
||||
func (s *Service) processQueuedAttestations(ctx context.Context, slotTicker <-chan primitives.Slot) {
|
||||
defer s.wg.Done()
|
||||
|
||||
for {
|
||||
select {
|
||||
case currentSlot := <-slotTicker:
|
||||
@@ -101,24 +113,26 @@ func (s *Service) processQueuedAttestations(ctx context.Context, slotTicker <-ch
|
||||
}).Info("Processing queued attestations for slashing detection")
|
||||
|
||||
// Save the attestation records to our database.
|
||||
// If multiple attestations are provided for the same validator index + target epoch combination,
|
||||
// then last (validator index + target epoch) => signing root) link is kept into the database.
|
||||
if err := s.serviceCfg.Database.SaveAttestationRecordsForValidators(
|
||||
ctx, validAtts,
|
||||
); err != nil {
|
||||
log.WithError(err).Error("Could not save attestation records to DB")
|
||||
log.WithError(err).Error(couldNotSaveAttRecord)
|
||||
continue
|
||||
}
|
||||
|
||||
// Check for slashings.
|
||||
slashings, err := s.checkSlashableAttestations(ctx, currentEpoch, validAtts)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Could not check slashable attestations")
|
||||
log.WithError(err).Error(couldNotCheckSlashableAtt)
|
||||
continue
|
||||
}
|
||||
|
||||
// Process attester slashings by verifying their signatures, submitting
|
||||
// to the beacon node's operations pool, and logging them.
|
||||
if err := s.processAttesterSlashings(ctx, slashings); err != nil {
|
||||
log.WithError(err).Error("Could not process attester slashings")
|
||||
log.WithError(err).Error(couldNotProcessAttesterSlashings)
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -132,6 +146,8 @@ func (s *Service) processQueuedAttestations(ctx context.Context, slotTicker <-ch
|
||||
// Process queued blocks every time an epoch ticker fires. We retrieve
|
||||
// these blocks from a queue, then perform double proposal detection.
|
||||
func (s *Service) processQueuedBlocks(ctx context.Context, slotTicker <-chan primitives.Slot) {
|
||||
defer s.wg.Done()
|
||||
|
||||
for {
|
||||
select {
|
||||
case currentSlot := <-slotTicker:
|
||||
@@ -172,6 +188,8 @@ func (s *Service) processQueuedBlocks(ctx context.Context, slotTicker <-chan pri
|
||||
|
||||
// Prunes slasher data on each slot tick to prevent unnecessary build-up of disk space usage.
|
||||
func (s *Service) pruneSlasherData(ctx context.Context, slotTicker <-chan primitives.Slot) {
|
||||
defer s.wg.Done()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-slotTicker:
|
||||
|
||||
@@ -32,10 +32,9 @@ func TestSlasher_receiveAttestations_OK(t *testing.T) {
|
||||
indexedAttsChan := make(chan *ethpb.IndexedAttestation)
|
||||
defer close(indexedAttsChan)
|
||||
|
||||
exitChan := make(chan struct{})
|
||||
s.wg.Add(1)
|
||||
go func() {
|
||||
s.receiveAttestations(ctx, indexedAttsChan)
|
||||
exitChan <- struct{}{}
|
||||
}()
|
||||
firstIndices := []uint64{1, 2, 3}
|
||||
secondIndices := []uint64{4, 5, 6}
|
||||
@@ -44,7 +43,7 @@ func TestSlasher_receiveAttestations_OK(t *testing.T) {
|
||||
indexedAttsChan <- att1.IndexedAttestation
|
||||
indexedAttsChan <- att2.IndexedAttestation
|
||||
cancel()
|
||||
<-exitChan
|
||||
s.wg.Wait()
|
||||
wanted := []*slashertypes.IndexedAttestationWrapper{
|
||||
att1,
|
||||
att2,
|
||||
@@ -216,11 +215,9 @@ func TestSlasher_receiveAttestations_OnlyValidAttestations(t *testing.T) {
|
||||
indexedAttsChan := make(chan *ethpb.IndexedAttestation)
|
||||
defer close(indexedAttsChan)
|
||||
|
||||
exitChan := make(chan struct{})
|
||||
defer close(exitChan)
|
||||
s.wg.Add(1)
|
||||
go func() {
|
||||
s.receiveAttestations(ctx, indexedAttsChan)
|
||||
exitChan <- struct{}{}
|
||||
}()
|
||||
firstIndices := []uint64{1, 2, 3}
|
||||
secondIndices := []uint64{4, 5, 6}
|
||||
@@ -233,7 +230,7 @@ func TestSlasher_receiveAttestations_OnlyValidAttestations(t *testing.T) {
|
||||
AttestingIndices: secondIndices,
|
||||
}
|
||||
cancel()
|
||||
<-exitChan
|
||||
s.wg.Wait()
|
||||
// Expect only a single, valid attestation was added to the queue.
|
||||
require.Equal(t, 1, s.attsQueue.size())
|
||||
wanted := []*slashertypes.IndexedAttestationWrapper{
|
||||
@@ -254,10 +251,9 @@ func TestSlasher_receiveBlocks_OK(t *testing.T) {
|
||||
}
|
||||
beaconBlockHeadersChan := make(chan *ethpb.SignedBeaconBlockHeader)
|
||||
defer close(beaconBlockHeadersChan)
|
||||
exitChan := make(chan struct{})
|
||||
s.wg.Add(1)
|
||||
go func() {
|
||||
s.receiveBlocks(ctx, beaconBlockHeadersChan)
|
||||
exitChan <- struct{}{}
|
||||
}()
|
||||
|
||||
block1 := createProposalWrapper(t, 0, 1, nil).SignedBeaconBlockHeader
|
||||
@@ -265,7 +261,7 @@ func TestSlasher_receiveBlocks_OK(t *testing.T) {
|
||||
beaconBlockHeadersChan <- block1
|
||||
beaconBlockHeadersChan <- block2
|
||||
cancel()
|
||||
<-exitChan
|
||||
s.wg.Wait()
|
||||
wanted := []*slashertypes.SignedBlockHeaderWrapper{
|
||||
createProposalWrapper(t, 0, block1.Header.ProposerIndex, nil),
|
||||
createProposalWrapper(t, 0, block2.Header.ProposerIndex, nil),
|
||||
@@ -301,15 +297,14 @@ func TestService_processQueuedBlocks(t *testing.T) {
|
||||
})
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
tickerChan := make(chan primitives.Slot)
|
||||
exitChan := make(chan struct{})
|
||||
s.wg.Add(1)
|
||||
go func() {
|
||||
s.processQueuedBlocks(ctx, tickerChan)
|
||||
exitChan <- struct{}{}
|
||||
}()
|
||||
|
||||
// Send a value over the ticker.
|
||||
tickerChan <- 0
|
||||
cancel()
|
||||
<-exitChan
|
||||
s.wg.Wait()
|
||||
assert.LogsContain(t, hook, "Processing queued")
|
||||
}
|
||||
|
||||
@@ -6,6 +6,7 @@ package slasher
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/prysmaticlabs/prysm/v4/async/event"
|
||||
@@ -15,7 +16,7 @@ import (
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/operations/slashings"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/startup"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/state/stategen"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/sync"
|
||||
beaconChainSync "github.com/prysmaticlabs/prysm/v4/beacon-chain/sync"
|
||||
"github.com/prysmaticlabs/prysm/v4/config/params"
|
||||
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
|
||||
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
|
||||
@@ -38,7 +39,7 @@ type ServiceConfig struct {
|
||||
StateGen stategen.StateManager
|
||||
SlashingPoolInserter slashings.PoolInserter
|
||||
HeadStateFetcher blockchain.HeadFetcher
|
||||
SyncChecker sync.Checker
|
||||
SyncChecker beaconChainSync.Checker
|
||||
ClockWaiter startup.ClockWaiter
|
||||
}
|
||||
|
||||
@@ -67,6 +68,7 @@ type Service struct {
|
||||
blocksSlotTicker *slots.SlotTicker
|
||||
pruningSlotTicker *slots.SlotTicker
|
||||
latestEpochWrittenForValidator map[primitives.ValidatorIndex]primitives.Epoch
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
// New instantiates a new slasher from configuration values.
|
||||
@@ -126,21 +128,33 @@ func (s *Service) run() {
|
||||
|
||||
indexedAttsChan := make(chan *ethpb.IndexedAttestation, 1)
|
||||
beaconBlockHeadersChan := make(chan *ethpb.SignedBeaconBlockHeader, 1)
|
||||
|
||||
s.wg.Add(1)
|
||||
go s.receiveAttestations(s.ctx, indexedAttsChan)
|
||||
|
||||
s.wg.Add(1)
|
||||
go s.receiveBlocks(s.ctx, beaconBlockHeadersChan)
|
||||
|
||||
secondsPerSlot := params.BeaconConfig().SecondsPerSlot
|
||||
s.attsSlotTicker = slots.NewSlotTicker(s.genesisTime, secondsPerSlot)
|
||||
s.blocksSlotTicker = slots.NewSlotTicker(s.genesisTime, secondsPerSlot)
|
||||
s.pruningSlotTicker = slots.NewSlotTicker(s.genesisTime, secondsPerSlot)
|
||||
|
||||
s.wg.Add(1)
|
||||
go s.processQueuedAttestations(s.ctx, s.attsSlotTicker.C())
|
||||
|
||||
s.wg.Add(1)
|
||||
go s.processQueuedBlocks(s.ctx, s.blocksSlotTicker.C())
|
||||
|
||||
s.wg.Add(1)
|
||||
go s.pruneSlasherData(s.ctx, s.pruningSlotTicker.C())
|
||||
}
|
||||
|
||||
// Stop the slasher service.
|
||||
func (s *Service) Stop() error {
|
||||
s.cancel()
|
||||
s.wg.Wait()
|
||||
|
||||
if s.attsSlotTicker != nil {
|
||||
s.attsSlotTicker.Done()
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user