mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-09 21:38:05 -05:00
Compare commits
4 Commits
hdiff_star
...
improve-at
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
781b5b63a2 | ||
|
|
9ca8dbc93a | ||
|
|
4df8f2c931 | ||
|
|
605dd872df |
@@ -460,6 +460,9 @@ func (s *Service) pruneAttsFromPool(ctx context.Context, headState state.BeaconS
|
||||
func (s *Service) pruneCoveredAttsFromPool(ctx context.Context, headState state.BeaconState, att ethpb.Att) error {
|
||||
switch {
|
||||
case !att.IsAggregated():
|
||||
if features.Get().EnableExperimentalAttestationPool {
|
||||
return errors.Wrap(s.cfg.AttestationCache.DeleteCovered(att), "could not delete covered attestation")
|
||||
}
|
||||
return s.cfg.AttPool.DeleteUnaggregatedAttestation(att)
|
||||
case att.Version() == version.Phase0:
|
||||
if features.Get().EnableExperimentalAttestationPool {
|
||||
@@ -528,12 +531,12 @@ func (s *Service) pruneCoveredElectraAttsFromPool(ctx context.Context, headState
|
||||
if err = s.cfg.AttestationCache.DeleteCovered(a); err != nil {
|
||||
return errors.Wrap(err, "could not delete covered attestation")
|
||||
}
|
||||
} else if !a.IsAggregated() {
|
||||
if err = s.cfg.AttPool.DeleteUnaggregatedAttestation(a); err != nil {
|
||||
return errors.Wrap(err, "could not delete unaggregated attestation")
|
||||
} else if a.IsAggregated() {
|
||||
if err = s.cfg.AttPool.DeleteAggregatedAttestation(a); err != nil {
|
||||
return errors.Wrap(err, "could not delete aggregated attestation")
|
||||
}
|
||||
} else if err = s.cfg.AttPool.DeleteAggregatedAttestation(a); err != nil {
|
||||
return errors.Wrap(err, "could not delete aggregated attestation")
|
||||
} else if err = s.cfg.AttPool.DeleteUnaggregatedAttestation(a); err != nil {
|
||||
return errors.Wrap(err, "could not delete unaggregated attestation")
|
||||
}
|
||||
|
||||
offset += uint64(len(c))
|
||||
|
||||
11
beacon-chain/cache/attestation.go
vendored
11
beacon-chain/cache/attestation.go
vendored
@@ -76,13 +76,22 @@ func (c *AttestationCache) Add(att ethpb.Att) error {
|
||||
if group == nil {
|
||||
group = &attGroup{
|
||||
slot: att.GetData().Slot,
|
||||
atts: []ethpb.Att{att},
|
||||
atts: []ethpb.Att{att.Clone()},
|
||||
}
|
||||
c.atts[id] = group
|
||||
return nil
|
||||
}
|
||||
|
||||
if att.IsAggregated() {
|
||||
// Ensure that this attestation is not already fully contained in an existing attestation.
|
||||
for _, a := range group.atts {
|
||||
if c, err := a.GetAggregationBits().Contains(att.GetAggregationBits()); err != nil {
|
||||
return err
|
||||
} else if c {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
group.atts = append(group.atts, att.Clone())
|
||||
return nil
|
||||
}
|
||||
|
||||
26
beacon-chain/cache/attestation_test.go
vendored
26
beacon-chain/cache/attestation_test.go
vendored
@@ -151,6 +151,32 @@ func TestAdd(t *testing.T) {
|
||||
require.Equal(t, 1, len(group.atts))
|
||||
assert.DeepEqual(t, []int{0, 1}, group.atts[0].GetAggregationBits().BitIndices())
|
||||
})
|
||||
t.Run("added attestation is copied", func(t *testing.T) {
|
||||
// We want to make sure that the running aggregate is based on a copy of the original attestation,
|
||||
// to avoid mutating the original attestation.
|
||||
|
||||
c := NewAttestationCache()
|
||||
ab := bitfield.NewBitlist(8)
|
||||
ab.SetBitAt(0, true)
|
||||
original := ðpb.Attestation{
|
||||
Data: ðpb.AttestationData{Slot: 123, BeaconBlockRoot: make([]byte, 32), Source: ðpb.Checkpoint{Root: make([]byte, 32)}, Target: ðpb.Checkpoint{Root: make([]byte, 32)}},
|
||||
AggregationBits: ab,
|
||||
Signature: sig.Marshal(),
|
||||
}
|
||||
require.NoError(t, c.Add(original))
|
||||
|
||||
ab = bitfield.NewBitlist(8)
|
||||
ab.SetBitAt(1, true)
|
||||
a := ðpb.Attestation{
|
||||
Data: ðpb.AttestationData{Slot: 123, BeaconBlockRoot: make([]byte, 32), Source: ðpb.Checkpoint{Root: make([]byte, 32)}, Target: ðpb.Checkpoint{Root: make([]byte, 32)}},
|
||||
AggregationBits: ab,
|
||||
Signature: sig.Marshal(),
|
||||
}
|
||||
require.NoError(t, c.Add(a))
|
||||
|
||||
// Assert that the bit of the second attestation was not added to the original attestation.
|
||||
assert.Equal(t, uint64(1), original.AggregationBits.Count())
|
||||
})
|
||||
}
|
||||
|
||||
func TestGetAll(t *testing.T) {
|
||||
|
||||
@@ -120,6 +120,9 @@ func (s *Service) aggregateAndSaveForkChoiceAtts(atts []ethpb.Att) error {
|
||||
return err
|
||||
}
|
||||
|
||||
if features.Get().EnableExperimentalAttestationPool {
|
||||
return s.cfg.Cache.SaveForkchoiceAttestations(aggregatedAtts)
|
||||
}
|
||||
return s.cfg.Pool.SaveForkchoiceAttestations(aggregatedAtts)
|
||||
}
|
||||
|
||||
|
||||
@@ -20,7 +20,7 @@ func (s *Service) pruneExpired() {
|
||||
s.pruneExpiredAtts()
|
||||
s.updateMetrics()
|
||||
case <-s.ctx.Done():
|
||||
log.Debug("Context closed, exiting routine")
|
||||
log.Debug("Context closed, exiting att pruning routine")
|
||||
return
|
||||
}
|
||||
}
|
||||
@@ -28,11 +28,13 @@ func (s *Service) pruneExpired() {
|
||||
|
||||
// pruneExpiredExperimental prunes attestations on every prune interval.
|
||||
func (s *Service) pruneExpiredExperimental() {
|
||||
ticker := time.NewTicker(s.cfg.pruneInterval)
|
||||
defer ticker.Stop()
|
||||
secondsPerSlot := params.BeaconConfig().SecondsPerSlot
|
||||
offset := time.Duration(secondsPerSlot-1) * time.Second
|
||||
slotTicker := slots.NewSlotTickerWithOffset(s.genesisTime, offset, secondsPerSlot)
|
||||
defer slotTicker.Done()
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
case <-slotTicker.C():
|
||||
expirySlot, err := s.expirySlot()
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Could not get expiry slot")
|
||||
@@ -41,7 +43,7 @@ func (s *Service) pruneExpiredExperimental() {
|
||||
numExpired := s.cfg.Cache.PruneBefore(expirySlot)
|
||||
s.updateMetricsExperimental(numExpired)
|
||||
case <-s.ctx.Done():
|
||||
log.Debug("Context closed, exiting routine")
|
||||
log.Debug("Context closed, exiting att pruning routine")
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
@@ -23,8 +23,7 @@ func TestPruneExpired_Ticker(t *testing.T) {
|
||||
defer cancel()
|
||||
|
||||
s, err := NewService(ctx, &Config{
|
||||
Pool: NewPool(),
|
||||
pruneInterval: 250 * time.Millisecond,
|
||||
Pool: NewPool(),
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
|
||||
@@ -11,7 +11,6 @@ import (
|
||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/cache"
|
||||
lruwrpr "github.com/OffchainLabs/prysm/v7/cache/lru"
|
||||
"github.com/OffchainLabs/prysm/v7/config/features"
|
||||
"github.com/OffchainLabs/prysm/v7/config/params"
|
||||
lru "github.com/hashicorp/golang-lru"
|
||||
)
|
||||
|
||||
@@ -31,7 +30,6 @@ type Service struct {
|
||||
type Config struct {
|
||||
Cache *cache.AttestationCache
|
||||
Pool Pool
|
||||
pruneInterval time.Duration
|
||||
InitialSyncComplete chan struct{}
|
||||
}
|
||||
|
||||
@@ -40,11 +38,6 @@ type Config struct {
|
||||
func NewService(ctx context.Context, cfg *Config) (*Service, error) {
|
||||
cache := lruwrpr.New(forkChoiceProcessedAttsSize)
|
||||
|
||||
if cfg.pruneInterval == 0 {
|
||||
// Prune expired attestations from the pool every slot interval.
|
||||
cfg.pruneInterval = time.Duration(params.BeaconConfig().SecondsPerSlot) * time.Second
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
return &Service{
|
||||
cfg: cfg,
|
||||
|
||||
@@ -282,10 +282,8 @@ func (s *Server) handleAttestationsElectra(
|
||||
if err = s.AttestationCache.Add(att); err != nil {
|
||||
log.WithError(err).Error("Could not save attestation")
|
||||
}
|
||||
} else {
|
||||
if err = s.AttestationsPool.SaveUnaggregatedAttestation(att); err != nil {
|
||||
log.WithError(err).Error("Could not save attestation")
|
||||
}
|
||||
} else if err = s.AttestationsPool.SaveUnaggregatedAttestation(att); err != nil {
|
||||
log.WithError(err).Error("Could not save attestation")
|
||||
}
|
||||
}
|
||||
}()
|
||||
@@ -384,10 +382,8 @@ func (s *Server) handleAttestations(
|
||||
if err = s.AttestationsPool.SaveAggregatedAttestation(att); err != nil {
|
||||
log.WithError(err).Error("Could not save aggregated attestation")
|
||||
}
|
||||
} else {
|
||||
if err = s.AttestationsPool.SaveUnaggregatedAttestation(att); err != nil {
|
||||
log.WithError(err).Error("Could not save unaggregated attestation")
|
||||
}
|
||||
} else if err = s.AttestationsPool.SaveUnaggregatedAttestation(att); err != nil {
|
||||
log.WithError(err).Error("Could not save unaggregated attestation")
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -439,16 +439,12 @@ func (vs *Server) deleteAttsInPool(ctx context.Context, atts []ethpb.Att) error
|
||||
if err := vs.AttestationCache.DeleteCovered(att); err != nil {
|
||||
return errors.Wrap(err, "could not delete attestation")
|
||||
}
|
||||
} else {
|
||||
if att.IsAggregated() {
|
||||
if err := vs.AttPool.DeleteAggregatedAttestation(att); err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
if err := vs.AttPool.DeleteUnaggregatedAttestation(att); err != nil {
|
||||
return err
|
||||
}
|
||||
} else if att.IsAggregated() {
|
||||
if err := vs.AttPool.DeleteAggregatedAttestation(att); err != nil {
|
||||
return err
|
||||
}
|
||||
} else if err := vs.AttPool.DeleteUnaggregatedAttestation(att); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
||||
@@ -26,11 +26,10 @@ func (s *Service) beaconAggregateProofSubscriber(_ context.Context, msg proto.Me
|
||||
|
||||
if features.Get().EnableExperimentalAttestationPool {
|
||||
return s.cfg.attestationCache.Add(aggregate)
|
||||
} else {
|
||||
// An unaggregated attestation can make it here. It’s valid, the aggregator it just itself, although it means poor performance for the subnet.
|
||||
if !aggregate.IsAggregated() {
|
||||
return s.cfg.attPool.SaveUnaggregatedAttestation(aggregate)
|
||||
}
|
||||
}
|
||||
if aggregate.IsAggregated() {
|
||||
return s.cfg.attPool.SaveAggregatedAttestation(aggregate)
|
||||
}
|
||||
// An unaggregated attestation can make it here. It’s valid, the aggregator it just itself, although it means poor performance for the subnet.
|
||||
return s.cfg.attPool.SaveUnaggregatedAttestation(aggregate)
|
||||
}
|
||||
|
||||
@@ -23,16 +23,16 @@ func (s *Service) committeeIndexBeaconAttestationSubscriber(_ context.Context, m
|
||||
|
||||
if features.Get().EnableExperimentalAttestationPool {
|
||||
return s.cfg.attestationCache.Add(a)
|
||||
} else {
|
||||
exists, err := s.cfg.attPool.HasAggregatedAttestation(a)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "could not determine if attestation pool has this attestation")
|
||||
}
|
||||
if exists {
|
||||
return nil
|
||||
}
|
||||
return s.cfg.attPool.SaveUnaggregatedAttestation(a)
|
||||
}
|
||||
exists, err := s.cfg.attPool.HasAggregatedAttestation(a)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "could not determine if attestation pool has this attestation")
|
||||
}
|
||||
if exists {
|
||||
return nil
|
||||
}
|
||||
return s.cfg.attPool.SaveUnaggregatedAttestation(a)
|
||||
|
||||
}
|
||||
|
||||
func persistentSubnetIndices() []uint64 {
|
||||
|
||||
3
changelog/radek_improve-att-cache.md
Normal file
3
changelog/radek_improve-att-cache.md
Normal file
@@ -0,0 +1,3 @@
|
||||
### Fixed
|
||||
|
||||
- Make experimental att cache fully functional.
|
||||
Reference in New Issue
Block a user