Compare commits

...

17 Commits

Author SHA1 Message Date
Potuz
181890e85c Terence's review 2024-02-02 14:44:03 -03:00
Potuz
74790e9986 Change init sync heursitics 2024-02-02 14:10:10 -03:00
terence
52af63f25a Revise blob sidecar not found log (#13571)
* Update blob sidecar not found log

* Use fields
2024-02-01 20:48:59 +00:00
james-prysm
2dad245bc8 handle slice out of range (#13568)
* handle slice out of range

* adding some tests
2024-02-01 16:59:40 +00:00
Potuz
9a9990605c Update Gohashtree to v0.0.4-beta (#13569)
* Update Gohashtree to v0.0.4-beta

* go mod tidy

* go mod tidy

---------

Co-authored-by: Preston Van Loon <pvanloon@offchainlabs.com>
2024-02-01 15:42:56 +00:00
james-prysm
2cddb5ca86 fixing jwt auth checks (#13565) 2024-02-01 15:13:52 +00:00
Nishant Das
73ce28c356 make it the default (#13556) 2024-01-31 10:27:26 +00:00
Manu NALEPA
7a294e861e Beacon node slasher improvement (#13549)
* Slasher: Ensure all gorouting are stopped before running `Stop` actions.

Fixes #13550.
In tests, `exitChan` are now useless since waitgroup are used to wait
for all goroutines to be stopped.

* `slasher.go`: Add comments and rename some variables. - NFC

* `detect_blocks.go`: Improve. - NFC

- Rename some variables.
- Add comments.
- Use second element of `range` when possible.

* `chunks.go`: Remove `_`receivers. - NFC

* `validateAttestationIntegrity`: Improve documentation. - NFC

* `filterAttestations`: Avoid `else`and rename variable. - NFC

* `slasher.go`: Fix and add comments.

* `SaveAttestationRecordsForValidators`: Remove unused code.

* `LastEpochWrittenForValidators`: Name variables consistently. - NFC

Avoid mixes between `indice(s)`and `index(es)`.

* `SaveLastEpochsWrittenForValidators`: Name variables consistently. - NFC

* `CheckAttesterDoubleVotes`: Rename variables and add comments. - NFC

* `schema.go`: Add comments. - NFC

* `processQueuedAttestations`: Add comments. - NFC

* `checkDoubleVotes`: Rename variable. - NFC

* `Test_processQueuedAttestations`: Ensure there is no error log.

* `shouldNotBeSlashable` => `shouldBeSlashable`

* `Test_processQueuedAttestations`: Add 2 test cases:
- Same target with different signing roots
- Same target with same signing roots

* `checkDoubleVotesOnDisk` ==> `checkDoubleVotes`.

Before this commit, `checkDoubleVotes` did two tasks:
- Checking if there are any slashable double votes in the input
  list of attestations with respect to each other.
- Checking if there are any slashable double votes in the input
  list of attestations with respect to our database.

However, `checkDoubleVotes` is called only in
`checkSlashableAttestations`.

And `checkSlashableAttestations` is called only in:
- `processQueuedAttestations`, and in
- `IsSlashableAttestation`

Study of case `processQueuedAttestations`:
---------------------------------------------
In `processQueuedAttestations`, `checkSlashableAttestations`
is ALWAYS called after
`Database.SaveAttestationRecordsForValidators`.

It means that, when calling `checkSlashableAttestations`,
`validAtts` are ALREADY stored in the DB.

Each attestation of `validAtts` will be checked twice:
- Against the other attestations of `validAtts` (the portion of
  deleted code)
- Against the content of the database.

One of those two checks is redundent.
==> We can remove the check against other attestations in `validAtts`.

Study of case `Database.SaveAttestationRecordsForValidators`:
----------------------------------------------------------------
In `Database.SaveAttestationRecordsForValidators`,
`checkSlashableAttestations` is ALWAYS called with a list of
attestations containing only ONE attestation.

This only attestaion will be checked twice:
- Against itself, and an attestation cannot conflict with itself.
- Against the content of the database.

==> We can remove the check against other attestations in `validAtts`.

=========================

In both cases, we showed that we can remove the check of attestation
against the content of `validAtts`, and the corresponding test
`Test_checkDoubleVotes_SlashableInputAttestations`.

* `Test_processQueuedBlocks_DetectsDoubleProposals`: Wrap proposals.

So we can add new proposals later.

* Fix slasher multiple proposals false negative.

If a first batch of blocks is sent with:
- validator 1 - slot 4 - signing root 1
- validator 1 - slot 5 - signing root 1

Then, if a second batch of blocks is sent with:
- validator 1 - slot 4 - signing root 2

Because we have two blocks proposed by the same validator (1) and for
the same slot (4), but with two different signing roots (1 and 2), the
validator 1 should be slashed.

This is not the case before this commit.
A new test case has been added as well to check this.

Fixes #13551

* `params.go`: Change comments. - NFC

* `CheckSlashable`: Keep the happy path without indentation.

* `detectAllAttesterSlashings` => `checkSurrounds`.

* Update beacon-chain/db/slasherkv/slasher.go

Co-authored-by: Sammy Rosso <15244892+saolyn@users.noreply.github.com>

* Update beacon-chain/db/slasherkv/slasher.go

Co-authored-by: Sammy Rosso <15244892+saolyn@users.noreply.github.com>

* `CheckAttesterDoubleVotes`: Keep happy path without indentation.

Well, even if, in our case, "happy path" mean slashing.

* 'SaveAttestationRecordsForValidators': Save the first attestation.

In case of multiple votes, arbitrarily save the first attestation.
Saving the first one in particular has no functional impact,
since in any case all attestations will be tested against
the content of the database. So all but the first one will be
detected as slashable.

However, saving the first one and not an other one let us not
to modify the end to end tests, since they expect the first one
to be saved in the database.

* Rename `min` => `minimum`.

Not to conflict with the new `min` built-in function.

* `couldNotSaveSlashableAtt` ==> `couldNotCheckSlashableAtt`

---------

Co-authored-by: Sammy Rosso <15244892+saolyn@users.noreply.github.com>
2024-01-31 09:49:14 +00:00
james-prysm
258123341e add a log and update size for promptui (#13542) 2024-01-30 17:19:31 +00:00
Preston Van Loon
224b136737 Revert "set limit to multiple of burst for goerli" (#13552)
Co-authored-by: Nishant Das <nishdas93@gmail.com>
2024-01-30 06:10:12 +00:00
Nishant Das
3ed4866eec Makes Our New Deposit Trie The Default (#13555)
* make 4881 the default

* fix failed build
2024-01-30 05:15:52 +00:00
kasey
373c853d17 set limit to multiple of burst for goerli (#13544)
Co-authored-by: Kasey Kirkham <kasey@users.noreply.github.com>
2024-01-27 22:12:08 +00:00
terence
23b0718b5f Add metric for data availability wait time (#13534)
* Add metric for data availability wait time

* Kasey's feedback

* Kasey's feedback
2024-01-26 18:17:25 +00:00
terence
3a9854145c Correct metrics from ns to ms (#13540) 2024-01-26 17:43:30 +00:00
Radosław Kapka
1b70d2b566 Fetch unaggregated atts in GetAggregateAttestation (#13533) 2024-01-26 17:08:58 +00:00
Nishant Das
59b310a221 make it the same (#13531) 2024-01-26 05:35:27 +00:00
Nishant Das
22b6d1751d Enable Backfill in E2E (#13524)
* enable backfill for devmode

* enable backfill

* gaz

* move to its own package

* fix panic

* fix bug

* gaz

* kasey's review
2024-01-26 04:37:41 +00:00
47 changed files with 830 additions and 523 deletions

View File

@@ -55,7 +55,7 @@ bazel build //beacon-chain --config=release
## Adding / updating dependencies
1. Add your dependency as you would with go modules. I.e. `go get ...`
1. Run `gazelle update-repos -from_file=go.mod` to update the bazel managed dependencies.
1. Run `bazel run //:gazelle -- update-repos -from_file=go.mod` to update the bazel managed dependencies.
Example:

View File

@@ -1,3 +1,7 @@
package api
const WebUrlPrefix = "/v2/validator/"
const (
WebUrlPrefix = "/v2/validator/"
WebApiUrlPrefix = "/api/v2/validator/"
KeymanagerApiPrefix = "/eth/v1"
)

View File

@@ -559,15 +559,20 @@ func (s *Service) RecentBlockSlot(root [32]byte) (primitives.Slot, error) {
// inRegularSync applies the following heuristics to decide if the node is in
// regular sync mode vs init sync mode using only forkchoice.
// It checks that the highest received block is behind the current time by at least 2 epochs
// and that it was imported at least one epoch late if both of these
// tests pass then the node is in init sync. The caller of this function MUST
// have a lock on forkchoice
// The caller of this function MUST have a lock on forkchoice
func (s *Service) inRegularSync() bool {
currentSlot := s.CurrentSlot()
fc := s.cfg.ForkChoiceStore
if currentSlot-fc.HighestReceivedBlockSlot() < 2*params.BeaconConfig().SlotsPerEpoch {
highestSlot := fc.HighestReceivedBlockSlot()
// if the highest received slot is from the same epoch, we are in regular sync
if slots.ToEpoch(currentSlot) == slots.ToEpoch(highestSlot) {
return true
}
return fc.HighestReceivedBlockDelay() < params.BeaconConfig().SlotsPerEpoch
// If the highest received block is less than 2 blocks away we are in regular sync
if currentSlot-highestSlot < primitives.Slot(2) {
return true
}
// At this stage the last block received is from the previous epoch and more than 2 blocks ago.
// If the highest slot was received during its slot or the next one then we are in regular sync
return fc.HighestReceivedBlockDelay() < primitives.Slot(2)
}

View File

@@ -609,7 +609,7 @@ func TestService_inRegularSync(t *testing.T) {
require.NoError(t, c.cfg.ForkChoiceStore.InsertNode(ctx, st, blkRoot))
require.Equal(t, false, c.inRegularSync())
c.SetGenesisTime(time.Now().Add(time.Second * time.Duration(-5*int64(params.BeaconConfig().SlotsPerEpoch)*int64(params.BeaconConfig().SecondsPerSlot))))
c.SetGenesisTime(time.Now().Add(time.Second * time.Duration(-4*int64(params.BeaconConfig().SlotsPerEpoch)*int64(params.BeaconConfig().SecondsPerSlot))))
require.Equal(t, true, c.inRegularSync())
c.SetGenesisTime(time.Now().Add(time.Second * time.Duration(-1*int64(params.BeaconConfig().SlotsPerEpoch)*int64(params.BeaconConfig().SecondsPerSlot))))

View File

@@ -182,6 +182,10 @@ var (
Name: "chain_service_processing_milliseconds",
Help: "Total time to call a chain service in ReceiveBlock()",
})
dataAvailWaitedTime = promauto.NewSummary(prometheus.SummaryOpts{
Name: "da_waited_time_milliseconds",
Help: "Total time spent waiting for a data availability check in ReceiveBlock()",
})
processAttsElapsedTime = promauto.NewHistogram(
prometheus.HistogramOpts{
Name: "process_attestations_milliseconds",

View File

@@ -122,6 +122,7 @@ func (s *Service) ReceiveBlock(ctx context.Context, block interfaces.ReadOnlySig
}
}
daWaitedTime := time.Since(daStartTime)
dataAvailWaitedTime.Observe(float64(daWaitedTime.Milliseconds()))
// Defragment the state before continuing block processing.
s.defragmentState(postState)

View File

@@ -7,10 +7,21 @@ package slasherkv
// it easy to scan for keys that have a certain shard number as a prefix and return those
// corresponding attestations.
var (
// Slasher buckets.
attestedEpochsByValidator = []byte("attested-epochs-by-validator")
attestationRecordsBucket = []byte("attestation-records")
// key: (encoded) ValidatorIndex
// value: (encoded) Epoch
attestedEpochsByValidator = []byte("attested-epochs-by-validator")
// key: attestation SigningRoot
// value: (encoded + compressed) IndexedAttestation
attestationRecordsBucket = []byte("attestation-records")
// key: (encoded) Target Epoch + (encoded) ValidatorIndex
// value: attestation SigningRoot
attestationDataRootsBucket = []byte("attestation-data-roots")
proposalRecordsBucket = []byte("proposal-records")
slasherChunksBucket = []byte("slasher-chunks")
// key: Slot+ValidatorIndex
// value: (encoded) SignedBlockHeaderWrapper
proposalRecordsBucket = []byte("proposal-records")
slasherChunksBucket = []byte("slasher-chunks")
)

View File

@@ -29,72 +29,90 @@ const (
// LastEpochWrittenForValidators given a list of validator indices returns the latest
// epoch we have recorded the validators writing data for.
func (s *Store) LastEpochWrittenForValidators(
ctx context.Context, validatorIndices []primitives.ValidatorIndex,
ctx context.Context, validatorIndexes []primitives.ValidatorIndex,
) ([]*slashertypes.AttestedEpochForValidator, error) {
_, span := trace.StartSpan(ctx, "BeaconDB.LastEpochWrittenForValidators")
defer span.End()
attestedEpochs := make([]*slashertypes.AttestedEpochForValidator, 0)
encodedIndices := make([][]byte, len(validatorIndices))
for i, valIdx := range validatorIndices {
encodedIndices[i] = encodeValidatorIndex(valIdx)
encodedIndexes := make([][]byte, len(validatorIndexes))
for i, validatorIndex := range validatorIndexes {
encodedIndexes[i] = encodeValidatorIndex(validatorIndex)
}
err := s.db.View(func(tx *bolt.Tx) error {
bkt := tx.Bucket(attestedEpochsByValidator)
for i, encodedIndex := range encodedIndices {
for i, encodedIndex := range encodedIndexes {
var epoch primitives.Epoch
epochBytes := bkt.Get(encodedIndex)
if epochBytes != nil {
if err := epoch.UnmarshalSSZ(epochBytes); err != nil {
return err
}
}
attestedEpochs = append(attestedEpochs, &slashertypes.AttestedEpochForValidator{
ValidatorIndex: validatorIndices[i],
ValidatorIndex: validatorIndexes[i],
Epoch: epoch,
})
}
return nil
})
return attestedEpochs, err
}
// SaveLastEpochsWrittenForValidators updates the latest epoch a slice
// of validator indices has attested to.
func (s *Store) SaveLastEpochsWrittenForValidators(
ctx context.Context, epochByValidator map[primitives.ValidatorIndex]primitives.Epoch,
ctx context.Context, epochByValIndex map[primitives.ValidatorIndex]primitives.Epoch,
) error {
ctx, span := trace.StartSpan(ctx, "BeaconDB.SaveLastEpochsWrittenForValidators")
defer span.End()
encodedIndices := make([][]byte, 0, len(epochByValidator))
encodedEpochs := make([][]byte, 0, len(epochByValidator))
for valIdx, epoch := range epochByValidator {
const batchSize = 10000
encodedIndexes := make([][]byte, 0, len(epochByValIndex))
encodedEpochs := make([][]byte, 0, len(epochByValIndex))
for valIndex, epoch := range epochByValIndex {
if ctx.Err() != nil {
return ctx.Err()
}
encodedEpoch, err := epoch.MarshalSSZ()
if err != nil {
return err
}
encodedIndices = append(encodedIndices, encodeValidatorIndex(valIdx))
encodedIndexes = append(encodedIndexes, encodeValidatorIndex(valIndex))
encodedEpochs = append(encodedEpochs, encodedEpoch)
}
// The list of validators might be too massive for boltdb to handle in a single transaction,
// so instead we split it into batches and write each batch.
batchSize := 10000
for i := 0; i < len(encodedIndices); i += batchSize {
for i := 0; i < len(encodedIndexes); i += batchSize {
if ctx.Err() != nil {
return ctx.Err()
}
if err := s.db.Update(func(tx *bolt.Tx) error {
if ctx.Err() != nil {
return ctx.Err()
}
bkt := tx.Bucket(attestedEpochsByValidator)
min := i + batchSize
if min > len(encodedIndices) {
min = len(encodedIndices)
minimum := i + batchSize
if minimum > len(encodedIndexes) {
minimum = len(encodedIndexes)
}
for j, encodedIndex := range encodedIndices[i:min] {
for j, encodedIndex := range encodedIndexes[i:minimum] {
if ctx.Err() != nil {
return ctx.Err()
}
@@ -102,79 +120,106 @@ func (s *Store) SaveLastEpochsWrittenForValidators(
return err
}
}
return nil
}); err != nil {
return err
}
}
return nil
}
// CheckAttesterDoubleVotes retries any slashable double votes that exist
// for a series of input attestations.
// CheckAttesterDoubleVotes retrieves any slashable double votes that exist
// for a series of input attestations with respect to the database.
func (s *Store) CheckAttesterDoubleVotes(
ctx context.Context, attestations []*slashertypes.IndexedAttestationWrapper,
) ([]*slashertypes.AttesterDoubleVote, error) {
ctx, span := trace.StartSpan(ctx, "BeaconDB.CheckAttesterDoubleVotes")
defer span.End()
doubleVotes := make([]*slashertypes.AttesterDoubleVote, 0)
doubleVotesMu := sync.Mutex{}
mu := sync.Mutex{}
eg, egctx := errgroup.WithContext(ctx)
for _, att := range attestations {
for _, attestation := range attestations {
// Copy the iteration instance to a local variable to give each go-routine its own copy to play with.
// See https://golang.org/doc/faq#closures_and_goroutines for more details.
attToProcess := att
// process every attestation parallelly.
attToProcess := attestation
// Process each attestation in parallel.
eg.Go(func() error {
err := s.db.View(func(tx *bolt.Tx) error {
signingRootsBkt := tx.Bucket(attestationDataRootsBucket)
attRecordsBkt := tx.Bucket(attestationRecordsBucket)
encEpoch := encodeTargetEpoch(attToProcess.IndexedAttestation.Data.Target.Epoch)
localDoubleVotes := make([]*slashertypes.AttesterDoubleVote, 0)
localDoubleVotes := []*slashertypes.AttesterDoubleVote{}
for _, valIdx := range attToProcess.IndexedAttestation.AttestingIndices {
// Check if there is signing root in the database for this combination
// of validator index and target epoch.
encIdx := encodeValidatorIndex(primitives.ValidatorIndex(valIdx))
validatorEpochKey := append(encEpoch, encIdx...)
attRecordsKey := signingRootsBkt.Get(validatorEpochKey)
// An attestation record key is comprised of a signing root (32 bytes).
if len(attRecordsKey) < attestationRecordKeySize {
// If there is no signing root for this combination,
// then there is no double vote. We can continue to the next validator.
continue
}
// Retrieve the attestation record corresponding to the signing root
// from the database.
encExistingAttRecord := attRecordsBkt.Get(attRecordsKey)
if encExistingAttRecord == nil {
continue
}
existingSigningRoot := bytesutil.ToBytes32(attRecordsKey[:signingRootSize])
if existingSigningRoot != attToProcess.SigningRoot {
existingAttRecord, err := decodeAttestationRecord(encExistingAttRecord)
if err != nil {
return err
}
slashAtt := &slashertypes.AttesterDoubleVote{
ValidatorIndex: primitives.ValidatorIndex(valIdx),
Target: attToProcess.IndexedAttestation.Data.Target.Epoch,
PrevAttestationWrapper: existingAttRecord,
AttestationWrapper: attToProcess,
}
localDoubleVotes = append(localDoubleVotes, slashAtt)
if existingSigningRoot == attToProcess.SigningRoot {
continue
}
// There is a double vote.
existingAttRecord, err := decodeAttestationRecord(encExistingAttRecord)
if err != nil {
return err
}
// Build the proof of double vote.
slashAtt := &slashertypes.AttesterDoubleVote{
ValidatorIndex: primitives.ValidatorIndex(valIdx),
Target: attToProcess.IndexedAttestation.Data.Target.Epoch,
PrevAttestationWrapper: existingAttRecord,
AttestationWrapper: attToProcess,
}
localDoubleVotes = append(localDoubleVotes, slashAtt)
}
// if any routine is cancelled, then cancel this routine too
// If any routine is cancelled, then cancel this routine too.
select {
case <-egctx.Done():
return egctx.Err()
default:
}
// if there are any doible votes in this attestation, add it to the global double votes
// If there are any double votes in this attestation, add it to the global double votes.
if len(localDoubleVotes) > 0 {
doubleVotesMu.Lock()
defer doubleVotesMu.Unlock()
mu.Lock()
defer mu.Unlock()
doubleVotes = append(doubleVotes, localDoubleVotes...)
}
return nil
})
return err
})
}
return doubleVotes, eg.Wait()
}
@@ -211,6 +256,8 @@ func (s *Store) AttestationRecordForValidator(
}
// SaveAttestationRecordsForValidators saves attestation records for the specified indices.
// If multiple attestations are provided for the same validator index + target epoch combination,
// then only the first one is (arbitrarily) saved in the `attestationDataRootsBucket` bucket.
func (s *Store) SaveAttestationRecordsForValidators(
ctx context.Context,
attestations []*slashertypes.IndexedAttestationWrapper,
@@ -219,37 +266,40 @@ func (s *Store) SaveAttestationRecordsForValidators(
defer span.End()
encodedTargetEpoch := make([][]byte, len(attestations))
encodedRecords := make([][]byte, len(attestations))
encodedIndices := make([][]byte, len(attestations))
for i, att := range attestations {
encEpoch := encodeTargetEpoch(att.IndexedAttestation.Data.Target.Epoch)
value, err := encodeAttestationRecord(att)
for i, attestation := range attestations {
encEpoch := encodeTargetEpoch(attestation.IndexedAttestation.Data.Target.Epoch)
value, err := encodeAttestationRecord(attestation)
if err != nil {
return err
}
indicesBytes := make([]byte, len(att.IndexedAttestation.AttestingIndices)*8)
for _, idx := range att.IndexedAttestation.AttestingIndices {
encodedIdx := encodeValidatorIndex(primitives.ValidatorIndex(idx))
indicesBytes = append(indicesBytes, encodedIdx...)
}
encodedIndices[i] = indicesBytes
encodedTargetEpoch[i] = encEpoch
encodedRecords[i] = value
}
return s.db.Update(func(tx *bolt.Tx) error {
attRecordsBkt := tx.Bucket(attestationRecordsBucket)
signingRootsBkt := tx.Bucket(attestationDataRootsBucket)
for i, att := range attestations {
if err := attRecordsBkt.Put(att.SigningRoot[:], encodedRecords[i]); err != nil {
for i := len(attestations) - 1; i >= 0; i-- {
attestation := attestations[i]
if err := attRecordsBkt.Put(attestation.SigningRoot[:], encodedRecords[i]); err != nil {
return err
}
for _, valIdx := range att.IndexedAttestation.AttestingIndices {
for _, valIdx := range attestation.IndexedAttestation.AttestingIndices {
encIdx := encodeValidatorIndex(primitives.ValidatorIndex(valIdx))
key := append(encodedTargetEpoch[i], encIdx...)
if err := signingRootsBkt.Put(key, att.SigningRoot[:]); err != nil {
if err := signingRootsBkt.Put(key, attestation.SigningRoot[:]); err != nil {
return err
}
}
}
return nil
})
}
@@ -314,43 +364,60 @@ func (s *Store) SaveSlasherChunks(
}
// CheckDoubleBlockProposals takes in a list of proposals and for each,
// checks if there already exists a proposal at the same slot+validatorIndex combination. If so,
// We check if the existing signing root is not-empty and is different than the incoming
// proposal signing root. If so, we return a double block proposal object.
// checks if there already exists a proposal at the same slot+validatorIndex combination.
// If so, it checks if the existing signing root is not-empty and is different than
// the incoming proposal signing root.
// If so, it returns a double block proposal object.
func (s *Store) CheckDoubleBlockProposals(
ctx context.Context, proposals []*slashertypes.SignedBlockHeaderWrapper,
ctx context.Context, incomingProposals []*slashertypes.SignedBlockHeaderWrapper,
) ([]*ethpb.ProposerSlashing, error) {
_, span := trace.StartSpan(ctx, "BeaconDB.CheckDoubleBlockProposals")
defer span.End()
proposerSlashings := make([]*ethpb.ProposerSlashing, 0, len(proposals))
proposerSlashings := make([]*ethpb.ProposerSlashing, 0, len(incomingProposals))
err := s.db.View(func(tx *bolt.Tx) error {
// Retrieve the proposal records bucket
bkt := tx.Bucket(proposalRecordsBucket)
for _, proposal := range proposals {
for _, incomingProposal := range incomingProposals {
// Build the key corresponding to this slot + validator index combination
key, err := keyForValidatorProposal(
proposal.SignedBeaconBlockHeader.Header.Slot,
proposal.SignedBeaconBlockHeader.Header.ProposerIndex,
incomingProposal.SignedBeaconBlockHeader.Header.Slot,
incomingProposal.SignedBeaconBlockHeader.Header.ProposerIndex,
)
if err != nil {
return err
}
// Retrieve the existing proposal record from the database
encExistingProposalWrapper := bkt.Get(key)
// If there is no existing proposal record (empty result), then there is no double proposal.
// We can continue to the next proposal.
if len(encExistingProposalWrapper) < signingRootSize {
continue
}
// Compare the proposal signing root in the DB with the incoming proposal signing root.
// If they differ, we have a double proposal.
existingSigningRoot := bytesutil.ToBytes32(encExistingProposalWrapper[:signingRootSize])
if existingSigningRoot != proposal.SigningRoot {
if existingSigningRoot != incomingProposal.SigningRoot {
existingProposalWrapper, err := decodeProposalRecord(encExistingProposalWrapper)
if err != nil {
return err
}
proposerSlashings = append(proposerSlashings, &ethpb.ProposerSlashing{
Header_1: existingProposalWrapper.SignedBeaconBlockHeader,
Header_2: proposal.SignedBeaconBlockHeader,
Header_2: incomingProposal.SignedBeaconBlockHeader,
})
}
}
return nil
})
return proposerSlashings, err
}
@@ -384,14 +451,20 @@ func (s *Store) BlockProposalForValidator(
// SaveBlockProposals takes in a list of block proposals and saves them to our
// proposal records bucket in the database.
// If multiple proposals are provided for the same slot + validatorIndex combination,
// then only the last one is saved in the database.
func (s *Store) SaveBlockProposals(
ctx context.Context, proposals []*slashertypes.SignedBlockHeaderWrapper,
) error {
_, span := trace.StartSpan(ctx, "BeaconDB.SaveBlockProposals")
defer span.End()
encodedKeys := make([][]byte, len(proposals))
encodedProposals := make([][]byte, len(proposals))
// Loop over all proposals to encode keys and proposals themselves.
for i, proposal := range proposals {
// Encode the key for this proposal.
key, err := keyForValidatorProposal(
proposal.SignedBeaconBlockHeader.Header.Slot,
proposal.SignedBeaconBlockHeader.Header.ProposerIndex,
@@ -399,20 +472,29 @@ func (s *Store) SaveBlockProposals(
if err != nil {
return err
}
// Encode the proposal itself.
enc, err := encodeProposalRecord(proposal)
if err != nil {
return err
}
encodedKeys[i] = key
encodedProposals[i] = enc
}
// All proposals are saved into the DB in a single transaction.
return s.db.Update(func(tx *bolt.Tx) error {
// Retrieve the proposal records bucket.
bkt := tx.Bucket(proposalRecordsBucket)
// Save all proposals.
for i := range proposals {
if err := bkt.Put(encodedKeys[i], encodedProposals[i]); err != nil {
return err
}
}
return nil
})
}
@@ -472,7 +554,7 @@ func suffixForAttestationRecordsKey(key, encodedValidatorIndex []byte) bool {
return bytes.Equal(encIdx, encodedValidatorIndex)
}
// Disk key for a validator proposal, including a slot+validatorIndex as a byte slice.
// keyForValidatorProposal returns a disk key for a validator proposal, including a slot+validatorIndex as a byte slice.
func keyForValidatorProposal(slot primitives.Slot, proposerIndex primitives.ValidatorIndex) ([]byte, error) {
encSlot, err := slot.MarshalSSZ()
if err != nil {
@@ -512,37 +594,55 @@ func decodeSlasherChunk(enc []byte) ([]uint16, error) {
return chunk, nil
}
// Decode attestation record from bytes.
// Encode attestation record to bytes.
// The output encoded attestation record consists in the signing root concatened with the compressed attestation record.
func encodeAttestationRecord(att *slashertypes.IndexedAttestationWrapper) ([]byte, error) {
if att == nil || att.IndexedAttestation == nil {
return []byte{}, errors.New("nil proposal record")
}
// Encode attestation.
encodedAtt, err := att.IndexedAttestation.MarshalSSZ()
if err != nil {
return nil, err
}
// Compress attestation.
compressedAtt := snappy.Encode(nil, encodedAtt)
return append(att.SigningRoot[:], compressedAtt...), nil
}
// Decode attestation record from bytes.
// The input encoded attestation record consists in the signing root concatened with the compressed attestation record.
func decodeAttestationRecord(encoded []byte) (*slashertypes.IndexedAttestationWrapper, error) {
if len(encoded) < signingRootSize {
return nil, fmt.Errorf("wrong length for encoded attestation record, want 32, got %d", len(encoded))
return nil, fmt.Errorf("wrong length for encoded attestation record, want minimum %d, got %d", signingRootSize, len(encoded))
}
signingRoot := encoded[:signingRootSize]
decodedAtt := &ethpb.IndexedAttestation{}
// Decompress attestation.
decodedAttBytes, err := snappy.Decode(nil, encoded[signingRootSize:])
if err != nil {
return nil, err
}
// Decode attestation.
decodedAtt := &ethpb.IndexedAttestation{}
if err := decodedAtt.UnmarshalSSZ(decodedAttBytes); err != nil {
return nil, err
}
return &slashertypes.IndexedAttestationWrapper{
// Decode signing root.
signingRootBytes := encoded[:signingRootSize]
signingRoot := bytesutil.ToBytes32(signingRootBytes)
// Return decoded attestation.
attestation := &slashertypes.IndexedAttestationWrapper{
IndexedAttestation: decodedAtt,
SigningRoot: bytesutil.ToBytes32(signingRoot),
}, nil
SigningRoot: signingRoot,
}
return attestation, nil
}
func encodeProposalRecord(blkHdr *slashertypes.SignedBlockHeaderWrapper) ([]byte, error) {

View File

@@ -753,7 +753,7 @@ func (s *Service) initializeEth1Data(ctx context.Context, eth1DataInDB *ethpb.ET
} else {
if eth1DataInDB.Trie == nil && eth1DataInDB.DepositSnapshot != nil {
return errors.Errorf("trying to use old deposit trie after migration to the new trie. "+
"Run with the --%s flag to resume normal operations.", features.EnableEIP4881.Name)
"Remove the --%s flag to resume normal operations.", features.DisableEIP4881.Name)
}
s.depositTrie, err = trie.CreateTrieFromProto(eth1DataInDB.Trie)
}

View File

@@ -38,7 +38,7 @@ import (
// GetAggregateAttestation aggregates all attestations matching the given attestation data root and slot, returning the aggregated result.
func (s *Server) GetAggregateAttestation(w http.ResponseWriter, r *http.Request) {
ctx, span := trace.StartSpan(r.Context(), "validator.GetAggregateAttestation")
_, span := trace.StartSpan(r.Context(), "validator.GetAggregateAttestation")
defer span.End()
_, attDataRoot, ok := shared.HexFromQuery(w, r, "attestation_data_root", fieldparams.RootLength, true)
@@ -51,53 +51,67 @@ func (s *Server) GetAggregateAttestation(w http.ResponseWriter, r *http.Request)
return
}
if err := s.AttestationsPool.AggregateUnaggregatedAttestations(ctx); err != nil {
httputil.HandleError(w, "Could not aggregate unaggregated attestations: "+err.Error(), http.StatusBadRequest)
var match *ethpbalpha.Attestation
var err error
match, err = matchingAtt(s.AttestationsPool.AggregatedAttestations(), primitives.Slot(slot), attDataRoot)
if err != nil {
httputil.HandleError(w, "Could not get matching attestation: "+err.Error(), http.StatusInternalServerError)
return
}
allAtts := s.AttestationsPool.AggregatedAttestations()
var bestMatchingAtt *ethpbalpha.Attestation
for _, att := range allAtts {
if att.Data.Slot == primitives.Slot(slot) {
root, err := att.Data.HashTreeRoot()
if err != nil {
httputil.HandleError(w, "Could not get attestation data root: "+err.Error(), http.StatusInternalServerError)
return
}
if bytes.Equal(root[:], attDataRoot) {
if bestMatchingAtt == nil || len(att.AggregationBits) > len(bestMatchingAtt.AggregationBits) {
bestMatchingAtt = att
}
}
if match == nil {
atts, err := s.AttestationsPool.UnaggregatedAttestations()
if err != nil {
httputil.HandleError(w, "Could not get unaggregated attestations: "+err.Error(), http.StatusInternalServerError)
return
}
match, err = matchingAtt(atts, primitives.Slot(slot), attDataRoot)
if err != nil {
httputil.HandleError(w, "Could not get matching attestation: "+err.Error(), http.StatusInternalServerError)
return
}
}
if bestMatchingAtt == nil {
if match == nil {
httputil.HandleError(w, "No matching attestation found", http.StatusNotFound)
return
}
response := &AggregateAttestationResponse{
Data: &shared.Attestation{
AggregationBits: hexutil.Encode(bestMatchingAtt.AggregationBits),
AggregationBits: hexutil.Encode(match.AggregationBits),
Data: &shared.AttestationData{
Slot: strconv.FormatUint(uint64(bestMatchingAtt.Data.Slot), 10),
CommitteeIndex: strconv.FormatUint(uint64(bestMatchingAtt.Data.CommitteeIndex), 10),
BeaconBlockRoot: hexutil.Encode(bestMatchingAtt.Data.BeaconBlockRoot),
Slot: strconv.FormatUint(uint64(match.Data.Slot), 10),
CommitteeIndex: strconv.FormatUint(uint64(match.Data.CommitteeIndex), 10),
BeaconBlockRoot: hexutil.Encode(match.Data.BeaconBlockRoot),
Source: &shared.Checkpoint{
Epoch: strconv.FormatUint(uint64(bestMatchingAtt.Data.Source.Epoch), 10),
Root: hexutil.Encode(bestMatchingAtt.Data.Source.Root),
Epoch: strconv.FormatUint(uint64(match.Data.Source.Epoch), 10),
Root: hexutil.Encode(match.Data.Source.Root),
},
Target: &shared.Checkpoint{
Epoch: strconv.FormatUint(uint64(bestMatchingAtt.Data.Target.Epoch), 10),
Root: hexutil.Encode(bestMatchingAtt.Data.Target.Root),
Epoch: strconv.FormatUint(uint64(match.Data.Target.Epoch), 10),
Root: hexutil.Encode(match.Data.Target.Root),
},
},
Signature: hexutil.Encode(bestMatchingAtt.Signature),
Signature: hexutil.Encode(match.Signature),
}}
httputil.WriteJson(w, response)
}
func matchingAtt(atts []*ethpbalpha.Attestation, slot primitives.Slot, attDataRoot []byte) (*ethpbalpha.Attestation, error) {
for _, att := range atts {
if att.Data.Slot == slot {
root, err := att.Data.HashTreeRoot()
if err != nil {
return nil, errors.Wrap(err, "could not get attestation data root")
}
if bytes.Equal(root[:], attDataRoot) {
return att, nil
}
}
}
return nil, nil
}
// SubmitContributionAndProofs publishes multiple signed sync committee contribution and proofs.
func (s *Server) SubmitContributionAndProofs(w http.ResponseWriter, r *http.Request) {
ctx, span := trace.StartSpan(r.Context(), "validator.SubmitContributionAndProofs")

View File

@@ -71,7 +71,7 @@ func TestGetAggregateAttestation(t *testing.T) {
AggregationBits: []byte{0, 1, 1},
Data: &ethpbalpha.AttestationData{
Slot: 2,
CommitteeIndex: 2,
CommitteeIndex: 1,
BeaconBlockRoot: root21,
Source: &ethpbalpha.Checkpoint{
Epoch: 1,
@@ -90,7 +90,7 @@ func TestGetAggregateAttestation(t *testing.T) {
AggregationBits: []byte{0, 1, 1, 1},
Data: &ethpbalpha.AttestationData{
Slot: 2,
CommitteeIndex: 3,
CommitteeIndex: 1,
BeaconBlockRoot: root22,
Source: &ethpbalpha.Checkpoint{
Epoch: 1,
@@ -103,33 +103,56 @@ func TestGetAggregateAttestation(t *testing.T) {
},
Signature: sig22,
}
root33 := bytesutil.PadTo([]byte("root3_3"), 32)
sig33 := bls.NewAggregateSignature().Marshal()
attslot33 := &ethpbalpha.Attestation{
AggregationBits: []byte{1, 0, 0, 1},
root31 := bytesutil.PadTo([]byte("root3_1"), 32)
sig31 := bls.NewAggregateSignature().Marshal()
attslot31 := &ethpbalpha.Attestation{
AggregationBits: []byte{1, 0},
Data: &ethpbalpha.AttestationData{
Slot: 2,
CommitteeIndex: 3,
BeaconBlockRoot: root33,
Slot: 3,
CommitteeIndex: 1,
BeaconBlockRoot: root31,
Source: &ethpbalpha.Checkpoint{
Epoch: 1,
Root: root33,
Root: root31,
},
Target: &ethpbalpha.Checkpoint{
Epoch: 1,
Root: root33,
Root: root31,
},
},
Signature: sig33,
Signature: sig31,
}
root32 := bytesutil.PadTo([]byte("root3_2"), 32)
sig32 := bls.NewAggregateSignature().Marshal()
attslot32 := &ethpbalpha.Attestation{
AggregationBits: []byte{0, 1},
Data: &ethpbalpha.AttestationData{
Slot: 3,
CommitteeIndex: 1,
BeaconBlockRoot: root32,
Source: &ethpbalpha.Checkpoint{
Epoch: 1,
Root: root32,
},
Target: &ethpbalpha.Checkpoint{
Epoch: 1,
Root: root32,
},
},
Signature: sig32,
}
pool := attestations.NewPool()
err := pool.SaveAggregatedAttestations([]*ethpbalpha.Attestation{attSlot1, attslot21, attslot22})
assert.NoError(t, err)
err = pool.SaveUnaggregatedAttestations([]*ethpbalpha.Attestation{attslot31, attslot32})
assert.NoError(t, err)
s := &Server{
AttestationsPool: pool,
}
t.Run("ok", func(t *testing.T) {
t.Run("matching aggregated att", func(t *testing.T) {
reqRoot, err := attslot22.Data.HashTreeRoot()
require.NoError(t, err)
attDataRoot := hexutil.Encode(reqRoot[:])
@@ -147,7 +170,7 @@ func TestGetAggregateAttestation(t *testing.T) {
assert.DeepEqual(t, "0x00010101", resp.Data.AggregationBits)
assert.DeepEqual(t, hexutil.Encode(sig22), resp.Data.Signature)
assert.Equal(t, "2", resp.Data.Data.Slot)
assert.Equal(t, "3", resp.Data.Data.CommitteeIndex)
assert.Equal(t, "1", resp.Data.Data.CommitteeIndex)
assert.DeepEqual(t, hexutil.Encode(root22), resp.Data.Data.BeaconBlockRoot)
require.NotNil(t, resp.Data.Data.Source)
assert.Equal(t, "1", resp.Data.Data.Source.Epoch)
@@ -156,19 +179,11 @@ func TestGetAggregateAttestation(t *testing.T) {
assert.Equal(t, "1", resp.Data.Data.Target.Epoch)
assert.DeepEqual(t, hexutil.Encode(root22), resp.Data.Data.Target.Root)
})
t.Run("aggregate beforehand", func(t *testing.T) {
err = s.AttestationsPool.SaveUnaggregatedAttestation(attslot33)
require.NoError(t, err)
newAtt := ethpbalpha.CopyAttestation(attslot33)
newAtt.AggregationBits = []byte{0, 1, 0, 1}
err = s.AttestationsPool.SaveUnaggregatedAttestation(newAtt)
require.NoError(t, err)
reqRoot, err := attslot33.Data.HashTreeRoot()
t.Run("matching unaggregated att", func(t *testing.T) {
reqRoot, err := attslot32.Data.HashTreeRoot()
require.NoError(t, err)
attDataRoot := hexutil.Encode(reqRoot[:])
url := "http://example.com?attestation_data_root=" + attDataRoot + "&slot=2"
url := "http://example.com?attestation_data_root=" + attDataRoot + "&slot=3"
request := httptest.NewRequest(http.MethodGet, url, nil)
writer := httptest.NewRecorder()
writer.Body = &bytes.Buffer{}
@@ -178,7 +193,18 @@ func TestGetAggregateAttestation(t *testing.T) {
resp := &AggregateAttestationResponse{}
require.NoError(t, json.Unmarshal(writer.Body.Bytes(), resp))
require.NotNil(t, resp)
assert.DeepEqual(t, "0x01010001", resp.Data.AggregationBits)
require.NotNil(t, resp.Data)
assert.DeepEqual(t, "0x0001", resp.Data.AggregationBits)
assert.DeepEqual(t, hexutil.Encode(sig32), resp.Data.Signature)
assert.Equal(t, "3", resp.Data.Data.Slot)
assert.Equal(t, "1", resp.Data.Data.CommitteeIndex)
assert.DeepEqual(t, hexutil.Encode(root32), resp.Data.Data.BeaconBlockRoot)
require.NotNil(t, resp.Data.Data.Source)
assert.Equal(t, "1", resp.Data.Data.Source.Epoch)
assert.DeepEqual(t, hexutil.Encode(root32), resp.Data.Data.Source.Root)
require.NotNil(t, resp.Data.Data.Target)
assert.Equal(t, "1", resp.Data.Data.Target.Epoch)
assert.DeepEqual(t, hexutil.Encode(root32), resp.Data.Data.Target.Root)
})
t.Run("no matching attestation", func(t *testing.T) {
attDataRoot := hexutil.Encode(bytesutil.PadTo([]byte("foo"), 32))

View File

@@ -23,7 +23,7 @@ type chunkUpdateArgs struct {
}
// Chunker defines a struct which represents a slice containing a chunk for K different validator's
// min spans used for surround vote detection in slasher. The interface defines methods used to check
// min/max spans used for surround vote detection in slasher. The interface defines methods used to check
// if an attestation is slashable for a validator index based on the contents of
// the chunk as well as the ability to update the data in the chunk with incoming information.
type Chunker interface {
@@ -153,12 +153,12 @@ func MaxChunkSpansSliceFrom(params *Parameters, chunk []uint16) (*MaxSpanChunksS
// NeutralElement for a min span chunks slice is undefined, in this case
// using MaxUint16 as a sane value given it is impossible we reach it.
func (_ *MinSpanChunksSlice) NeutralElement() uint16 {
func (*MinSpanChunksSlice) NeutralElement() uint16 {
return math.MaxUint16
}
// NeutralElement for a max span chunks slice is 0.
func (_ *MaxSpanChunksSlice) NeutralElement() uint16 {
func (*MaxSpanChunksSlice) NeutralElement() uint16 {
return 0
}
@@ -191,12 +191,14 @@ func (m *MinSpanChunksSlice) CheckSlashable(
) (*ethpb.AttesterSlashing, error) {
sourceEpoch := attestation.IndexedAttestation.Data.Source.Epoch
targetEpoch := attestation.IndexedAttestation.Data.Target.Epoch
minTarget, err := chunkDataAtEpoch(m.params, m.data, validatorIdx, sourceEpoch)
if err != nil {
return nil, errors.Wrapf(
err, "could not get min target for validator %d at epoch %d", validatorIdx, sourceEpoch,
)
}
if targetEpoch > minTarget {
existingAttRecord, err := slasherDB.AttestationRecordForValidator(
ctx, validatorIdx, minTarget,
@@ -206,16 +208,20 @@ func (m *MinSpanChunksSlice) CheckSlashable(
err, "could not get existing attestation record at target %d", minTarget,
)
}
if existingAttRecord != nil {
if sourceEpoch < existingAttRecord.IndexedAttestation.Data.Source.Epoch {
surroundingVotesTotal.Inc()
return &ethpb.AttesterSlashing{
Attestation_1: attestation.IndexedAttestation,
Attestation_2: existingAttRecord.IndexedAttestation,
}, nil
}
if existingAttRecord == nil {
return nil, nil
}
if sourceEpoch < existingAttRecord.IndexedAttestation.Data.Source.Epoch {
surroundingVotesTotal.Inc()
return &ethpb.AttesterSlashing{
Attestation_1: attestation.IndexedAttestation,
Attestation_2: existingAttRecord.IndexedAttestation,
}, nil
}
}
return nil, nil
}
@@ -224,7 +230,7 @@ func (m *MinSpanChunksSlice) CheckSlashable(
// within the max span chunks slice. Recall that for an incoming attestation, B, and an
// existing attestation, A:
//
// B surrounds A if and only if B.target < max_spans[B.source]
// B is surrounded by A if and only if B.target < max_spans[B.source]
//
// That is, this condition is sufficient to check if an incoming attestation
// is surrounded by a previous one. We also check if we indeed have an existing
@@ -238,12 +244,14 @@ func (m *MaxSpanChunksSlice) CheckSlashable(
) (*ethpb.AttesterSlashing, error) {
sourceEpoch := attestation.IndexedAttestation.Data.Source.Epoch
targetEpoch := attestation.IndexedAttestation.Data.Target.Epoch
maxTarget, err := chunkDataAtEpoch(m.params, m.data, validatorIdx, sourceEpoch)
if err != nil {
return nil, errors.Wrapf(
err, "could not get max target for validator %d at epoch %d", validatorIdx, sourceEpoch,
)
}
if targetEpoch < maxTarget {
existingAttRecord, err := slasherDB.AttestationRecordForValidator(
ctx, validatorIdx, maxTarget,
@@ -253,14 +261,17 @@ func (m *MaxSpanChunksSlice) CheckSlashable(
err, "could not get existing attestation record at target %d", maxTarget,
)
}
if existingAttRecord != nil {
if existingAttRecord.IndexedAttestation.Data.Source.Epoch < sourceEpoch {
surroundedVotesTotal.Inc()
return &ethpb.AttesterSlashing{
Attestation_1: existingAttRecord.IndexedAttestation,
Attestation_2: attestation.IndexedAttestation,
}, nil
}
if existingAttRecord == nil {
return nil, nil
}
if existingAttRecord.IndexedAttestation.Data.Source.Epoch < sourceEpoch {
surroundedVotesTotal.Inc()
return &ethpb.AttesterSlashing{
Attestation_1: existingAttRecord.IndexedAttestation,
Attestation_2: attestation.IndexedAttestation,
}, nil
}
}
return nil, nil
@@ -331,8 +342,8 @@ func (m *MinSpanChunksSlice) Update(
minEpoch = args.currentEpoch - (m.params.historyLength - 1)
}
epochInChunk := startEpoch
// We go down the chunk for the validator, updating every value starting at start_epoch down to min_epoch.
// As long as the epoch, e, in the same chunk index and e >= min_epoch, we proceed with
// We go down the chunk for the validator, updating every value starting at startEpoch down to minEpoch.
// As long as the epoch, e, in the same chunk index and e >= minEpoch, we proceed with
// a for loop.
for m.params.chunkIndex(epochInChunk) == args.chunkIndex && epochInChunk >= minEpoch {
var chunkTarget primitives.Epoch
@@ -375,7 +386,7 @@ func (m *MaxSpanChunksSlice) Update(
newTargetEpoch primitives.Epoch,
) (keepGoing bool, err error) {
epochInChunk := startEpoch
// We go down the chunk for the validator, updating every value starting at start_epoch up to
// We go down the chunk for the validator, updating every value starting at startEpoch up to
// and including the current epoch. As long as the epoch, e, is in the same chunk index and e <= currentEpoch,
// we proceed with a for loop.
for m.params.chunkIndex(epochInChunk) == args.chunkIndex && epochInChunk <= args.currentEpoch {
@@ -436,7 +447,7 @@ func (m *MinSpanChunksSlice) StartEpoch(
// StartEpoch given a source epoch and current epoch, determines the start epoch of
// a max span chunk for use in chunk updates. The source epoch cannot be >= the current epoch.
func (_ *MaxSpanChunksSlice) StartEpoch(
func (*MaxSpanChunksSlice) StartEpoch(
sourceEpoch, currentEpoch primitives.Epoch,
) (epoch primitives.Epoch, exists bool) {
if sourceEpoch >= currentEpoch {

View File

@@ -20,6 +20,7 @@ func (s *Service) checkSlashableAttestations(
) ([]*ethpb.AttesterSlashing, error) {
slashings := make([]*ethpb.AttesterSlashing, 0)
// Double votes
log.Debug("Checking for double votes")
start := time.Now()
doubleVoteSlashings, err := s.checkDoubleVotes(ctx, atts)
@@ -29,42 +30,53 @@ func (s *Service) checkSlashableAttestations(
log.WithField("elapsed", time.Since(start)).Debug("Done checking double votes")
slashings = append(slashings, doubleVoteSlashings...)
// Surrounding / surrounded votes
groupedAtts := s.groupByValidatorChunkIndex(atts)
log.WithField("numBatches", len(groupedAtts)).Debug("Batching attestations by validator chunk index")
start = time.Now()
batchTimes := make([]time.Duration, 0, len(groupedAtts))
for validatorChunkIdx, batch := range groupedAtts {
innerStart := time.Now()
attSlashings, err := s.detectAllAttesterSlashings(ctx, &chunkUpdateArgs{
attSlashings, err := s.checkSurrounds(ctx, &chunkUpdateArgs{
validatorChunkIndex: validatorChunkIdx,
currentEpoch: currentEpoch,
}, batch)
if err != nil {
return nil, err
}
slashings = append(slashings, attSlashings...)
indices := s.params.validatorIndicesInChunk(validatorChunkIdx)
for _, idx := range indices {
s.latestEpochWrittenForValidator[idx] = currentEpoch
}
batchTimes = append(batchTimes, time.Since(innerStart))
}
var avgProcessingTimePerBatch time.Duration
avgProcessingTimePerBatch := time.Duration(0)
for _, dur := range batchTimes {
avgProcessingTimePerBatch += dur
}
if avgProcessingTimePerBatch != time.Duration(0) {
avgProcessingTimePerBatch = avgProcessingTimePerBatch / time.Duration(len(batchTimes))
}
log.WithFields(logrus.Fields{
"numAttestations": len(atts),
"numBatchesByValidatorChunkIndex": len(groupedAtts),
"elapsed": time.Since(start),
"avgBatchProcessingTime": avgProcessingTimePerBatch,
}).Info("Done checking slashable attestations")
if len(slashings) > 0 {
log.WithField("numSlashings", len(slashings)).Warn("Slashable attestation offenses found")
}
return slashings, nil
}
@@ -72,14 +84,13 @@ func (s *Service) checkSlashableAttestations(
// as the current epoch in time, we perform slashing detection.
// The process is as follows given a list of attestations:
//
// 1. Check for attester double votes using the list of attestations.
// 2. Group the attestations by chunk index.
// 3. Update the min and max spans for those grouped attestations, check if any slashings are
// 1. Group the attestations by chunk index.
// 2. Update the min and max spans for those grouped attestations, check if any slashings are
// found in the process
// 4. Update the latest written epoch for all validators involved to the current epoch.
// 3. Update the latest written epoch for all validators involved to the current epoch.
//
// This function performs a lot of critical actions and is split into smaller helpers for cleanliness.
func (s *Service) detectAllAttesterSlashings(
func (s *Service) checkSurrounds(
ctx context.Context,
args *chunkUpdateArgs,
attestations []*slashertypes.IndexedAttestationWrapper,
@@ -136,48 +147,8 @@ func (s *Service) detectAllAttesterSlashings(
return slashings, nil
}
// Check for attester slashing double votes by looking at every single validator index
// in each attestation's attesting indices and checking if there already exist records for such
// attestation's target epoch. If so, we append a double vote slashing object to a list of slashings
// we return to the caller.
func (s *Service) checkDoubleVotes(
ctx context.Context, attestations []*slashertypes.IndexedAttestationWrapper,
) ([]*ethpb.AttesterSlashing, error) {
ctx, span := trace.StartSpan(ctx, "Slasher.checkDoubleVotes")
defer span.End()
// We check if there are any slashable double votes in the input list
// of attestations with respect to each other.
slashings := make([]*ethpb.AttesterSlashing, 0)
existingAtts := make(map[string]*slashertypes.IndexedAttestationWrapper)
for _, att := range attestations {
for _, valIdx := range att.IndexedAttestation.AttestingIndices {
key := uintToString(uint64(att.IndexedAttestation.Data.Target.Epoch)) + ":" + uintToString(valIdx)
existingAtt, ok := existingAtts[key]
if !ok {
existingAtts[key] = att
continue
}
if att.SigningRoot != existingAtt.SigningRoot {
doubleVotesTotal.Inc()
slashings = append(slashings, &ethpb.AttesterSlashing{
Attestation_1: existingAtt.IndexedAttestation,
Attestation_2: att.IndexedAttestation,
})
}
}
}
// We check if there are any slashable double votes in the input list
// of attestations with respect to our database.
moreSlashings, err := s.checkDoubleVotesOnDisk(ctx, attestations)
if err != nil {
return nil, errors.Wrap(err, "could not check attestation double votes on disk")
}
return append(slashings, moreSlashings...), nil
}
// Check for double votes in our database given a list of incoming attestations.
func (s *Service) checkDoubleVotesOnDisk(
func (s *Service) checkDoubleVotes(
ctx context.Context, attestations []*slashertypes.IndexedAttestationWrapper,
) ([]*ethpb.AttesterSlashing, error) {
ctx, span := trace.StartSpan(ctx, "Slasher.checkDoubleVotesOnDisk")

View File

@@ -30,10 +30,32 @@ func Test_processQueuedAttestations(t *testing.T) {
currentEpoch primitives.Epoch
}
tests := []struct {
name string
args args
shouldNotBeSlashable bool
name string
args args
shouldBeSlashable bool
}{
{
name: "Same target with different signing roots",
args: args{
attestationQueue: []*slashertypes.IndexedAttestationWrapper{
createAttestationWrapper(t, 1, 2, []uint64{0, 1}, []byte{1}),
createAttestationWrapper(t, 1, 2, []uint64{0, 1}, []byte{2}),
},
currentEpoch: 4,
},
shouldBeSlashable: true,
},
{
name: "Same target with same signing roots",
args: args{
attestationQueue: []*slashertypes.IndexedAttestationWrapper{
createAttestationWrapper(t, 1, 2, []uint64{0, 1}, []byte{1}),
createAttestationWrapper(t, 1, 2, []uint64{0, 1}, []byte{1}),
},
currentEpoch: 4,
},
shouldBeSlashable: false,
},
{
name: "Detects surrounding vote (source 1, target 2), (source 0, target 3)",
args: args{
@@ -43,6 +65,7 @@ func Test_processQueuedAttestations(t *testing.T) {
},
currentEpoch: 4,
},
shouldBeSlashable: true,
},
{
name: "Detects surrounding vote (source 50, target 51), (source 0, target 1000)",
@@ -53,6 +76,7 @@ func Test_processQueuedAttestations(t *testing.T) {
},
currentEpoch: 1000,
},
shouldBeSlashable: true,
},
{
name: "Detects surrounded vote (source 0, target 3), (source 1, target 2)",
@@ -63,6 +87,7 @@ func Test_processQueuedAttestations(t *testing.T) {
},
currentEpoch: 4,
},
shouldBeSlashable: true,
},
{
name: "Detects double vote, (source 1, target 2), (source 0, target 2)",
@@ -73,6 +98,7 @@ func Test_processQueuedAttestations(t *testing.T) {
},
currentEpoch: 4,
},
shouldBeSlashable: true,
},
{
name: "Not slashable, surrounding but non-overlapping attesting indices within same validator chunk index",
@@ -83,7 +109,7 @@ func Test_processQueuedAttestations(t *testing.T) {
},
currentEpoch: 4,
},
shouldNotBeSlashable: true,
shouldBeSlashable: false,
},
{
name: "Not slashable, surrounded but non-overlapping attesting indices within same validator chunk index",
@@ -94,7 +120,7 @@ func Test_processQueuedAttestations(t *testing.T) {
},
currentEpoch: 4,
},
shouldNotBeSlashable: true,
shouldBeSlashable: false,
},
{
name: "Not slashable, surrounding but non-overlapping attesting indices in different validator chunk index",
@@ -111,7 +137,7 @@ func Test_processQueuedAttestations(t *testing.T) {
},
currentEpoch: 4,
},
shouldNotBeSlashable: true,
shouldBeSlashable: false,
},
{
name: "Not slashable, surrounded but non-overlapping attesting indices in different validator chunk index",
@@ -128,7 +154,7 @@ func Test_processQueuedAttestations(t *testing.T) {
},
currentEpoch: 4,
},
shouldNotBeSlashable: true,
shouldBeSlashable: false,
},
{
name: "Not slashable, (source 1, target 2), (source 2, target 3)",
@@ -139,7 +165,7 @@ func Test_processQueuedAttestations(t *testing.T) {
},
currentEpoch: 4,
},
shouldNotBeSlashable: true,
shouldBeSlashable: false,
},
{
name: "Not slashable, (source 0, target 3), (source 2, target 4)",
@@ -150,7 +176,7 @@ func Test_processQueuedAttestations(t *testing.T) {
},
currentEpoch: 4,
},
shouldNotBeSlashable: true,
shouldBeSlashable: false,
},
{
name: "Not slashable, (source 0, target 2), (source 0, target 3)",
@@ -161,7 +187,7 @@ func Test_processQueuedAttestations(t *testing.T) {
},
currentEpoch: 4,
},
shouldNotBeSlashable: true,
shouldBeSlashable: false,
},
{
name: "Not slashable, (source 0, target 3), (source 0, target 2)",
@@ -172,7 +198,7 @@ func Test_processQueuedAttestations(t *testing.T) {
},
currentEpoch: 4,
},
shouldNotBeSlashable: true,
shouldBeSlashable: false,
},
}
for _, tt := range tests {
@@ -246,21 +272,24 @@ func Test_processQueuedAttestations(t *testing.T) {
s.genesisTime = genesisTime
currentSlotChan := make(chan primitives.Slot)
exitChan := make(chan struct{})
s.wg.Add(1)
go func() {
s.processQueuedAttestations(ctx, currentSlotChan)
exitChan <- struct{}{}
}()
s.attsQueue.extend(tt.args.attestationQueue)
currentSlotChan <- slot
time.Sleep(time.Millisecond * 200)
cancel()
<-exitChan
if tt.shouldNotBeSlashable {
require.LogsDoNotContain(t, hook, "Attester slashing detected")
} else {
s.wg.Wait()
if tt.shouldBeSlashable {
require.LogsContain(t, hook, "Attester slashing detected")
} else {
require.LogsDoNotContain(t, hook, "Attester slashing detected")
}
require.LogsDoNotContain(t, hook, couldNotSaveAttRecord)
require.LogsDoNotContain(t, hook, couldNotCheckSlashableAtt)
require.LogsDoNotContain(t, hook, couldNotProcessAttesterSlashings)
})
}
}
@@ -304,10 +333,9 @@ func Test_processQueuedAttestations_MultipleChunkIndices(t *testing.T) {
s.genesisTime = genesisTime
currentSlotChan := make(chan primitives.Slot)
exitChan := make(chan struct{})
s.wg.Add(1)
go func() {
s.processQueuedAttestations(ctx, currentSlotChan)
exitChan <- struct{}{}
}()
for i := startEpoch; i <= endEpoch; i++ {
@@ -331,7 +359,7 @@ func Test_processQueuedAttestations_MultipleChunkIndices(t *testing.T) {
time.Sleep(time.Millisecond * 200)
cancel()
<-exitChan
s.wg.Wait()
require.LogsDoNotContain(t, hook, "Slashable offenses found")
require.LogsDoNotContain(t, hook, "Could not detect")
}
@@ -370,10 +398,9 @@ func Test_processQueuedAttestations_OverlappingChunkIndices(t *testing.T) {
s.genesisTime = genesisTime
currentSlotChan := make(chan primitives.Slot)
exitChan := make(chan struct{})
s.wg.Add(1)
go func() {
s.processQueuedAttestations(ctx, currentSlotChan)
exitChan <- struct{}{}
}()
// We create two attestations fully spanning chunk indices 0 and chunk 1
@@ -392,7 +419,7 @@ func Test_processQueuedAttestations_OverlappingChunkIndices(t *testing.T) {
time.Sleep(time.Millisecond * 200)
cancel()
<-exitChan
s.wg.Wait()
require.LogsDoNotContain(t, hook, "Slashable offenses found")
require.LogsDoNotContain(t, hook, "Could not detect")
}
@@ -593,44 +620,6 @@ func Test_applyAttestationForValidator_MaxSpanChunk(t *testing.T) {
require.NotNil(t, slashing)
}
func Test_checkDoubleVotes_SlashableInputAttestations(t *testing.T) {
slasherDB := dbtest.SetupSlasherDB(t)
ctx := context.Background()
// For a list of input attestations, check that we can
// indeed check there could exist a double vote offense
// within the list with respect to other entries in the list.
atts := []*slashertypes.IndexedAttestationWrapper{
createAttestationWrapper(t, 0, 1, []uint64{1, 2}, []byte{1}),
createAttestationWrapper(t, 0, 2, []uint64{1, 2}, []byte{1}),
createAttestationWrapper(t, 0, 2, []uint64{1, 2}, []byte{2}), // Different signing root.
}
srv, err := New(context.Background(),
&ServiceConfig{
Database: slasherDB,
StateNotifier: &mock.MockStateNotifier{},
ClockWaiter: startup.NewClockSynchronizer(),
})
require.NoError(t, err)
prev1 := createAttestationWrapper(t, 0, 2, []uint64{1, 2}, []byte{1})
cur1 := createAttestationWrapper(t, 0, 2, []uint64{1, 2}, []byte{2})
prev2 := createAttestationWrapper(t, 0, 2, []uint64{1, 2}, []byte{1})
cur2 := createAttestationWrapper(t, 0, 2, []uint64{1, 2}, []byte{2})
wanted := []*ethpb.AttesterSlashing{
{
Attestation_1: prev1.IndexedAttestation,
Attestation_2: cur1.IndexedAttestation,
},
{
Attestation_1: prev2.IndexedAttestation,
Attestation_2: cur2.IndexedAttestation,
},
}
slashings, err := srv.checkDoubleVotes(ctx, atts)
require.NoError(t, err)
require.DeepEqual(t, wanted, slashings)
}
func Test_checkDoubleVotes_SlashableAttestationsOnDisk(t *testing.T) {
slasherDB := dbtest.SetupSlasherDB(t)
ctx := context.Background()
@@ -787,16 +776,15 @@ func TestService_processQueuedAttestations(t *testing.T) {
})
ctx, cancel := context.WithCancel(context.Background())
tickerChan := make(chan primitives.Slot)
exitChan := make(chan struct{})
s.wg.Add(1)
go func() {
s.processQueuedAttestations(ctx, tickerChan)
exitChan <- struct{}{}
}()
// Send a value over the ticker.
tickerChan <- 1
cancel()
<-exitChan
s.wg.Wait()
assert.LogsContain(t, hook, "Processing queued")
}

View File

@@ -5,7 +5,6 @@ import (
"github.com/pkg/errors"
slashertypes "github.com/prysmaticlabs/prysm/v4/beacon-chain/slasher/types"
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
"go.opencensus.io/trace"
)
@@ -13,80 +12,70 @@ import (
// detectProposerSlashings takes in signed block header wrappers and returns a list of proposer slashings detected.
func (s *Service) detectProposerSlashings(
ctx context.Context,
proposedBlocks []*slashertypes.SignedBlockHeaderWrapper,
incomingProposals []*slashertypes.SignedBlockHeaderWrapper,
) ([]*ethpb.ProposerSlashing, error) {
ctx, span := trace.StartSpan(ctx, "slasher.detectProposerSlashings")
defer span.End()
// internalSlashings will contain any slashable double proposals in the input list
// of proposals with respect to each other.
internalSlashings := []*ethpb.ProposerSlashing{}
existingProposals := make(map[string]*slashertypes.SignedBlockHeaderWrapper)
// We check if there are any slashable double proposals in the input list
// of proposals with respect to each other.
slashings := make([]*ethpb.ProposerSlashing, 0)
existingProposals := make(map[string]*slashertypes.SignedBlockHeaderWrapper)
for i, proposal := range proposedBlocks {
key := proposalKey(proposal)
for _, incomingProposal := range incomingProposals {
key := proposalKey(incomingProposal)
existingProposal, ok := existingProposals[key]
// If we have not seen this proposal before, we add it to our map of existing proposals
// and we continue to the next proposal.
if !ok {
existingProposals[key] = proposal
existingProposals[key] = incomingProposal
continue
}
if isDoubleProposal(proposedBlocks[i].SigningRoot, existingProposal.SigningRoot) {
// If we have seen this proposal before, we check if it is a double proposal.
if isDoubleProposal(incomingProposal.SigningRoot, existingProposal.SigningRoot) {
doubleProposalsTotal.Inc()
slashing := &ethpb.ProposerSlashing{
Header_1: existingProposal.SignedBeaconBlockHeader,
Header_2: proposedBlocks[i].SignedBeaconBlockHeader,
Header_2: incomingProposal.SignedBeaconBlockHeader,
}
slashings = append(slashings, slashing)
internalSlashings = append(internalSlashings, slashing)
}
}
proposerSlashings, err := s.serviceCfg.Database.CheckDoubleBlockProposals(ctx, proposedBlocks)
// We check if there are any slashable double proposals in the input list
// of proposals with respect to the slasher database.
databaseSlashings, err := s.serviceCfg.Database.CheckDoubleBlockProposals(ctx, incomingProposals)
if err != nil {
return nil, errors.Wrap(err, "could not check for double proposals on disk")
}
if err := s.saveSafeProposals(ctx, proposedBlocks, proposerSlashings); err != nil {
// We save the safe proposals (with respect to the database) to our database.
// If some proposals in incomingProposals are slashable with respect to each other,
// we (arbitrarily) save the last one to the database.
if err := s.serviceCfg.Database.SaveBlockProposals(ctx, incomingProposals); err != nil {
return nil, errors.Wrap(err, "could not save safe proposals")
}
slashings = append(slashings, proposerSlashings...)
return slashings, nil
}
// Check for double proposals in our database given a list of incoming block proposals.
// For the proposals that were not slashable, we save them to the database.
func (s *Service) saveSafeProposals(
ctx context.Context,
proposedBlocks []*slashertypes.SignedBlockHeaderWrapper,
proposerSlashings []*ethpb.ProposerSlashing,
) error {
ctx, span := trace.StartSpan(ctx, "slasher.saveSafeProposals")
defer span.End()
return s.serviceCfg.Database.SaveBlockProposals(
ctx,
filterSafeProposals(proposedBlocks, proposerSlashings),
)
}
func filterSafeProposals(
proposedBlocks []*slashertypes.SignedBlockHeaderWrapper,
proposerSlashings []*ethpb.ProposerSlashing,
) []*slashertypes.SignedBlockHeaderWrapper {
// We initialize a map of proposers that are safe from slashing.
safeProposers := make(map[primitives.ValidatorIndex]*slashertypes.SignedBlockHeaderWrapper, len(proposedBlocks))
for _, proposal := range proposedBlocks {
safeProposers[proposal.SignedBeaconBlockHeader.Header.ProposerIndex] = proposal
}
for _, doubleProposal := range proposerSlashings {
// If a proposer is found to have committed a slashable offense, we delete
// them from the safe proposers map.
delete(safeProposers, doubleProposal.Header_1.Header.ProposerIndex)
}
// We save all the proposals that are determined "safe" and not-slashable to our database.
safeProposals := make([]*slashertypes.SignedBlockHeaderWrapper, 0, len(safeProposers))
for _, proposal := range safeProposers {
safeProposals = append(safeProposals, proposal)
}
return safeProposals
// totalSlashings contain all slashings we have detected.
totalSlashings := append(internalSlashings, databaseSlashings...)
return totalSlashings, nil
}
// proposalKey build a key which is a combination of the slot and the proposer index.
// If a validator proposes several blocks for the same slot, then several (potentially slashable)
// proposals will correspond to the same key.
func proposalKey(proposal *slashertypes.SignedBlockHeaderWrapper) string {
header := proposal.SignedBeaconBlockHeader.Header
return uintToString(uint64(header.Slot)) + ":" + uintToString(uint64(header.ProposerIndex))
slotKey := uintToString(uint64(header.Slot))
proposerIndexKey := uintToString(uint64(header.ProposerIndex))
return slotKey + ":" + proposerIndexKey
}

View File

@@ -22,94 +22,139 @@ import (
logTest "github.com/sirupsen/logrus/hooks/test"
)
type wrapped struct {
slot primitives.Slot
signedBlkHeaders []*slashertypes.SignedBlockHeaderWrapper
}
func Test_processQueuedBlocks_DetectsDoubleProposals(t *testing.T) {
hook := logTest.NewGlobal()
slasherDB := dbtest.SetupSlasherDB(t)
beaconDB := dbtest.SetupDB(t)
ctx, cancel := context.WithCancel(context.Background())
beaconState, err := util.NewBeaconState()
require.NoError(t, err)
// Initialize validators in the state.
numVals := params.BeaconConfig().MinGenesisActiveValidatorCount
validators := make([]*ethpb.Validator, numVals)
privKeys := make([]bls.SecretKey, numVals)
for i := range validators {
privKey, err := bls.RandKey()
require.NoError(t, err)
privKeys[i] = privKey
validators[i] = &ethpb.Validator{
PublicKey: privKey.PublicKey().Marshal(),
WithdrawalCredentials: make([]byte, 32),
}
}
err = beaconState.SetValidators(validators)
require.NoError(t, err)
domain, err := signing.Domain(
beaconState.Fork(),
0,
params.BeaconConfig().DomainBeaconProposer,
beaconState.GenesisValidatorsRoot(),
)
require.NoError(t, err)
mockChain := &mock.ChainService{
State: beaconState,
}
s := &Service{
serviceCfg: &ServiceConfig{
Database: slasherDB,
StateNotifier: &mock.MockStateNotifier{},
HeadStateFetcher: mockChain,
StateGen: stategen.New(beaconDB, doublylinkedtree.New()),
SlashingPoolInserter: &slashingsmock.PoolMock{},
ClockWaiter: startup.NewClockSynchronizer(),
testCases := []struct {
name string
wraps []wrapped
}{
{
name: "detects double proposals in the same batch",
wraps: []wrapped{
{
4,
[]*slashertypes.SignedBlockHeaderWrapper{
createProposalWrapper(t, 4, 1, []byte{1}),
createProposalWrapper(t, 4, 1, []byte{1}),
createProposalWrapper(t, 4, 1, []byte{1}),
createProposalWrapper(t, 4, 1, []byte{2}),
},
},
},
},
{
name: "detects double proposals in the different batches",
wraps: []wrapped{
{
5,
[]*slashertypes.SignedBlockHeaderWrapper{
createProposalWrapper(t, 4, 1, []byte{1}),
createProposalWrapper(t, 5, 1, []byte{1}),
},
},
{
6,
[]*slashertypes.SignedBlockHeaderWrapper{
createProposalWrapper(t, 4, 1, []byte{2}),
},
},
},
},
params: DefaultParams(),
blksQueue: newBlocksQueue(),
}
parentRoot := bytesutil.ToBytes32([]byte("parent"))
err = s.serviceCfg.StateGen.SaveState(ctx, parentRoot, beaconState)
require.NoError(t, err)
for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
hook := logTest.NewGlobal()
beaconDB := dbtest.SetupDB(t)
slasherDB := dbtest.SetupSlasherDB(t)
ctx, cancel := context.WithCancel(context.Background())
currentSlotChan := make(chan primitives.Slot)
exitChan := make(chan struct{})
go func() {
s.processQueuedBlocks(ctx, currentSlotChan)
exitChan <- struct{}{}
}()
beaconState, err := util.NewBeaconState()
require.NoError(t, err)
signedBlkHeaders := []*slashertypes.SignedBlockHeaderWrapper{
createProposalWrapper(t, 4, 1, []byte{1}),
createProposalWrapper(t, 4, 1, []byte{1}),
createProposalWrapper(t, 4, 1, []byte{1}),
createProposalWrapper(t, 4, 1, []byte{2}),
// Initialize validators in the state.
numVals := params.BeaconConfig().MinGenesisActiveValidatorCount
validators := make([]*ethpb.Validator, numVals)
privKeys := make([]bls.SecretKey, numVals)
for i := range validators {
privKey, err := bls.RandKey()
require.NoError(t, err)
privKeys[i] = privKey
validators[i] = &ethpb.Validator{
PublicKey: privKey.PublicKey().Marshal(),
WithdrawalCredentials: make([]byte, 32),
}
}
err = beaconState.SetValidators(validators)
require.NoError(t, err)
domain, err := signing.Domain(
beaconState.Fork(),
0,
params.BeaconConfig().DomainBeaconProposer,
beaconState.GenesisValidatorsRoot(),
)
require.NoError(t, err)
mockChain := &mock.ChainService{
State: beaconState,
}
s := &Service{
serviceCfg: &ServiceConfig{
Database: slasherDB,
StateNotifier: &mock.MockStateNotifier{},
HeadStateFetcher: mockChain,
StateGen: stategen.New(beaconDB, doublylinkedtree.New()),
SlashingPoolInserter: &slashingsmock.PoolMock{},
ClockWaiter: startup.NewClockSynchronizer(),
},
params: DefaultParams(),
blksQueue: newBlocksQueue(),
}
parentRoot := bytesutil.ToBytes32([]byte("parent"))
err = s.serviceCfg.StateGen.SaveState(ctx, parentRoot, beaconState)
require.NoError(t, err)
currentSlotChan := make(chan primitives.Slot)
s.wg.Add(1)
go func() {
s.processQueuedBlocks(ctx, currentSlotChan)
}()
for _, wrap := range tt.wraps {
// Add valid signatures to the block headers we are testing.
for _, proposalWrapper := range wrap.signedBlkHeaders {
proposalWrapper.SignedBeaconBlockHeader.Header.ParentRoot = parentRoot[:]
headerHtr, err := proposalWrapper.SignedBeaconBlockHeader.Header.HashTreeRoot()
require.NoError(t, err)
container := &ethpb.SigningData{
ObjectRoot: headerHtr[:],
Domain: domain,
}
signingRoot, err := container.HashTreeRoot()
require.NoError(t, err)
privKey := privKeys[proposalWrapper.SignedBeaconBlockHeader.Header.ProposerIndex]
proposalWrapper.SignedBeaconBlockHeader.Signature = privKey.Sign(signingRoot[:]).Marshal()
}
s.blksQueue.extend(wrap.signedBlkHeaders)
currentSlot := primitives.Slot(4)
currentSlotChan <- currentSlot
}
cancel()
s.wg.Wait()
require.LogsContain(t, hook, "Proposer slashing detected")
})
}
// Add valid signatures to the block headers we are testing.
for _, proposalWrapper := range signedBlkHeaders {
proposalWrapper.SignedBeaconBlockHeader.Header.ParentRoot = parentRoot[:]
headerHtr, err := proposalWrapper.SignedBeaconBlockHeader.Header.HashTreeRoot()
require.NoError(t, err)
container := &ethpb.SigningData{
ObjectRoot: headerHtr[:],
Domain: domain,
}
signingRoot, err := container.HashTreeRoot()
require.NoError(t, err)
privKey := privKeys[proposalWrapper.SignedBeaconBlockHeader.Header.ProposerIndex]
proposalWrapper.SignedBeaconBlockHeader.Signature = privKey.Sign(signingRoot[:]).Marshal()
}
s.blksQueue.extend(signedBlkHeaders)
currentSlot := primitives.Slot(4)
currentSlotChan <- currentSlot
cancel()
<-exitChan
require.LogsContain(t, hook, "Proposer slashing detected")
}
func Test_processQueuedBlocks_NotSlashable(t *testing.T) {
@@ -137,10 +182,9 @@ func Test_processQueuedBlocks_NotSlashable(t *testing.T) {
blksQueue: newBlocksQueue(),
}
currentSlotChan := make(chan primitives.Slot)
exitChan := make(chan struct{})
s.wg.Add(1)
go func() {
s.processQueuedBlocks(ctx, currentSlotChan)
exitChan <- struct{}{}
}()
s.blksQueue.extend([]*slashertypes.SignedBlockHeaderWrapper{
createProposalWrapper(t, 4, 1, []byte{1}),
@@ -148,7 +192,7 @@ func Test_processQueuedBlocks_NotSlashable(t *testing.T) {
})
currentSlotChan <- currentSlot
cancel()
<-exitChan
s.wg.Wait()
require.LogsDoNotContain(t, hook, "Proposer slashing detected")
}

View File

@@ -52,12 +52,12 @@ func (s *Service) groupByChunkIndex(
// This function returns a list of valid attestations, a list of attestations that are
// valid in the future, and the number of attestations dropped.
func (s *Service) filterAttestations(
atts []*slashertypes.IndexedAttestationWrapper, currentEpoch primitives.Epoch,
attWrappers []*slashertypes.IndexedAttestationWrapper, currentEpoch primitives.Epoch,
) (valid, validInFuture []*slashertypes.IndexedAttestationWrapper, numDropped int) {
valid = make([]*slashertypes.IndexedAttestationWrapper, 0, len(atts))
validInFuture = make([]*slashertypes.IndexedAttestationWrapper, 0, len(atts))
valid = make([]*slashertypes.IndexedAttestationWrapper, 0, len(attWrappers))
validInFuture = make([]*slashertypes.IndexedAttestationWrapper, 0, len(attWrappers))
for _, attWrapper := range atts {
for _, attWrapper := range attWrappers {
if attWrapper == nil || !validateAttestationIntegrity(attWrapper.IndexedAttestation) {
numDropped++
continue
@@ -73,18 +73,19 @@ func (s *Service) filterAttestations(
// If an attestations's target epoch is in the future, we defer processing for later.
if attWrapper.IndexedAttestation.Data.Target.Epoch > currentEpoch {
validInFuture = append(validInFuture, attWrapper)
} else {
valid = append(valid, attWrapper)
continue
}
// The attestation is valid.
valid = append(valid, attWrapper)
}
return
}
// Validates the attestation data integrity, ensuring we have no nil values for
// source, epoch, and that the source epoch of the attestation must be less than
// the target epoch, which is a precondition for performing slashing detection.
// This function also checks the attestation source epoch is within the history size
// we keep track of for slashing detection.
// source and target epochs, and that the source epoch of the attestation must
// be less than the target epoch, which is a precondition for performing slashing
// detection (except for the genesis epoch).
func validateAttestationIntegrity(att *ethpb.IndexedAttestation) bool {
// If an attestation is malformed, we drop it.
if att == nil ||

View File

@@ -10,16 +10,10 @@ import (
// To properly access the element at epoch `e` for a validator index `i`, we leverage helper
// functions from these parameter values as nice abstractions. the following parameters are
// required for the helper functions defined in this file.
//
// (C) chunkSize defines how many elements are in a chunk for a validator
// min or max span slice.
// (K) validatorChunkSize defines how many validators' chunks we store in a single
// flat byte slice on disk.
// (H) historyLength defines how many epochs we keep of min or max spans.
type Parameters struct {
chunkSize uint64
validatorChunkSize uint64
historyLength primitives.Epoch
chunkSize uint64 // C - defines how many elements are in a chunk for a validator min or max span slice.
validatorChunkSize uint64 // K - defines how many validators' chunks we store in a single flat byte slice on disk.
historyLength primitives.Epoch // H - defines how many epochs we keep of min or max spans.
}
// DefaultParams defines default values for slasher's important parameters, defined
@@ -98,8 +92,8 @@ func (p *Parameters) lastEpoch(chunkIndex uint64) primitives.Epoch {
// with (validatorIndex % K)*C + (epoch % C), which gives us:
//
// (2 % 3)*3 + (1 % 3) =
// (2*3) + (1) =
// 7
// 2*3 + 1 =
// 7
//
// val0 val1 val2
// | | |

View File

@@ -12,10 +12,18 @@ import (
"github.com/sirupsen/logrus"
)
const (
couldNotSaveAttRecord = "Could not save attestation records to DB"
couldNotCheckSlashableAtt = "Could not check slashable attestations"
couldNotProcessAttesterSlashings = "Could not process attester slashings"
)
// Receive indexed attestations from some source event feed,
// validating their integrity before appending them to an attestation queue
// for batch processing in a separate routine.
func (s *Service) receiveAttestations(ctx context.Context, indexedAttsChan chan *ethpb.IndexedAttestation) {
defer s.wg.Done()
sub := s.serviceCfg.IndexedAttestationsFeed.Subscribe(indexedAttsChan)
defer sub.Unsubscribe()
for {
@@ -45,6 +53,8 @@ func (s *Service) receiveAttestations(ctx context.Context, indexedAttsChan chan
// Receive beacon blocks from some source event feed,
func (s *Service) receiveBlocks(ctx context.Context, beaconBlockHeadersChan chan *ethpb.SignedBeaconBlockHeader) {
defer s.wg.Done()
sub := s.serviceCfg.BeaconBlockHeadersFeed.Subscribe(beaconBlockHeadersChan)
defer sub.Unsubscribe()
for {
@@ -77,6 +87,8 @@ func (s *Service) receiveBlocks(ctx context.Context, beaconBlockHeadersChan chan
// This grouping will allow us to perform detection on batches of attestations
// per validator chunk index which can be done concurrently.
func (s *Service) processQueuedAttestations(ctx context.Context, slotTicker <-chan primitives.Slot) {
defer s.wg.Done()
for {
select {
case currentSlot := <-slotTicker:
@@ -101,24 +113,26 @@ func (s *Service) processQueuedAttestations(ctx context.Context, slotTicker <-ch
}).Info("Processing queued attestations for slashing detection")
// Save the attestation records to our database.
// If multiple attestations are provided for the same validator index + target epoch combination,
// then last (validator index + target epoch) => signing root) link is kept into the database.
if err := s.serviceCfg.Database.SaveAttestationRecordsForValidators(
ctx, validAtts,
); err != nil {
log.WithError(err).Error("Could not save attestation records to DB")
log.WithError(err).Error(couldNotSaveAttRecord)
continue
}
// Check for slashings.
slashings, err := s.checkSlashableAttestations(ctx, currentEpoch, validAtts)
if err != nil {
log.WithError(err).Error("Could not check slashable attestations")
log.WithError(err).Error(couldNotCheckSlashableAtt)
continue
}
// Process attester slashings by verifying their signatures, submitting
// to the beacon node's operations pool, and logging them.
if err := s.processAttesterSlashings(ctx, slashings); err != nil {
log.WithError(err).Error("Could not process attester slashings")
log.WithError(err).Error(couldNotProcessAttesterSlashings)
continue
}
@@ -132,6 +146,8 @@ func (s *Service) processQueuedAttestations(ctx context.Context, slotTicker <-ch
// Process queued blocks every time an epoch ticker fires. We retrieve
// these blocks from a queue, then perform double proposal detection.
func (s *Service) processQueuedBlocks(ctx context.Context, slotTicker <-chan primitives.Slot) {
defer s.wg.Done()
for {
select {
case currentSlot := <-slotTicker:
@@ -172,6 +188,8 @@ func (s *Service) processQueuedBlocks(ctx context.Context, slotTicker <-chan pri
// Prunes slasher data on each slot tick to prevent unnecessary build-up of disk space usage.
func (s *Service) pruneSlasherData(ctx context.Context, slotTicker <-chan primitives.Slot) {
defer s.wg.Done()
for {
select {
case <-slotTicker:

View File

@@ -32,10 +32,9 @@ func TestSlasher_receiveAttestations_OK(t *testing.T) {
indexedAttsChan := make(chan *ethpb.IndexedAttestation)
defer close(indexedAttsChan)
exitChan := make(chan struct{})
s.wg.Add(1)
go func() {
s.receiveAttestations(ctx, indexedAttsChan)
exitChan <- struct{}{}
}()
firstIndices := []uint64{1, 2, 3}
secondIndices := []uint64{4, 5, 6}
@@ -44,7 +43,7 @@ func TestSlasher_receiveAttestations_OK(t *testing.T) {
indexedAttsChan <- att1.IndexedAttestation
indexedAttsChan <- att2.IndexedAttestation
cancel()
<-exitChan
s.wg.Wait()
wanted := []*slashertypes.IndexedAttestationWrapper{
att1,
att2,
@@ -216,11 +215,9 @@ func TestSlasher_receiveAttestations_OnlyValidAttestations(t *testing.T) {
indexedAttsChan := make(chan *ethpb.IndexedAttestation)
defer close(indexedAttsChan)
exitChan := make(chan struct{})
defer close(exitChan)
s.wg.Add(1)
go func() {
s.receiveAttestations(ctx, indexedAttsChan)
exitChan <- struct{}{}
}()
firstIndices := []uint64{1, 2, 3}
secondIndices := []uint64{4, 5, 6}
@@ -233,7 +230,7 @@ func TestSlasher_receiveAttestations_OnlyValidAttestations(t *testing.T) {
AttestingIndices: secondIndices,
}
cancel()
<-exitChan
s.wg.Wait()
// Expect only a single, valid attestation was added to the queue.
require.Equal(t, 1, s.attsQueue.size())
wanted := []*slashertypes.IndexedAttestationWrapper{
@@ -254,10 +251,9 @@ func TestSlasher_receiveBlocks_OK(t *testing.T) {
}
beaconBlockHeadersChan := make(chan *ethpb.SignedBeaconBlockHeader)
defer close(beaconBlockHeadersChan)
exitChan := make(chan struct{})
s.wg.Add(1)
go func() {
s.receiveBlocks(ctx, beaconBlockHeadersChan)
exitChan <- struct{}{}
}()
block1 := createProposalWrapper(t, 0, 1, nil).SignedBeaconBlockHeader
@@ -265,7 +261,7 @@ func TestSlasher_receiveBlocks_OK(t *testing.T) {
beaconBlockHeadersChan <- block1
beaconBlockHeadersChan <- block2
cancel()
<-exitChan
s.wg.Wait()
wanted := []*slashertypes.SignedBlockHeaderWrapper{
createProposalWrapper(t, 0, block1.Header.ProposerIndex, nil),
createProposalWrapper(t, 0, block2.Header.ProposerIndex, nil),
@@ -301,15 +297,14 @@ func TestService_processQueuedBlocks(t *testing.T) {
})
ctx, cancel := context.WithCancel(context.Background())
tickerChan := make(chan primitives.Slot)
exitChan := make(chan struct{})
s.wg.Add(1)
go func() {
s.processQueuedBlocks(ctx, tickerChan)
exitChan <- struct{}{}
}()
// Send a value over the ticker.
tickerChan <- 0
cancel()
<-exitChan
s.wg.Wait()
assert.LogsContain(t, hook, "Processing queued")
}

View File

@@ -6,6 +6,7 @@ package slasher
import (
"context"
"sync"
"time"
"github.com/prysmaticlabs/prysm/v4/async/event"
@@ -15,7 +16,7 @@ import (
"github.com/prysmaticlabs/prysm/v4/beacon-chain/operations/slashings"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/startup"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/state/stategen"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/sync"
beaconChainSync "github.com/prysmaticlabs/prysm/v4/beacon-chain/sync"
"github.com/prysmaticlabs/prysm/v4/config/params"
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
@@ -38,7 +39,7 @@ type ServiceConfig struct {
StateGen stategen.StateManager
SlashingPoolInserter slashings.PoolInserter
HeadStateFetcher blockchain.HeadFetcher
SyncChecker sync.Checker
SyncChecker beaconChainSync.Checker
ClockWaiter startup.ClockWaiter
}
@@ -67,6 +68,7 @@ type Service struct {
blocksSlotTicker *slots.SlotTicker
pruningSlotTicker *slots.SlotTicker
latestEpochWrittenForValidator map[primitives.ValidatorIndex]primitives.Epoch
wg sync.WaitGroup
}
// New instantiates a new slasher from configuration values.
@@ -126,21 +128,33 @@ func (s *Service) run() {
indexedAttsChan := make(chan *ethpb.IndexedAttestation, 1)
beaconBlockHeadersChan := make(chan *ethpb.SignedBeaconBlockHeader, 1)
s.wg.Add(1)
go s.receiveAttestations(s.ctx, indexedAttsChan)
s.wg.Add(1)
go s.receiveBlocks(s.ctx, beaconBlockHeadersChan)
secondsPerSlot := params.BeaconConfig().SecondsPerSlot
s.attsSlotTicker = slots.NewSlotTicker(s.genesisTime, secondsPerSlot)
s.blocksSlotTicker = slots.NewSlotTicker(s.genesisTime, secondsPerSlot)
s.pruningSlotTicker = slots.NewSlotTicker(s.genesisTime, secondsPerSlot)
s.wg.Add(1)
go s.processQueuedAttestations(s.ctx, s.attsSlotTicker.C())
s.wg.Add(1)
go s.processQueuedBlocks(s.ctx, s.blocksSlotTicker.C())
s.wg.Add(1)
go s.pruneSlasherData(s.ctx, s.pruningSlotTicker.C())
}
// Stop the slasher service.
func (s *Service) Stop() error {
s.cancel()
s.wg.Wait()
if s.attsSlotTicker != nil {
s.attsSlotTicker.Done()
}

View File

@@ -54,6 +54,7 @@ go_test(
],
embed = [":go_default_library"],
deps = [
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/core/signing:go_default_library",
"//beacon-chain/db:go_default_library",
"//beacon-chain/p2p/testing:go_default_library",

View File

@@ -227,6 +227,10 @@ func (s *Service) Start() {
}
s.ms.setClock(clock)
if s.store.isGenesisSync() {
log.Info("Exiting backfill service as the node has been initialized with a genesis state or the backfill status is missing")
return
}
status := s.store.status()
// Exit early if there aren't going to be any batches to backfill.
if primitives.Slot(status.LowSlot) <= s.ms.minimumSlot() {
@@ -293,8 +297,10 @@ func minimumBackfillSlot(current primitives.Slot) primitives.Slot {
oe = slots.MaxSafeEpoch()
}
offset := slots.UnsafeEpochStart(oe)
if offset > current {
return 0
if offset >= current {
// Slot 0 is the genesis block, therefore the signature in it is invalid.
// To prevent us from rejecting a batch, we restrict the minimum backfill batch till only slot 1
return 1
}
return current - offset
}

View File

@@ -5,9 +5,11 @@ import (
"testing"
"time"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/helpers"
p2ptest "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p/testing"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/startup"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/state"
"github.com/prysmaticlabs/prysm/v4/config/params"
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v4/proto/dbval"
"github.com/prysmaticlabs/prysm/v4/testing/require"
@@ -75,6 +77,20 @@ func TestServiceInit(t *testing.T) {
}
}
func TestMinimumBackfillSlot(t *testing.T) {
oe := helpers.MinEpochsForBlockRequests()
currSlot := (oe + 100).Mul(uint64(params.BeaconConfig().SlotsPerEpoch))
minSlot := minimumBackfillSlot(primitives.Slot(currSlot))
require.Equal(t, 100*params.BeaconConfig().SlotsPerEpoch, minSlot)
oe = helpers.MinEpochsForBlockRequests()
currSlot = oe.Mul(uint64(params.BeaconConfig().SlotsPerEpoch))
minSlot = minimumBackfillSlot(primitives.Slot(currSlot))
require.Equal(t, primitives.Slot(1), minSlot)
}
func testReadN(t *testing.T, ctx context.Context, c chan batch, n int, into []batch) []batch {
for i := 0; i < n; i++ {
select {

View File

@@ -149,6 +149,12 @@ func (s *Store) swapStatus(bs *dbval.BackfillStatus) {
s.bs = bs
}
func (s *Store) isGenesisSync() bool {
s.RLock()
defer s.RUnlock()
return s.genesisSync
}
// originState looks up the state for the checkpoint sync origin. This is a hack, because StatusUpdater is the only
// thing that needs db access and it has the origin root handy, so it's convenient to look it up here. The state is
// needed by the verifier.

View File

@@ -40,11 +40,13 @@ func (w *p2pWorker) handle(ctx context.Context, b batch) batch {
dlt := time.Now()
backfillBatchTimeDownloading.Observe(float64(dlt.Sub(start).Milliseconds()))
if err != nil {
log.WithError(err).WithFields(b.logFields()).Debug("Batch requesting failed")
return b.withRetryableError(err)
}
vb, err := w.v.verify(results)
backfillBatchTimeVerifying.Observe(float64(time.Since(dlt).Milliseconds()))
if err != nil {
log.WithError(err).WithFields(b.logFields()).Debug("Batch validation failed")
return b.withRetryableError(err)
}
// This is a hack to get the rough size of the batch. This helps us approximate the amount of memory needed

View File

@@ -2,6 +2,7 @@ package sync
import (
"context"
"fmt"
"sort"
"time"
@@ -16,6 +17,7 @@ import (
"github.com/prysmaticlabs/prysm/v4/encoding/bytesutil"
"github.com/prysmaticlabs/prysm/v4/monitoring/tracing"
"github.com/prysmaticlabs/prysm/v4/time/slots"
"github.com/sirupsen/logrus"
"go.opencensus.io/trace"
)
@@ -80,7 +82,10 @@ func (s *Service) blobSidecarByRootRPCHandler(ctx context.Context, msg interface
sc, err := s.cfg.blobStorage.Get(root, idx)
if err != nil {
if db.IsNotFound(err) {
log.WithError(err).Debugf("BlobSidecar not found in db, root=%x, index=%d", root, idx)
log.WithError(err).WithFields(logrus.Fields{
"root": fmt.Sprintf("%#x", root),
"index": idx,
}).Debugf("Peer requested blob sidecar by root not found in db")
continue
}
log.WithError(err).Errorf("unexpected db error retrieving BlobSidecar, root=%x, index=%d", root, idx)

View File

@@ -227,8 +227,8 @@ func (s *Service) validateBeaconBlockPubSub(ctx context.Context, pid peer.ID, ms
logFields["validationTime"] = validationTime
log.WithFields(logFields).Debug("Received block")
blockArrivalGossipSummary.Observe(float64(sinceSlotStartTime))
blockVerificationGossipSummary.Observe(float64(validationTime))
blockArrivalGossipSummary.Observe(float64(sinceSlotStartTime.Milliseconds()))
blockVerificationGossipSummary.Observe(float64(validationTime.Milliseconds()))
return pubsub.ValidationAccept, nil
}

View File

@@ -22,6 +22,7 @@ go_library(
"//cmd/beacon-chain/jwt:go_default_library",
"//cmd/beacon-chain/storage:go_default_library",
"//cmd/beacon-chain/sync/backfill:go_default_library",
"//cmd/beacon-chain/sync/backfill/flags:go_default_library",
"//cmd/beacon-chain/sync/checkpoint:go_default_library",
"//cmd/beacon-chain/sync/genesis:go_default_library",
"//config/features:go_default_library",

View File

@@ -171,7 +171,7 @@ var (
BlobBatchLimit = &cli.IntFlag{
Name: "blob-batch-limit",
Usage: "The amount of blobs the local peer is bounded to request and respond to in a batch.",
Value: 8,
Value: 64,
}
// BlobBatchLimitBurstFactor specifies the factor by which blob batch size may increase.
BlobBatchLimitBurstFactor = &cli.IntFlag{

View File

@@ -20,7 +20,8 @@ import (
"github.com/prysmaticlabs/prysm/v4/cmd/beacon-chain/flags"
jwtcommands "github.com/prysmaticlabs/prysm/v4/cmd/beacon-chain/jwt"
"github.com/prysmaticlabs/prysm/v4/cmd/beacon-chain/storage"
"github.com/prysmaticlabs/prysm/v4/cmd/beacon-chain/sync/backfill"
backfill "github.com/prysmaticlabs/prysm/v4/cmd/beacon-chain/sync/backfill"
bflags "github.com/prysmaticlabs/prysm/v4/cmd/beacon-chain/sync/backfill/flags"
"github.com/prysmaticlabs/prysm/v4/cmd/beacon-chain/sync/checkpoint"
"github.com/prysmaticlabs/prysm/v4/cmd/beacon-chain/sync/genesis"
"github.com/prysmaticlabs/prysm/v4/config/features"
@@ -139,9 +140,9 @@ var appFlags = []cli.Flag{
flags.JwtId,
storage.BlobStoragePathFlag,
storage.BlobRetentionEpochFlag,
backfill.EnableExperimentalBackfill,
backfill.BackfillBatchSize,
backfill.BackfillWorkerCount,
bflags.EnableExperimentalBackfill,
bflags.BackfillBatchSize,
bflags.BackfillWorkerCount,
}
func init() {

View File

@@ -8,6 +8,7 @@ go_library(
deps = [
"//beacon-chain/node:go_default_library",
"//beacon-chain/sync/backfill:go_default_library",
"//cmd/beacon-chain/sync/backfill/flags:go_default_library",
"@com_github_urfave_cli_v2//:go_default_library",
],
)

View File

@@ -0,0 +1,9 @@
load("@prysm//tools/go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = ["flags.go"],
importpath = "github.com/prysmaticlabs/prysm/v4/cmd/beacon-chain/sync/backfill/flags",
visibility = ["//visibility:public"],
deps = ["@com_github_urfave_cli_v2//:go_default_library"],
)

View File

@@ -0,0 +1,38 @@
package flags
import (
"github.com/urfave/cli/v2"
)
var (
backfillBatchSizeName = "backfill-batch-size"
backfillWorkerCountName = "backfill-worker-count"
// EnableExperimentalBackfill enables backfill for checkpoint synced nodes.
// This flag will be removed onced backfill is enabled by default.
EnableExperimentalBackfill = &cli.BoolFlag{
Name: "enable-experimental-backfill",
Usage: "Backfill is still experimental at this time." +
"It will only be enabled if this flag is specified and the node was started using checkpoint sync.",
}
// BackfillBatchSize allows users to tune block backfill request sizes to maximize network utilization
// at the cost of higher memory.
BackfillBatchSize = &cli.Uint64Flag{
Name: backfillBatchSizeName,
Usage: "Number of blocks per backfill batch. " +
"A larger number will request more blocks at once from peers, but also consume more system memory to " +
"hold batches in memory during processing. This has a multiplicative effect with " + backfillWorkerCountName,
Value: 64,
}
// BackfillWorkerCount allows users to tune the number of concurrent backfill batches to download, to maximize
// network utilization at the cost of higher memory.
BackfillWorkerCount = &cli.IntFlag{
Name: backfillWorkerCountName,
Usage: "Number of concurrent backfill batch requests. " +
"A larger number will better utilize network resources, up to a system-dependent limit, but will also " +
"consume more system memory to hold batches in memory during processing. Multiply by backfill-batch-size and " +
"average block size (~2MB before deneb) to find the right number for your system. " +
"This has a multiplicatice effect with " + backfillBatchSizeName,
Value: 2,
}
)

View File

@@ -3,49 +3,18 @@ package backfill
import (
"github.com/prysmaticlabs/prysm/v4/beacon-chain/node"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/sync/backfill"
"github.com/prysmaticlabs/prysm/v4/cmd/beacon-chain/sync/backfill/flags"
"github.com/urfave/cli/v2"
)
var (
backfillBatchSizeName = "backfill-batch-size"
backfillWorkerCountName = "backfill-worker-count"
// EnableExperimentalBackfill enables backfill for checkpoint synced nodes.
// This flag will be removed onced backfill is enabled by default.
EnableExperimentalBackfill = &cli.BoolFlag{
Name: "enable-experimental-backfill",
Usage: "Backfill is still experimental at this time." +
"It will only be enabled if this flag is specified and the node was started using checkpoint sync.",
}
// BackfillBatchSize allows users to tune block backfill request sizes to maximize network utilization
// at the cost of higher memory.
BackfillBatchSize = &cli.Uint64Flag{
Name: backfillBatchSizeName,
Usage: "Number of blocks per backfill batch. " +
"A larger number will request more blocks at once from peers, but also consume more system memory to " +
"hold batches in memory during processing. This has a multiplicative effect with " + backfillWorkerCountName,
Value: 64,
}
// BackfillWorkerCount allows users to tune the number of concurrent backfill batches to download, to maximize
// network utilization at the cost of higher memory.
BackfillWorkerCount = &cli.IntFlag{
Name: backfillWorkerCountName,
Usage: "Number of concurrent backfill batch requests. " +
"A larger number will better utilize network resources, up to a system-dependent limit, but will also " +
"consume more system memory to hold batches in memory during processing. Multiply by backfill-batch-size and " +
"average block size (~2MB before deneb) to find the right number for your system. " +
"This has a multiplicatice effect with " + backfillBatchSizeName,
Value: 2,
}
)
// BeaconNodeOptions sets the appropriate functional opts on the *node.BeaconNode value, to decouple options
// from flag parsing.
func BeaconNodeOptions(c *cli.Context) ([]node.Option, error) {
opt := func(node *node.BeaconNode) (err error) {
node.BackfillOpts = []backfill.ServiceOption{
backfill.WithBatchSize(c.Uint64(BackfillBatchSize.Name)),
backfill.WithWorkerCount(c.Int(BackfillWorkerCount.Name)),
backfill.WithEnableBackfill(c.Bool(EnableExperimentalBackfill.Name)),
backfill.WithBatchSize(c.Uint64(flags.BackfillBatchSize.Name)),
backfill.WithWorkerCount(c.Int(flags.BackfillWorkerCount.Name)),
backfill.WithEnableBackfill(c.Bool(flags.EnableExperimentalBackfill.Name)),
}
return nil
}

View File

@@ -8,7 +8,7 @@ import (
"github.com/prysmaticlabs/prysm/v4/cmd"
"github.com/prysmaticlabs/prysm/v4/cmd/beacon-chain/flags"
"github.com/prysmaticlabs/prysm/v4/cmd/beacon-chain/storage"
"github.com/prysmaticlabs/prysm/v4/cmd/beacon-chain/sync/backfill"
backfill "github.com/prysmaticlabs/prysm/v4/cmd/beacon-chain/sync/backfill/flags"
"github.com/prysmaticlabs/prysm/v4/cmd/beacon-chain/sync/checkpoint"
"github.com/prysmaticlabs/prysm/v4/cmd/beacon-chain/sync/genesis"
"github.com/prysmaticlabs/prysm/v4/config/features"

View File

@@ -12,6 +12,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//cmd:go_default_library",
"//cmd/beacon-chain/sync/backfill/flags:go_default_library",
"//config/params:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@com_github_urfave_cli_v2//:go_default_library",

View File

@@ -223,9 +223,10 @@ func ConfigureBeaconChain(ctx *cli.Context) error {
logEnabled(enableFullSSZDataLogging)
cfg.EnableFullSSZDataLogging = true
}
if ctx.IsSet(enableVerboseSigVerification.Name) {
logEnabled(enableVerboseSigVerification)
cfg.EnableVerboseSigVerification = true
cfg.EnableVerboseSigVerification = true
if ctx.IsSet(disableVerboseSigVerification.Name) {
logEnabled(disableVerboseSigVerification)
cfg.EnableVerboseSigVerification = false
}
if ctx.IsSet(prepareAllPayloads.Name) {
logEnabled(prepareAllPayloads)
@@ -235,9 +236,10 @@ func ConfigureBeaconChain(ctx *cli.Context) error {
logEnabled(disableResourceManager)
cfg.DisableResourceManager = true
}
if ctx.IsSet(EnableEIP4881.Name) {
logEnabled(EnableEIP4881)
cfg.EnableEIP4881 = true
cfg.EnableEIP4881 = true
if ctx.IsSet(DisableEIP4881.Name) {
logEnabled(DisableEIP4881)
cfg.EnableEIP4881 = false
}
if ctx.IsSet(EnableLightClient.Name) {
logEnabled(EnableLightClient)

View File

@@ -68,6 +68,16 @@ var (
Usage: deprecatedUsage,
Hidden: true,
}
deprecatedEnableEIP4881 = &cli.BoolFlag{
Name: "enable-eip-4881",
Usage: deprecatedUsage,
Hidden: true,
}
deprecatedVerboseSigVerification = &cli.BoolFlag{
Name: "enable-verbose-sig-verification",
Usage: deprecatedUsage,
Hidden: true,
}
)
// Deprecated flags for both the beacon node and validator client.
@@ -84,6 +94,8 @@ var deprecatedFlags = []cli.Flag{
deprecatedDisableReorgLateBlocks,
deprecatedDisableOptionalEngineMethods,
deprecatedDisableAggregateParallel,
deprecatedEnableEIP4881,
deprecatedVerboseSigVerification,
}
// deprecatedBeaconFlags contains flags that are still used by other components

View File

@@ -3,6 +3,7 @@ package features
import (
"time"
backfill "github.com/prysmaticlabs/prysm/v4/cmd/beacon-chain/sync/backfill/flags"
"github.com/urfave/cli/v2"
)
@@ -128,17 +129,17 @@ var (
Name: "enable-beacon-rest-api",
Usage: "(Experimental): Enables of the beacon REST API when querying a beacon node.",
}
enableVerboseSigVerification = &cli.BoolFlag{
Name: "enable-verbose-sig-verification",
Usage: "Enables identifying invalid signatures if batch verification fails when processing block.",
disableVerboseSigVerification = &cli.BoolFlag{
Name: "disable-verbose-sig-verification",
Usage: "Disables identifying invalid signatures if batch verification fails when processing block.",
}
prepareAllPayloads = &cli.BoolFlag{
Name: "prepare-all-payloads",
Usage: "Informs the engine to prepare all local payloads. Useful for relayers and builders.",
}
EnableEIP4881 = &cli.BoolFlag{
Name: "enable-eip-4881",
Usage: "Enables the deposit tree specified in EIP-4881.",
DisableEIP4881 = &cli.BoolFlag{
Name: "disable-eip-4881",
Usage: "Disables the deposit tree specified in EIP-4881.",
}
EnableLightClient = &cli.BoolFlag{
Name: "enable-lightclient",
@@ -158,9 +159,8 @@ var (
// devModeFlags holds list of flags that are set when development mode is on.
var devModeFlags = []cli.Flag{
enableVerboseSigVerification,
EnableEIP4881,
enableExperimentalState,
backfill.EnableExperimentalBackfill,
}
// ValidatorFlags contains a list of all the feature flags that apply to the validator client.
@@ -200,12 +200,12 @@ var BeaconChainFlags = append(deprecatedBeaconFlags, append(deprecatedFlags, []c
SaveFullExecutionPayloads,
enableStartupOptimistic,
enableFullSSZDataLogging,
enableVerboseSigVerification,
disableVerboseSigVerification,
prepareAllPayloads,
aggregateFirstInterval,
aggregateSecondInterval,
aggregateThirdInterval,
EnableEIP4881,
DisableEIP4881,
disableResourceManager,
DisableRegistrationCache,
EnableLightClient,

View File

@@ -3561,8 +3561,8 @@ def prysm_deps():
go_repository(
name = "com_github_prysmaticlabs_gohashtree",
importpath = "github.com/prysmaticlabs/gohashtree",
sum = "h1:1EVinCWdb3Lorq7xn8DYQHf48nCcdAM3Vb18KsFlRWY=",
version = "v0.0.3-alpha",
sum = "h1:H/EbCuXPeTV3lpKeXGPpEV9gsUpkqOOVnWapUyeWro4=",
version = "v0.0.4-beta",
)
go_repository(

2
go.mod
View File

@@ -257,7 +257,7 @@ require (
github.com/go-ole/go-ole v1.2.6 // indirect
github.com/go-playground/validator/v10 v10.13.0
github.com/peterh/liner v1.2.0 // indirect
github.com/prysmaticlabs/gohashtree v0.0.3-alpha
github.com/prysmaticlabs/gohashtree v0.0.4-beta
golang.org/x/sys v0.15.0 // indirect
google.golang.org/api v0.44.0 // indirect
google.golang.org/appengine v1.6.7 // indirect

4
go.sum
View File

@@ -1105,8 +1105,8 @@ github.com/prysmaticlabs/fastssz v0.0.0-20221107182844-78142813af44/go.mod h1:MA
github.com/prysmaticlabs/go-bitfield v0.0.0-20210108222456-8e92c3709aa0/go.mod h1:hCwmef+4qXWjv0jLDbQdWnL0Ol7cS7/lCSS26WR+u6s=
github.com/prysmaticlabs/go-bitfield v0.0.0-20210809151128-385d8c5e3fb7 h1:0tVE4tdWQK9ZpYygoV7+vS6QkDvQVySboMVEIxBJmXw=
github.com/prysmaticlabs/go-bitfield v0.0.0-20210809151128-385d8c5e3fb7/go.mod h1:wmuf/mdK4VMD+jA9ThwcUKjg3a2XWM9cVfFYjDyY4j4=
github.com/prysmaticlabs/gohashtree v0.0.3-alpha h1:1EVinCWdb3Lorq7xn8DYQHf48nCcdAM3Vb18KsFlRWY=
github.com/prysmaticlabs/gohashtree v0.0.3-alpha/go.mod h1:4pWaT30XoEx1j8KNJf3TV+E3mQkaufn7mf+jRNb/Fuk=
github.com/prysmaticlabs/gohashtree v0.0.4-beta h1:H/EbCuXPeTV3lpKeXGPpEV9gsUpkqOOVnWapUyeWro4=
github.com/prysmaticlabs/gohashtree v0.0.4-beta/go.mod h1:BFdtALS+Ffhg3lGQIHv9HDWuHS8cTvHZzrHWxwOtGOs=
github.com/prysmaticlabs/grpc-gateway/v2 v2.3.1-0.20230315201114-09284ba20446 h1:4wctORg/1TkgLgXejv9yOSAm3cDBJxoTzl/RNuZmX28=
github.com/prysmaticlabs/grpc-gateway/v2 v2.3.1-0.20230315201114-09284ba20446/go.mod h1:IOyTYjcIO0rkmnGBfJTL0NJ11exy/Tc2QEuv7hCXp24=
github.com/prysmaticlabs/prombbolt v0.0.0-20210126082820-9b7adba6db7c h1:9PHRCuO/VN0s9k+RmLykho7AjDxblNYI5bYKed16NPU=

View File

@@ -283,7 +283,7 @@ func (node *BeaconNode) Start(ctx context.Context) error {
// on our features or the beacon index is a multiplier of 2 (idea is to split nodes
// equally down the line with one group having feature flags and the other without
// feature flags; this is to allow A-B testing on new features)
if !config.TestFeature || index%2 == 0 {
if !config.TestFeature || index != 1 {
args = append(args, features.E2EBeaconChainFlags...)
}
if config.UseBuilder {

View File

@@ -42,10 +42,14 @@ func selectAccounts(selectionPrompt string, pubKeys [][fieldparams.BLSPubkeyLeng
exit := "Done selecting"
results := make([]int, 0)
au := aurora.NewAurora(true)
if len(pubKeyStrings) > 5 {
log.Warnf("there are more than %d potential public keys to exit, please consider using the --%s or --%s flags", 5, flags.VoluntaryExitPublicKeysFlag.Name, flags.ExitAllFlag.Name)
}
for result != exit {
p := promptui.Select{
Label: selectionPrompt,
HideSelected: true,
Size: len(pubKeyStrings),
Items: append([]string{exit, allAccountsText}, pubKeyStrings...),
Templates: templates,
}

View File

@@ -39,14 +39,20 @@ func (s *Server) JWTInterceptor() grpc.UnaryServerInterceptor {
func (s *Server) JwtHttpInterceptor(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// if it's not initialize or has a web prefix
if !strings.Contains(r.URL.Path, api.WebUrlPrefix+"initialize") && // ignore some routes
!strings.Contains(r.URL.Path, api.WebUrlPrefix+"health/logs") {
if strings.Contains(r.URL.Path, api.WebApiUrlPrefix) || strings.Contains(r.URL.Path, api.KeymanagerApiPrefix) {
// ignore some routes
reqToken := r.Header.Get("Authorization")
if reqToken == "" {
http.Error(w, "unauthorized: no Authorization header passed. Please use an Authorization header with the jwt created in the prysm wallet", http.StatusUnauthorized)
return
}
token := strings.Split(reqToken, "Bearer ")[1]
tokenParts := strings.Split(reqToken, "Bearer ")
if len(tokenParts) != 2 {
http.Error(w, "Invalid token format", http.StatusBadRequest)
return
}
token := tokenParts[1]
_, err := jwt.Parse(token, s.validateJWT)
if err != nil {
http.Error(w, fmt.Errorf("forbidden: could not parse JWT token: %v", err).Error(), http.StatusForbidden)

View File

@@ -107,6 +107,43 @@ func TestServer_JwtHttpInterceptor(t *testing.T) {
testHandler.ServeHTTP(rr, req)
require.Equal(t, http.StatusOK, rr.Code)
})
t.Run("wrong jwt format was sent", func(t *testing.T) {
rr := httptest.NewRecorder()
req, err := http.NewRequest(http.MethodGet, "/eth/v1/keystores", nil)
require.NoError(t, err)
token, err := createTokenString(jwtKey)
require.NoError(t, err)
req.Header.Set("Authorization", "Bearer"+token) // no space was added // Replace with a valid JWT token
testHandler.ServeHTTP(rr, req)
require.Equal(t, http.StatusBadRequest, rr.Code)
})
t.Run("wrong jwt no bearer format was sent", func(t *testing.T) {
rr := httptest.NewRecorder()
req, err := http.NewRequest(http.MethodGet, "/eth/v1/keystores", nil)
require.NoError(t, err)
token, err := createTokenString(jwtKey)
require.NoError(t, err)
req.Header.Set("Authorization", token) // Replace with a valid JWT token
testHandler.ServeHTTP(rr, req)
require.Equal(t, http.StatusBadRequest, rr.Code)
})
t.Run("broken jwt token format was sent", func(t *testing.T) {
rr := httptest.NewRecorder()
req, err := http.NewRequest(http.MethodGet, "/eth/v1/keystores", nil)
require.NoError(t, err)
token, err := createTokenString(jwtKey)
require.NoError(t, err)
req.Header.Set("Authorization", "Bearer "+token[0:2]+" "+token[2:]) // Replace with a valid JWT token
testHandler.ServeHTTP(rr, req)
require.Equal(t, http.StatusForbidden, rr.Code)
})
t.Run("web endpoint needs jwt token", func(t *testing.T) {
rr := httptest.NewRecorder()
req, err := http.NewRequest(http.MethodGet, "/api/v2/validator/beacon/status", nil)
require.NoError(t, err)
testHandler.ServeHTTP(rr, req)
require.Equal(t, http.StatusUnauthorized, rr.Code)
})
t.Run("initialize does not need jwt", func(t *testing.T) {
rr := httptest.NewRecorder()
req, err := http.NewRequest(http.MethodGet, api.WebUrlPrefix+"initialize", nil)