mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-08 23:18:15 -05:00
Redesign of the attestation pool (#14324)
* Rename existing AttestationCache * Cache with simple Add * fix import cycle * logic for unaggregated, aggregated and block attestations * some small fixes * remove Seen * finishing touches * feature flag * extract forkchoice atts to separate type * gate new functionality behind feature flag * revert test files * preparing for review * change Id to [32]byte * Potuz's review * Potuz's review pt 2 * Nishant's review * keep flat list of atts * fix ForkchoiceAttestations() function * Tests for Add, GetAll, Count * Tests for remaining functions * use DeepEqual * fix tests * documentation * changelog <3 * v2 handlers * nil check for forkchoice atts * guard against 0 bits set * fix failing test * Preston's review * better godocs
This commit is contained in:
@@ -14,7 +14,8 @@ The format is based on Keep a Changelog, and this project adheres to Semantic Ve
|
||||
- `Finished building block`: Display error only if not nil.
|
||||
- Added support to update target and max blob count to different values per hard fork config.
|
||||
- Log before blob filesystem cache warm-up.
|
||||
-
|
||||
- New design for the attestation pool. [PR](https://github.com/prysmaticlabs/prysm/pull/14324)
|
||||
|
||||
### Changed
|
||||
|
||||
- Process light client finality updates only for new finalized epochs instead of doing it for every block.
|
||||
|
||||
@@ -11,6 +11,7 @@ import (
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/helpers"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/forkchoice"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/state"
|
||||
"github.com/prysmaticlabs/prysm/v5/config/features"
|
||||
fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams"
|
||||
"github.com/prysmaticlabs/prysm/v5/config/params"
|
||||
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
|
||||
@@ -404,13 +405,19 @@ func (s *Service) saveOrphanedOperations(ctx context.Context, orphanedRoot [32]b
|
||||
if a.GetData().Slot+params.BeaconConfig().SlotsPerEpoch < s.CurrentSlot() {
|
||||
continue
|
||||
}
|
||||
if helpers.IsAggregated(a) {
|
||||
if err := s.cfg.AttPool.SaveAggregatedAttestation(a); err != nil {
|
||||
if features.Get().EnableExperimentalAttestationPool {
|
||||
if err = s.cfg.AttestationCache.Add(a); err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
if err := s.cfg.AttPool.SaveUnaggregatedAttestation(a); err != nil {
|
||||
return err
|
||||
if a.IsAggregated() {
|
||||
if err = s.cfg.AttPool.SaveAggregatedAttestation(a); err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
if err = s.cfg.AttPool.SaveUnaggregatedAttestation(a); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
saveOrphanedAttCount.Inc()
|
||||
|
||||
@@ -85,6 +85,14 @@ func WithTrackedValidatorsCache(c *cache.TrackedValidatorsCache) Option {
|
||||
}
|
||||
}
|
||||
|
||||
// WithAttestationCache for attestation lifecycle after chain inclusion.
|
||||
func WithAttestationCache(c *cache.AttestationCache) Option {
|
||||
return func(s *Service) error {
|
||||
s.cfg.AttestationCache = c
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// WithAttestationPool for attestation lifecycle after chain inclusion.
|
||||
func WithAttestationPool(p attestations.Pool) Option {
|
||||
return func(s *Service) error {
|
||||
|
||||
@@ -378,7 +378,11 @@ func (s *Service) handleBlockAttestations(ctx context.Context, blk interfaces.Re
|
||||
r := bytesutil.ToBytes32(a.GetData().BeaconBlockRoot)
|
||||
if s.cfg.ForkChoiceStore.HasNode(r) {
|
||||
s.cfg.ForkChoiceStore.ProcessAttestation(ctx, indices, r, a.GetData().Target.Epoch)
|
||||
} else if err := s.cfg.AttPool.SaveBlockAttestation(a); err != nil {
|
||||
} else if features.Get().EnableExperimentalAttestationPool {
|
||||
if err = s.cfg.AttestationCache.Add(a); err != nil {
|
||||
return err
|
||||
}
|
||||
} else if err = s.cfg.AttPool.SaveBlockAttestation(a); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
@@ -418,7 +422,11 @@ func (s *Service) savePostStateInfo(ctx context.Context, r [32]byte, b interface
|
||||
func (s *Service) pruneAttsFromPool(headBlock interfaces.ReadOnlySignedBeaconBlock) error {
|
||||
atts := headBlock.Block().Body().Attestations()
|
||||
for _, att := range atts {
|
||||
if helpers.IsAggregated(att) {
|
||||
if features.Get().EnableExperimentalAttestationPool {
|
||||
if err := s.cfg.AttestationCache.DeleteCovered(att); err != nil {
|
||||
return errors.Wrap(err, "could not delete attestation")
|
||||
}
|
||||
} else if att.IsAggregated() {
|
||||
if err := s.cfg.AttPool.DeleteAggregatedAttestation(att); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -9,6 +9,7 @@ import (
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/helpers"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/state"
|
||||
"github.com/prysmaticlabs/prysm/v5/config/features"
|
||||
"github.com/prysmaticlabs/prysm/v5/config/params"
|
||||
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
|
||||
"github.com/prysmaticlabs/prysm/v5/encoding/bytesutil"
|
||||
@@ -166,7 +167,13 @@ func (s *Service) UpdateHead(ctx context.Context, proposingSlot primitives.Slot)
|
||||
|
||||
// This processes fork choice attestations from the pool to account for validator votes and fork choice.
|
||||
func (s *Service) processAttestations(ctx context.Context, disparity time.Duration) {
|
||||
atts := s.cfg.AttPool.ForkchoiceAttestations()
|
||||
var atts []ethpb.Att
|
||||
if features.Get().EnableExperimentalAttestationPool {
|
||||
atts = s.cfg.AttestationCache.ForkchoiceAttestations()
|
||||
} else {
|
||||
atts = s.cfg.AttPool.ForkchoiceAttestations()
|
||||
}
|
||||
|
||||
for _, a := range atts {
|
||||
// Based on the spec, don't process the attestation until the subsequent slot.
|
||||
// This delays consideration in the fork choice until their slot is in the past.
|
||||
@@ -182,7 +189,11 @@ func (s *Service) processAttestations(ctx context.Context, disparity time.Durati
|
||||
continue
|
||||
}
|
||||
|
||||
if err := s.cfg.AttPool.DeleteForkchoiceAttestation(a); err != nil {
|
||||
if features.Get().EnableExperimentalAttestationPool {
|
||||
if err := s.cfg.AttestationCache.DeleteForkchoiceAttestation(a); err != nil {
|
||||
log.WithError(err).Error("Could not delete fork choice attestation in pool")
|
||||
}
|
||||
} else if err := s.cfg.AttPool.DeleteForkchoiceAttestation(a); err != nil {
|
||||
log.WithError(err).Error("Could not delete fork choice attestation in pool")
|
||||
}
|
||||
|
||||
|
||||
@@ -75,6 +75,7 @@ type config struct {
|
||||
DepositCache cache.DepositCache
|
||||
PayloadIDCache *cache.PayloadIDCache
|
||||
TrackedValidatorsCache *cache.TrackedValidatorsCache
|
||||
AttestationCache *cache.AttestationCache
|
||||
AttPool attestations.Pool
|
||||
ExitPool voluntaryexits.PoolManager
|
||||
SlashingPool slashings.PoolManager
|
||||
|
||||
8
beacon-chain/cache/BUILD.bazel
vendored
8
beacon-chain/cache/BUILD.bazel
vendored
@@ -5,6 +5,7 @@ go_library(
|
||||
srcs = [
|
||||
"active_balance.go",
|
||||
"active_balance_disabled.go", # keep
|
||||
"attestation.go",
|
||||
"attestation_data.go",
|
||||
"balance_cache_key.go",
|
||||
"checkpoint_state.go",
|
||||
@@ -36,18 +37,21 @@ go_library(
|
||||
],
|
||||
deps = [
|
||||
"//beacon-chain/forkchoice/types:go_default_library",
|
||||
"//beacon-chain/operations/attestations/attmap:go_default_library",
|
||||
"//beacon-chain/state:go_default_library",
|
||||
"//cache/lru:go_default_library",
|
||||
"//config/fieldparams:go_default_library",
|
||||
"//config/params:go_default_library",
|
||||
"//consensus-types/primitives:go_default_library",
|
||||
"//container/slice:go_default_library",
|
||||
"//crypto/bls:go_default_library",
|
||||
"//crypto/hash:go_default_library",
|
||||
"//crypto/rand:go_default_library",
|
||||
"//encoding/bytesutil:go_default_library",
|
||||
"//math:go_default_library",
|
||||
"//monitoring/tracing/trace:go_default_library",
|
||||
"//proto/prysm/v1alpha1:go_default_library",
|
||||
"//proto/prysm/v1alpha1/attestation:go_default_library",
|
||||
"//runtime/version:go_default_library",
|
||||
"@com_github_ethereum_go_ethereum//common:go_default_library",
|
||||
"@com_github_hashicorp_golang_lru//:go_default_library",
|
||||
@@ -66,6 +70,7 @@ go_test(
|
||||
srcs = [
|
||||
"active_balance_test.go",
|
||||
"attestation_data_test.go",
|
||||
"attestation_test.go",
|
||||
"cache_test.go",
|
||||
"checkpoint_state_test.go",
|
||||
"committee_fuzz_test.go",
|
||||
@@ -88,14 +93,17 @@ go_test(
|
||||
"//config/fieldparams:go_default_library",
|
||||
"//config/params:go_default_library",
|
||||
"//consensus-types/primitives:go_default_library",
|
||||
"//crypto/bls/blst:go_default_library",
|
||||
"//encoding/bytesutil:go_default_library",
|
||||
"//proto/prysm/v1alpha1:go_default_library",
|
||||
"//proto/prysm/v1alpha1/attestation:go_default_library",
|
||||
"//testing/assert:go_default_library",
|
||||
"//testing/require:go_default_library",
|
||||
"//testing/util:go_default_library",
|
||||
"@com_github_ethereum_go_ethereum//common/hexutil:go_default_library",
|
||||
"@com_github_google_gofuzz//:go_default_library",
|
||||
"@com_github_hashicorp_golang_lru//:go_default_library",
|
||||
"@com_github_prysmaticlabs_go_bitfield//:go_default_library",
|
||||
"@com_github_stretchr_testify//require:go_default_library",
|
||||
"@org_golang_google_protobuf//proto:go_default_library",
|
||||
],
|
||||
|
||||
275
beacon-chain/cache/attestation.go
vendored
Normal file
275
beacon-chain/cache/attestation.go
vendored
Normal file
@@ -0,0 +1,275 @@
|
||||
package cache
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/operations/attestations/attmap"
|
||||
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
|
||||
"github.com/prysmaticlabs/prysm/v5/crypto/bls"
|
||||
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
|
||||
"github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1/attestation"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
type attGroup struct {
|
||||
slot primitives.Slot
|
||||
atts []ethpb.Att
|
||||
}
|
||||
|
||||
// AttestationCache holds a map of attGroup items that group together all attestations for a single slot.
|
||||
// When we add an attestation to the cache by calling Add, we either create a new group with this attestation
|
||||
// (if this is the first attestation for some slot) or two things can happen:
|
||||
//
|
||||
// - If the attestation is unaggregated, we add its attestation bit to attestation bits of the first
|
||||
// attestation in the group.
|
||||
// - If the attestation is aggregated, we append it to the group. There should be no redundancy
|
||||
// in the list because we ignore redundant aggregates in gossip.
|
||||
//
|
||||
// The first bullet point above means that we keep one aggregate attestation to which we keep appending bits
|
||||
// as new single-bit attestations arrive. This means that at any point during seconds 0-4 of a slot
|
||||
// we will have only one attestation for this slot in the cache.
|
||||
//
|
||||
// NOTE: This design in principle can result in worse aggregates since we lose the ability to aggregate some
|
||||
// single bit attestations in case of overlaps with incoming aggregates.
|
||||
//
|
||||
// The cache also keeps forkchoice attestations in a separate struct. These attestations are used for
|
||||
// forkchoice-related operations.
|
||||
type AttestationCache struct {
|
||||
atts map[attestation.Id]*attGroup
|
||||
sync.RWMutex
|
||||
forkchoiceAtts *attmap.Attestations
|
||||
}
|
||||
|
||||
// NewAttestationCache creates a new cache instance.
|
||||
func NewAttestationCache() *AttestationCache {
|
||||
return &AttestationCache{
|
||||
atts: make(map[attestation.Id]*attGroup),
|
||||
forkchoiceAtts: attmap.New(),
|
||||
}
|
||||
}
|
||||
|
||||
// Add does one of two things:
|
||||
//
|
||||
// - For unaggregated attestations, it adds the attestation bit to attestation bits of the running aggregate,
|
||||
// which is the first aggregate for the slot.
|
||||
// - For aggregated attestations, it appends the attestation to the existng list of attestations for the slot.
|
||||
func (c *AttestationCache) Add(att ethpb.Att) error {
|
||||
if att.IsNil() {
|
||||
log.Debug("Attempted to add a nil attestation to the attestation cache")
|
||||
return nil
|
||||
}
|
||||
if len(att.GetAggregationBits().BitIndices()) == 0 {
|
||||
log.Debug("Attempted to add an attestation with 0 bits set to the attestation cache")
|
||||
return nil
|
||||
}
|
||||
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
|
||||
id, err := attestation.NewId(att, attestation.Data)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "could not create attestation ID")
|
||||
}
|
||||
|
||||
group := c.atts[id]
|
||||
if group == nil {
|
||||
group = &attGroup{
|
||||
slot: att.GetData().Slot,
|
||||
atts: []ethpb.Att{att},
|
||||
}
|
||||
c.atts[id] = group
|
||||
return nil
|
||||
}
|
||||
|
||||
if att.IsAggregated() {
|
||||
group.atts = append(group.atts, att.Clone())
|
||||
return nil
|
||||
}
|
||||
|
||||
// This should never happen because we return early for a new group.
|
||||
if len(group.atts) == 0 {
|
||||
log.Error("Attestation group contains no attestations, skipping insertion")
|
||||
return nil
|
||||
}
|
||||
|
||||
a := group.atts[0]
|
||||
|
||||
// Indexing is safe because we have guarded against 0 bits set.
|
||||
bit := att.GetAggregationBits().BitIndices()[0]
|
||||
if a.GetAggregationBits().BitAt(uint64(bit)) {
|
||||
return nil
|
||||
}
|
||||
sig, err := aggregateSig(a, att)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "could not aggregate signatures")
|
||||
}
|
||||
|
||||
a.GetAggregationBits().SetBitAt(uint64(bit), true)
|
||||
a.SetSignature(sig)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetAll returns all attestations in the cache, excluding forkchoice attestations.
|
||||
func (c *AttestationCache) GetAll() []ethpb.Att {
|
||||
c.RLock()
|
||||
defer c.RUnlock()
|
||||
|
||||
var result []ethpb.Att
|
||||
for _, group := range c.atts {
|
||||
result = append(result, group.atts...)
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
// Count returns the number of all attestations in the cache, excluding forkchoice attestations.
|
||||
func (c *AttestationCache) Count() int {
|
||||
c.RLock()
|
||||
defer c.RUnlock()
|
||||
|
||||
count := 0
|
||||
for _, group := range c.atts {
|
||||
count += len(group.atts)
|
||||
}
|
||||
return count
|
||||
}
|
||||
|
||||
// DeleteCovered removes all attestations whose attestation bits are a proper subset of the passed-in attestation.
|
||||
func (c *AttestationCache) DeleteCovered(att ethpb.Att) error {
|
||||
if att.IsNil() {
|
||||
return nil
|
||||
}
|
||||
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
|
||||
id, err := attestation.NewId(att, attestation.Data)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "could not create attestation ID")
|
||||
}
|
||||
|
||||
group := c.atts[id]
|
||||
if group == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
idx := 0
|
||||
for _, a := range group.atts {
|
||||
if covered, err := att.GetAggregationBits().Contains(a.GetAggregationBits()); err != nil {
|
||||
return err
|
||||
} else if !covered {
|
||||
group.atts[idx] = a
|
||||
idx++
|
||||
}
|
||||
}
|
||||
group.atts = group.atts[:idx]
|
||||
|
||||
if len(group.atts) == 0 {
|
||||
delete(c.atts, id)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// PruneBefore removes all attestations whose slot is earlier than the passed-in slot.
|
||||
func (c *AttestationCache) PruneBefore(slot primitives.Slot) uint64 {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
|
||||
var pruneCount int
|
||||
for id, group := range c.atts {
|
||||
if group.slot < slot {
|
||||
pruneCount += len(group.atts)
|
||||
delete(c.atts, id)
|
||||
}
|
||||
}
|
||||
return uint64(pruneCount)
|
||||
}
|
||||
|
||||
// AggregateIsRedundant checks whether all attestation bits of the passed-in aggregate
|
||||
// are already included by any aggregate in the cache.
|
||||
func (c *AttestationCache) AggregateIsRedundant(att ethpb.Att) (bool, error) {
|
||||
if att.IsNil() {
|
||||
return true, nil
|
||||
}
|
||||
|
||||
c.RLock()
|
||||
defer c.RUnlock()
|
||||
|
||||
id, err := attestation.NewId(att, attestation.Data)
|
||||
if err != nil {
|
||||
return true, errors.Wrapf(err, "could not create attestation ID")
|
||||
}
|
||||
|
||||
group := c.atts[id]
|
||||
if group == nil {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
for _, a := range group.atts {
|
||||
if redundant, err := a.GetAggregationBits().Contains(att.GetAggregationBits()); err != nil {
|
||||
return true, err
|
||||
} else if redundant {
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// SaveForkchoiceAttestations saves forkchoice attestations.
|
||||
func (c *AttestationCache) SaveForkchoiceAttestations(att []ethpb.Att) error {
|
||||
return c.forkchoiceAtts.SaveMany(att)
|
||||
}
|
||||
|
||||
// ForkchoiceAttestations returns all forkchoice attestations.
|
||||
func (c *AttestationCache) ForkchoiceAttestations() []ethpb.Att {
|
||||
return c.forkchoiceAtts.GetAll()
|
||||
}
|
||||
|
||||
// DeleteForkchoiceAttestation deletes a forkchoice attestation.
|
||||
func (c *AttestationCache) DeleteForkchoiceAttestation(att ethpb.Att) error {
|
||||
return c.forkchoiceAtts.Delete(att)
|
||||
}
|
||||
|
||||
// GetBySlotAndCommitteeIndex returns all attestations in the cache that match the provided slot
|
||||
// and committee index. Forkchoice attestations are not returned.
|
||||
//
|
||||
// NOTE: This function cannot be declared as a method on the AttestationCache because it is a generic function.
|
||||
func GetBySlotAndCommitteeIndex[T ethpb.Att](c *AttestationCache, slot primitives.Slot, committeeIndex primitives.CommitteeIndex) []T {
|
||||
c.RLock()
|
||||
defer c.RUnlock()
|
||||
|
||||
var result []T
|
||||
|
||||
for _, group := range c.atts {
|
||||
if len(group.atts) > 0 {
|
||||
// We can safely compare the first attestation because all attestations in a group
|
||||
// must have the same slot and committee index, since they are under the same key.
|
||||
a, ok := group.atts[0].(T)
|
||||
if ok && a.GetData().Slot == slot && a.CommitteeBitsVal().BitAt(uint64(committeeIndex)) {
|
||||
for _, a := range group.atts {
|
||||
a, ok := a.(T)
|
||||
if ok {
|
||||
result = append(result, a)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
func aggregateSig(agg ethpb.Att, att ethpb.Att) ([]byte, error) {
|
||||
aggSig, err := bls.SignatureFromBytesNoValidation(agg.GetSignature())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
attSig, err := bls.SignatureFromBytesNoValidation(att.GetSignature())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return bls.AggregateSignatures([]bls.Signature{aggSig, attSig}).Marshal(), nil
|
||||
}
|
||||
14
beacon-chain/cache/attestation_data.go
vendored
14
beacon-chain/cache/attestation_data.go
vendored
@@ -15,24 +15,24 @@ type AttestationConsensusData struct {
|
||||
Source forkchoicetypes.Checkpoint
|
||||
}
|
||||
|
||||
// AttestationCache stores cached results of AttestationData requests.
|
||||
type AttestationCache struct {
|
||||
// AttestationDataCache stores cached results of AttestationData requests.
|
||||
type AttestationDataCache struct {
|
||||
a *AttestationConsensusData
|
||||
sync.RWMutex
|
||||
}
|
||||
|
||||
// NewAttestationCache creates a new instance of AttestationCache.
|
||||
func NewAttestationCache() *AttestationCache {
|
||||
return &AttestationCache{}
|
||||
// NewAttestationDataCache creates a new instance of AttestationDataCache.
|
||||
func NewAttestationDataCache() *AttestationDataCache {
|
||||
return &AttestationDataCache{}
|
||||
}
|
||||
|
||||
// Get retrieves cached attestation data, recording a cache hit or miss. This method is lock free.
|
||||
func (c *AttestationCache) Get() *AttestationConsensusData {
|
||||
func (c *AttestationDataCache) Get() *AttestationConsensusData {
|
||||
return c.a
|
||||
}
|
||||
|
||||
// Put adds a response to the cache. This method is lock free.
|
||||
func (c *AttestationCache) Put(a *AttestationConsensusData) error {
|
||||
func (c *AttestationDataCache) Put(a *AttestationConsensusData) error {
|
||||
if a == nil {
|
||||
return errors.New("attestation cannot be nil")
|
||||
}
|
||||
|
||||
2
beacon-chain/cache/attestation_data_test.go
vendored
2
beacon-chain/cache/attestation_data_test.go
vendored
@@ -9,7 +9,7 @@ import (
|
||||
)
|
||||
|
||||
func TestAttestationCache_RoundTrip(t *testing.T) {
|
||||
c := cache.NewAttestationCache()
|
||||
c := cache.NewAttestationDataCache()
|
||||
|
||||
a := c.Get()
|
||||
require.Nil(t, a)
|
||||
|
||||
353
beacon-chain/cache/attestation_test.go
vendored
Normal file
353
beacon-chain/cache/attestation_test.go
vendored
Normal file
@@ -0,0 +1,353 @@
|
||||
package cache
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/prysmaticlabs/go-bitfield"
|
||||
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
|
||||
"github.com/prysmaticlabs/prysm/v5/crypto/bls/blst"
|
||||
"github.com/prysmaticlabs/prysm/v5/encoding/bytesutil"
|
||||
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
|
||||
"github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1/attestation"
|
||||
"github.com/prysmaticlabs/prysm/v5/testing/assert"
|
||||
"github.com/prysmaticlabs/prysm/v5/testing/require"
|
||||
)
|
||||
|
||||
func TestAdd(t *testing.T) {
|
||||
k, err := blst.RandKey()
|
||||
require.NoError(t, err)
|
||||
sig := k.Sign([]byte{'X'})
|
||||
|
||||
t.Run("new ID", func(t *testing.T) {
|
||||
t.Run("first ID ever", func(t *testing.T) {
|
||||
c := NewAttestationCache()
|
||||
ab := bitfield.NewBitlist(8)
|
||||
ab.SetBitAt(0, true)
|
||||
att := ðpb.Attestation{
|
||||
Data: ðpb.AttestationData{Slot: 123, BeaconBlockRoot: make([]byte, 32), Source: ðpb.Checkpoint{Root: make([]byte, 32)}, Target: ðpb.Checkpoint{Root: make([]byte, 32)}},
|
||||
AggregationBits: ab,
|
||||
Signature: sig.Marshal(),
|
||||
}
|
||||
id, err := attestation.NewId(att, attestation.Data)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, c.Add(att))
|
||||
|
||||
require.Equal(t, 1, len(c.atts))
|
||||
group, ok := c.atts[id]
|
||||
require.Equal(t, true, ok)
|
||||
assert.Equal(t, primitives.Slot(123), group.slot)
|
||||
require.Equal(t, 1, len(group.atts))
|
||||
assert.DeepEqual(t, group.atts[0], att)
|
||||
})
|
||||
t.Run("other ID exists", func(t *testing.T) {
|
||||
c := NewAttestationCache()
|
||||
ab := bitfield.NewBitlist(8)
|
||||
ab.SetBitAt(0, true)
|
||||
existingAtt := ðpb.Attestation{
|
||||
Data: ðpb.AttestationData{BeaconBlockRoot: make([]byte, 32), Source: ðpb.Checkpoint{Root: make([]byte, 32)}, Target: ðpb.Checkpoint{Root: make([]byte, 32)}},
|
||||
AggregationBits: ab,
|
||||
Signature: sig.Marshal(),
|
||||
}
|
||||
existingId, err := attestation.NewId(existingAtt, attestation.Data)
|
||||
require.NoError(t, err)
|
||||
c.atts[existingId] = &attGroup{slot: existingAtt.Data.Slot, atts: []ethpb.Att{existingAtt}}
|
||||
|
||||
att := ðpb.Attestation{
|
||||
Data: ðpb.AttestationData{Slot: 123, BeaconBlockRoot: make([]byte, 32), Source: ðpb.Checkpoint{Root: make([]byte, 32)}, Target: ðpb.Checkpoint{Root: make([]byte, 32)}},
|
||||
AggregationBits: ab,
|
||||
Signature: sig.Marshal(),
|
||||
}
|
||||
id, err := attestation.NewId(att, attestation.Data)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, c.Add(att))
|
||||
|
||||
require.Equal(t, 2, len(c.atts))
|
||||
group, ok := c.atts[id]
|
||||
require.Equal(t, true, ok)
|
||||
assert.Equal(t, primitives.Slot(123), group.slot)
|
||||
require.Equal(t, 1, len(group.atts))
|
||||
assert.DeepEqual(t, group.atts[0], att)
|
||||
})
|
||||
})
|
||||
t.Run("aggregated", func(t *testing.T) {
|
||||
c := NewAttestationCache()
|
||||
existingAtt := ðpb.Attestation{
|
||||
Data: ðpb.AttestationData{Slot: 123, BeaconBlockRoot: make([]byte, 32), Source: ðpb.Checkpoint{Root: make([]byte, 32)}, Target: ðpb.Checkpoint{Root: make([]byte, 32)}},
|
||||
AggregationBits: bitfield.NewBitlist(8),
|
||||
Signature: sig.Marshal(),
|
||||
}
|
||||
id, err := attestation.NewId(existingAtt, attestation.Data)
|
||||
require.NoError(t, err)
|
||||
c.atts[id] = &attGroup{slot: existingAtt.Data.Slot, atts: []ethpb.Att{existingAtt}}
|
||||
|
||||
att := ðpb.Attestation{
|
||||
Data: ðpb.AttestationData{Slot: 123, BeaconBlockRoot: make([]byte, 32), Source: ðpb.Checkpoint{Root: make([]byte, 32)}, Target: ðpb.Checkpoint{Root: make([]byte, 32)}},
|
||||
AggregationBits: bitfield.NewBitlist(8),
|
||||
Signature: sig.Marshal(),
|
||||
}
|
||||
att.AggregationBits.SetBitAt(0, true)
|
||||
att.AggregationBits.SetBitAt(1, true)
|
||||
require.NoError(t, c.Add(att))
|
||||
|
||||
require.Equal(t, 1, len(c.atts))
|
||||
group, ok := c.atts[id]
|
||||
require.Equal(t, true, ok)
|
||||
assert.Equal(t, primitives.Slot(123), group.slot)
|
||||
require.Equal(t, 2, len(group.atts))
|
||||
assert.DeepEqual(t, group.atts[0], existingAtt)
|
||||
assert.DeepEqual(t, group.atts[1], att)
|
||||
})
|
||||
t.Run("unaggregated - existing bit", func(t *testing.T) {
|
||||
c := NewAttestationCache()
|
||||
existingAtt := ðpb.Attestation{
|
||||
Data: ðpb.AttestationData{Slot: 123, BeaconBlockRoot: make([]byte, 32), Source: ðpb.Checkpoint{Root: make([]byte, 32)}, Target: ðpb.Checkpoint{Root: make([]byte, 32)}},
|
||||
AggregationBits: bitfield.NewBitlist(8),
|
||||
Signature: sig.Marshal(),
|
||||
}
|
||||
existingAtt.AggregationBits.SetBitAt(0, true)
|
||||
id, err := attestation.NewId(existingAtt, attestation.Data)
|
||||
require.NoError(t, err)
|
||||
c.atts[id] = &attGroup{slot: existingAtt.Data.Slot, atts: []ethpb.Att{existingAtt}}
|
||||
|
||||
att := ðpb.Attestation{
|
||||
Data: ðpb.AttestationData{Slot: 123, BeaconBlockRoot: make([]byte, 32), Source: ðpb.Checkpoint{Root: make([]byte, 32)}, Target: ðpb.Checkpoint{Root: make([]byte, 32)}},
|
||||
AggregationBits: bitfield.NewBitlist(8),
|
||||
Signature: sig.Marshal(),
|
||||
}
|
||||
att.AggregationBits.SetBitAt(0, true)
|
||||
require.NoError(t, c.Add(att))
|
||||
|
||||
require.Equal(t, 1, len(c.atts))
|
||||
group, ok := c.atts[id]
|
||||
require.Equal(t, true, ok)
|
||||
assert.Equal(t, primitives.Slot(123), group.slot)
|
||||
require.Equal(t, 1, len(group.atts))
|
||||
assert.DeepEqual(t, []int{0}, group.atts[0].GetAggregationBits().BitIndices())
|
||||
})
|
||||
t.Run("unaggregated - new bit", func(t *testing.T) {
|
||||
c := NewAttestationCache()
|
||||
existingAtt := ðpb.Attestation{
|
||||
Data: ðpb.AttestationData{Slot: 123, BeaconBlockRoot: make([]byte, 32), Source: ðpb.Checkpoint{Root: make([]byte, 32)}, Target: ðpb.Checkpoint{Root: make([]byte, 32)}},
|
||||
AggregationBits: bitfield.NewBitlist(8),
|
||||
Signature: sig.Marshal(),
|
||||
}
|
||||
existingAtt.AggregationBits.SetBitAt(0, true)
|
||||
id, err := attestation.NewId(existingAtt, attestation.Data)
|
||||
require.NoError(t, err)
|
||||
c.atts[id] = &attGroup{slot: existingAtt.Data.Slot, atts: []ethpb.Att{existingAtt}}
|
||||
|
||||
att := ðpb.Attestation{
|
||||
Data: ðpb.AttestationData{Slot: 123, BeaconBlockRoot: make([]byte, 32), Source: ðpb.Checkpoint{Root: make([]byte, 32)}, Target: ðpb.Checkpoint{Root: make([]byte, 32)}},
|
||||
AggregationBits: bitfield.NewBitlist(8),
|
||||
Signature: sig.Marshal(),
|
||||
}
|
||||
att.AggregationBits.SetBitAt(1, true)
|
||||
require.NoError(t, c.Add(att))
|
||||
|
||||
require.Equal(t, 1, len(c.atts))
|
||||
group, ok := c.atts[id]
|
||||
require.Equal(t, true, ok)
|
||||
assert.Equal(t, primitives.Slot(123), group.slot)
|
||||
require.Equal(t, 1, len(group.atts))
|
||||
assert.DeepEqual(t, []int{0, 1}, group.atts[0].GetAggregationBits().BitIndices())
|
||||
})
|
||||
}
|
||||
|
||||
func TestGetAll(t *testing.T) {
|
||||
c := NewAttestationCache()
|
||||
c.atts[bytesutil.ToBytes32([]byte("id1"))] = &attGroup{atts: []ethpb.Att{ðpb.Attestation{}, ðpb.Attestation{}}}
|
||||
c.atts[bytesutil.ToBytes32([]byte("id2"))] = &attGroup{atts: []ethpb.Att{ðpb.Attestation{}}}
|
||||
|
||||
assert.Equal(t, 3, len(c.GetAll()))
|
||||
}
|
||||
|
||||
func TestCount(t *testing.T) {
|
||||
c := NewAttestationCache()
|
||||
c.atts[bytesutil.ToBytes32([]byte("id1"))] = &attGroup{atts: []ethpb.Att{ðpb.Attestation{}, ðpb.Attestation{}}}
|
||||
c.atts[bytesutil.ToBytes32([]byte("id2"))] = &attGroup{atts: []ethpb.Att{ðpb.Attestation{}}}
|
||||
|
||||
assert.Equal(t, 3, c.Count())
|
||||
}
|
||||
|
||||
func TestDeleteCovered(t *testing.T) {
|
||||
k, err := blst.RandKey()
|
||||
require.NoError(t, err)
|
||||
sig := k.Sign([]byte{'X'})
|
||||
|
||||
att1 := ðpb.Attestation{
|
||||
Data: ðpb.AttestationData{Slot: 123, BeaconBlockRoot: make([]byte, 32), Source: ðpb.Checkpoint{Root: make([]byte, 32)}, Target: ðpb.Checkpoint{Root: make([]byte, 32)}},
|
||||
AggregationBits: bitfield.NewBitlist(8),
|
||||
Signature: sig.Marshal(),
|
||||
}
|
||||
att1.AggregationBits.SetBitAt(0, true)
|
||||
|
||||
att2 := ðpb.Attestation{
|
||||
Data: ðpb.AttestationData{Slot: 123, BeaconBlockRoot: make([]byte, 32), Source: ðpb.Checkpoint{Root: make([]byte, 32)}, Target: ðpb.Checkpoint{Root: make([]byte, 32)}},
|
||||
AggregationBits: bitfield.NewBitlist(8),
|
||||
Signature: sig.Marshal(),
|
||||
}
|
||||
att2.AggregationBits.SetBitAt(1, true)
|
||||
att2.AggregationBits.SetBitAt(2, true)
|
||||
|
||||
att3 := ðpb.Attestation{
|
||||
Data: ðpb.AttestationData{Slot: 123, BeaconBlockRoot: make([]byte, 32), Source: ðpb.Checkpoint{Root: make([]byte, 32)}, Target: ðpb.Checkpoint{Root: make([]byte, 32)}},
|
||||
AggregationBits: bitfield.NewBitlist(8),
|
||||
Signature: sig.Marshal(),
|
||||
}
|
||||
att3.AggregationBits.SetBitAt(1, true)
|
||||
att3.AggregationBits.SetBitAt(3, true)
|
||||
att3.AggregationBits.SetBitAt(4, true)
|
||||
|
||||
c := NewAttestationCache()
|
||||
id, err := attestation.NewId(att1, attestation.Data)
|
||||
require.NoError(t, err)
|
||||
c.atts[id] = &attGroup{slot: att1.Data.Slot, atts: []ethpb.Att{att1, att2, att3}}
|
||||
|
||||
t.Run("no matching group", func(t *testing.T) {
|
||||
att := ðpb.Attestation{
|
||||
Data: ðpb.AttestationData{Slot: 456, BeaconBlockRoot: make([]byte, 32), Source: ðpb.Checkpoint{Root: make([]byte, 32)}, Target: ðpb.Checkpoint{Root: make([]byte, 32)}},
|
||||
AggregationBits: bitfield.NewBitlist(8),
|
||||
Signature: sig.Marshal(),
|
||||
}
|
||||
att.AggregationBits.SetBitAt(0, true)
|
||||
att.AggregationBits.SetBitAt(1, true)
|
||||
att.AggregationBits.SetBitAt(2, true)
|
||||
att.AggregationBits.SetBitAt(3, true)
|
||||
att.AggregationBits.SetBitAt(4, true)
|
||||
require.NoError(t, c.DeleteCovered(att))
|
||||
|
||||
assert.Equal(t, 3, len(c.atts[id].atts))
|
||||
})
|
||||
t.Run("covered atts deleted", func(t *testing.T) {
|
||||
att := ðpb.Attestation{
|
||||
Data: ðpb.AttestationData{Slot: 123, BeaconBlockRoot: make([]byte, 32), Source: ðpb.Checkpoint{Root: make([]byte, 32)}, Target: ðpb.Checkpoint{Root: make([]byte, 32)}},
|
||||
AggregationBits: bitfield.NewBitlist(8),
|
||||
Signature: sig.Marshal(),
|
||||
}
|
||||
att.AggregationBits.SetBitAt(0, true)
|
||||
att.AggregationBits.SetBitAt(1, true)
|
||||
att.AggregationBits.SetBitAt(3, true)
|
||||
att.AggregationBits.SetBitAt(4, true)
|
||||
require.NoError(t, c.DeleteCovered(att))
|
||||
|
||||
atts := c.atts[id].atts
|
||||
require.Equal(t, 1, len(atts))
|
||||
assert.DeepEqual(t, att2, atts[0])
|
||||
})
|
||||
t.Run("last att in group deleted", func(t *testing.T) {
|
||||
att := ðpb.Attestation{
|
||||
Data: ðpb.AttestationData{Slot: 123, BeaconBlockRoot: make([]byte, 32), Source: ðpb.Checkpoint{Root: make([]byte, 32)}, Target: ðpb.Checkpoint{Root: make([]byte, 32)}},
|
||||
AggregationBits: bitfield.NewBitlist(8),
|
||||
Signature: sig.Marshal(),
|
||||
}
|
||||
att.AggregationBits.SetBitAt(0, true)
|
||||
att.AggregationBits.SetBitAt(1, true)
|
||||
att.AggregationBits.SetBitAt(2, true)
|
||||
att.AggregationBits.SetBitAt(3, true)
|
||||
att.AggregationBits.SetBitAt(4, true)
|
||||
require.NoError(t, c.DeleteCovered(att))
|
||||
|
||||
assert.Equal(t, 0, len(c.atts))
|
||||
})
|
||||
}
|
||||
|
||||
func TestPruneBefore(t *testing.T) {
|
||||
c := NewAttestationCache()
|
||||
c.atts[bytesutil.ToBytes32([]byte("id1"))] = &attGroup{slot: 1, atts: []ethpb.Att{ðpb.Attestation{}, ðpb.Attestation{}}}
|
||||
c.atts[bytesutil.ToBytes32([]byte("id2"))] = &attGroup{slot: 3, atts: []ethpb.Att{ðpb.Attestation{}}}
|
||||
c.atts[bytesutil.ToBytes32([]byte("id3"))] = &attGroup{slot: 2, atts: []ethpb.Att{ðpb.Attestation{}}}
|
||||
|
||||
count := c.PruneBefore(3)
|
||||
|
||||
require.Equal(t, 1, len(c.atts))
|
||||
_, ok := c.atts[bytesutil.ToBytes32([]byte("id2"))]
|
||||
assert.Equal(t, true, ok)
|
||||
assert.Equal(t, uint64(3), count)
|
||||
}
|
||||
|
||||
func TestAggregateIsRedundant(t *testing.T) {
|
||||
k, err := blst.RandKey()
|
||||
require.NoError(t, err)
|
||||
sig := k.Sign([]byte{'X'})
|
||||
|
||||
c := NewAttestationCache()
|
||||
existingAtt := ðpb.Attestation{
|
||||
Data: ðpb.AttestationData{Slot: 123, BeaconBlockRoot: make([]byte, 32), Source: ðpb.Checkpoint{Root: make([]byte, 32)}, Target: ðpb.Checkpoint{Root: make([]byte, 32)}},
|
||||
AggregationBits: bitfield.NewBitlist(8),
|
||||
Signature: sig.Marshal(),
|
||||
}
|
||||
existingAtt.AggregationBits.SetBitAt(0, true)
|
||||
existingAtt.AggregationBits.SetBitAt(1, true)
|
||||
id, err := attestation.NewId(existingAtt, attestation.Data)
|
||||
require.NoError(t, err)
|
||||
c.atts[id] = &attGroup{slot: existingAtt.Data.Slot, atts: []ethpb.Att{existingAtt}}
|
||||
|
||||
t.Run("no matching group", func(t *testing.T) {
|
||||
att := ðpb.Attestation{
|
||||
Data: ðpb.AttestationData{Slot: 456, BeaconBlockRoot: make([]byte, 32), Source: ðpb.Checkpoint{Root: make([]byte, 32)}, Target: ðpb.Checkpoint{Root: make([]byte, 32)}},
|
||||
AggregationBits: bitfield.NewBitlist(8),
|
||||
Signature: sig.Marshal(),
|
||||
}
|
||||
att.AggregationBits.SetBitAt(0, true)
|
||||
|
||||
redundant, err := c.AggregateIsRedundant(att)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, false, redundant)
|
||||
})
|
||||
t.Run("redundant", func(t *testing.T) {
|
||||
att := ðpb.Attestation{
|
||||
Data: ðpb.AttestationData{Slot: existingAtt.Data.Slot, BeaconBlockRoot: make([]byte, 32), Source: ðpb.Checkpoint{Root: make([]byte, 32)}, Target: ðpb.Checkpoint{Root: make([]byte, 32)}},
|
||||
AggregationBits: bitfield.NewBitlist(8),
|
||||
Signature: sig.Marshal(),
|
||||
}
|
||||
att.AggregationBits.SetBitAt(0, true)
|
||||
|
||||
redundant, err := c.AggregateIsRedundant(att)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, true, redundant)
|
||||
})
|
||||
t.Run("not redundant", func(t *testing.T) {
|
||||
t.Run("strictly better", func(t *testing.T) {
|
||||
att := ðpb.Attestation{
|
||||
Data: ðpb.AttestationData{Slot: existingAtt.Data.Slot, BeaconBlockRoot: make([]byte, 32), Source: ðpb.Checkpoint{Root: make([]byte, 32)}, Target: ðpb.Checkpoint{Root: make([]byte, 32)}},
|
||||
AggregationBits: bitfield.NewBitlist(8),
|
||||
Signature: sig.Marshal(),
|
||||
}
|
||||
att.AggregationBits.SetBitAt(0, true)
|
||||
att.AggregationBits.SetBitAt(1, true)
|
||||
att.AggregationBits.SetBitAt(2, true)
|
||||
|
||||
redundant, err := c.AggregateIsRedundant(att)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, false, redundant)
|
||||
})
|
||||
t.Run("overlapping and new bits", func(t *testing.T) {
|
||||
att := ðpb.Attestation{
|
||||
Data: ðpb.AttestationData{Slot: existingAtt.Data.Slot, BeaconBlockRoot: make([]byte, 32), Source: ðpb.Checkpoint{Root: make([]byte, 32)}, Target: ðpb.Checkpoint{Root: make([]byte, 32)}},
|
||||
AggregationBits: bitfield.NewBitlist(8),
|
||||
Signature: sig.Marshal(),
|
||||
}
|
||||
att.AggregationBits.SetBitAt(0, true)
|
||||
att.AggregationBits.SetBitAt(2, true)
|
||||
|
||||
redundant, err := c.AggregateIsRedundant(att)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, false, redundant)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func TestGetBySlotAndCommitteeIndex(t *testing.T) {
|
||||
c := NewAttestationCache()
|
||||
c.atts[bytesutil.ToBytes32([]byte("id1"))] = &attGroup{slot: 1, atts: []ethpb.Att{ðpb.Attestation{Data: ðpb.AttestationData{Slot: 1, CommitteeIndex: 1}}, ðpb.Attestation{Data: ðpb.AttestationData{Slot: 1, CommitteeIndex: 1}}}}
|
||||
c.atts[bytesutil.ToBytes32([]byte("id2"))] = &attGroup{slot: 2, atts: []ethpb.Att{ðpb.Attestation{Data: ðpb.AttestationData{Slot: 2, CommitteeIndex: 2}}}}
|
||||
c.atts[bytesutil.ToBytes32([]byte("id3"))] = &attGroup{slot: 1, atts: []ethpb.Att{ðpb.Attestation{Data: ðpb.AttestationData{Slot: 2, CommitteeIndex: 2}}}}
|
||||
|
||||
// committeeIndex has to be small enough to fit in the bitvector
|
||||
atts := GetBySlotAndCommitteeIndex[*ethpb.Attestation](c, 1, 1)
|
||||
require.Equal(t, 2, len(atts))
|
||||
assert.Equal(t, primitives.Slot(1), atts[0].Data.Slot)
|
||||
assert.Equal(t, primitives.Slot(1), atts[1].Data.Slot)
|
||||
assert.Equal(t, primitives.CommitteeIndex(1), atts[0].Data.CommitteeIndex)
|
||||
assert.Equal(t, primitives.CommitteeIndex(1), atts[1].Data.CommitteeIndex)
|
||||
}
|
||||
@@ -67,12 +67,6 @@ func IsAggregator(committeeCount uint64, slotSig []byte) (bool, error) {
|
||||
return binary.LittleEndian.Uint64(b[:8])%modulo == 0, nil
|
||||
}
|
||||
|
||||
// IsAggregated returns true if the attestation is an aggregated attestation,
|
||||
// false otherwise.
|
||||
func IsAggregated(attestation ethpb.Att) bool {
|
||||
return attestation.GetAggregationBits().Count() > 1
|
||||
}
|
||||
|
||||
// ComputeSubnetForAttestation returns the subnet for which the provided attestation will be broadcasted to.
|
||||
// This differs from the spec definition by instead passing in the active validators indices in the attestation's
|
||||
// given epoch.
|
||||
|
||||
@@ -93,6 +93,7 @@ type BeaconNode struct {
|
||||
stop chan struct{} // Channel to wait for termination notifications.
|
||||
db db.Database
|
||||
slasherDB db.SlasherDatabase
|
||||
attestationCache *cache.AttestationCache
|
||||
attestationPool attestations.Pool
|
||||
exitPool voluntaryexits.PoolManager
|
||||
slashingsPool slashings.PoolManager
|
||||
@@ -144,6 +145,7 @@ func New(cliCtx *cli.Context, cancel context.CancelFunc, opts ...Option) (*Beaco
|
||||
stateFeed: new(event.Feed),
|
||||
blockFeed: new(event.Feed),
|
||||
opFeed: new(event.Feed),
|
||||
attestationCache: cache.NewAttestationCache(),
|
||||
attestationPool: attestations.NewPool(),
|
||||
exitPool: voluntaryexits.NewPool(),
|
||||
slashingsPool: slashings.NewPool(),
|
||||
@@ -704,6 +706,7 @@ func (b *BeaconNode) fetchBuilderService() *builder.Service {
|
||||
|
||||
func (b *BeaconNode) registerAttestationPool() error {
|
||||
s, err := attestations.NewService(b.ctx, &attestations.Config{
|
||||
Cache: b.attestationCache,
|
||||
Pool: b.attestationPool,
|
||||
InitialSyncComplete: b.initialSyncComplete,
|
||||
})
|
||||
@@ -732,6 +735,7 @@ func (b *BeaconNode) registerBlockchainService(fc forkchoice.ForkChoicer, gs *st
|
||||
blockchain.WithDepositCache(b.depositCache),
|
||||
blockchain.WithChainStartFetcher(web3Service),
|
||||
blockchain.WithExecutionEngineCaller(web3Service),
|
||||
blockchain.WithAttestationCache(b.attestationCache),
|
||||
blockchain.WithAttestationPool(b.attestationPool),
|
||||
blockchain.WithExitPool(b.exitPool),
|
||||
blockchain.WithSlashingPool(b.slashingsPool),
|
||||
@@ -816,6 +820,7 @@ func (b *BeaconNode) registerSyncService(initialSyncComplete chan struct{}, bFil
|
||||
regularsync.WithBlockNotifier(b),
|
||||
regularsync.WithAttestationNotifier(b),
|
||||
regularsync.WithOperationNotifier(b),
|
||||
regularsync.WithAttestationCache(b.attestationCache),
|
||||
regularsync.WithAttestationPool(b.attestationPool),
|
||||
regularsync.WithExitPool(b.exitPool),
|
||||
regularsync.WithSlashingPool(b.slashingsPool),
|
||||
@@ -952,6 +957,7 @@ func (b *BeaconNode) registerRPCService(router *http.ServeMux) error {
|
||||
GenesisTimeFetcher: chainService,
|
||||
GenesisFetcher: chainService,
|
||||
OptimisticModeFetcher: chainService,
|
||||
AttestationCache: b.attestationCache,
|
||||
AttestationsPool: b.attestationPool,
|
||||
ExitPool: b.exitPool,
|
||||
SlashingsPool: b.slashingsPool,
|
||||
|
||||
@@ -16,6 +16,7 @@ go_library(
|
||||
"//testing/spectest:__subpackages__",
|
||||
],
|
||||
deps = [
|
||||
"//beacon-chain/cache:go_default_library",
|
||||
"//beacon-chain/operations/attestations/kv:go_default_library",
|
||||
"//cache/lru:go_default_library",
|
||||
"//config/features:go_default_library",
|
||||
|
||||
13
beacon-chain/operations/attestations/attmap/BUILD.bazel
Normal file
13
beacon-chain/operations/attestations/attmap/BUILD.bazel
Normal file
@@ -0,0 +1,13 @@
|
||||
load("@prysm//tools/go:def.bzl", "go_library")
|
||||
|
||||
go_library(
|
||||
name = "go_default_library",
|
||||
srcs = ["map.go"],
|
||||
importpath = "github.com/prysmaticlabs/prysm/v5/beacon-chain/operations/attestations/attmap",
|
||||
visibility = ["//visibility:public"],
|
||||
deps = [
|
||||
"//proto/prysm/v1alpha1:go_default_library",
|
||||
"//proto/prysm/v1alpha1/attestation:go_default_library",
|
||||
"@com_github_pkg_errors//:go_default_library",
|
||||
],
|
||||
)
|
||||
89
beacon-chain/operations/attestations/attmap/map.go
Normal file
89
beacon-chain/operations/attestations/attmap/map.go
Normal file
@@ -0,0 +1,89 @@
|
||||
package attmap
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
|
||||
"github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1/attestation"
|
||||
)
|
||||
|
||||
// Attestations --
|
||||
type Attestations struct {
|
||||
atts map[attestation.Id]ethpb.Att
|
||||
sync.RWMutex
|
||||
}
|
||||
|
||||
// New creates a new instance of the map.
|
||||
func New() *Attestations {
|
||||
return &Attestations{atts: make(map[attestation.Id]ethpb.Att)}
|
||||
}
|
||||
|
||||
// Save stores an attestation in the map.
|
||||
func (a *Attestations) Save(att ethpb.Att) error {
|
||||
if att == nil || att.IsNil() {
|
||||
return nil
|
||||
}
|
||||
|
||||
id, err := attestation.NewId(att, attestation.Full)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "could not create attestation ID")
|
||||
}
|
||||
|
||||
a.Lock()
|
||||
defer a.Unlock()
|
||||
a.atts[id] = att
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// SaveMany stores multiple attestation in the map.
|
||||
func (a *Attestations) SaveMany(atts []ethpb.Att) error {
|
||||
for _, att := range atts {
|
||||
if err := a.Save(att); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetAll retrieves all attestations that are in the map.
|
||||
func (a *Attestations) GetAll() []ethpb.Att {
|
||||
a.RLock()
|
||||
defer a.RUnlock()
|
||||
|
||||
atts := make([]ethpb.Att, len(a.atts))
|
||||
i := 0
|
||||
for _, att := range a.atts {
|
||||
atts[i] = att.Clone()
|
||||
i++
|
||||
}
|
||||
|
||||
return atts
|
||||
}
|
||||
|
||||
// Delete removes an attestation from the map.
|
||||
func (a *Attestations) Delete(att ethpb.Att) error {
|
||||
if att == nil || att.IsNil() {
|
||||
return nil
|
||||
}
|
||||
|
||||
id, err := attestation.NewId(att, attestation.Full)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "could not create attestation ID")
|
||||
}
|
||||
|
||||
a.Lock()
|
||||
defer a.Unlock()
|
||||
delete(a.atts, id)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Count returns the number of attestations in the map.
|
||||
func (a *Attestations) Count() int {
|
||||
a.RLock()
|
||||
defer a.RUnlock()
|
||||
return len(a.atts)
|
||||
}
|
||||
@@ -5,7 +5,6 @@ go_library(
|
||||
srcs = [
|
||||
"aggregated.go",
|
||||
"block.go",
|
||||
"forkchoice.go",
|
||||
"kv.go",
|
||||
"seen_bits.go",
|
||||
"unaggregated.go",
|
||||
@@ -14,6 +13,7 @@ go_library(
|
||||
visibility = ["//beacon-chain:__subpackages__"],
|
||||
deps = [
|
||||
"//beacon-chain/core/helpers:go_default_library",
|
||||
"//beacon-chain/operations/attestations/attmap:go_default_library",
|
||||
"//config/params:go_default_library",
|
||||
"//consensus-types/primitives:go_default_library",
|
||||
"//monitoring/tracing/trace:go_default_library",
|
||||
|
||||
@@ -88,7 +88,7 @@ func (c *AttCaches) aggregateParallel(atts map[attestation.Id][]ethpb.Att, leftO
|
||||
log.Error("nil aggregated attestation")
|
||||
continue
|
||||
}
|
||||
if helpers.IsAggregated(aggregated) {
|
||||
if aggregated.IsAggregated() {
|
||||
if err := c.SaveAggregatedAttestations([]ethpb.Att{aggregated}); err != nil {
|
||||
log.WithError(err).Error("could not save aggregated attestation")
|
||||
continue
|
||||
@@ -122,7 +122,7 @@ func (c *AttCaches) SaveAggregatedAttestation(att ethpb.Att) error {
|
||||
if err := helpers.ValidateNilAttestation(att); err != nil {
|
||||
return err
|
||||
}
|
||||
if !helpers.IsAggregated(att) {
|
||||
if !att.IsAggregated() {
|
||||
return errors.New("attestation is not aggregated")
|
||||
}
|
||||
has, err := c.HasAggregatedAttestation(att)
|
||||
@@ -255,7 +255,7 @@ func (c *AttCaches) DeleteAggregatedAttestation(att ethpb.Att) error {
|
||||
if err := helpers.ValidateNilAttestation(att); err != nil {
|
||||
return err
|
||||
}
|
||||
if !helpers.IsAggregated(att) {
|
||||
if !att.IsAggregated() {
|
||||
return errors.New("attestation is not aggregated")
|
||||
}
|
||||
|
||||
|
||||
@@ -1,74 +0,0 @@
|
||||
package kv
|
||||
|
||||
import (
|
||||
"github.com/pkg/errors"
|
||||
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
|
||||
"github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1/attestation"
|
||||
)
|
||||
|
||||
// SaveForkchoiceAttestation saves an forkchoice attestation in cache.
|
||||
func (c *AttCaches) SaveForkchoiceAttestation(att ethpb.Att) error {
|
||||
if att == nil || att.IsNil() {
|
||||
return nil
|
||||
}
|
||||
|
||||
id, err := attestation.NewId(att, attestation.Full)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "could not create attestation ID")
|
||||
}
|
||||
|
||||
c.forkchoiceAttLock.Lock()
|
||||
defer c.forkchoiceAttLock.Unlock()
|
||||
c.forkchoiceAtt[id] = att
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// SaveForkchoiceAttestations saves a list of forkchoice attestations in cache.
|
||||
func (c *AttCaches) SaveForkchoiceAttestations(atts []ethpb.Att) error {
|
||||
for _, att := range atts {
|
||||
if err := c.SaveForkchoiceAttestation(att); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// ForkchoiceAttestations returns the forkchoice attestations in cache.
|
||||
func (c *AttCaches) ForkchoiceAttestations() []ethpb.Att {
|
||||
c.forkchoiceAttLock.RLock()
|
||||
defer c.forkchoiceAttLock.RUnlock()
|
||||
|
||||
atts := make([]ethpb.Att, 0, len(c.forkchoiceAtt))
|
||||
for _, att := range c.forkchoiceAtt {
|
||||
atts = append(atts, att.Clone())
|
||||
}
|
||||
|
||||
return atts
|
||||
}
|
||||
|
||||
// DeleteForkchoiceAttestation deletes a forkchoice attestation in cache.
|
||||
func (c *AttCaches) DeleteForkchoiceAttestation(att ethpb.Att) error {
|
||||
if att == nil || att.IsNil() {
|
||||
return nil
|
||||
}
|
||||
|
||||
id, err := attestation.NewId(att, attestation.Full)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "could not create attestation ID")
|
||||
}
|
||||
|
||||
c.forkchoiceAttLock.Lock()
|
||||
defer c.forkchoiceAttLock.Unlock()
|
||||
delete(c.forkchoiceAtt, id)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// ForkchoiceAttestationCount returns the number of fork choice attestations key in the pool.
|
||||
func (c *AttCaches) ForkchoiceAttestationCount() int {
|
||||
c.forkchoiceAttLock.RLock()
|
||||
defer c.forkchoiceAttLock.RUnlock()
|
||||
return len(c.forkchoiceAtt)
|
||||
}
|
||||
@@ -20,7 +20,7 @@ func TestKV_Forkchoice_CanSaveRetrieve(t *testing.T) {
|
||||
atts := []ethpb.Att{att1, att2, att3}
|
||||
|
||||
for _, att := range atts {
|
||||
require.NoError(t, cache.SaveForkchoiceAttestation(att))
|
||||
require.NoError(t, cache.saveForkchoiceAttestation(att))
|
||||
}
|
||||
|
||||
returned := cache.ForkchoiceAttestations()
|
||||
@@ -41,7 +41,7 @@ func TestKV_Forkchoice_CanDelete(t *testing.T) {
|
||||
atts := []ethpb.Att{att1, att2, att3}
|
||||
|
||||
for _, att := range atts {
|
||||
require.NoError(t, cache.SaveForkchoiceAttestation(att))
|
||||
require.NoError(t, cache.saveForkchoiceAttestation(att))
|
||||
}
|
||||
|
||||
require.NoError(t, cache.DeleteForkchoiceAttestation(att1))
|
||||
@@ -61,7 +61,7 @@ func TestKV_Forkchoice_CanCount(t *testing.T) {
|
||||
atts := []*ethpb.Attestation{att1, att2, att3}
|
||||
|
||||
for _, att := range atts {
|
||||
require.NoError(t, cache.SaveForkchoiceAttestation(att))
|
||||
require.NoError(t, cache.saveForkchoiceAttestation(att))
|
||||
}
|
||||
|
||||
require.Equal(t, 3, cache.ForkchoiceAttestationCount())
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/patrickmn/go-cache"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/operations/attestations/attmap"
|
||||
"github.com/prysmaticlabs/prysm/v5/config/params"
|
||||
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
|
||||
"github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1/attestation"
|
||||
@@ -21,8 +22,7 @@ type AttCaches struct {
|
||||
aggregatedAtt map[attestation.Id][]ethpb.Att
|
||||
unAggregateAttLock sync.RWMutex
|
||||
unAggregatedAtt map[attestation.Id]ethpb.Att
|
||||
forkchoiceAttLock sync.RWMutex
|
||||
forkchoiceAtt map[attestation.Id]ethpb.Att
|
||||
forkchoiceAtt *attmap.Attestations
|
||||
blockAttLock sync.RWMutex
|
||||
blockAtt map[attestation.Id][]ethpb.Att
|
||||
seenAtt *cache.Cache
|
||||
@@ -36,10 +36,35 @@ func NewAttCaches() *AttCaches {
|
||||
pool := &AttCaches{
|
||||
unAggregatedAtt: make(map[attestation.Id]ethpb.Att),
|
||||
aggregatedAtt: make(map[attestation.Id][]ethpb.Att),
|
||||
forkchoiceAtt: make(map[attestation.Id]ethpb.Att),
|
||||
forkchoiceAtt: attmap.New(),
|
||||
blockAtt: make(map[attestation.Id][]ethpb.Att),
|
||||
seenAtt: c,
|
||||
}
|
||||
|
||||
return pool
|
||||
}
|
||||
|
||||
// saveForkchoiceAttestation saves a forkchoice attestation.
|
||||
func (c *AttCaches) saveForkchoiceAttestation(att ethpb.Att) error {
|
||||
return c.forkchoiceAtt.Save(att)
|
||||
}
|
||||
|
||||
// SaveForkchoiceAttestations saves forkchoice attestations.
|
||||
func (c *AttCaches) SaveForkchoiceAttestations(att []ethpb.Att) error {
|
||||
return c.forkchoiceAtt.SaveMany(att)
|
||||
}
|
||||
|
||||
// ForkchoiceAttestations returns all forkchoice attestations.
|
||||
func (c *AttCaches) ForkchoiceAttestations() []ethpb.Att {
|
||||
return c.forkchoiceAtt.GetAll()
|
||||
}
|
||||
|
||||
// DeleteForkchoiceAttestation deletes a forkchoice attestation.
|
||||
func (c *AttCaches) DeleteForkchoiceAttestation(att ethpb.Att) error {
|
||||
return c.forkchoiceAtt.Delete(att)
|
||||
}
|
||||
|
||||
// ForkchoiceAttestationCount returns the number of forkchoice attestation keys.
|
||||
func (c *AttCaches) ForkchoiceAttestationCount() int {
|
||||
return c.forkchoiceAtt.Count()
|
||||
}
|
||||
|
||||
@@ -4,7 +4,6 @@ import (
|
||||
"context"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/helpers"
|
||||
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
|
||||
"github.com/prysmaticlabs/prysm/v5/monitoring/tracing/trace"
|
||||
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
|
||||
@@ -17,7 +16,7 @@ func (c *AttCaches) SaveUnaggregatedAttestation(att ethpb.Att) error {
|
||||
if att == nil || att.IsNil() {
|
||||
return nil
|
||||
}
|
||||
if helpers.IsAggregated(att) {
|
||||
if att.IsAggregated() {
|
||||
return errors.New("attestation is aggregated")
|
||||
}
|
||||
|
||||
@@ -133,8 +132,7 @@ func (c *AttCaches) DeleteUnaggregatedAttestation(att ethpb.Att) error {
|
||||
if att == nil || att.IsNil() {
|
||||
return nil
|
||||
}
|
||||
|
||||
if helpers.IsAggregated(att) {
|
||||
if att.IsAggregated() {
|
||||
return errors.New("attestation is aggregated")
|
||||
}
|
||||
|
||||
@@ -162,7 +160,7 @@ func (c *AttCaches) DeleteSeenUnaggregatedAttestations() (int, error) {
|
||||
|
||||
count := 0
|
||||
for r, att := range c.unAggregatedAtt {
|
||||
if att == nil || att.IsNil() || helpers.IsAggregated(att) {
|
||||
if att == nil || att.IsNil() || att.IsAggregated() {
|
||||
continue
|
||||
}
|
||||
if seen, err := c.hasSeenBit(att); err == nil && seen {
|
||||
|
||||
@@ -30,6 +30,16 @@ var (
|
||||
Name: "expired_block_atts_total",
|
||||
Help: "The number of expired and deleted block attestations in the pool.",
|
||||
})
|
||||
attCount = promauto.NewGauge(
|
||||
prometheus.GaugeOpts{
|
||||
Name: "attestations_in_pool_total",
|
||||
Help: "The number of attestations in the pool.",
|
||||
},
|
||||
)
|
||||
expiredAtts = promauto.NewCounter(prometheus.CounterOpts{
|
||||
Name: "expired_atts_total",
|
||||
Help: "The number of expired and deleted attestations in the pool.",
|
||||
})
|
||||
batchForkChoiceAttsT1 = promauto.NewHistogram(
|
||||
prometheus.HistogramOpts{
|
||||
Name: "aggregate_attestations_t1",
|
||||
@@ -50,3 +60,8 @@ func (s *Service) updateMetrics() {
|
||||
aggregatedAttsCount.Set(float64(s.cfg.Pool.AggregatedAttestationCount()))
|
||||
unaggregatedAttsCount.Set(float64(s.cfg.Pool.UnaggregatedAttestationCount()))
|
||||
}
|
||||
|
||||
func (s *Service) updateMetricsExperimental(numExpired uint64) {
|
||||
attCount.Set(float64(s.cfg.Cache.Count()))
|
||||
expiredAtts.Add(float64(numExpired))
|
||||
}
|
||||
|
||||
@@ -37,7 +37,6 @@ type Pool interface {
|
||||
BlockAttestations() []ethpb.Att
|
||||
DeleteBlockAttestation(att ethpb.Att) error
|
||||
// For attestations to be passed to fork choice.
|
||||
SaveForkchoiceAttestation(att ethpb.Att) error
|
||||
SaveForkchoiceAttestations(atts []ethpb.Att) error
|
||||
ForkchoiceAttestations() []ethpb.Att
|
||||
DeleteForkchoiceAttestation(att ethpb.Att) error
|
||||
|
||||
@@ -61,11 +61,16 @@ func (s *Service) batchForkChoiceAtts(ctx context.Context) error {
|
||||
ctx, span := trace.StartSpan(ctx, "Operations.attestations.batchForkChoiceAtts")
|
||||
defer span.End()
|
||||
|
||||
if err := s.cfg.Pool.AggregateUnaggregatedAttestations(ctx); err != nil {
|
||||
return err
|
||||
var atts []ethpb.Att
|
||||
if features.Get().EnableExperimentalAttestationPool {
|
||||
atts = append(s.cfg.Cache.GetAll(), s.cfg.Cache.ForkchoiceAttestations()...)
|
||||
} else {
|
||||
if err := s.cfg.Pool.AggregateUnaggregatedAttestations(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
atts = append(s.cfg.Pool.AggregatedAttestations(), s.cfg.Pool.BlockAttestations()...)
|
||||
atts = append(atts, s.cfg.Pool.ForkchoiceAttestations()...)
|
||||
}
|
||||
atts := append(s.cfg.Pool.AggregatedAttestations(), s.cfg.Pool.BlockAttestations()...)
|
||||
atts = append(atts, s.cfg.Pool.ForkchoiceAttestations()...)
|
||||
|
||||
attsById := make(map[attestation.Id][]ethpb.Att, len(atts))
|
||||
|
||||
@@ -92,9 +97,11 @@ func (s *Service) batchForkChoiceAtts(ctx context.Context) error {
|
||||
}
|
||||
}
|
||||
|
||||
for _, a := range s.cfg.Pool.BlockAttestations() {
|
||||
if err := s.cfg.Pool.DeleteBlockAttestation(a); err != nil {
|
||||
return err
|
||||
if !features.Get().EnableExperimentalAttestationPool {
|
||||
for _, a := range s.cfg.Pool.BlockAttestations() {
|
||||
if err := s.cfg.Pool.DeleteBlockAttestation(a); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -9,8 +9,8 @@ import (
|
||||
"github.com/prysmaticlabs/prysm/v5/time/slots"
|
||||
)
|
||||
|
||||
// pruneAttsPool prunes attestations pool on every slot interval.
|
||||
func (s *Service) pruneAttsPool() {
|
||||
// pruneExpired prunes attestations pool on every slot interval.
|
||||
func (s *Service) pruneExpired() {
|
||||
ticker := time.NewTicker(s.cfg.pruneInterval)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
@@ -25,6 +25,27 @@ func (s *Service) pruneAttsPool() {
|
||||
}
|
||||
}
|
||||
|
||||
// pruneExpiredExperimental prunes attestations on every prune interval.
|
||||
func (s *Service) pruneExpiredExperimental() {
|
||||
ticker := time.NewTicker(s.cfg.pruneInterval)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
expirySlot, err := s.expirySlot()
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Could not get expiry slot")
|
||||
continue
|
||||
}
|
||||
numExpired := s.cfg.Cache.PruneBefore(expirySlot)
|
||||
s.updateMetricsExperimental(numExpired)
|
||||
case <-s.ctx.Done():
|
||||
log.Debug("Context closed, exiting routine")
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// This prunes expired attestations from the pool.
|
||||
func (s *Service) pruneExpiredAtts() {
|
||||
aggregatedAtts := s.cfg.Pool.AggregatedAttestations()
|
||||
@@ -84,3 +105,17 @@ func (s *Service) expiredPreDeneb(slot primitives.Slot) bool {
|
||||
currentTime := uint64(prysmTime.Now().Unix())
|
||||
return currentTime >= expirationTime
|
||||
}
|
||||
|
||||
// Attestations for a slot before the returned slot are considered expired.
|
||||
func (s *Service) expirySlot() (primitives.Slot, error) {
|
||||
currSlot := slots.CurrentSlot(s.genesisTime)
|
||||
currEpoch := slots.ToEpoch(currSlot)
|
||||
if currEpoch == 0 {
|
||||
return 0, nil
|
||||
}
|
||||
if currEpoch < params.BeaconConfig().DenebForkEpoch {
|
||||
// Safe to subtract because we exited early for epoch 0.
|
||||
return currSlot - 31, nil
|
||||
}
|
||||
return slots.EpochStart(currEpoch - 1)
|
||||
}
|
||||
|
||||
@@ -50,7 +50,7 @@ func TestPruneExpired_Ticker(t *testing.T) {
|
||||
// Rewind back one epoch worth of time.
|
||||
s.genesisTime = uint64(prysmTime.Now().Unix()) - uint64(params.BeaconConfig().SlotsPerEpoch.Mul(params.BeaconConfig().SecondsPerSlot))
|
||||
|
||||
go s.pruneAttsPool()
|
||||
go s.pruneExpired()
|
||||
|
||||
done := make(chan struct{}, 1)
|
||||
async.RunEvery(ctx, 500*time.Millisecond, func() {
|
||||
@@ -145,5 +145,4 @@ func TestPruneExpired_ExpiredDeneb(t *testing.T) {
|
||||
|
||||
assert.Equal(t, true, s.expired(secondEpochStart), "Should be expired")
|
||||
assert.Equal(t, false, s.expired(thirdEpochStart), "Should not be expired")
|
||||
|
||||
}
|
||||
|
||||
@@ -9,7 +9,9 @@ import (
|
||||
"time"
|
||||
|
||||
lru "github.com/hashicorp/golang-lru"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/cache"
|
||||
lruwrpr "github.com/prysmaticlabs/prysm/v5/cache/lru"
|
||||
"github.com/prysmaticlabs/prysm/v5/config/features"
|
||||
"github.com/prysmaticlabs/prysm/v5/config/params"
|
||||
)
|
||||
|
||||
@@ -27,12 +29,13 @@ type Service struct {
|
||||
|
||||
// Config options for the service.
|
||||
type Config struct {
|
||||
Cache *cache.AttestationCache
|
||||
Pool Pool
|
||||
pruneInterval time.Duration
|
||||
InitialSyncComplete chan struct{}
|
||||
}
|
||||
|
||||
// NewService instantiates a new attestation pool service instance that will
|
||||
// NewService instantiates a new attestation service instance that will
|
||||
// be registered into a running beacon node.
|
||||
func NewService(ctx context.Context, cfg *Config) (*Service, error) {
|
||||
cache := lruwrpr.New(forkChoiceProcessedAttsSize)
|
||||
@@ -58,7 +61,12 @@ func (s *Service) Start() {
|
||||
return
|
||||
}
|
||||
go s.prepareForkChoiceAtts()
|
||||
go s.pruneAttsPool()
|
||||
|
||||
if features.Get().EnableExperimentalAttestationPool {
|
||||
go s.pruneExpiredExperimental()
|
||||
} else {
|
||||
go s.pruneExpired()
|
||||
}
|
||||
}
|
||||
|
||||
// waitForSync waits until the beacon node is synced to the latest head.
|
||||
|
||||
@@ -21,7 +21,7 @@ type Service struct {
|
||||
Broadcaster p2p.Broadcaster
|
||||
SyncCommitteePool synccommittee.Pool
|
||||
OperationNotifier opfeed.Notifier
|
||||
AttestationCache *cache.AttestationCache
|
||||
AttestationCache *cache.AttestationDataCache
|
||||
StateGen stategen.StateManager
|
||||
P2P p2p.Broadcaster
|
||||
ReplayerBuilder stategen.ReplayerBuilder
|
||||
|
||||
@@ -205,6 +205,7 @@ func (s *Service) validatorEndpoints(
|
||||
TimeFetcher: s.cfg.GenesisTimeFetcher,
|
||||
SyncChecker: s.cfg.SyncService,
|
||||
OptimisticModeFetcher: s.cfg.OptimisticModeFetcher,
|
||||
AttestationCache: s.cfg.AttestationCache,
|
||||
AttestationsPool: s.cfg.AttestationsPool,
|
||||
PeerManager: s.cfg.PeerManager,
|
||||
Broadcaster: s.cfg.Broadcaster,
|
||||
@@ -508,6 +509,7 @@ func (s *Service) beaconEndpoints(
|
||||
server := &beacon.Server{
|
||||
CanonicalHistory: ch,
|
||||
BeaconDB: s.cfg.BeaconDB,
|
||||
AttestationCache: s.cfg.AttestationCache,
|
||||
AttestationsPool: s.cfg.AttestationsPool,
|
||||
SlashingsPool: s.cfg.SlashingsPool,
|
||||
ChainInfoFetcher: s.cfg.ChainInfoFetcher,
|
||||
|
||||
@@ -17,6 +17,7 @@ go_library(
|
||||
"//api/server:go_default_library",
|
||||
"//api/server/structs:go_default_library",
|
||||
"//beacon-chain/blockchain:go_default_library",
|
||||
"//beacon-chain/cache:go_default_library",
|
||||
"//beacon-chain/cache/depositsnapshot:go_default_library",
|
||||
"//beacon-chain/core/altair:go_default_library",
|
||||
"//beacon-chain/core/blocks:go_default_library",
|
||||
|
||||
@@ -49,13 +49,18 @@ func (s *Server) ListAttestations(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
attestations := s.AttestationsPool.AggregatedAttestations()
|
||||
unaggAtts, err := s.AttestationsPool.UnaggregatedAttestations()
|
||||
if err != nil {
|
||||
httputil.HandleError(w, "Could not get unaggregated attestations: "+err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
var attestations []eth.Att
|
||||
if features.Get().EnableExperimentalAttestationPool {
|
||||
attestations = s.AttestationCache.GetAll()
|
||||
} else {
|
||||
attestations = s.AttestationsPool.AggregatedAttestations()
|
||||
unaggAtts, err := s.AttestationsPool.UnaggregatedAttestations()
|
||||
if err != nil {
|
||||
httputil.HandleError(w, "Could not get unaggregated attestations: "+err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
attestations = append(attestations, unaggAtts...)
|
||||
}
|
||||
attestations = append(attestations, unaggAtts...)
|
||||
|
||||
filteredAtts := make([]*structs.Attestation, 0, len(attestations))
|
||||
for _, a := range attestations {
|
||||
@@ -102,13 +107,19 @@ func (s *Server) ListAttestationsV2(w http.ResponseWriter, r *http.Request) {
|
||||
if rawSlot == "" {
|
||||
v = slots.ToForkVersion(s.TimeFetcher.CurrentSlot())
|
||||
}
|
||||
attestations := s.AttestationsPool.AggregatedAttestations()
|
||||
unaggAtts, err := s.AttestationsPool.UnaggregatedAttestations()
|
||||
if err != nil {
|
||||
httputil.HandleError(w, "Could not get unaggregated attestations: "+err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
|
||||
var attestations []eth.Att
|
||||
if features.Get().EnableExperimentalAttestationPool {
|
||||
attestations = s.AttestationCache.GetAll()
|
||||
} else {
|
||||
attestations = s.AttestationsPool.AggregatedAttestations()
|
||||
unaggAtts, err := s.AttestationsPool.UnaggregatedAttestations()
|
||||
if err != nil {
|
||||
httputil.HandleError(w, "Could not get unaggregated attestations: "+err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
attestations = append(attestations, unaggAtts...)
|
||||
}
|
||||
attestations = append(attestations, unaggAtts...)
|
||||
|
||||
filteredAtts := make([]interface{}, 0, len(attestations))
|
||||
for _, att := range attestations {
|
||||
@@ -309,7 +320,7 @@ func (s *Server) handleAttestationsElectra(ctx context.Context, data json.RawMes
|
||||
// Broadcast the unaggregated attestation on a feed to notify other services in the beacon node
|
||||
// of a received unaggregated attestation.
|
||||
// Note we can't send for aggregated att because we don't have selection proof.
|
||||
if !corehelpers.IsAggregated(att) {
|
||||
if !att.IsAggregated() {
|
||||
s.OperationNotifier.OperationFeed().Send(&feed.Event{
|
||||
Type: operation.UnaggregatedAttReceived,
|
||||
Data: &operation.UnAggregatedAttReceivedData{
|
||||
@@ -335,7 +346,11 @@ func (s *Server) handleAttestationsElectra(ctx context.Context, data json.RawMes
|
||||
continue
|
||||
}
|
||||
|
||||
if corehelpers.IsAggregated(att) {
|
||||
if features.Get().EnableExperimentalAttestationPool {
|
||||
if err = s.AttestationCache.Add(att); err != nil {
|
||||
log.WithError(err).Error("could not save attestation")
|
||||
}
|
||||
} else if att.IsAggregated() {
|
||||
if err = s.AttestationsPool.SaveAggregatedAttestation(att); err != nil {
|
||||
log.WithError(err).Error("could not save aggregated attestation")
|
||||
}
|
||||
@@ -384,7 +399,7 @@ func (s *Server) handleAttestations(ctx context.Context, data json.RawMessage) (
|
||||
// Broadcast the unaggregated attestation on a feed to notify other services in the beacon node
|
||||
// of a received unaggregated attestation.
|
||||
// Note we can't send for aggregated att because we don't have selection proof.
|
||||
if !corehelpers.IsAggregated(att) {
|
||||
if !att.IsAggregated() {
|
||||
s.OperationNotifier.OperationFeed().Send(&feed.Event{
|
||||
Type: operation.UnaggregatedAttReceived,
|
||||
Data: &operation.UnAggregatedAttReceivedData{
|
||||
@@ -407,7 +422,11 @@ func (s *Server) handleAttestations(ctx context.Context, data json.RawMessage) (
|
||||
continue
|
||||
}
|
||||
|
||||
if corehelpers.IsAggregated(att) {
|
||||
if features.Get().EnableExperimentalAttestationPool {
|
||||
if err = s.AttestationCache.Add(att); err != nil {
|
||||
log.WithError(err).Error("could not save attestation")
|
||||
}
|
||||
} else if att.IsAggregated() {
|
||||
if err = s.AttestationsPool.SaveAggregatedAttestation(att); err != nil {
|
||||
log.WithError(err).Error("could not save aggregated attestation")
|
||||
}
|
||||
|
||||
@@ -5,6 +5,7 @@ package beacon
|
||||
|
||||
import (
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/cache"
|
||||
blockfeed "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed/block"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed/operation"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/db"
|
||||
@@ -31,6 +32,7 @@ type Server struct {
|
||||
BlockNotifier blockfeed.Notifier
|
||||
OperationNotifier operation.Notifier
|
||||
Broadcaster p2p.Broadcaster
|
||||
AttestationCache *cache.AttestationCache
|
||||
AttestationsPool attestations.Pool
|
||||
SlashingsPool slashings.PoolManager
|
||||
VoluntaryExitsPool voluntaryexits.PoolManager
|
||||
|
||||
@@ -30,6 +30,7 @@ go_library(
|
||||
"//beacon-chain/rpc/lookup:go_default_library",
|
||||
"//beacon-chain/state:go_default_library",
|
||||
"//beacon-chain/sync:go_default_library",
|
||||
"//config/features:go_default_library",
|
||||
"//config/fieldparams:go_default_library",
|
||||
"//config/params:go_default_library",
|
||||
"//consensus-types:go_default_library",
|
||||
|
||||
@@ -25,6 +25,7 @@ import (
|
||||
rpchelpers "github.com/prysmaticlabs/prysm/v5/beacon-chain/rpc/eth/helpers"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/rpc/eth/shared"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/state"
|
||||
"github.com/prysmaticlabs/prysm/v5/config/features"
|
||||
fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams"
|
||||
"github.com/prysmaticlabs/prysm/v5/config/params"
|
||||
consensus_types "github.com/prysmaticlabs/prysm/v5/consensus-types"
|
||||
@@ -129,13 +130,23 @@ func (s *Server) GetAggregateAttestationV2(w http.ResponseWriter, r *http.Reques
|
||||
}
|
||||
|
||||
func (s *Server) aggregatedAttestation(w http.ResponseWriter, slot primitives.Slot, attDataRoot []byte, index primitives.CommitteeIndex) ethpbalpha.Att {
|
||||
var match []ethpbalpha.Att
|
||||
var err error
|
||||
|
||||
match, err := matchingAtts(s.AttestationsPool.AggregatedAttestations(), slot, attDataRoot, index)
|
||||
if err != nil {
|
||||
httputil.HandleError(w, "Could not get matching attestations: "+err.Error(), http.StatusInternalServerError)
|
||||
return nil
|
||||
if features.Get().EnableExperimentalAttestationPool {
|
||||
match, err = matchingAtts(s.AttestationCache.GetAll(), slot, attDataRoot, index)
|
||||
if err != nil {
|
||||
httputil.HandleError(w, "Could not get matching attestations: "+err.Error(), http.StatusInternalServerError)
|
||||
return nil
|
||||
}
|
||||
} else {
|
||||
match, err = matchingAtts(s.AttestationsPool.AggregatedAttestations(), slot, attDataRoot, index)
|
||||
if err != nil {
|
||||
httputil.HandleError(w, "Could not get matching attestations: "+err.Error(), http.StatusInternalServerError)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
if len(match) > 0 {
|
||||
// If there are multiple matching aggregated attestations,
|
||||
// then we return the one with the most aggregation bits.
|
||||
@@ -145,6 +156,11 @@ func (s *Server) aggregatedAttestation(w http.ResponseWriter, slot primitives.Sl
|
||||
return match[0]
|
||||
}
|
||||
|
||||
// No match was found and the new pool doesn't store aggregated and unaggregated attestations separately.
|
||||
if features.Get().EnableExperimentalAttestationPool {
|
||||
return nil
|
||||
}
|
||||
|
||||
atts, err := s.AttestationsPool.UnaggregatedAttestations()
|
||||
if err != nil {
|
||||
httputil.HandleError(w, "Could not get unaggregated attestations: "+err.Error(), http.StatusInternalServerError)
|
||||
|
||||
@@ -1194,7 +1194,7 @@ func TestGetAttestationData(t *testing.T) {
|
||||
HeadFetcher: chain,
|
||||
GenesisTimeFetcher: chain,
|
||||
FinalizedFetcher: chain,
|
||||
AttestationCache: cache.NewAttestationCache(),
|
||||
AttestationCache: cache.NewAttestationDataCache(),
|
||||
OptimisticModeFetcher: chain,
|
||||
},
|
||||
}
|
||||
@@ -1275,7 +1275,7 @@ func TestGetAttestationData(t *testing.T) {
|
||||
TimeFetcher: chain,
|
||||
OptimisticModeFetcher: chain,
|
||||
CoreService: &core.Service{
|
||||
AttestationCache: cache.NewAttestationCache(),
|
||||
AttestationCache: cache.NewAttestationDataCache(),
|
||||
GenesisTimeFetcher: chain,
|
||||
HeadFetcher: chain,
|
||||
FinalizedFetcher: chain,
|
||||
@@ -1434,7 +1434,7 @@ func TestGetAttestationData(t *testing.T) {
|
||||
TimeFetcher: chain,
|
||||
OptimisticModeFetcher: chain,
|
||||
CoreService: &core.Service{
|
||||
AttestationCache: cache.NewAttestationCache(),
|
||||
AttestationCache: cache.NewAttestationDataCache(),
|
||||
OptimisticModeFetcher: chain,
|
||||
HeadFetcher: chain,
|
||||
GenesisTimeFetcher: chain,
|
||||
@@ -1528,7 +1528,7 @@ func TestGetAttestationData(t *testing.T) {
|
||||
TimeFetcher: chain,
|
||||
OptimisticModeFetcher: chain,
|
||||
CoreService: &core.Service{
|
||||
AttestationCache: cache.NewAttestationCache(),
|
||||
AttestationCache: cache.NewAttestationDataCache(),
|
||||
OptimisticModeFetcher: chain,
|
||||
HeadFetcher: chain,
|
||||
GenesisTimeFetcher: chain,
|
||||
|
||||
@@ -22,6 +22,7 @@ type Server struct {
|
||||
HeadFetcher blockchain.HeadFetcher
|
||||
TimeFetcher blockchain.TimeFetcher
|
||||
SyncChecker sync.Checker
|
||||
AttestationCache *cache.AttestationCache
|
||||
AttestationsPool attestations.Pool
|
||||
PeerManager p2p.PeerManager
|
||||
Broadcaster p2p.Broadcaster
|
||||
|
||||
@@ -7,11 +7,13 @@ import (
|
||||
"strings"
|
||||
|
||||
"github.com/prysmaticlabs/prysm/v5/api/pagination"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/cache"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/helpers"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/db/filters"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/operations/attestations"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/state/stategen"
|
||||
"github.com/prysmaticlabs/prysm/v5/cmd"
|
||||
"github.com/prysmaticlabs/prysm/v5/config/features"
|
||||
"github.com/prysmaticlabs/prysm/v5/config/params"
|
||||
"github.com/prysmaticlabs/prysm/v5/consensus-types/interfaces"
|
||||
"github.com/prysmaticlabs/prysm/v5/encoding/bytesutil"
|
||||
@@ -305,7 +307,14 @@ func (bs *Server) ListIndexedAttestationsElectra(
|
||||
// attestations are processed and when they are no longer valid.
|
||||
// https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/beacon-chain.md#attestations
|
||||
func (bs *Server) AttestationPool(_ context.Context, req *ethpb.AttestationPoolRequest) (*ethpb.AttestationPoolResponse, error) {
|
||||
atts, err := attestationsFromPool[*ethpb.Attestation](req.PageSize, bs.AttestationsPool)
|
||||
var atts []*ethpb.Attestation
|
||||
var err error
|
||||
|
||||
if features.Get().EnableExperimentalAttestationPool {
|
||||
atts, err = attestationsFromCache[*ethpb.Attestation](req.PageSize, bs.AttestationCache)
|
||||
} else {
|
||||
atts, err = attestationsFromPool[*ethpb.Attestation](req.PageSize, bs.AttestationsPool)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -332,10 +341,18 @@ func (bs *Server) AttestationPool(_ context.Context, req *ethpb.AttestationPoolR
|
||||
}
|
||||
|
||||
func (bs *Server) AttestationPoolElectra(_ context.Context, req *ethpb.AttestationPoolRequest) (*ethpb.AttestationPoolElectraResponse, error) {
|
||||
atts, err := attestationsFromPool[*ethpb.AttestationElectra](req.PageSize, bs.AttestationsPool)
|
||||
var atts []*ethpb.AttestationElectra
|
||||
var err error
|
||||
|
||||
if features.Get().EnableExperimentalAttestationPool {
|
||||
atts, err = attestationsFromCache[*ethpb.AttestationElectra](req.PageSize, bs.AttestationCache)
|
||||
} else {
|
||||
atts, err = attestationsFromPool[*ethpb.AttestationElectra](req.PageSize, bs.AttestationsPool)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// If there are no attestations, we simply return a response specifying this.
|
||||
// Otherwise, attempting to paginate 0 attestations below would result in an error.
|
||||
if len(atts) == 0 {
|
||||
@@ -465,3 +482,25 @@ func attestationsFromPool[T ethpb.Att](pageSize int32, pool attestations.Pool) (
|
||||
}
|
||||
return atts, nil
|
||||
}
|
||||
|
||||
func attestationsFromCache[T ethpb.Att](pageSize int32, c *cache.AttestationCache) ([]T, error) {
|
||||
if int(pageSize) > cmd.Get().MaxRPCPageSize {
|
||||
return nil, status.Errorf(
|
||||
codes.InvalidArgument,
|
||||
"Requested page size %d can not be greater than max size %d",
|
||||
pageSize,
|
||||
cmd.Get().MaxRPCPageSize,
|
||||
)
|
||||
}
|
||||
cacheAtts := c.GetAll()
|
||||
atts := make([]T, 0, len(cacheAtts))
|
||||
for _, att := range cacheAtts {
|
||||
a, ok := att.(T)
|
||||
if !ok {
|
||||
var expected T
|
||||
return nil, status.Errorf(codes.Internal, "Attestation is of the wrong type (expected %T, got %T)", expected, att)
|
||||
}
|
||||
atts = append(atts, a)
|
||||
}
|
||||
return atts, nil
|
||||
}
|
||||
|
||||
@@ -39,6 +39,7 @@ type Server struct {
|
||||
BlockNotifier blockfeed.Notifier
|
||||
AttestationNotifier operation.Notifier
|
||||
Broadcaster p2p.Broadcaster
|
||||
AttestationCache *cache.AttestationCache
|
||||
AttestationsPool attestations.Pool
|
||||
SlashingsPool slashings.PoolManager
|
||||
ChainStartChan chan time.Time
|
||||
|
||||
@@ -3,8 +3,10 @@ package validator
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/cache"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/helpers"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/rpc/core"
|
||||
"github.com/prysmaticlabs/prysm/v5/config/features"
|
||||
"github.com/prysmaticlabs/prysm/v5/config/params"
|
||||
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
|
||||
"github.com/prysmaticlabs/prysm/v5/encoding/bytesutil"
|
||||
@@ -27,14 +29,21 @@ func (vs *Server) SubmitAggregateSelectionProof(ctx context.Context, req *ethpb.
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
atts := vs.AttPool.AggregatedAttestationsBySlotIndex(ctx, req.Slot, req.CommitteeIndex)
|
||||
// Filter out the best aggregated attestation (ie. the one with the most aggregated bits).
|
||||
if len(atts) == 0 {
|
||||
atts = vs.AttPool.UnaggregatedAttestationsBySlotIndex(ctx, req.Slot, req.CommitteeIndex)
|
||||
|
||||
var atts []*ethpb.Attestation
|
||||
|
||||
if features.Get().EnableExperimentalAttestationPool {
|
||||
atts = cache.GetBySlotAndCommitteeIndex[*ethpb.Attestation](vs.AttestationCache, req.Slot, req.CommitteeIndex)
|
||||
} else {
|
||||
atts = vs.AttPool.AggregatedAttestationsBySlotIndex(ctx, req.Slot, req.CommitteeIndex)
|
||||
if len(atts) == 0 {
|
||||
return nil, status.Errorf(codes.NotFound, "Could not find attestation for slot and committee in pool")
|
||||
atts = vs.AttPool.UnaggregatedAttestationsBySlotIndex(ctx, req.Slot, req.CommitteeIndex)
|
||||
}
|
||||
}
|
||||
if len(atts) == 0 {
|
||||
return nil, status.Errorf(codes.NotFound, "Could not find attestation for slot and committee in pool")
|
||||
}
|
||||
|
||||
best := bestAggregate(atts, req.CommitteeIndex, indexInCommittee)
|
||||
attAndProof := ðpb.AggregateAttestationAndProof{
|
||||
Aggregate: best,
|
||||
@@ -59,13 +68,21 @@ func (vs *Server) SubmitAggregateSelectionProofElectra(
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
atts := vs.AttPool.AggregatedAttestationsBySlotIndexElectra(ctx, req.Slot, req.CommitteeIndex)
|
||||
if len(atts) == 0 {
|
||||
atts = vs.AttPool.UnaggregatedAttestationsBySlotIndexElectra(ctx, req.Slot, req.CommitteeIndex)
|
||||
|
||||
var atts []*ethpb.AttestationElectra
|
||||
|
||||
if features.Get().EnableExperimentalAttestationPool {
|
||||
atts = cache.GetBySlotAndCommitteeIndex[*ethpb.AttestationElectra](vs.AttestationCache, req.Slot, req.CommitteeIndex)
|
||||
} else {
|
||||
atts = vs.AttPool.AggregatedAttestationsBySlotIndexElectra(ctx, req.Slot, req.CommitteeIndex)
|
||||
if len(atts) == 0 {
|
||||
return nil, status.Errorf(codes.NotFound, "No attestations found in pool")
|
||||
atts = vs.AttPool.UnaggregatedAttestationsBySlotIndexElectra(ctx, req.Slot, req.CommitteeIndex)
|
||||
}
|
||||
}
|
||||
if len(atts) == 0 {
|
||||
return nil, status.Errorf(codes.NotFound, "Could not find attestation for slot and committee in pool")
|
||||
}
|
||||
|
||||
best := bestAggregate(atts, req.CommitteeIndex, indexInCommittee)
|
||||
attAndProof := ðpb.AggregateAttestationAndProofElectra{
|
||||
Aggregate: best,
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed/operation"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/helpers"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/rpc/core"
|
||||
"github.com/prysmaticlabs/prysm/v5/config/features"
|
||||
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
|
||||
"github.com/prysmaticlabs/prysm/v5/crypto/bls"
|
||||
"github.com/prysmaticlabs/prysm/v5/monitoring/tracing/trace"
|
||||
@@ -49,13 +50,19 @@ func (vs *Server) ProposeAttestation(ctx context.Context, att *ethpb.Attestation
|
||||
return nil, err
|
||||
}
|
||||
|
||||
go func() {
|
||||
attCopy := att.Copy()
|
||||
if err := vs.AttPool.SaveUnaggregatedAttestation(attCopy); err != nil {
|
||||
log.WithError(err).Error("Could not save unaggregated attestation")
|
||||
return
|
||||
if features.Get().EnableExperimentalAttestationPool {
|
||||
if err = vs.AttestationCache.Add(att); err != nil {
|
||||
log.WithError(err).Error("Could not save attestation")
|
||||
}
|
||||
}()
|
||||
} else {
|
||||
go func() {
|
||||
attCopy := att.Copy()
|
||||
if err := vs.AttPool.SaveUnaggregatedAttestation(attCopy); err != nil {
|
||||
log.WithError(err).Error("Could not save unaggregated attestation")
|
||||
return
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
@@ -76,14 +83,20 @@ func (vs *Server) ProposeAttestationElectra(ctx context.Context, att *ethpb.Atte
|
||||
return nil, err
|
||||
}
|
||||
|
||||
go func() {
|
||||
ctx = trace.NewContext(context.Background(), trace.FromContext(ctx))
|
||||
attCopy := att.Copy()
|
||||
if err := vs.AttPool.SaveUnaggregatedAttestation(attCopy); err != nil {
|
||||
log.WithError(err).Error("Could not save unaggregated attestation")
|
||||
return
|
||||
if features.Get().EnableExperimentalAttestationPool {
|
||||
if err = vs.AttestationCache.Add(att); err != nil {
|
||||
log.WithError(err).Error("Could not save attestation")
|
||||
}
|
||||
}()
|
||||
} else {
|
||||
go func() {
|
||||
ctx = trace.NewContext(context.Background(), trace.FromContext(ctx))
|
||||
attCopy := att.Copy()
|
||||
if err := vs.AttPool.SaveUnaggregatedAttestation(attCopy); err != nil {
|
||||
log.WithError(err).Error("Could not save unaggregated attestation")
|
||||
return
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
@@ -65,7 +65,7 @@ func TestAttestationDataAtSlot_HandlesFarAwayJustifiedEpoch(t *testing.T) {
|
||||
OptimisticModeFetcher: &mock.ChainService{Optimistic: false},
|
||||
TimeFetcher: &mock.ChainService{Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second)},
|
||||
CoreService: &core.Service{
|
||||
AttestationCache: cache.NewAttestationCache(),
|
||||
AttestationCache: cache.NewAttestationDataCache(),
|
||||
HeadFetcher: &mock.ChainService{TargetRoot: blockRoot, Root: blockRoot[:], State: beaconState},
|
||||
GenesisTimeFetcher: &mock.ChainService{Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second)},
|
||||
FinalizedFetcher: &mock.ChainService{CurrentJustifiedCheckPoint: justifiedCheckpoint},
|
||||
|
||||
@@ -204,7 +204,7 @@ func TestGetAttestationData_OK(t *testing.T) {
|
||||
Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second),
|
||||
},
|
||||
FinalizedFetcher: &mock.ChainService{CurrentJustifiedCheckPoint: justifiedCheckpoint},
|
||||
AttestationCache: cache.NewAttestationCache(),
|
||||
AttestationCache: cache.NewAttestationDataCache(),
|
||||
OptimisticModeFetcher: &mock.ChainService{Optimistic: false},
|
||||
},
|
||||
}
|
||||
@@ -259,7 +259,7 @@ func BenchmarkGetAttestationDataConcurrent(b *testing.B) {
|
||||
OptimisticModeFetcher: &mock.ChainService{Optimistic: false},
|
||||
TimeFetcher: &mock.ChainService{Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second)},
|
||||
CoreService: &core.Service{
|
||||
AttestationCache: cache.NewAttestationCache(),
|
||||
AttestationCache: cache.NewAttestationDataCache(),
|
||||
HeadFetcher: &mock.ChainService{TargetRoot: targetRoot, Root: blockRoot[:]},
|
||||
GenesisTimeFetcher: &mock.ChainService{
|
||||
Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second),
|
||||
@@ -313,7 +313,7 @@ func TestGetAttestationData_Optimistic(t *testing.T) {
|
||||
CoreService: &core.Service{
|
||||
GenesisTimeFetcher: &mock.ChainService{Genesis: time.Now()},
|
||||
HeadFetcher: &mock.ChainService{},
|
||||
AttestationCache: cache.NewAttestationCache(),
|
||||
AttestationCache: cache.NewAttestationDataCache(),
|
||||
OptimisticModeFetcher: &mock.ChainService{Optimistic: true},
|
||||
},
|
||||
}
|
||||
@@ -330,7 +330,7 @@ func TestGetAttestationData_Optimistic(t *testing.T) {
|
||||
OptimisticModeFetcher: &mock.ChainService{Optimistic: false},
|
||||
TimeFetcher: &mock.ChainService{Genesis: time.Now()},
|
||||
CoreService: &core.Service{
|
||||
AttestationCache: cache.NewAttestationCache(),
|
||||
AttestationCache: cache.NewAttestationDataCache(),
|
||||
GenesisTimeFetcher: &mock.ChainService{Genesis: time.Now()},
|
||||
HeadFetcher: &mock.ChainService{Optimistic: false, State: beaconState},
|
||||
FinalizedFetcher: &mock.ChainService{CurrentJustifiedCheckPoint: ðpb.Checkpoint{}},
|
||||
@@ -440,7 +440,7 @@ func TestGetAttestationData_SucceedsInFirstEpoch(t *testing.T) {
|
||||
OptimisticModeFetcher: &mock.ChainService{Optimistic: false},
|
||||
TimeFetcher: &mock.ChainService{Genesis: prysmTime.Now().Add(time.Duration(-1*offset) * time.Second)},
|
||||
CoreService: &core.Service{
|
||||
AttestationCache: cache.NewAttestationCache(),
|
||||
AttestationCache: cache.NewAttestationDataCache(),
|
||||
HeadFetcher: &mock.ChainService{
|
||||
TargetRoot: targetRoot, Root: blockRoot[:], State: beaconState,
|
||||
},
|
||||
@@ -514,7 +514,7 @@ func TestGetAttestationData_CommitteeIndexIsZeroPostElectra(t *testing.T) {
|
||||
Genesis: time.Now().Add(time.Duration(-1*offset) * time.Second),
|
||||
},
|
||||
FinalizedFetcher: &mock.ChainService{CurrentJustifiedCheckPoint: justifiedCheckpoint},
|
||||
AttestationCache: cache.NewAttestationCache(),
|
||||
AttestationCache: cache.NewAttestationDataCache(),
|
||||
OptimisticModeFetcher: &mock.ChainService{Optimistic: false},
|
||||
},
|
||||
}
|
||||
|
||||
@@ -9,7 +9,6 @@ import (
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prysmaticlabs/go-bitfield"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/blocks"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/helpers"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/state"
|
||||
"github.com/prysmaticlabs/prysm/v5/config/features"
|
||||
"github.com/prysmaticlabs/prysm/v5/config/params"
|
||||
@@ -30,21 +29,22 @@ func (vs *Server) packAttestations(ctx context.Context, latestState state.Beacon
|
||||
ctx, span := trace.StartSpan(ctx, "ProposerServer.packAttestations")
|
||||
defer span.End()
|
||||
|
||||
atts := vs.AttPool.AggregatedAttestations()
|
||||
atts, err := vs.validateAndDeleteAttsInPool(ctx, latestState, atts)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "could not filter attestations")
|
||||
}
|
||||
var atts []ethpb.Att
|
||||
|
||||
uAtts, err := vs.AttPool.UnaggregatedAttestations()
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "could not get unaggregated attestations")
|
||||
if features.Get().EnableExperimentalAttestationPool {
|
||||
atts = vs.AttestationCache.GetAll()
|
||||
atts = vs.validateAndDeleteAttsInPool(ctx, latestState, atts)
|
||||
} else {
|
||||
atts = vs.AttPool.AggregatedAttestations()
|
||||
atts = vs.validateAndDeleteAttsInPool(ctx, latestState, atts)
|
||||
|
||||
uAtts, err := vs.AttPool.UnaggregatedAttestations()
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "could not get unaggregated attestations")
|
||||
}
|
||||
uAtts = vs.validateAndDeleteAttsInPool(ctx, latestState, uAtts)
|
||||
atts = append(atts, uAtts...)
|
||||
}
|
||||
uAtts, err = vs.validateAndDeleteAttsInPool(ctx, latestState, uAtts)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "could not filter attestations")
|
||||
}
|
||||
atts = append(atts, uAtts...)
|
||||
|
||||
// Checking the state's version here will give the wrong result if the last slot of Deneb is missed.
|
||||
// The head state will still be in Deneb while we are trying to build an Electra block.
|
||||
@@ -65,6 +65,8 @@ func (vs *Server) packAttestations(ctx context.Context, latestState state.Beacon
|
||||
}
|
||||
}
|
||||
|
||||
var err error
|
||||
|
||||
// Remove duplicates from both aggregated/unaggregated attestations. This
|
||||
// prevents inefficient aggregates being created.
|
||||
versionAtts, err = proposerAtts(versionAtts).dedup()
|
||||
@@ -455,15 +457,15 @@ func (a proposerAtts) dedup() (proposerAtts, error) {
|
||||
}
|
||||
|
||||
// This filters the input attestations to return a list of valid attestations to be packaged inside a beacon block.
|
||||
func (vs *Server) validateAndDeleteAttsInPool(ctx context.Context, st state.BeaconState, atts []ethpb.Att) ([]ethpb.Att, error) {
|
||||
func (vs *Server) validateAndDeleteAttsInPool(ctx context.Context, st state.BeaconState, atts []ethpb.Att) []ethpb.Att {
|
||||
ctx, span := trace.StartSpan(ctx, "ProposerServer.validateAndDeleteAttsInPool")
|
||||
defer span.End()
|
||||
|
||||
validAtts, invalidAtts := proposerAtts(atts).filter(ctx, st)
|
||||
if err := vs.deleteAttsInPool(ctx, invalidAtts); err != nil {
|
||||
return nil, err
|
||||
log.WithError(err).Error("Could not delete invalid attestations")
|
||||
}
|
||||
return validAtts, nil
|
||||
return validAtts
|
||||
}
|
||||
|
||||
// The input attestations are processed and seen by the node, this deletes them from pool
|
||||
@@ -476,13 +478,19 @@ func (vs *Server) deleteAttsInPool(ctx context.Context, atts []ethpb.Att) error
|
||||
if ctx.Err() != nil {
|
||||
return ctx.Err()
|
||||
}
|
||||
if helpers.IsAggregated(att) {
|
||||
if err := vs.AttPool.DeleteAggregatedAttestation(att); err != nil {
|
||||
return err
|
||||
if features.Get().EnableExperimentalAttestationPool {
|
||||
if err := vs.AttestationCache.DeleteCovered(att); err != nil {
|
||||
return errors.Wrap(err, "could not delete attestation")
|
||||
}
|
||||
} else {
|
||||
if err := vs.AttPool.DeleteUnaggregatedAttestation(att); err != nil {
|
||||
return err
|
||||
if att.IsAggregated() {
|
||||
if err := vs.AttPool.DeleteAggregatedAttestation(att); err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
if err := vs.AttPool.DeleteUnaggregatedAttestation(att); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2580,7 +2580,6 @@ func TestProposer_FilterAttestation(t *testing.T) {
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
wantedErr string
|
||||
inputAtts func() []ethpb.Att
|
||||
expectedAtts func(inputAtts []ethpb.Att) []ethpb.Att
|
||||
}{
|
||||
@@ -2656,14 +2655,8 @@ func TestProposer_FilterAttestation(t *testing.T) {
|
||||
HeadFetcher: &mock.ChainService{State: st, Root: genesisRoot[:]},
|
||||
}
|
||||
atts := tt.inputAtts()
|
||||
received, err := proposerServer.validateAndDeleteAttsInPool(context.Background(), st, atts)
|
||||
if tt.wantedErr != "" {
|
||||
assert.ErrorContains(t, tt.wantedErr, err)
|
||||
assert.Equal(t, nil, received)
|
||||
} else {
|
||||
assert.NoError(t, err)
|
||||
assert.DeepEqual(t, tt.expectedAtts(atts), received)
|
||||
}
|
||||
received := proposerServer.validateAndDeleteAttsInPool(context.Background(), st, atts)
|
||||
assert.DeepEqual(t, tt.expectedAtts(atts), received)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -60,6 +60,7 @@ type Server struct {
|
||||
StateNotifier statefeed.Notifier
|
||||
BlockNotifier blockfeed.Notifier
|
||||
P2P p2p.Broadcaster
|
||||
AttestationCache *cache.AttestationCache
|
||||
AttPool attestations.Pool
|
||||
SlashingsPool slashings.PoolManager
|
||||
ExitPool voluntaryexits.PoolManager
|
||||
|
||||
@@ -95,6 +95,7 @@ type Config struct {
|
||||
GenesisFetcher blockchain.GenesisFetcher
|
||||
MockEth1Votes bool
|
||||
EnableDebugRPCEndpoints bool
|
||||
AttestationCache *cache.AttestationCache
|
||||
AttestationsPool attestations.Pool
|
||||
ExitPool voluntaryexits.PoolManager
|
||||
SlashingsPool slashings.PoolManager
|
||||
@@ -203,7 +204,7 @@ func NewService(ctx context.Context, cfg *Config) *Service {
|
||||
Broadcaster: s.cfg.Broadcaster,
|
||||
SyncCommitteePool: s.cfg.SyncCommitteeObjectPool,
|
||||
OperationNotifier: s.cfg.OperationNotifier,
|
||||
AttestationCache: cache.NewAttestationCache(),
|
||||
AttestationCache: cache.NewAttestationDataCache(),
|
||||
StateGen: s.cfg.StateGen,
|
||||
P2P: s.cfg.Broadcaster,
|
||||
FinalizedFetcher: s.cfg.FinalizationFetcher,
|
||||
@@ -212,6 +213,7 @@ func NewService(ctx context.Context, cfg *Config) *Service {
|
||||
}
|
||||
validatorServer := &validatorv1alpha1.Server{
|
||||
Ctx: s.ctx,
|
||||
AttestationCache: s.cfg.AttestationCache,
|
||||
AttPool: s.cfg.AttestationsPool,
|
||||
ExitPool: s.cfg.ExitPool,
|
||||
HeadFetcher: s.cfg.HeadFetcher,
|
||||
@@ -266,6 +268,7 @@ func NewService(ctx context.Context, cfg *Config) *Service {
|
||||
beaconChainServer := &beaconv1alpha1.Server{
|
||||
Ctx: s.ctx,
|
||||
BeaconDB: s.cfg.BeaconDB,
|
||||
AttestationCache: s.cfg.AttestationCache,
|
||||
AttestationsPool: s.cfg.AttestationsPool,
|
||||
SlashingsPool: s.cfg.SlashingsPool,
|
||||
OptimisticModeFetcher: s.cfg.OptimisticModeFetcher,
|
||||
|
||||
@@ -2,6 +2,7 @@ package sync
|
||||
|
||||
import (
|
||||
"github.com/prysmaticlabs/prysm/v5/async/event"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/cache"
|
||||
blockfeed "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed/block"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed/operation"
|
||||
statefeed "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed/state"
|
||||
@@ -43,6 +44,13 @@ func WithDatabase(db db.NoHeadAccessDatabase) Option {
|
||||
}
|
||||
}
|
||||
|
||||
func WithAttestationCache(c *cache.AttestationCache) Option {
|
||||
return func(s *Service) error {
|
||||
s.cfg.attestationCache = c
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func WithAttestationPool(attPool attestations.Pool) Option {
|
||||
return func(s *Service) error {
|
||||
s.cfg.attPool = attPool
|
||||
|
||||
@@ -10,6 +10,7 @@ import (
|
||||
"github.com/prysmaticlabs/prysm/v5/async"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/helpers"
|
||||
"github.com/prysmaticlabs/prysm/v5/config/features"
|
||||
"github.com/prysmaticlabs/prysm/v5/config/params"
|
||||
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
|
||||
"github.com/prysmaticlabs/prysm/v5/crypto/rand"
|
||||
@@ -94,7 +95,7 @@ func (s *Service) processAttestations(ctx context.Context, attestations []ethpb.
|
||||
data := aggregate.GetData()
|
||||
// The pending attestations can arrive in both aggregated and unaggregated forms,
|
||||
// each from has distinct validation steps.
|
||||
if helpers.IsAggregated(aggregate) {
|
||||
if aggregate.IsAggregated() {
|
||||
// Save the pending aggregated attestation to the pool if it passes the aggregated
|
||||
// validation steps.
|
||||
valRes, err := s.validateAggregatedAtt(ctx, signedAtt)
|
||||
@@ -103,10 +104,18 @@ func (s *Service) processAttestations(ctx context.Context, attestations []ethpb.
|
||||
}
|
||||
aggValid := pubsub.ValidationAccept == valRes
|
||||
if s.validateBlockInAttestation(ctx, signedAtt) && aggValid {
|
||||
if err := s.cfg.attPool.SaveAggregatedAttestation(aggregate); err != nil {
|
||||
log.WithError(err).Debug("Could not save aggregate attestation")
|
||||
continue
|
||||
if features.Get().EnableExperimentalAttestationPool {
|
||||
if err = s.cfg.attestationCache.Add(aggregate); err != nil {
|
||||
log.WithError(err).Debug("Could not save aggregate attestation")
|
||||
continue
|
||||
}
|
||||
} else {
|
||||
if err := s.cfg.attPool.SaveAggregatedAttestation(aggregate); err != nil {
|
||||
log.WithError(err).Debug("Could not save aggregate attestation")
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
s.setAggregatorIndexEpochSeen(data.Target.Epoch, signedAtt.AggregateAttestationAndProof().GetAggregatorIndex())
|
||||
|
||||
// Broadcasting the signed attestation again once a node is able to process it.
|
||||
@@ -138,9 +147,16 @@ func (s *Service) processAttestations(ctx context.Context, attestations []ethpb.
|
||||
continue
|
||||
}
|
||||
if valid == pubsub.ValidationAccept {
|
||||
if err := s.cfg.attPool.SaveUnaggregatedAttestation(aggregate); err != nil {
|
||||
log.WithError(err).Debug("Could not save unaggregated attestation")
|
||||
continue
|
||||
if features.Get().EnableExperimentalAttestationPool {
|
||||
if err = s.cfg.attestationCache.Add(aggregate); err != nil {
|
||||
log.WithError(err).Debug("Could not save unaggregated attestation")
|
||||
continue
|
||||
}
|
||||
} else {
|
||||
if err := s.cfg.attPool.SaveUnaggregatedAttestation(aggregate); err != nil {
|
||||
log.WithError(err).Debug("Could not save unaggregated attestation")
|
||||
continue
|
||||
}
|
||||
}
|
||||
s.setSeenCommitteeIndicesSlot(data.Slot, data.CommitteeIndex, aggregate.GetAggregationBits())
|
||||
|
||||
|
||||
@@ -21,6 +21,7 @@ import (
|
||||
"github.com/prysmaticlabs/prysm/v5/async/abool"
|
||||
"github.com/prysmaticlabs/prysm/v5/async/event"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/cache"
|
||||
blockfeed "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed/block"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed/operation"
|
||||
statefeed "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed/state"
|
||||
@@ -84,6 +85,7 @@ type config struct {
|
||||
attestationNotifier operation.Notifier
|
||||
p2p p2p.P2P
|
||||
beaconDB db.NoHeadAccessDatabase
|
||||
attestationCache *cache.AttestationCache
|
||||
attPool attestations.Pool
|
||||
exitPool voluntaryexits.PoolManager
|
||||
slashingPool slashings.PoolManager
|
||||
|
||||
@@ -5,7 +5,7 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/helpers"
|
||||
"github.com/prysmaticlabs/prysm/v5/config/features"
|
||||
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
|
||||
"google.golang.org/protobuf/proto"
|
||||
)
|
||||
@@ -24,10 +24,13 @@ func (s *Service) beaconAggregateProofSubscriber(_ context.Context, msg proto.Me
|
||||
return errors.New("nil aggregate")
|
||||
}
|
||||
|
||||
// An unaggregated attestation can make it here. It’s valid, the aggregator it just itself, although it means poor performance for the subnet.
|
||||
if !helpers.IsAggregated(aggregate) {
|
||||
return s.cfg.attPool.SaveUnaggregatedAttestation(aggregate)
|
||||
if features.Get().EnableExperimentalAttestationPool {
|
||||
return s.cfg.attestationCache.Add(aggregate)
|
||||
} else {
|
||||
// An unaggregated attestation can make it here. It’s valid, the aggregator it just itself, although it means poor performance for the subnet.
|
||||
if !aggregate.IsAggregated() {
|
||||
return s.cfg.attPool.SaveUnaggregatedAttestation(aggregate)
|
||||
}
|
||||
return s.cfg.attPool.SaveAggregatedAttestation(aggregate)
|
||||
}
|
||||
|
||||
return s.cfg.attPool.SaveAggregatedAttestation(aggregate)
|
||||
}
|
||||
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/cache"
|
||||
"github.com/prysmaticlabs/prysm/v5/config/features"
|
||||
"github.com/prysmaticlabs/prysm/v5/config/params"
|
||||
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
|
||||
"github.com/prysmaticlabs/prysm/v5/container/slice"
|
||||
@@ -31,15 +32,18 @@ func (s *Service) committeeIndexBeaconAttestationSubscriber(_ context.Context, m
|
||||
}
|
||||
s.setSeenCommitteeIndicesSlot(data.Slot, committeeIndex, a.GetAggregationBits())
|
||||
|
||||
exists, err := s.cfg.attPool.HasAggregatedAttestation(a)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "could not determine if attestation pool has this attestation")
|
||||
if features.Get().EnableExperimentalAttestationPool {
|
||||
return s.cfg.attestationCache.Add(a)
|
||||
} else {
|
||||
exists, err := s.cfg.attPool.HasAggregatedAttestation(a)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "could not determine if attestation pool has this attestation")
|
||||
}
|
||||
if exists {
|
||||
return nil
|
||||
}
|
||||
return s.cfg.attPool.SaveUnaggregatedAttestation(a)
|
||||
}
|
||||
if exists {
|
||||
return nil
|
||||
}
|
||||
|
||||
return s.cfg.attPool.SaveUnaggregatedAttestation(a)
|
||||
}
|
||||
|
||||
func (*Service) persistentSubnetIndices() []uint64 {
|
||||
|
||||
@@ -8,7 +8,6 @@ import (
|
||||
"github.com/prysmaticlabs/go-bitfield"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain"
|
||||
chainMock "github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain/testing"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/helpers"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/db/filesystem"
|
||||
dbtest "github.com/prysmaticlabs/prysm/v5/beacon-chain/db/testing"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/execution"
|
||||
@@ -80,7 +79,7 @@ func TestService_beaconBlockSubscriber(t *testing.T) {
|
||||
s.initCaches()
|
||||
// Set up attestation pool.
|
||||
for _, att := range pooledAttestations {
|
||||
if helpers.IsAggregated(att) {
|
||||
if att.IsAggregated() {
|
||||
assert.NoError(t, s.cfg.attPool.SaveAggregatedAttestation(att))
|
||||
} else {
|
||||
assert.NoError(t, s.cfg.attPool.SaveUnaggregatedAttestation(att))
|
||||
|
||||
@@ -14,6 +14,7 @@ import (
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/helpers"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/signing"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/state"
|
||||
"github.com/prysmaticlabs/prysm/v5/config/features"
|
||||
"github.com/prysmaticlabs/prysm/v5/config/params"
|
||||
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
|
||||
"github.com/prysmaticlabs/prysm/v5/crypto/bls"
|
||||
@@ -108,14 +109,27 @@ func (s *Service) validateAggregateAndProof(ctx context.Context, pid peer.ID, ms
|
||||
return pubsub.ValidationReject, errors.New("bad block referenced in attestation data")
|
||||
}
|
||||
|
||||
// Verify aggregate attestation has not already been seen via aggregate gossip, within a block, or through the creation locally.
|
||||
seen, err := s.cfg.attPool.HasAggregatedAttestation(aggregate)
|
||||
if err != nil {
|
||||
tracing.AnnotateError(span, err)
|
||||
return pubsub.ValidationIgnore, err
|
||||
}
|
||||
if seen {
|
||||
return pubsub.ValidationIgnore, nil
|
||||
if features.Get().EnableExperimentalAttestationPool {
|
||||
// It is possible that some aggregate in the pool already covers all bits
|
||||
// of this aggregate, in which case we can ignore it.
|
||||
isRedundant, err := s.cfg.attestationCache.AggregateIsRedundant(aggregate)
|
||||
if err != nil {
|
||||
tracing.AnnotateError(span, err)
|
||||
return pubsub.ValidationIgnore, err
|
||||
}
|
||||
if isRedundant {
|
||||
return pubsub.ValidationIgnore, nil
|
||||
}
|
||||
} else {
|
||||
// Verify aggregate attestation has not already been seen via aggregate gossip, within a block, or through the creation locally.
|
||||
seen, err := s.cfg.attPool.HasAggregatedAttestation(aggregate)
|
||||
if err != nil {
|
||||
tracing.AnnotateError(span, err)
|
||||
return pubsub.ValidationIgnore, err
|
||||
}
|
||||
if seen {
|
||||
return pubsub.ValidationIgnore, nil
|
||||
}
|
||||
}
|
||||
|
||||
// Verify the block being voted on is in the beacon chain.
|
||||
|
||||
@@ -49,6 +49,7 @@ type Flags struct {
|
||||
EnableHistoricalSpaceRepresentation bool // EnableHistoricalSpaceRepresentation enables the saving of registry validators in separate buckets to save space
|
||||
EnableBeaconRESTApi bool // EnableBeaconRESTApi enables experimental usage of the beacon REST API by the validator when querying a beacon node
|
||||
DisableCommitteeAwarePacking bool // DisableCommitteeAwarePacking changes the attestation packing algorithm to one that is not aware of attesting committees.
|
||||
EnableExperimentalAttestationPool bool // EnableExperimentalAttestationPool enables an experimental attestation pool design.
|
||||
// Logging related toggles.
|
||||
DisableGRPCConnectionLogs bool // Disables logging when a new grpc client has connected.
|
||||
EnableFullSSZDataLogging bool // Enables logging for full ssz data on rejected gossip messages
|
||||
@@ -267,6 +268,10 @@ func ConfigureBeaconChain(ctx *cli.Context) error {
|
||||
logEnabled(EnableDiscoveryReboot)
|
||||
cfg.EnableDiscoveryReboot = true
|
||||
}
|
||||
if ctx.IsSet(enableExperimentalAttestationPool.Name) {
|
||||
logEnabled(enableExperimentalAttestationPool)
|
||||
cfg.EnableExperimentalAttestationPool = true
|
||||
}
|
||||
|
||||
cfg.AggregateIntervals = [3]time.Duration{aggregateFirstInterval.Value, aggregateSecondInterval.Value, aggregateThirdInterval.Value}
|
||||
Init(cfg)
|
||||
|
||||
@@ -174,6 +174,10 @@ var (
|
||||
Name: "enable-discovery-reboot",
|
||||
Usage: "Experimental: Enables the discovery listener to rebooted in the event of connectivity issues.",
|
||||
}
|
||||
enableExperimentalAttestationPool = &cli.BoolFlag{
|
||||
Name: "enable-experimental-attestation-pool",
|
||||
Usage: "Enables an experimental attestation pool design.",
|
||||
}
|
||||
)
|
||||
|
||||
// devModeFlags holds list of flags that are set when development mode is on.
|
||||
@@ -231,6 +235,7 @@ var BeaconChainFlags = append(deprecatedBeaconFlags, append(deprecatedFlags, []c
|
||||
DisableQUIC,
|
||||
DisableCommitteeAwarePacking,
|
||||
EnableDiscoveryReboot,
|
||||
enableExperimentalAttestationPool,
|
||||
}...)...)
|
||||
|
||||
// E2EBeaconChainFlags contains a list of the beacon chain feature flags to be tested in E2E.
|
||||
|
||||
@@ -19,13 +19,15 @@ type Att interface {
|
||||
ssz.Unmarshaler
|
||||
ssz.HashRoot
|
||||
Version() int
|
||||
IsNil() bool
|
||||
IsAggregated() bool
|
||||
Clone() Att
|
||||
GetAggregationBits() bitfield.Bitlist
|
||||
GetData() *AttestationData
|
||||
CommitteeBitsVal() bitfield.Bitfield
|
||||
GetSignature() []byte
|
||||
SetSignature(sig []byte)
|
||||
GetCommitteeIndex() (primitives.CommitteeIndex, error)
|
||||
IsNil() bool
|
||||
}
|
||||
|
||||
// IndexedAtt defines common functionality for all indexed attestation types.
|
||||
@@ -113,6 +115,11 @@ func (a *Attestation) IsNil() bool {
|
||||
return a == nil || a.Data == nil
|
||||
}
|
||||
|
||||
// IsAggregated --
|
||||
func (a *Attestation) IsAggregated() bool {
|
||||
return a.AggregationBits.Count() > 1
|
||||
}
|
||||
|
||||
// Clone --
|
||||
func (a *Attestation) Clone() Att {
|
||||
return a.Copy()
|
||||
@@ -137,6 +144,11 @@ func (a *Attestation) CommitteeBitsVal() bitfield.Bitfield {
|
||||
return cb
|
||||
}
|
||||
|
||||
// SetSignature --
|
||||
func (a *Attestation) SetSignature(sig []byte) {
|
||||
a.Signature = sig
|
||||
}
|
||||
|
||||
// GetCommitteeIndex --
|
||||
func (a *Attestation) GetCommitteeIndex() (primitives.CommitteeIndex, error) {
|
||||
if a == nil || a.Data == nil {
|
||||
@@ -155,6 +167,11 @@ func (a *PendingAttestation) IsNil() bool {
|
||||
return a == nil || a.Data == nil
|
||||
}
|
||||
|
||||
// IsAggregated --
|
||||
func (a *PendingAttestation) IsAggregated() bool {
|
||||
return a.AggregationBits.Count() > 1
|
||||
}
|
||||
|
||||
// Clone --
|
||||
func (a *PendingAttestation) Clone() Att {
|
||||
return a.Copy()
|
||||
@@ -183,6 +200,9 @@ func (a *PendingAttestation) GetSignature() []byte {
|
||||
return nil
|
||||
}
|
||||
|
||||
// SetSignature --
|
||||
func (a *PendingAttestation) SetSignature(_ []byte) {}
|
||||
|
||||
// GetCommitteeIndex --
|
||||
func (a *PendingAttestation) GetCommitteeIndex() (primitives.CommitteeIndex, error) {
|
||||
if a == nil || a.Data == nil {
|
||||
@@ -201,6 +221,11 @@ func (a *AttestationElectra) IsNil() bool {
|
||||
return a == nil || a.Data == nil
|
||||
}
|
||||
|
||||
// IsAggregated --
|
||||
func (a *AttestationElectra) IsAggregated() bool {
|
||||
return a.AggregationBits.Count() > 1
|
||||
}
|
||||
|
||||
// Clone --
|
||||
func (a *AttestationElectra) Clone() Att {
|
||||
return a.Copy()
|
||||
@@ -224,6 +249,11 @@ func (a *AttestationElectra) CommitteeBitsVal() bitfield.Bitfield {
|
||||
return a.CommitteeBits
|
||||
}
|
||||
|
||||
// SetSignature --
|
||||
func (a *AttestationElectra) SetSignature(sig []byte) {
|
||||
a.Signature = sig
|
||||
}
|
||||
|
||||
// GetCommitteeIndex --
|
||||
func (a *AttestationElectra) GetCommitteeIndex() (primitives.CommitteeIndex, error) {
|
||||
if a == nil || a.Data == nil {
|
||||
|
||||
@@ -9,7 +9,6 @@ go_library(
|
||||
importpath = "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1/attestation",
|
||||
visibility = ["//visibility:public"],
|
||||
deps = [
|
||||
"//beacon-chain/core/helpers:go_default_library",
|
||||
"//beacon-chain/core/signing:go_default_library",
|
||||
"//config/params:go_default_library",
|
||||
"//consensus-types/primitives:go_default_library",
|
||||
|
||||
@@ -5,7 +5,6 @@ import (
|
||||
"strings"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/helpers"
|
||||
"github.com/prysmaticlabs/prysm/v5/crypto/hash"
|
||||
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
|
||||
"github.com/prysmaticlabs/prysm/v5/runtime/version"
|
||||
@@ -22,12 +21,12 @@ const (
|
||||
)
|
||||
|
||||
// Id represents an attestation ID. Its uniqueness depends on the IdSource provided when constructing the Id.
|
||||
type Id [33]byte
|
||||
type Id [32]byte
|
||||
|
||||
// NewId --
|
||||
func NewId(att ethpb.Att, source IdSource) (Id, error) {
|
||||
if err := helpers.ValidateNilAttestation(att); err != nil {
|
||||
return Id{}, err
|
||||
if att.IsNil() {
|
||||
return Id{}, errors.New("nil attestation")
|
||||
}
|
||||
if att.Version() < 0 || att.Version() > 255 {
|
||||
return Id{}, errors.New("attestation version must be between 0 and 255")
|
||||
@@ -42,7 +41,7 @@ func NewId(att ethpb.Att, source IdSource) (Id, error) {
|
||||
if err != nil {
|
||||
return Id{}, err
|
||||
}
|
||||
copy(id[1:], h[:])
|
||||
copy(id[1:], h[1:])
|
||||
return id, nil
|
||||
case Data:
|
||||
dataHash, err := att.GetData().HashTreeRoot()
|
||||
@@ -61,7 +60,7 @@ func NewId(att ethpb.Att, source IdSource) (Id, error) {
|
||||
}
|
||||
h = hash.Hash(append(dataHash[:], []byte(strings.Join(stringCommitteeIndices, ","))...))
|
||||
}
|
||||
copy(id[1:], h[:])
|
||||
copy(id[1:], h[1:])
|
||||
return id, nil
|
||||
default:
|
||||
return Id{}, errors.New("invalid source requested")
|
||||
|
||||
Reference in New Issue
Block a user