Compare commits

...

11 Commits

Author SHA1 Message Date
nisdas
79657b158c Merge branch 'develop' of https://github.com/prysmaticlabs/geth-sharding into hackSync 2025-03-05 13:33:53 +08:00
Potuz
a7b016c954 Add Target root to forkchoice dump (#15009)
* Add Target root to forkchoice dump

* review
2025-03-05 04:49:44 +00:00
Manu NALEPA
6015493de9 --validators-registration-batch-size: Change default value from 0 to 200. (#14981) 2025-03-04 16:32:24 +00:00
Dhruv Bodani
c718bdbe2b fix pruner timing issue with batch pruning (#14929)
* fix pruner timing issue with batch pruning

* add changelog entry

* add batchSize and number of slots deleted to debug logs

* fix lint

* prune in small batches in one prune cycle

* remove noisy logs

* fix number of batches to prune and return early if there's nothing to delete

* use context with timeout

* fix lint by ignoring nil err return
2025-03-04 16:31:17 +00:00
Preston Van Loon
0a8f947169 beacon-chain: Reorganize flags in help text (#14959)
* Beacon flags: Comment on deprecated section

* Beacon flags: Organize flags relevant to logging, comment on logging section

* Beacon flags: Organize flags relevant to p2p, comment on p2p section

* Beacon flags: Introduce db flag section, organize flags relevant to db, comment on db section

* Beacon flags: Introduce builder flag section, organize flags relevant to builder, comment on builder section

* Beacon flags: Introduce sync flag section, organize flags relevant to sync, comment on sync section

* Beacon flags: Introduce execution layer flag section, organize flags relevant to execution layer, comment on execution layer section

* Beacon flags: Introduce monitoring flag section, organize flags relevant to monitoring, comment on monitoring section

* Beacon flags: Organizing remaining flags in cmd and beacon-chain sections

* Beacon flags: Introduce slasher flag section, organize flags relevant to slasher, comment on slasher section

* Move slasher flag from features to the slasher section

* Changelog fragment

* Beacon flags: Reorganize sections

* Move MaxGoroutines to debug section
2025-03-04 16:17:29 +00:00
james-prysm
d7efccf6a5 single attestation cleanup (#14984)
* some cleanup and minor bug fix

* adding some comments back in

* Update beacon-chain/sync/pending_attestations_queue.go

Co-authored-by: Radosław Kapka <rkapka@wp.pl>

* Update beacon-chain/sync/pending_attestations_queue.go

Co-authored-by: Radosław Kapka <rkapka@wp.pl>

* Update beacon-chain/sync/validate_beacon_attestation.go

Co-authored-by: Radosław Kapka <rkapka@wp.pl>

* Update beacon-chain/sync/validate_beacon_attestation.go

Co-authored-by: Radosław Kapka <rkapka@wp.pl>

* Update beacon-chain/sync/validate_beacon_attestation.go

Co-authored-by: Radosław Kapka <rkapka@wp.pl>

* Update beacon-chain/sync/validate_beacon_attestation.go

Co-authored-by: Radosław Kapka <rkapka@wp.pl>

* Update beacon-chain/sync/validate_beacon_attestation.go

Co-authored-by: Radosław Kapka <rkapka@wp.pl>

* Update beacon-chain/sync/pending_attestations_queue.go

Co-authored-by: Radosław Kapka <rkapka@wp.pl>

* adding comment back in

* linting

* fixing committeeIndiciesSLot

* fixing changelog

---------

Co-authored-by: Radosław Kapka <rkapka@wp.pl>
2025-03-04 14:50:59 +00:00
Nishant Das
334920bc9e Fix Block Decoding During State Deletion (#15008)
* Fix Block Decoding

* Changelog
2025-03-04 11:55:32 +00:00
james-prysm
6e00db433c cache uses wrong seen committee index for electra (#14998)
* adding fix and unit tests

* removing unneeded test for now, will handle in separate PR
2025-03-03 16:53:42 +00:00
james-prysm
c6344e7c3e fixing electra committee logs (#14992)
* fixing logs

* fixing tests

* addressing feedback

* fixing tests based on feedback
2025-03-03 15:35:45 +00:00
nisdas
2c6e028600 Add a Hack 2025-03-03 08:52:20 +08:00
Potuz
2131254722 Populate pubkey cache at genesis (#14995)
When starting with a zero root finalized checkpoint instead of returning
early also populate the pubkey cache.
2025-02-28 11:12:52 +00:00
42 changed files with 819 additions and 452 deletions

View File

@@ -54,4 +54,5 @@ type ForkChoiceNodeExtraData struct {
Balance string `json:"balance"`
ExecutionOptimistic bool `json:"execution_optimistic"`
TimeStamp string `json:"timestamp"`
Target string `json:"target"`
}

View File

@@ -213,3 +213,10 @@ func WithSyncChecker(checker Checker) Option {
return nil
}
}
func WithSlasherEnabled(enabled bool) Option {
return func(s *Service) error {
s.slasherEnabled = enabled
return nil
}
}

View File

@@ -16,7 +16,6 @@ import (
"github.com/prysmaticlabs/prysm/v5/beacon-chain/das"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/slasher/types"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/state"
"github.com/prysmaticlabs/prysm/v5/config/features"
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v5/consensus-types/interfaces"
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
@@ -126,7 +125,7 @@ func (s *Service) ReceiveBlock(ctx context.Context, block interfaces.ReadOnlySig
return err
}
// If slasher is configured, forward the attestations in the block via an event feed for processing.
if features.Get().EnableSlasher {
if s.slasherEnabled {
go s.sendBlockAttestationsToSlasher(blockCopy, preState)
}

View File

@@ -63,6 +63,7 @@ type Service struct {
blobNotifiers *blobNotifierMap
blockBeingSynced *currentlySyncingBlock
blobStorage *filesystem.BlobStorage
slasherEnabled bool
}
// config options for the service.

View File

@@ -101,7 +101,7 @@ type NoHeadAccessDatabase interface {
SaveLightClientBootstrap(ctx context.Context, blockRoot []byte, bootstrap interfaces.LightClientBootstrap) error
CleanUpDirtyStates(ctx context.Context, slotsPerArchivedPoint primitives.Slot) error
DeleteHistoricalDataBeforeSlot(ctx context.Context, slot primitives.Slot) error
DeleteHistoricalDataBeforeSlot(ctx context.Context, slot primitives.Slot, batchSize int) (int, error)
}
// HeadAccessDatabase defines a struct with access to reading chain head data.

View File

@@ -229,7 +229,8 @@ func (s *Store) DeleteBlock(ctx context.Context, root [32]byte) error {
defer span.End()
if err := s.DeleteState(ctx, root); err != nil {
return err
// TODO: Find out why invalid states are in the db
log.WithError(err).Error("Could not delete state")
}
if err := s.deleteStateSummary(root); err != nil {
@@ -260,77 +261,82 @@ func (s *Store) DeleteBlock(ctx context.Context, root [32]byte) error {
// - blockRootValidatorHashesBucket
// - blockSlotIndicesBucket
// - stateSlotIndicesBucket
func (s *Store) DeleteHistoricalDataBeforeSlot(ctx context.Context, cutoffSlot primitives.Slot) error {
func (s *Store) DeleteHistoricalDataBeforeSlot(ctx context.Context, cutoffSlot primitives.Slot, batchSize int) (int, error) {
ctx, span := trace.StartSpan(ctx, "BeaconDB.DeleteHistoricalDataBeforeSlot")
defer span.End()
// Collect slot/root pairs to perform deletions in a separate read only transaction.
var (
roots [][]byte
slts []primitives.Slot
)
err := s.db.View(func(tx *bolt.Tx) error {
var err error
roots, slts, err = blockRootsBySlotRange(ctx, tx.Bucket(blockSlotIndicesBucket), primitives.Slot(0), cutoffSlot, nil, nil, nil)
if err != nil {
return errors.Wrap(err, "could not retrieve block roots")
}
return nil
})
slotRoots, err := s.slotRootsInRange(ctx, primitives.Slot(0), cutoffSlot, batchSize)
if err != nil {
return errors.Wrap(err, "could not retrieve block roots and slots")
return 0, err
}
// Return early if there's nothing to delete.
if len(slotRoots) == 0 {
return 0, nil
}
// Perform all deletions in a single transaction for atomicity
return s.db.Update(func(tx *bolt.Tx) error {
for _, root := range roots {
var numSlotsDeleted int
err = s.db.Update(func(tx *bolt.Tx) error {
for _, sr := range slotRoots {
// Return if context is cancelled or deadline is exceeded.
if ctx.Err() != nil {
//nolint:nilerr
return nil
}
// Delete block
if err = s.deleteBlock(tx, root); err != nil {
if err = s.deleteBlock(tx, sr.root[:]); err != nil {
return err
}
// Delete finalized block roots index
if err = tx.Bucket(finalizedBlockRootsIndexBucket).Delete(root); err != nil {
if err = tx.Bucket(finalizedBlockRootsIndexBucket).Delete(sr.root[:]); err != nil {
return errors.Wrap(err, "could not delete finalized block root index")
}
// Delete state
if err = tx.Bucket(stateBucket).Delete(root); err != nil {
if err = tx.Bucket(stateBucket).Delete(sr.root[:]); err != nil {
return errors.Wrap(err, "could not delete state")
}
// Delete state summary
if err = tx.Bucket(stateSummaryBucket).Delete(root); err != nil {
if err = tx.Bucket(stateSummaryBucket).Delete(sr.root[:]); err != nil {
return errors.Wrap(err, "could not delete state summary")
}
// Delete validator entries
if err = s.deleteValidatorHashes(tx, root); err != nil {
if err = s.deleteValidatorHashes(tx, sr.root[:]); err != nil {
return errors.Wrap(err, "could not delete validators")
}
numSlotsDeleted++
}
for _, slot := range slts {
for _, sr := range slotRoots {
// Delete slot indices
if err = tx.Bucket(blockSlotIndicesBucket).Delete(bytesutil.SlotToBytesBigEndian(slot)); err != nil {
if err = tx.Bucket(blockSlotIndicesBucket).Delete(bytesutil.SlotToBytesBigEndian(sr.slot)); err != nil {
return errors.Wrap(err, "could not delete block slot index")
}
if err = tx.Bucket(stateSlotIndicesBucket).Delete(bytesutil.SlotToBytesBigEndian(slot)); err != nil {
if err = tx.Bucket(stateSlotIndicesBucket).Delete(bytesutil.SlotToBytesBigEndian(sr.slot)); err != nil {
return errors.Wrap(err, "could not delete state slot index")
}
}
// Delete all caches after we have deleted everything from buckets.
// This is done after the buckets are deleted to avoid any issues in case of transaction rollback.
for _, root := range roots {
for _, sr := range slotRoots {
// Delete block from cache
s.blockCache.Del(string(root))
s.blockCache.Del(string(sr.root[:]))
// Delete state summary from cache
s.stateSummaryCache.delete([32]byte(root))
s.stateSummaryCache.delete(sr.root)
}
return nil
})
return numSlotsDeleted, err
}
// SaveBlock to the db.
@@ -351,7 +357,7 @@ func (s *Store) SaveBlock(ctx context.Context, signed interfaces.ReadOnlySignedB
// if a `saveBlindedBeaconBlocks` key exists in the database. Otherwise, we check if the last
// blocked stored to check if it is blinded, and then write that `saveBlindedBeaconBlocks` key
// to the DB for future checks.
func (s *Store) shouldSaveBlinded(ctx context.Context) (bool, error) {
func (s *Store) shouldSaveBlinded() (bool, error) {
var saveBlinded bool
if err := s.db.View(func(tx *bolt.Tx) error {
metadataBkt := tx.Bucket(chainMetadataBucket)
@@ -413,7 +419,7 @@ func prepareBlockBatch(blks []blocks.ROBlock, shouldBlind bool) ([]blockBatchEnt
}
func (s *Store) SaveROBlocks(ctx context.Context, blks []blocks.ROBlock, cache bool) error {
shouldBlind, err := s.shouldSaveBlinded(ctx)
shouldBlind, err := s.shouldSaveBlinded()
if err != nil {
return err
}
@@ -684,6 +690,49 @@ func (s *Store) SaveRegistrationsByValidatorIDs(ctx context.Context, ids []primi
})
}
type slotRoot struct {
slot primitives.Slot
root [32]byte
}
// slotRootsInRange returns slot and block root pairs of length min(batchSize, end-slot)
func (s *Store) slotRootsInRange(ctx context.Context, start, end primitives.Slot, batchSize int) ([]slotRoot, error) {
_, span := trace.StartSpan(ctx, "BeaconDB.slotRootsInRange")
defer span.End()
if end < start {
return nil, errInvalidSlotRange
}
var pairs []slotRoot
key := bytesutil.SlotToBytesBigEndian(end)
err := s.db.View(func(tx *bolt.Tx) error {
bkt := tx.Bucket(blockSlotIndicesBucket)
c := bkt.Cursor()
for k, v := c.Seek(key); k != nil; k, v = c.Prev() {
slot := bytesutil.BytesToSlotBigEndian(k)
if slot > end {
continue // Seek will seek to the next key *after* the given one if not present
}
if slot < start {
return nil
}
roots, err := splitRoots(v)
if err != nil {
return errors.Wrapf(err, "corrupt value %v in block slot index for slot=%d", v, slot)
}
for _, r := range roots {
pairs = append(pairs, slotRoot{slot: slot, root: r})
}
if len(pairs) >= batchSize {
return nil // allows code to easily cap the number of items pruned
}
}
return nil
})
return pairs, err
}
// blockRootsByFilter retrieves the block roots given the filter criteria.
func blockRootsByFilter(ctx context.Context, tx *bolt.Tx, f *filters.QueryFilter) ([][]byte, error) {
ctx, span := trace.StartSpan(ctx, "BeaconDB.blockRootsByFilter")
@@ -704,7 +753,7 @@ func blockRootsByFilter(ctx context.Context, tx *bolt.Tx, f *filters.QueryFilter
// We retrieve block roots that match a filter criteria of slot ranges, if specified.
filtersMap := f.Filters()
rootsBySlotRange, _, err := blockRootsBySlotRange(
rootsBySlotRange, err := blockRootsBySlotRange(
ctx,
tx.Bucket(blockSlotIndicesBucket),
filtersMap[filters.StartSlot],
@@ -749,13 +798,13 @@ func blockRootsBySlotRange(
ctx context.Context,
bkt *bolt.Bucket,
startSlotEncoded, endSlotEncoded, startEpochEncoded, endEpochEncoded, slotStepEncoded interface{},
) ([][]byte, []primitives.Slot, error) {
) ([][]byte, error) {
_, span := trace.StartSpan(ctx, "BeaconDB.blockRootsBySlotRange")
defer span.End()
// Return nothing when all slot parameters are missing
if startSlotEncoded == nil && endSlotEncoded == nil && startEpochEncoded == nil && endEpochEncoded == nil {
return [][]byte{}, nil, nil
return [][]byte{}, nil
}
var startSlot, endSlot primitives.Slot
@@ -776,11 +825,11 @@ func blockRootsBySlotRange(
if startEpochOk && endEpochOk {
startSlot, err = slots.EpochStart(startEpoch)
if err != nil {
return nil, nil, err
return nil, err
}
endSlot, err = slots.EpochStart(endEpoch)
if err != nil {
return nil, nil, err
return nil, err
}
endSlot = endSlot + params.BeaconConfig().SlotsPerEpoch - 1
}
@@ -791,11 +840,10 @@ func blockRootsBySlotRange(
return key != nil && bytes.Compare(key, max) <= 0
}
if endSlot < startSlot {
return nil, nil, errInvalidSlotRange
return nil, errInvalidSlotRange
}
rootsRange := endSlot.SubSlot(startSlot).Div(step)
roots := make([][]byte, 0, rootsRange)
var slts []primitives.Slot
c := bkt.Cursor()
for k, v := c.Seek(min); conditional(k, max); k, v = c.Next() {
slot := bytesutil.BytesToSlotBigEndian(k)
@@ -810,9 +858,8 @@ func blockRootsBySlotRange(
splitRoots = append(splitRoots, v[i:i+32])
}
roots = append(roots, splitRoots...)
slts = append(slts, slot)
}
return roots, slts, nil
return roots, nil
}
// blockRootsBySlot retrieves the block roots by slot

View File

@@ -359,184 +359,221 @@ func TestStore_DeleteFinalizedBlock(t *testing.T) {
func TestStore_HistoricalDataBeforeSlot(t *testing.T) {
slotsPerEpoch := uint64(params.BeaconConfig().SlotsPerEpoch)
db := setupDB(t)
ctx := context.Background()
// Save genesis block root
require.NoError(t, db.SaveGenesisBlockRoot(ctx, genesisBlockRoot))
tests := []struct {
name string
batchSize int
numOfEpochs uint64
deleteBeforeSlot uint64
}{
{
name: "batchSize less than delete range",
batchSize: 10,
numOfEpochs: 4,
deleteBeforeSlot: 25,
},
{
name: "batchSize greater than delete range",
batchSize: 30,
numOfEpochs: 4,
deleteBeforeSlot: 15,
},
}
// Create and save blocks for 4 epochs
blks := makeBlocks(t, 0, slotsPerEpoch*4, genesisBlockRoot)
require.NoError(t, db.SaveBlocks(ctx, blks))
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
db := setupDB(t)
// Save genesis block root
require.NoError(t, db.SaveGenesisBlockRoot(ctx, genesisBlockRoot))
// Mark state validator migration as complete
err := db.db.Update(func(tx *bolt.Tx) error {
return tx.Bucket(migrationsBucket).Put(migrationStateValidatorsKey, migrationCompleted)
})
require.NoError(t, err)
// Create and save blocks for given epochs
blks := makeBlocks(t, 0, slotsPerEpoch*tt.numOfEpochs, genesisBlockRoot)
require.NoError(t, db.SaveBlocks(ctx, blks))
migrated, err := db.isStateValidatorMigrationOver()
require.NoError(t, err)
require.Equal(t, true, migrated)
// Mark state validator migration as complete
err := db.db.Update(func(tx *bolt.Tx) error {
return tx.Bucket(migrationsBucket).Put(migrationStateValidatorsKey, migrationCompleted)
})
require.NoError(t, err)
// Create state summaries and states for each block
ss := make([]*ethpb.StateSummary, len(blks))
states := make([]state.BeaconState, len(blks))
migrated, err := db.isStateValidatorMigrationOver()
require.NoError(t, err)
require.Equal(t, true, migrated)
for i, blk := range blks {
slot := blk.Block().Slot()
r, err := blk.Block().HashTreeRoot()
require.NoError(t, err)
// Create state summaries and states for each block
ss := make([]*ethpb.StateSummary, len(blks))
states := make([]state.BeaconState, len(blks))
// Create and save state summary
ss[i] = &ethpb.StateSummary{
Slot: slot,
Root: r[:],
}
for i, blk := range blks {
slot := blk.Block().Slot()
r, err := blk.Block().HashTreeRoot()
require.NoError(t, err)
// Create and save state with validator entries
vals := make([]*ethpb.Validator, 2)
for j := range vals {
vals[j] = &ethpb.Validator{
PublicKey: bytesutil.PadTo([]byte{byte(i*j + 1)}, 48),
WithdrawalCredentials: bytesutil.PadTo([]byte{byte(i*j + 2)}, 32),
// Create and save state summary
ss[i] = &ethpb.StateSummary{
Slot: slot,
Root: r[:],
}
// Create and save state with validator entries
vals := make([]*ethpb.Validator, 2)
for j := range vals {
vals[j] = &ethpb.Validator{
PublicKey: bytesutil.PadTo([]byte{byte(i*j + 1)}, 48),
WithdrawalCredentials: bytesutil.PadTo([]byte{byte(i*j + 2)}, 32),
}
}
st, err := util.NewBeaconState(func(state *ethpb.BeaconState) error {
state.Validators = vals
state.Slot = slot
return nil
})
require.NoError(t, err)
require.NoError(t, db.SaveState(ctx, st, r))
states[i] = st
// Verify validator entries are saved to db
valsActual, err := db.validatorEntries(ctx, r)
require.NoError(t, err)
for j, val := range valsActual {
require.DeepEqual(t, vals[j], val)
}
}
}
require.NoError(t, db.SaveStateSummaries(ctx, ss))
st, err := util.NewBeaconState(func(state *ethpb.BeaconState) error {
state.Validators = vals
state.Slot = slot
return nil
})
require.NoError(t, err)
require.NoError(t, db.SaveState(ctx, st, r))
states[i] = st
// Verify validator entries are saved to db
valsActual, err := db.validatorEntries(ctx, r)
require.NoError(t, err)
for j, val := range valsActual {
require.DeepEqual(t, vals[j], val)
}
}
require.NoError(t, db.SaveStateSummaries(ctx, ss))
// Verify slot indices exist before deletion
err = db.db.View(func(tx *bolt.Tx) error {
blockSlotBkt := tx.Bucket(blockSlotIndicesBucket)
stateSlotBkt := tx.Bucket(stateSlotIndicesBucket)
for i := uint64(0); i < slotsPerEpoch; i++ {
slot := bytesutil.SlotToBytesBigEndian(primitives.Slot(i + 1))
assert.NotNil(t, blockSlotBkt.Get(slot), "Expected block slot index to exist")
assert.NotNil(t, stateSlotBkt.Get(slot), "Expected state slot index to exist", i)
}
return nil
})
require.NoError(t, err)
// Delete data before slot at epoch 1
require.NoError(t, db.DeleteHistoricalDataBeforeSlot(ctx, primitives.Slot(slotsPerEpoch)))
// Verify blocks from epoch 0 are deleted
for i := uint64(0); i < slotsPerEpoch; i++ {
root, err := blks[i].Block().HashTreeRoot()
require.NoError(t, err)
// Check block is deleted
retrievedBlocks, err := db.BlocksBySlot(ctx, primitives.Slot(i))
require.NoError(t, err)
assert.Equal(t, 0, len(retrievedBlocks))
// Verify block does not exist
assert.Equal(t, false, db.HasBlock(ctx, root))
// Verify block parent root does not exist
err = db.db.View(func(tx *bolt.Tx) error {
require.Equal(t, 0, len(tx.Bucket(blockParentRootIndicesBucket).Get(root[:])))
return nil
})
require.NoError(t, err)
// Verify state is deleted
hasState := db.HasState(ctx, root)
assert.Equal(t, false, hasState)
// Verify state summary is deleted
hasSummary := db.HasStateSummary(ctx, root)
assert.Equal(t, false, hasSummary)
// Verify validator hashes for block roots are deleted
err = db.db.View(func(tx *bolt.Tx) error {
assert.Equal(t, 0, len(tx.Bucket(blockRootValidatorHashesBucket).Get(root[:])))
return nil
})
require.NoError(t, err)
}
// Verify slot indices are deleted
err = db.db.View(func(tx *bolt.Tx) error {
blockSlotBkt := tx.Bucket(blockSlotIndicesBucket)
stateSlotBkt := tx.Bucket(stateSlotIndicesBucket)
for i := uint64(0); i < slotsPerEpoch; i++ {
slot := bytesutil.SlotToBytesBigEndian(primitives.Slot(i + 1))
assert.Equal(t, 0, len(blockSlotBkt.Get(slot)), fmt.Sprintf("Expected block slot index to be deleted, slot: %d", slot))
assert.Equal(t, 0, len(stateSlotBkt.Get(slot)), fmt.Sprintf("Expected state slot index to be deleted, slot: %d", slot))
}
return nil
})
require.NoError(t, err)
// Verify blocks from epochs 1-3 still exist
for i := slotsPerEpoch; i < slotsPerEpoch*4; i++ {
root, err := blks[i].Block().HashTreeRoot()
require.NoError(t, err)
// Verify block exists
assert.Equal(t, true, db.HasBlock(ctx, root))
// Verify remaining block parent root exists, except last slot since we store parent roots of each block.
if i < slotsPerEpoch*4-1 {
// Verify slot indices exist before deletion
err = db.db.View(func(tx *bolt.Tx) error {
require.NotNil(t, tx.Bucket(blockParentRootIndicesBucket).Get(root[:]), fmt.Sprintf("Expected block parent index to be deleted, slot: %d", i))
blockSlotBkt := tx.Bucket(blockSlotIndicesBucket)
stateSlotBkt := tx.Bucket(stateSlotIndicesBucket)
for i := uint64(0); i < uint64(tt.deleteBeforeSlot); i++ {
slot := bytesutil.SlotToBytesBigEndian(primitives.Slot(i + 1))
assert.NotNil(t, blockSlotBkt.Get(slot), "Expected block slot index to exist")
assert.NotNil(t, stateSlotBkt.Get(slot), "Expected state slot index to exist", i)
}
return nil
})
require.NoError(t, err)
}
// Verify state exists
hasState := db.HasState(ctx, root)
assert.Equal(t, true, hasState)
// Delete data before slot
slotsDeleted, err := db.DeleteHistoricalDataBeforeSlot(ctx, primitives.Slot(tt.deleteBeforeSlot), tt.batchSize)
require.NoError(t, err)
// Verify state summary exists
hasSummary := db.HasStateSummary(ctx, root)
assert.Equal(t, true, hasSummary)
var startSlotDeleted, endSlotDeleted uint64
if tt.batchSize >= int(tt.deleteBeforeSlot) {
startSlotDeleted = 1
endSlotDeleted = tt.deleteBeforeSlot
} else {
startSlotDeleted = tt.deleteBeforeSlot - uint64(tt.batchSize) + 1
endSlotDeleted = tt.deleteBeforeSlot
}
// Verify slot indices still exist
err = db.db.View(func(tx *bolt.Tx) error {
blockSlotBkt := tx.Bucket(blockSlotIndicesBucket)
stateSlotBkt := tx.Bucket(stateSlotIndicesBucket)
require.Equal(t, endSlotDeleted-startSlotDeleted+1, uint64(slotsDeleted))
slot := bytesutil.SlotToBytesBigEndian(primitives.Slot(i + 1))
assert.NotNil(t, blockSlotBkt.Get(slot), "Expected block slot index to exist")
assert.NotNil(t, stateSlotBkt.Get(slot), "Expected state slot index to exist")
return nil
// Verify blocks before given slot/batch are deleted
for i := startSlotDeleted; i < endSlotDeleted; i++ {
root, err := blks[i].Block().HashTreeRoot()
require.NoError(t, err)
// Check block is deleted
retrievedBlocks, err := db.BlocksBySlot(ctx, primitives.Slot(i))
require.NoError(t, err)
assert.Equal(t, 0, len(retrievedBlocks), fmt.Sprintf("Expected %d blocks, got %d for slot %d", 0, len(retrievedBlocks), i))
// Verify block does not exist
assert.Equal(t, false, db.HasBlock(ctx, root), fmt.Sprintf("Expected block index to not exist for slot %d", i))
// Verify block parent root does not exist
err = db.db.View(func(tx *bolt.Tx) error {
require.Equal(t, 0, len(tx.Bucket(blockParentRootIndicesBucket).Get(root[:])))
return nil
})
require.NoError(t, err)
// Verify state is deleted
hasState := db.HasState(ctx, root)
assert.Equal(t, false, hasState)
// Verify state summary is deleted
hasSummary := db.HasStateSummary(ctx, root)
assert.Equal(t, false, hasSummary)
// Verify validator hashes for block roots are deleted
err = db.db.View(func(tx *bolt.Tx) error {
assert.Equal(t, 0, len(tx.Bucket(blockRootValidatorHashesBucket).Get(root[:])))
return nil
})
require.NoError(t, err)
}
// Verify slot indices are deleted
err = db.db.View(func(tx *bolt.Tx) error {
blockSlotBkt := tx.Bucket(blockSlotIndicesBucket)
stateSlotBkt := tx.Bucket(stateSlotIndicesBucket)
for i := startSlotDeleted; i < endSlotDeleted; i++ {
slot := bytesutil.SlotToBytesBigEndian(primitives.Slot(i + 1))
assert.Equal(t, 0, len(blockSlotBkt.Get(slot)), fmt.Sprintf("Expected block slot index to be deleted, slot: %d", slot))
assert.Equal(t, 0, len(stateSlotBkt.Get(slot)), fmt.Sprintf("Expected state slot index to be deleted, slot: %d", slot))
}
return nil
})
require.NoError(t, err)
// Verify blocks from expectedLastDeletedSlot till numEpochs still exist
for i := endSlotDeleted; i < slotsPerEpoch*tt.numOfEpochs; i++ {
root, err := blks[i].Block().HashTreeRoot()
require.NoError(t, err)
// Verify block exists
assert.Equal(t, true, db.HasBlock(ctx, root))
// Verify remaining block parent root exists, except last slot since we store parent roots of each block.
if i < slotsPerEpoch*tt.numOfEpochs-1 {
err = db.db.View(func(tx *bolt.Tx) error {
require.NotNil(t, tx.Bucket(blockParentRootIndicesBucket).Get(root[:]), fmt.Sprintf("Expected block parent index to be deleted, slot: %d", i))
return nil
})
require.NoError(t, err)
}
// Verify state exists
hasState := db.HasState(ctx, root)
assert.Equal(t, true, hasState)
// Verify state summary exists
hasSummary := db.HasStateSummary(ctx, root)
assert.Equal(t, true, hasSummary)
// Verify slot indices still exist
err = db.db.View(func(tx *bolt.Tx) error {
blockSlotBkt := tx.Bucket(blockSlotIndicesBucket)
stateSlotBkt := tx.Bucket(stateSlotIndicesBucket)
slot := bytesutil.SlotToBytesBigEndian(primitives.Slot(i + 1))
assert.NotNil(t, blockSlotBkt.Get(slot), "Expected block slot index to exist")
assert.NotNil(t, stateSlotBkt.Get(slot), "Expected state slot index to exist")
return nil
})
require.NoError(t, err)
// Verify validator entries still exist
valsActual, err := db.validatorEntries(ctx, root)
require.NoError(t, err)
assert.NotNil(t, valsActual)
// Verify remaining validator hashes for block roots exists
err = db.db.View(func(tx *bolt.Tx) error {
assert.NotNil(t, tx.Bucket(blockRootValidatorHashesBucket).Get(root[:]))
return nil
})
require.NoError(t, err)
}
})
require.NoError(t, err)
// Verify validator entries still exist
valsActual, err := db.validatorEntries(ctx, root)
require.NoError(t, err)
assert.NotNil(t, valsActual)
// Verify remaining validator hashes for block roots exists
err = db.db.View(func(tx *bolt.Tx) error {
assert.NotNil(t, tx.Bucket(blockRootValidatorHashesBucket).Get(root[:]))
return nil
})
require.NoError(t, err)
}
}
func TestStore_GenesisBlock(t *testing.T) {

View File

@@ -820,30 +820,25 @@ func (s *Store) slotByBlockRoot(ctx context.Context, tx *bolt.Tx, blockRoot []by
// no need to construct the validator entries as it is not used here.
s, err := s.unmarshalState(ctx, enc, nil)
if err != nil {
return 0, err
return 0, errors.Wrap(err, "could not unmarshal state")
}
if s == nil || s.IsNil() {
return 0, errors.New("state can't be nil")
}
return s.Slot(), nil
}
b := &ethpb.SignedBeaconBlock{}
err := decode(ctx, enc, b)
b, err := unmarshalBlock(ctx, enc)
if err != nil {
return 0, errors.Wrap(err, "could not unmarshal block")
}
if err := blocks.BeaconBlockIsNil(b); err != nil {
return 0, err
}
wsb, err := blocks.NewSignedBeaconBlock(b)
if err != nil {
return 0, err
}
if err := blocks.BeaconBlockIsNil(wsb); err != nil {
return 0, err
}
return b.Block.Slot, nil
return b.Block().Slot(), nil
}
stateSummary := &ethpb.StateSummary{}
if err := decode(ctx, enc, stateSummary); err != nil {
return 0, err
return 0, errors.Wrap(err, "could not unmarshal state summary")
}
return stateSummary.Slot, nil
}

View File

@@ -5,7 +5,6 @@ import (
"crypto/rand"
"encoding/binary"
mathRand "math/rand"
"strconv"
"testing"
"time"
@@ -1070,6 +1069,31 @@ func TestBellatrixState_CanDelete(t *testing.T) {
require.Equal(t, state.ReadOnlyBeaconState(nil), savedS, "Unsaved state should've been nil")
}
func TestBellatrixState_CanDeleteWithBlock(t *testing.T) {
db := setupDB(t)
b := util.NewBeaconBlockBellatrix()
b.Block.Slot = 100
r, err := b.Block.HashTreeRoot()
require.NoError(t, err)
wsb, err := blocks.NewSignedBeaconBlock(b)
require.NoError(t, err)
require.NoError(t, db.SaveBlock(context.Background(), wsb))
require.Equal(t, false, db.HasState(context.Background(), r))
st, _ := util.DeterministicGenesisStateBellatrix(t, 1)
require.NoError(t, st.SetSlot(100))
require.NoError(t, db.SaveState(context.Background(), st, r))
require.Equal(t, true, db.HasState(context.Background(), r))
require.NoError(t, db.DeleteState(context.Background(), r))
savedS, err := db.State(context.Background(), r)
require.NoError(t, err)
require.Equal(t, state.ReadOnlyBeaconState(nil), savedS, "Unsaved state should've been nil")
}
func TestDenebState_CanSaveRetrieve(t *testing.T) {
db := setupDB(t)

View File

@@ -16,6 +16,15 @@ import (
var log = logrus.WithField("prefix", "db-pruner")
const (
// defaultPrunableBatchSize is the number of slots that can be pruned at once.
defaultPrunableBatchSize = 32
// defaultPruningWindow is the duration of one pruning window.
defaultPruningWindow = time.Second * 3
// defaultNumBatchesToPrune is the number of batches to prune in one pruning window.
defaultNumBatchesToPrune = 15
)
type ServiceOption func(*Service)
// WithRetentionPeriod allows the user to specify a different data retention period than the spec default.
@@ -143,14 +152,17 @@ func (p *Service) prune(slot primitives.Slot) error {
}).Debug("Pruning chain data")
tt := time.Now()
if err := p.db.DeleteHistoricalDataBeforeSlot(p.ctx, pruneUpto); err != nil {
return errors.Wrapf(err, "could not delete upto slot %d", pruneUpto)
numBatches, err := p.pruneBatches(pruneUpto)
if err != nil {
return errors.Wrap(err, "failed to prune batches")
}
log.WithFields(logrus.Fields{
"prunedUpto": pruneUpto,
"duration": time.Since(tt),
"currentSlot": slot,
"batchSize": defaultPrunableBatchSize,
"numBatches": numBatches,
}).Debug("Successfully pruned chain data")
// Update pruning checkpoint.
@@ -159,6 +171,33 @@ func (p *Service) prune(slot primitives.Slot) error {
return nil
}
func (p *Service) pruneBatches(pruneUpto primitives.Slot) (int, error) {
ctx, cancel := context.WithTimeout(p.ctx, defaultPruningWindow)
defer cancel()
numBatches := 0
for {
select {
case <-ctx.Done():
return numBatches, nil
default:
for i := 0; i < defaultNumBatchesToPrune; i++ {
slotsDeleted, err := p.db.DeleteHistoricalDataBeforeSlot(ctx, pruneUpto, defaultPrunableBatchSize)
if err != nil {
return 0, errors.Wrapf(err, "could not delete upto slot %d", pruneUpto)
}
// Return if there's nothing to delete.
if slotsDeleted == 0 {
return numBatches, nil
}
numBatches++
}
}
}
}
// pruneStartSlotFunc returns the function to determine the start slot to start pruning.
func pruneStartSlotFunc(retentionEpochs primitives.Epoch) func(primitives.Slot) primitives.Slot {
return func(current primitives.Slot) primitives.Slot {

View File

@@ -156,6 +156,10 @@ func (n *Node) nodeTreeDump(ctx context.Context, nodes []*forkchoice2.Node) ([]*
if n.parent != nil {
parentRoot = n.parent.root
}
target := [32]byte{}
if n.target != nil {
target = n.target.root
}
thisNode := &forkchoice2.Node{
Slot: n.slot,
BlockRoot: n.root[:],
@@ -169,6 +173,7 @@ func (n *Node) nodeTreeDump(ctx context.Context, nodes []*forkchoice2.Node) ([]*
ExecutionOptimistic: n.optimistic,
ExecutionBlockHash: n.payloadHash[:],
Timestamp: n.timestamp,
Target: target[:],
}
if n.optimistic {
thisNode.Validity = forkchoice2.Optimistic

View File

@@ -122,6 +122,7 @@ type BeaconNode struct {
BlobStorageOptions []filesystem.BlobStorageOption
verifyInitWaiter *verification.InitializerWaiter
syncChecker *initialsync.SyncChecker
slasherEnabled bool
}
// New creates a new node instance, sets up configuration options, and registers
@@ -159,6 +160,7 @@ func New(cliCtx *cli.Context, cancel context.CancelFunc, opts ...Option) (*Beaco
serviceFlagOpts: &serviceFlagOpts{},
initialSyncComplete: make(chan struct{}),
syncChecker: &initialsync.SyncChecker{},
slasherEnabled: cliCtx.Bool(flags.SlasherFlag.Name),
}
for _, opt := range opts {
@@ -342,7 +344,7 @@ func registerServices(cliCtx *cli.Context, beacon *BeaconNode, synchronizer *sta
return errors.Wrap(err, "could not register slashing pool service")
}
log.Debugln("Registering Slasher Service")
log.WithField("enabled", beacon.slasherEnabled).Debugln("Registering Slasher Service")
if err := beacon.registerSlasherService(); err != nil {
return errors.Wrap(err, "could not register slasher service")
}
@@ -587,7 +589,7 @@ func (b *BeaconNode) startDB(cliCtx *cli.Context, depositAddress string) error {
}
func (b *BeaconNode) startSlasherDB(cliCtx *cli.Context) error {
if !features.Get().EnableSlasher {
if !b.slasherEnabled {
return nil
}
baseDir := cliCtx.String(cmd.DataDirFlag.Name)
@@ -775,6 +777,7 @@ func (b *BeaconNode) registerBlockchainService(fc forkchoice.ForkChoicer, gs *st
blockchain.WithTrackedValidatorsCache(b.trackedValidatorsCache),
blockchain.WithPayloadIDCache(b.payloadIDCache),
blockchain.WithSyncChecker(b.syncChecker),
blockchain.WithSlasherEnabled(b.slasherEnabled),
)
blockchainService, err := blockchain.NewService(b.ctx, opts...)
@@ -859,6 +862,7 @@ func (b *BeaconNode) registerSyncService(initialSyncComplete chan struct{}, bFil
regularsync.WithBlobStorage(b.BlobStorage),
regularsync.WithVerifierWaiter(b.verifyInitWaiter),
regularsync.WithAvailableBlocker(bFillStore),
regularsync.WithSlasherEnabled(b.slasherEnabled),
)
return b.services.RegisterService(rs)
}
@@ -887,7 +891,7 @@ func (b *BeaconNode) registerInitialSyncService(complete chan struct{}) error {
}
func (b *BeaconNode) registerSlasherService() error {
if !features.Get().EnableSlasher {
if !b.slasherEnabled {
return nil
}
var chainService *blockchain.Service
@@ -934,7 +938,7 @@ func (b *BeaconNode) registerRPCService(router *http.ServeMux) error {
}
var slasherService *slasher.Service
if features.Get().EnableSlasher {
if b.slasherEnabled {
if err := b.services.FetchService(&slasherService); err != nil {
return err
}

View File

@@ -190,6 +190,7 @@ func (s *Server) GetForkChoice(w http.ResponseWriter, r *http.Request) {
Balance: fmt.Sprintf("%d", n.Balance),
ExecutionOptimistic: n.ExecutionOptimistic,
TimeStamp: fmt.Sprintf("%d", n.Timestamp),
Target: fmt.Sprintf("%#x", n.Target),
},
}
}

View File

@@ -121,9 +121,10 @@ func (s *State) Resume(ctx context.Context, fState state.BeaconState) (state.Bea
return nil, err
}
fRoot := bytesutil.ToBytes32(c.Root)
st := fState
// Resume as genesis state if last finalized root is zero hashes.
if fRoot == params.BeaconConfig().ZeroHash {
st, err := s.beaconDB.GenesisState(ctx)
st, err = s.beaconDB.GenesisState(ctx)
if err != nil {
return nil, errors.Wrap(err, "could not get genesis state")
}
@@ -132,10 +133,13 @@ func (s *State) Resume(ctx context.Context, fState state.BeaconState) (state.Bea
if err != nil {
return nil, stderrors.Join(ErrNoGenesisBlock, err)
}
return st, s.SaveState(ctx, gbr, st)
fRoot = gbr
if err := s.SaveState(ctx, gbr, st); err != nil {
return nil, errors.Wrap(err, "could not save genesis state")
}
}
if fState == nil || fState.IsNil() {
if st == nil || st.IsNil() {
return nil, errors.New("finalized state is nil")
}
@@ -145,20 +149,22 @@ func (s *State) Resume(ctx context.Context, fState state.BeaconState) (state.Bea
}
}()
s.finalizedInfo = &finalizedInfo{slot: fState.Slot(), root: fRoot, state: fState.Copy()}
fEpoch := slots.ToEpoch(fState.Slot())
s.finalizedInfo = &finalizedInfo{slot: st.Slot(), root: fRoot, state: st.Copy()}
populatePubkeyCache(ctx, st)
return st, nil
}
// Pre-populate the pubkey cache with the validator public keys from the finalized state.
// This process takes about 30 seconds on mainnet with 450,000 validators.
func populatePubkeyCache(ctx context.Context, st state.BeaconState) {
epoch := slots.ToEpoch(st.Slot())
go populatePubkeyCacheOnce.Do(func() {
log.Debug("Populating pubkey cache")
start := time.Now()
if err := fState.ReadFromEveryValidator(func(_ int, val state.ReadOnlyValidator) error {
if err := st.ReadFromEveryValidator(func(_ int, val state.ReadOnlyValidator) error {
if ctx.Err() != nil {
return ctx.Err()
}
// Do not cache for non-active validators.
if !helpers.IsActiveValidatorUsingTrie(val, fEpoch) {
if !helpers.IsActiveValidatorUsingTrie(val, epoch) {
return nil
}
pub := val.PublicKey()
@@ -169,8 +175,6 @@ func (s *State) Resume(ctx context.Context, fState state.BeaconState) (state.Bea
}
log.WithField("duration", time.Since(start)).Debug("Done populating pubkey cache")
})
return fState, nil
}
// SaveFinalizedState saves the finalized slot, root and state into memory to be used by state gen service.

View File

@@ -188,3 +188,11 @@ func WithAvailableBlocker(avb coverage.AvailableBlocker) Option {
return nil
}
}
// WithSlasherEnabled configures the sync package to support slashing detection.
func WithSlasherEnabled(enabled bool) Option {
return func(s *Service) error {
s.slasherEnabled = enabled
return nil
}
}

View File

@@ -140,8 +140,7 @@ func (s *Service) processUnaggregated(ctx context.Context, att ethpb.Att) {
data := att.GetData()
// This is an important validation before retrieving attestation pre state to defend against
// attestation's target intentionally reference checkpoint that's long ago.
// Verify current finalized checkpoint is an ancestor of the block defined by the attestation's beacon block root.
// attestation's target intentionally referencing a checkpoint that's long ago.
if !s.cfg.chain.InForkchoice(bytesutil.ToBytes32(data.BeaconBlockRoot)) {
log.WithError(blockchain.ErrNotDescendantOfFinalized).Debug("Could not verify finalized consistency")
return
@@ -169,35 +168,57 @@ func (s *Service) processUnaggregated(ctx context.Context, att ethpb.Att) {
return
}
var singleAtt *ethpb.SingleAttestation
// Decide if the attestation is an Electra SingleAttestation or a Phase0 unaggregated attestation
var (
attForValidation ethpb.Att
broadcastAtt ethpb.Att
eventType feed.EventType
eventData interface{}
)
if att.Version() >= version.Electra {
var ok bool
singleAtt, ok = att.(*ethpb.SingleAttestation)
singleAtt, ok := att.(*ethpb.SingleAttestation)
if !ok {
log.Debugf("Attestation has wrong type (expected %T, got %T)", &ethpb.SingleAttestation{}, att)
return
}
att = singleAtt.ToAttestationElectra(committee)
// Convert Electra SingleAttestation to unaggregated ElectraAttestation. This is needed because many parts of the codebase assume that attestations have a certain structure and SingleAttestation validates these assumptions.
attForValidation = singleAtt.ToAttestationElectra(committee)
broadcastAtt = singleAtt
eventType = operation.SingleAttReceived
eventData = &operation.SingleAttReceivedData{
Attestation: singleAtt,
}
} else {
// Phase0 attestation
attForValidation = att
broadcastAtt = att
eventType = operation.UnaggregatedAttReceived
eventData = &operation.UnAggregatedAttReceivedData{
Attestation: att,
}
}
valid, err = s.validateUnaggregatedAttWithState(ctx, att, preState)
valid, err = s.validateUnaggregatedAttWithState(ctx, attForValidation, preState)
if err != nil {
log.WithError(err).Debug("Pending unaggregated attestation failed validation")
return
}
if valid == pubsub.ValidationAccept {
if features.Get().EnableExperimentalAttestationPool {
if err = s.cfg.attestationCache.Add(att); err != nil {
if err = s.cfg.attestationCache.Add(attForValidation); err != nil {
log.WithError(err).Debug("Could not save unaggregated attestation")
return
}
} else {
if err := s.cfg.attPool.SaveUnaggregatedAttestation(att); err != nil {
if err := s.cfg.attPool.SaveUnaggregatedAttestation(attForValidation); err != nil {
log.WithError(err).Debug("Could not save unaggregated attestation")
return
}
}
s.setSeenCommitteeIndicesSlot(data.Slot, data.CommitteeIndex, att.GetAggregationBits())
s.setSeenCommitteeIndicesSlot(data.Slot, attForValidation.GetCommitteeIndex(), attForValidation.GetAggregationBits())
valCount, err := helpers.ActiveValidatorCount(ctx, preState, slots.ToEpoch(data.Slot))
if err != nil {
@@ -205,34 +226,16 @@ func (s *Service) processUnaggregated(ctx context.Context, att ethpb.Att) {
return
}
// Broadcasting the signed attestation again once a node is able to process it.
var attToBroadcast ethpb.Att
if singleAtt != nil {
attToBroadcast = singleAtt
} else {
attToBroadcast = att
}
if err := s.cfg.p2p.BroadcastAttestation(ctx, helpers.ComputeSubnetForAttestation(valCount, attToBroadcast), attToBroadcast); err != nil {
// Broadcast the final 'broadcastAtt' object
if err := s.cfg.p2p.BroadcastAttestation(ctx, helpers.ComputeSubnetForAttestation(valCount, broadcastAtt), broadcastAtt); err != nil {
log.WithError(err).Debug("Could not broadcast")
}
// Broadcast the unaggregated attestation on a feed to notify other services in the beacon node
// of a received unaggregated attestation.
if singleAtt != nil {
s.cfg.attestationNotifier.OperationFeed().Send(&feed.Event{
Type: operation.SingleAttReceived,
Data: &operation.SingleAttReceivedData{
Attestation: singleAtt,
},
})
} else {
s.cfg.attestationNotifier.OperationFeed().Send(&feed.Event{
Type: operation.UnaggregatedAttReceived,
Data: &operation.UnAggregatedAttReceivedData{
Attestation: att,
},
})
}
// Feed event notification for other services
s.cfg.attestationNotifier.OperationFeed().Send(&feed.Event{
Type: eventType,
Data: eventData,
})
}
}

View File

@@ -706,3 +706,41 @@ func Test_attsAreEqual_Committee(t *testing.T) {
assert.Equal(t, false, attsAreEqual(att1, att2))
})
}
func Test_SeenCommitteeIndicesSlot(t *testing.T) {
t.Run("phase 0 success", func(t *testing.T) {
s := &Service{
seenUnAggregatedAttestationCache: lruwrpr.New(1),
}
data := &ethpb.AttestationData{Slot: 1, CommitteeIndex: 44}
att := &ethpb.Attestation{
AggregationBits: bitfield.Bitlist{0x01},
Data: data,
}
s.setSeenCommitteeIndicesSlot(data.Slot, att.GetCommitteeIndex(), att.GetAggregationBits())
b := append(bytesutil.Bytes32(uint64(1)), bytesutil.Bytes32(uint64(44))...)
b = append(b, bytesutil.SafeCopyBytes(att.GetAggregationBits())...)
_, ok := s.seenUnAggregatedAttestationCache.Get(string(b))
require.Equal(t, true, ok)
})
t.Run("electra success", func(t *testing.T) {
s := &Service{
seenUnAggregatedAttestationCache: lruwrpr.New(1),
}
// committee index is 0 post electra for attestation electra
data := &ethpb.AttestationData{Slot: 1, CommitteeIndex: 0}
cb := primitives.NewAttestationCommitteeBits()
cb.SetBitAt(uint64(63), true)
att := &ethpb.AttestationElectra{
AggregationBits: bitfield.Bitlist{0x01},
Data: data,
CommitteeBits: cb,
}
ci := att.GetCommitteeIndex()
s.setSeenCommitteeIndicesSlot(data.Slot, ci, att.GetAggregationBits())
b := append(bytesutil.Bytes32(uint64(1)), bytesutil.Bytes32(uint64(63))...)
b = append(b, bytesutil.SafeCopyBytes(att.GetAggregationBits())...)
_, ok := s.seenUnAggregatedAttestationCache.Get(string(b))
require.Equal(t, true, ok)
})
}

View File

@@ -168,6 +168,7 @@ type Service struct {
newBlobVerifier verification.NewBlobVerifier
availableBlocker coverage.AvailableBlocker
ctxMap ContextByteVersions
slasherEnabled bool
}
// NewService initializes new regular sync service.

View File

@@ -17,7 +17,6 @@ import (
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/slasher/types"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/state"
"github.com/prysmaticlabs/prysm/v5/config/features"
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v5/encoding/bytesutil"
"github.com/prysmaticlabs/prysm/v5/monitoring/tracing"
@@ -34,7 +33,11 @@ import (
// - The attestation is unaggregated -- that is, it has exactly one participating validator (len(get_attesting_indices(state, attestation.data, attestation.aggregation_bits)) == 1).
// - attestation.data.slot is within the last ATTESTATION_PROPAGATION_SLOT_RANGE slots (attestation.data.slot + ATTESTATION_PROPAGATION_SLOT_RANGE >= current_slot >= attestation.data.slot).
// - The signature of attestation is valid.
func (s *Service) validateCommitteeIndexBeaconAttestation(ctx context.Context, pid peer.ID, msg *pubsub.Message) (pubsub.ValidationResult, error) {
func (s *Service) validateCommitteeIndexBeaconAttestation(
ctx context.Context,
pid peer.ID,
msg *pubsub.Message,
) (pubsub.ValidationResult, error) {
if pid == s.cfg.p2p.PeerID() {
return pubsub.ValidationAccept, nil
}
@@ -64,6 +67,7 @@ func (s *Service) validateCommitteeIndexBeaconAttestation(ctx context.Context, p
if err := helpers.ValidateNilAttestation(att); err != nil {
return pubsub.ValidationReject, err
}
data := att.GetData()
// Do not process slot 0 attestations.
@@ -73,8 +77,7 @@ func (s *Service) validateCommitteeIndexBeaconAttestation(ctx context.Context, p
// Attestation's slot is within ATTESTATION_PROPAGATION_SLOT_RANGE and early attestation
// processing tolerance.
if err := helpers.ValidateAttestationTime(data.Slot, s.cfg.clock.GenesisTime(),
earlyAttestationProcessingTolerance); err != nil {
if err := helpers.ValidateAttestationTime(data.Slot, s.cfg.clock.GenesisTime(), earlyAttestationProcessingTolerance); err != nil {
tracing.AnnotateError(span, err)
return pubsub.ValidationIgnore, err
}
@@ -84,12 +87,11 @@ func (s *Service) validateCommitteeIndexBeaconAttestation(ctx context.Context, p
committeeIndex := att.GetCommitteeIndex()
if !features.Get().EnableSlasher {
if !s.slasherEnabled {
// Verify this the first attestation received for the participating validator for the slot.
if s.hasSeenCommitteeIndicesSlot(data.Slot, committeeIndex, att.GetAggregationBits()) {
return pubsub.ValidationIgnore, nil
}
// Reject an attestation if it references an invalid block.
if s.hasBadBlock(bytesutil.ToBytes32(data.BeaconBlockRoot)) ||
s.hasBadBlock(bytesutil.ToBytes32(data.Target.Root)) ||
@@ -99,15 +101,12 @@ func (s *Service) validateCommitteeIndexBeaconAttestation(ctx context.Context, p
}
}
var validationRes pubsub.ValidationResult
// Verify the block being voted and the processed state is in beaconDB and the block has passed validation if it's in the beaconDB.
blockRoot := bytesutil.ToBytes32(data.BeaconBlockRoot)
if !s.hasBlockAndState(ctx, blockRoot) {
return s.saveToPendingAttPool(att)
}
if !s.cfg.chain.InForkchoice(bytesutil.ToBytes32(data.BeaconBlockRoot)) {
if !s.cfg.chain.InForkchoice(blockRoot) {
tracing.AnnotateError(span, blockchain.ErrNotDescendantOfFinalized)
return pubsub.ValidationIgnore, blockchain.ErrNotDescendantOfFinalized
}
@@ -123,12 +122,12 @@ func (s *Service) validateCommitteeIndexBeaconAttestation(ctx context.Context, p
return pubsub.ValidationIgnore, err
}
validationRes, err = s.validateUnaggregatedAttTopic(ctx, att, preState, *msg.Topic)
validationRes, err := s.validateUnaggregatedAttTopic(ctx, att, preState, *msg.Topic)
if validationRes != pubsub.ValidationAccept {
return validationRes, err
}
committee, err := helpers.BeaconCommitteeFromState(ctx, preState, att.GetData().Slot, committeeIndex)
committee, err := helpers.BeaconCommitteeFromState(ctx, preState, data.Slot, committeeIndex)
if err != nil {
tracing.AnnotateError(span, err)
return pubsub.ValidationIgnore, err
@@ -139,21 +138,42 @@ func (s *Service) validateCommitteeIndexBeaconAttestation(ctx context.Context, p
return validationRes, err
}
var singleAtt *eth.SingleAttestation
// Consolidated handling of Electra SingleAttestation vs Phase0 unaggregated attestation
var (
attForValidation eth.Att // what we'll pass to further validation
eventType feed.EventType
eventData interface{}
)
if att.Version() >= version.Electra {
singleAtt, ok = att.(*eth.SingleAttestation)
singleAtt, ok := att.(*eth.SingleAttestation)
if !ok {
return pubsub.ValidationIgnore, fmt.Errorf("attestation has wrong type (expected %T, got %T)", &eth.SingleAttestation{}, att)
return pubsub.ValidationIgnore, fmt.Errorf(
"attestation has wrong type (expected %T, got %T)",
&eth.SingleAttestation{}, att,
)
}
// Convert Electra SingleAttestation to unaggregated ElectraAttestation. This is needed because many parts of the codebase assume that attestations have a certain structure and SingleAttestation validates these assumptions.
attForValidation = singleAtt.ToAttestationElectra(committee)
eventType = operation.SingleAttReceived
eventData = &operation.SingleAttReceivedData{
Attestation: singleAtt,
}
} else {
// Phase0 unaggregated attestation
attForValidation = att
eventType = operation.UnaggregatedAttReceived
eventData = &operation.UnAggregatedAttReceivedData{
Attestation: att,
}
att = singleAtt.ToAttestationElectra(committee)
}
validationRes, err = s.validateUnaggregatedAttWithState(ctx, att, preState)
validationRes, err = s.validateUnaggregatedAttWithState(ctx, attForValidation, preState)
if validationRes != pubsub.ValidationAccept {
return validationRes, err
}
if features.Get().EnableSlasher {
if s.slasherEnabled {
// Feed the indexed attestation to slasher if enabled. This action
// is done in the background to avoid adding more load to this critical code path.
go func() {
@@ -172,7 +192,7 @@ func (s *Service) validateCommitteeIndexBeaconAttestation(ctx context.Context, p
tracing.AnnotateError(span, err)
return
}
indexedAtt, err := attestation.ConvertToIndexed(ctx, att, committee)
indexedAtt, err := attestation.ConvertToIndexed(ctx, attForValidation, committee)
if err != nil {
log.WithError(err).Error("Could not convert to indexed attestation")
tracing.AnnotateError(span, err)
@@ -182,27 +202,16 @@ func (s *Service) validateCommitteeIndexBeaconAttestation(ctx context.Context, p
}()
}
// Broadcast the unaggregated attestation on a feed to notify other services in the beacon node
// of a received unaggregated attestation.
if singleAtt != nil {
s.cfg.attestationNotifier.OperationFeed().Send(&feed.Event{
Type: operation.SingleAttReceived,
Data: &operation.SingleAttReceivedData{
Attestation: singleAtt,
},
})
} else {
s.cfg.attestationNotifier.OperationFeed().Send(&feed.Event{
Type: operation.UnaggregatedAttReceived,
Data: &operation.UnAggregatedAttReceivedData{
Attestation: att,
},
})
}
// Notify other services in the beacon node
s.cfg.attestationNotifier.OperationFeed().Send(&feed.Event{
Type: eventType,
Data: eventData,
})
s.setSeenCommitteeIndicesSlot(data.Slot, committeeIndex, att.GetAggregationBits())
s.setSeenCommitteeIndicesSlot(data.Slot, committeeIndex, attForValidation.GetAggregationBits())
msg.ValidatorData = att
// Attach final validated attestation to the message for further pipeline use
msg.ValidatorData = attForValidation
return pubsub.ValidationAccept, nil
}

View File

@@ -15,7 +15,6 @@ import (
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/transition"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/state"
"github.com/prysmaticlabs/prysm/v5/config/features"
"github.com/prysmaticlabs/prysm/v5/config/params"
consensusblocks "github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v5/consensus-types/interfaces"
@@ -80,7 +79,7 @@ func (s *Service) validateBeaconBlockPubSub(ctx context.Context, pid peer.ID, ms
},
})
if features.Get().EnableSlasher {
if s.slasherEnabled {
// Feed the block header to slasher if enabled. This action
// is done in the background to avoid adding more load to this critical code path.
go func() {

View File

@@ -0,0 +1,3 @@
### Fixed
- Fixed pruner to not block while pruning large database by introducing batchSize

View File

@@ -0,0 +1,4 @@
### Ignored
- Cleanup single attestation code for readability.

View File

@@ -0,0 +1,3 @@
### Fixed
- cosmetic fix for post electra validator logs displaying attestation committee information correctly.

View File

@@ -0,0 +1,3 @@
### Fixed
- fix inserting the wrong committee index into the seen cache for electra attestations

View File

@@ -0,0 +1,3 @@
## Changed
- `--validators-registration-batch-size`: Change default value from `0` to `200`.

View File

@@ -0,0 +1,3 @@
### Fixed
- Allow any block type to be unmarshaled rather than only phase0 blocks in `slotByBlockRoot`.

View File

@@ -0,0 +1,3 @@
### Ignored
- Add target root to forkchoice dump

View File

@@ -0,0 +1,3 @@
### Ignored
- Populate pubkey cache at genesis.

View File

@@ -0,0 +1,2 @@
### Changed
- Reorganized beacon chain flags in `--help` text into logical sections.

View File

@@ -296,6 +296,11 @@ var (
Usage: "Directory for the slasher database",
Value: cmd.DefaultDataDir(),
}
// SlasherFlag defines a flag to enable the beacon chain slasher.
SlasherFlag = &cli.BoolFlag{
Name: "slasher",
Usage: "Enables a slasher in the beacon node for detecting slashable offenses.",
}
// BeaconDBPruning enables the pruning of beacon db.
BeaconDBPruning = &cli.BoolFlag{
Name: "beacon-db-pruning",

View File

@@ -142,6 +142,7 @@ var appFlags = []cli.Flag{
genesis.StatePath,
genesis.BeaconAPIURL,
flags.SlasherDirFlag,
flags.SlasherFlag,
flags.JwtId,
storage.BlobStoragePathFlag,
storage.BlobRetentionEpochFlag,

View File

@@ -45,154 +45,188 @@ type flagGroup struct {
}
var appHelpFlagGroups = []flagGroup{
{
{ // Flags relevant to running the process.
Name: "cmd",
Flags: []cli.Flag{
cmd.MinimalConfigFlag,
cmd.E2EConfigFlag,
cmd.RPCMaxPageSizeFlag,
cmd.NoDiscovery,
cmd.BootstrapNode,
cmd.RelayNode,
cmd.P2PUDPPort,
cmd.P2PQUICPort,
cmd.P2PTCPPort,
cmd.DataDirFlag,
cmd.VerbosityFlag,
cmd.EnableTracingFlag,
cmd.TracingProcessNameFlag,
cmd.TracingEndpointFlag,
cmd.TraceSampleFractionFlag,
cmd.MonitoringHostFlag,
flags.MonitoringPortFlag,
cmd.DisableMonitoringFlag,
cmd.MaxGoroutines,
cmd.ForceClearDB,
cmd.ClearDB,
cmd.ConfigFileFlag,
cmd.ChainConfigFileFlag,
cmd.GrpcMaxCallRecvMsgSizeFlag,
cmd.AcceptTosFlag,
cmd.RestoreSourceFileFlag,
cmd.RestoreTargetDirFlag,
cmd.ValidatorMonitorIndicesFlag,
cmd.ApiTimeoutFlag,
cmd.ConfigFileFlag,
},
},
{
Name: "debug",
Flags: []cli.Flag{
debug.PProfFlag,
debug.PProfAddrFlag,
debug.PProfPortFlag,
debug.MemProfileRateFlag,
debug.CPUProfileFlag,
debug.TraceFlag,
debug.BlockProfileRateFlag,
debug.MutexProfileFractionFlag,
},
},
{
{ // Flags relevant to configuring the beacon chain and APIs.
Name: "beacon-chain",
Flags: []cli.Flag{
flags.InteropMockEth1DataVotesFlag,
flags.DepositContractFlag,
flags.ContractDeploymentBlock,
flags.RPCHost,
flags.RPCPort,
cmd.ApiTimeoutFlag,
cmd.ChainConfigFileFlag,
cmd.E2EConfigFlag,
cmd.GrpcMaxCallRecvMsgSizeFlag,
cmd.MinimalConfigFlag,
cmd.RPCMaxPageSizeFlag,
flags.CertFlag,
flags.KeyFlag,
flags.ChainID,
flags.DisableDebugRPCEndpoints,
flags.HTTPModules,
flags.HTTPServerCorsDomain,
flags.HTTPServerHost,
flags.HTTPServerPort,
flags.HTTPServerCorsDomain,
flags.KeyFlag,
flags.NetworkID,
flags.RPCHost,
flags.RPCPort,
},
},
{
// p2p flags configure the p2p side of beacon-chain.
Name: "p2p",
Flags: []cli.Flag{
cmd.BootstrapNode,
cmd.EnableUPnPFlag,
cmd.NoDiscovery,
cmd.P2PAllowList,
cmd.P2PDenyList,
cmd.P2PHost,
cmd.P2PHostDNS,
cmd.P2PIP,
cmd.P2PMaxPeers,
cmd.P2PMetadata,
cmd.P2PPrivKey,
cmd.P2PQUICPort,
cmd.P2PStaticID,
cmd.P2PTCPPort,
cmd.P2PUDPPort,
cmd.PubsubQueueSize,
cmd.RelayNode,
cmd.StaticPeers,
flags.BlobBatchLimit,
flags.BlobBatchLimitBurstFactor,
flags.BlockBatchLimit,
flags.BlockBatchLimitBurstFactor,
flags.MaxConcurrentDials,
flags.MinPeersPerSubnet,
flags.MinSyncPeers,
flags.SubscribeToAllSubnets,
},
},
{ // Flags relevant to storing data on disk and configuring the beacon chain database.
Name: "db",
Flags: []cli.Flag{
backfill.BackfillBatchSize,
backfill.BackfillOldestSlot,
backfill.BackfillWorkerCount,
backfill.EnableExperimentalBackfill,
cmd.ClearDB,
cmd.DataDirFlag,
cmd.ForceClearDB,
cmd.RestoreSourceFileFlag,
cmd.RestoreTargetDirFlag,
flags.BeaconDBPruning,
flags.PrunerRetentionEpochs,
flags.SlotsPerArchivedPoint,
storage.BlobRetentionEpochFlag,
storage.BlobStorageLayout,
storage.BlobStoragePathFlag,
},
},
{ // Flags relevant to configuring local block production or external builders such as mev-boost.
Name: "builder",
Flags: []cli.Flag{
flags.LocalBlockValueBoost,
flags.MaxBuilderConsecutiveMissedSlots,
flags.MaxBuilderEpochMissedSlots,
flags.MevRelayEndpoint,
flags.MinBuilderBid,
flags.MinBuilderDiff,
flags.SuggestedFeeRecipient,
},
},
{ // Flags relevant to syncing the beacon chain.
Name: "sync",
Flags: []cli.Flag{
checkpoint.BlockPath,
checkpoint.RemoteURL,
checkpoint.StatePath,
flags.WeakSubjectivityCheckpoint,
genesis.BeaconAPIURL,
genesis.StatePath,
},
},
{ // Flags relevant to interacting with the execution layer.
Name: "execution layer",
Flags: []cli.Flag{
flags.ContractDeploymentBlock,
flags.DepositContractFlag,
flags.EngineEndpointTimeoutSeconds,
flags.Eth1HeaderReqLimit,
flags.ExecutionEngineEndpoint,
flags.ExecutionEngineHeaders,
flags.ExecutionJWTSecretFlag,
flags.SetGCPercent,
flags.SlotsPerArchivedPoint,
flags.BlockBatchLimit,
flags.BlockBatchLimitBurstFactor,
flags.BlobBatchLimit,
flags.BlobBatchLimitBurstFactor,
flags.DisableDebugRPCEndpoints,
flags.SubscribeToAllSubnets,
flags.HistoricalSlasherNode,
flags.ChainID,
flags.NetworkID,
flags.WeakSubjectivityCheckpoint,
flags.Eth1HeaderReqLimit,
flags.MinPeersPerSubnet,
flags.MaxConcurrentDials,
flags.MevRelayEndpoint,
flags.MaxBuilderEpochMissedSlots,
flags.MaxBuilderConsecutiveMissedSlots,
flags.EngineEndpointTimeoutSeconds,
flags.SlasherDirFlag,
flags.LocalBlockValueBoost,
flags.MinBuilderBid,
flags.MinBuilderDiff,
flags.JwtId,
flags.BeaconDBPruning,
flags.PrunerRetentionEpochs,
checkpoint.BlockPath,
checkpoint.StatePath,
checkpoint.RemoteURL,
genesis.StatePath,
genesis.BeaconAPIURL,
storage.BlobStoragePathFlag,
storage.BlobRetentionEpochFlag,
storage.BlobStorageLayout,
backfill.EnableExperimentalBackfill,
backfill.BackfillWorkerCount,
backfill.BackfillBatchSize,
backfill.BackfillOldestSlot,
flags.InteropMockEth1DataVotesFlag,
},
},
{ // Flags relevant to configuring beacon chain monitoring.
Name: "monitoring",
Flags: []cli.Flag{
cmd.DisableMonitoringFlag,
cmd.EnableTracingFlag,
cmd.MonitoringHostFlag,
cmd.TraceSampleFractionFlag,
cmd.TracingEndpointFlag,
cmd.TracingProcessNameFlag,
cmd.ValidatorMonitorIndicesFlag,
flags.MonitoringPortFlag,
},
},
{ // Flags relevant to slasher operation.
Name: "slasher",
Flags: []cli.Flag{
flags.HistoricalSlasherNode,
flags.SlasherDirFlag,
flags.SlasherFlag,
},
},
{
// Flags in the "log" section control how Prysm handles logging.
Name: "log",
Flags: []cli.Flag{
cmd.LogFormat,
cmd.LogFileName,
cmd.VerbosityFlag,
},
},
{ // Feature flags.
Name: "features",
Flags: features.ActiveFlags(features.BeaconChainFlags),
},
{ // Flags required to configure the merge.
Name: "merge",
Flags: []cli.Flag{
flags.SuggestedFeeRecipient,
flags.TerminalTotalDifficultyOverride,
flags.TerminalBlockHashOverride,
flags.TerminalBlockHashActivationEpochOverride,
},
},
{
Name: "p2p",
Flags: []cli.Flag{
cmd.P2PIP,
cmd.P2PHost,
cmd.P2PHostDNS,
cmd.P2PMaxPeers,
cmd.P2PPrivKey,
cmd.P2PStaticID,
cmd.P2PMetadata,
cmd.P2PAllowList,
cmd.P2PDenyList,
cmd.PubsubQueueSize,
cmd.StaticPeers,
cmd.EnableUPnPFlag,
flags.MinSyncPeers,
},
},
{
Name: "log",
Flags: []cli.Flag{
cmd.LogFormat,
cmd.LogFileName,
},
},
{
Name: "features",
Flags: features.ActiveFlags(features.BeaconChainFlags),
},
{
{ // The deprecated section represents beacon flags that still have use, but should not be used
// as they are expected to be deleted in a feature release.
Name: "deprecated",
Flags: []cli.Flag{
cmd.BackupWebhookOutputDir,
},
},
{ // Flags used in debugging Prysm. These are flags not usually run by end users.
Name: "debug",
Flags: []cli.Flag{
cmd.MaxGoroutines,
debug.BlockProfileRateFlag,
debug.CPUProfileFlag,
debug.MemProfileRateFlag,
debug.MutexProfileFractionFlag,
debug.PProfAddrFlag,
debug.PProfFlag,
debug.PProfPortFlag,
debug.TraceFlag,
flags.SetGCPercent,
},
},
}
func init() {

View File

@@ -381,7 +381,7 @@ var (
ValidatorsRegistrationBatchSizeFlag = &cli.IntFlag{
Name: "validators-registration-batch-size",
Usage: "Sets the maximum size for one batch of validator registrations. Use a non-positive value to disable batching.",
Value: 0,
Value: 200,
}
// EnableDistributed enables the usage of prysm validator client in a Distributed Validator Cluster.
EnableDistributed = &cli.BoolFlag{

View File

@@ -60,7 +60,6 @@ type Flags struct {
// Bug fixes related flags.
AttestTimely bool // AttestTimely fixes #8185. It is gated behind a flag to ensure beacon node's fix can safely roll out first. We'll invert this in v1.1.0.
EnableSlasher bool // Enable slasher in the beacon node runtime.
EnableSlashingProtectionPruning bool // Enable slashing protection pruning for the validator client.
EnableMinimalSlashingProtection bool // Enable minimal slashing protection database for the validator client.
@@ -214,10 +213,6 @@ func ConfigureBeaconChain(ctx *cli.Context) error {
logDisabled(disableBroadcastSlashingFlag)
cfg.DisableBroadcastSlashings = true
}
if ctx.Bool(enableSlasherFlag.Name) {
log.WithField(enableSlasherFlag.Name, enableSlasherFlag.Usage).Warn(enabledFeatureFlag)
cfg.EnableSlasher = true
}
if ctx.Bool(enableHistoricalSpaceRepresentation.Name) {
log.WithField(enableHistoricalSpaceRepresentation.Name, enableHistoricalSpaceRepresentation.Usage).Warn(enabledFeatureFlag)
cfg.EnableHistoricalSpaceRepresentation = true

View File

@@ -12,39 +12,39 @@ import (
func TestInitFeatureConfig(t *testing.T) {
defer Init(&Flags{})
cfg := &Flags{
EnableSlasher: true,
EnableDoppelGanger: true,
}
Init(cfg)
c := Get()
assert.Equal(t, true, c.EnableSlasher)
assert.Equal(t, true, c.EnableDoppelGanger)
}
func TestInitWithReset(t *testing.T) {
defer Init(&Flags{})
Init(&Flags{
EnableSlasher: true,
EnableDoppelGanger: true,
})
assert.Equal(t, true, Get().EnableSlasher)
assert.Equal(t, true, Get().EnableDoppelGanger)
// Overwrite previously set value (value that didn't come by default).
resetCfg := InitWithReset(&Flags{
EnableSlasher: false,
EnableDoppelGanger: false,
})
assert.Equal(t, false, Get().EnableSlasher)
assert.Equal(t, false, Get().EnableDoppelGanger)
// Reset must get to previously set configuration (not to default config values).
resetCfg()
assert.Equal(t, true, Get().EnableSlasher)
assert.Equal(t, true, Get().EnableDoppelGanger)
}
func TestConfigureBeaconConfig(t *testing.T) {
app := cli.App{}
set := flag.NewFlagSet("test", 0)
set.Bool(enableSlasherFlag.Name, true, "test")
set.Bool(saveInvalidBlockTempFlag.Name, true, "test")
context := cli.NewContext(&app, set, nil)
require.NoError(t, ConfigureBeaconChain(context))
c := Get()
assert.Equal(t, true, c.EnableSlasher)
assert.Equal(t, true, c.SaveInvalidBlock)
}
func TestValidateNetworkFlags(t *testing.T) {

View File

@@ -89,10 +89,6 @@ var (
Name: "attest-timely",
Usage: "Fixes validator can attest timely after current block processes. See #8185 for more details.",
}
enableSlasherFlag = &cli.BoolFlag{
Name: "slasher",
Usage: "Enables a slasher in the beacon node for detecting slashable offenses.",
}
enableSlashingProtectionPruning = &cli.BoolFlag{
Name: "enable-slashing-protection-history-pruning",
Usage: "Enables the pruning of the validator client's slashing protection database.",
@@ -223,7 +219,6 @@ var BeaconChainFlags = combinedFlags([]cli.Flag{
Mainnet,
disablePeerScorer,
disableBroadcastSlashingFlag,
enableSlasherFlag,
disableStakinContractCheck,
SaveFullExecutionPayloads,
enableStartupOptimistic,

View File

@@ -51,4 +51,5 @@ type Node struct {
BlockRoot []byte
ParentRoot []byte
ExecutionBlockHash []byte
Target []byte
}

View File

@@ -105,6 +105,7 @@ go_test(
"aggregate_test.go",
"attest_test.go",
"key_reload_test.go",
"log_test.go",
"metrics_test.go",
"propose_test.go",
"registration_test.go",

View File

@@ -158,7 +158,7 @@ func (v *validator) SubmitAggregateAndProof(ctx context.Context, slot primitives
}
}
if err := v.saveSubmittedAtt(agg.AggregateVal().GetData(), pubKey[:], true); err != nil {
if err := v.saveSubmittedAtt(agg.AggregateVal(), pubKey[:], true); err != nil {
log.WithError(err).Error("Could not add aggregator indices to logs")
if v.emitAccountMetrics {
ValidatorAggFailVec.WithLabelValues(fmtKey).Inc()

View File

@@ -133,16 +133,17 @@ func (v *validator) SubmitAttestation(ctx context.Context, slot primitives.Slot,
}
var aggregationBitfield bitfield.Bitlist
var attestation ethpb.Att
var attResp *ethpb.AttestResponse
if postElectra {
attestation := &ethpb.SingleAttestation{
sa := &ethpb.SingleAttestation{
Data: data,
AttesterIndex: duty.ValidatorIndex,
CommitteeId: duty.CommitteeIndex,
Signature: sig,
}
attResp, err = v.validatorClient.ProposeAttestationElectra(ctx, attestation)
attestation = sa
attResp, err = v.validatorClient.ProposeAttestationElectra(ctx, sa)
} else {
var indexInCommittee uint64
var found bool
@@ -162,12 +163,13 @@ func (v *validator) SubmitAttestation(ctx context.Context, slot primitives.Slot,
}
aggregationBitfield = bitfield.NewBitlist(uint64(len(duty.Committee)))
aggregationBitfield.SetBitAt(indexInCommittee, true)
attestation := &ethpb.Attestation{
a := &ethpb.Attestation{
Data: data,
AggregationBits: aggregationBitfield,
Signature: sig,
}
attResp, err = v.validatorClient.ProposeAttestation(ctx, attestation)
attestation = a
attResp, err = v.validatorClient.ProposeAttestation(ctx, a)
}
if err != nil {
log.WithError(err).Error("Could not submit attestation to beacon node")
@@ -178,7 +180,7 @@ func (v *validator) SubmitAttestation(ctx context.Context, slot primitives.Slot,
return
}
if err := v.saveSubmittedAtt(data, pubKey[:], false); err != nil {
if err := v.saveSubmittedAtt(attestation, pubKey[:], false); err != nil {
log.WithError(err).Error("Could not save validator index for logging")
if v.emitAccountMetrics {
ValidatorAttestFailVec.WithLabelValues(fmtKey).Inc()

View File

@@ -49,10 +49,10 @@ func (k submittedAttKey) FromAttData(data *ethpb.AttestationData) error {
// saveSubmittedAtt saves the submitted attestation data along with the attester's pubkey.
// The purpose of this is to display combined attesting logs for all keys managed by the validator client.
func (v *validator) saveSubmittedAtt(data *ethpb.AttestationData, pubkey []byte, isAggregate bool) error {
func (v *validator) saveSubmittedAtt(att ethpb.Att, pubkey []byte, isAggregate bool) error {
v.attLogsLock.Lock()
defer v.attLogsLock.Unlock()
data := att.GetData()
key := submittedAttKey{}
if err := key.FromAttData(data); err != nil {
return errors.Wrapf(err, "could not create submitted attestation key")
@@ -80,7 +80,7 @@ func (v *validator) saveSubmittedAtt(data *ethpb.AttestationData, pubkey []byte,
submittedAtts[key] = &submittedAtt{
d,
append(submittedAtts[key].pubkeys, pubkey),
append(submittedAtts[key].committees, data.CommitteeIndex),
append(submittedAtts[key].committees, att.GetCommitteeIndex()),
}
return nil

View File

@@ -0,0 +1,84 @@
package client
import (
"testing"
field_params "github.com/prysmaticlabs/prysm/v5/config/fieldparams"
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v5/testing/assert"
"github.com/prysmaticlabs/prysm/v5/testing/require"
"github.com/prysmaticlabs/prysm/v5/testing/util"
logTest "github.com/sirupsen/logrus/hooks/test"
)
func TestLogSubmittedAtts(t *testing.T) {
t.Run("phase0 attestations", func(t *testing.T) {
logHook := logTest.NewGlobal()
v := validator{
submittedAtts: make(map[submittedAttKey]*submittedAtt),
}
att := util.HydrateAttestation(&ethpb.Attestation{})
att.Data.CommitteeIndex = 12
require.NoError(t, v.saveSubmittedAtt(att, make([]byte, field_params.BLSPubkeyLength), false))
v.LogSubmittedAtts(0)
assert.LogsContain(t, logHook, "committeeIndices=\"[12]\"")
})
t.Run("electra attestations", func(t *testing.T) {
logHook := logTest.NewGlobal()
v := validator{
submittedAtts: make(map[submittedAttKey]*submittedAtt),
}
att := util.HydrateAttestationElectra(&ethpb.AttestationElectra{})
att.Data.CommitteeIndex = 0
att.CommitteeBits = primitives.NewAttestationCommitteeBits()
att.CommitteeBits.SetBitAt(44, true)
require.NoError(t, v.saveSubmittedAtt(att, make([]byte, field_params.BLSPubkeyLength), false))
v.LogSubmittedAtts(0)
assert.LogsContain(t, logHook, "committeeIndices=\"[44]\"")
})
t.Run("electra attestations multiple saved", func(t *testing.T) {
logHook := logTest.NewGlobal()
v := validator{
submittedAtts: make(map[submittedAttKey]*submittedAtt),
}
att := util.HydrateAttestationElectra(&ethpb.AttestationElectra{})
att.Data.CommitteeIndex = 0
att.CommitteeBits = primitives.NewAttestationCommitteeBits()
att.CommitteeBits.SetBitAt(23, true)
require.NoError(t, v.saveSubmittedAtt(att, make([]byte, field_params.BLSPubkeyLength), false))
att2 := util.HydrateAttestationElectra(&ethpb.AttestationElectra{})
att2.Data.CommitteeIndex = 0
att2.CommitteeBits = primitives.NewAttestationCommitteeBits()
att2.CommitteeBits.SetBitAt(2, true)
require.NoError(t, v.saveSubmittedAtt(att2, make([]byte, field_params.BLSPubkeyLength), false))
v.LogSubmittedAtts(0)
assert.LogsContain(t, logHook, "committeeIndices=\"[23 2]\"")
})
t.Run("phase0 aggregates", func(t *testing.T) {
logHook := logTest.NewGlobal()
v := validator{
submittedAggregates: make(map[submittedAttKey]*submittedAtt),
}
agg := &ethpb.AggregateAttestationAndProof{}
agg.Aggregate = util.HydrateAttestation(&ethpb.Attestation{})
agg.Aggregate.Data.CommitteeIndex = 12
require.NoError(t, v.saveSubmittedAtt(agg.AggregateVal(), make([]byte, field_params.BLSPubkeyLength), true))
v.LogSubmittedAtts(0)
assert.LogsContain(t, logHook, "committeeIndices=\"[12]\"")
})
t.Run("electra aggregates", func(t *testing.T) {
logHook := logTest.NewGlobal()
v := validator{
submittedAggregates: make(map[submittedAttKey]*submittedAtt),
}
agg := &ethpb.AggregateAttestationAndProofElectra{}
agg.Aggregate = util.HydrateAttestationElectra(&ethpb.AttestationElectra{})
agg.Aggregate.Data.CommitteeIndex = 0
agg.Aggregate.CommitteeBits = primitives.NewAttestationCommitteeBits()
agg.Aggregate.CommitteeBits.SetBitAt(63, true)
require.NoError(t, v.saveSubmittedAtt(agg.AggregateVal(), make([]byte, field_params.BLSPubkeyLength), true))
v.LogSubmittedAtts(0)
assert.LogsContain(t, logHook, "committeeIndices=\"[63]\"")
})
}