Compare commits

...

4 Commits

Author SHA1 Message Date
rkapka
781b5b63a2 test copy 2025-12-19 15:39:10 +01:00
rkapka
9ca8dbc93a changelog <3 2025-12-19 15:22:03 +01:00
Radosław Kapka
4df8f2c931 Merge branch 'develop' into improve-att-cache 2025-12-19 15:20:51 +01:00
rkapka
605dd872df Make experimental att cache fully functional 2025-12-19 15:18:26 +01:00
12 changed files with 80 additions and 51 deletions

View File

@@ -460,6 +460,9 @@ func (s *Service) pruneAttsFromPool(ctx context.Context, headState state.BeaconS
func (s *Service) pruneCoveredAttsFromPool(ctx context.Context, headState state.BeaconState, att ethpb.Att) error {
switch {
case !att.IsAggregated():
if features.Get().EnableExperimentalAttestationPool {
return errors.Wrap(s.cfg.AttestationCache.DeleteCovered(att), "could not delete covered attestation")
}
return s.cfg.AttPool.DeleteUnaggregatedAttestation(att)
case att.Version() == version.Phase0:
if features.Get().EnableExperimentalAttestationPool {
@@ -528,12 +531,12 @@ func (s *Service) pruneCoveredElectraAttsFromPool(ctx context.Context, headState
if err = s.cfg.AttestationCache.DeleteCovered(a); err != nil {
return errors.Wrap(err, "could not delete covered attestation")
}
} else if !a.IsAggregated() {
if err = s.cfg.AttPool.DeleteUnaggregatedAttestation(a); err != nil {
return errors.Wrap(err, "could not delete unaggregated attestation")
} else if a.IsAggregated() {
if err = s.cfg.AttPool.DeleteAggregatedAttestation(a); err != nil {
return errors.Wrap(err, "could not delete aggregated attestation")
}
} else if err = s.cfg.AttPool.DeleteAggregatedAttestation(a); err != nil {
return errors.Wrap(err, "could not delete aggregated attestation")
} else if err = s.cfg.AttPool.DeleteUnaggregatedAttestation(a); err != nil {
return errors.Wrap(err, "could not delete unaggregated attestation")
}
offset += uint64(len(c))

View File

@@ -76,13 +76,22 @@ func (c *AttestationCache) Add(att ethpb.Att) error {
if group == nil {
group = &attGroup{
slot: att.GetData().Slot,
atts: []ethpb.Att{att},
atts: []ethpb.Att{att.Clone()},
}
c.atts[id] = group
return nil
}
if att.IsAggregated() {
// Ensure that this attestation is not already fully contained in an existing attestation.
for _, a := range group.atts {
if c, err := a.GetAggregationBits().Contains(att.GetAggregationBits()); err != nil {
return err
} else if c {
return nil
}
}
group.atts = append(group.atts, att.Clone())
return nil
}

View File

@@ -151,6 +151,32 @@ func TestAdd(t *testing.T) {
require.Equal(t, 1, len(group.atts))
assert.DeepEqual(t, []int{0, 1}, group.atts[0].GetAggregationBits().BitIndices())
})
t.Run("added attestation is copied", func(t *testing.T) {
// We want to make sure that the running aggregate is based on a copy of the original attestation,
// to avoid mutating the original attestation.
c := NewAttestationCache()
ab := bitfield.NewBitlist(8)
ab.SetBitAt(0, true)
original := &ethpb.Attestation{
Data: &ethpb.AttestationData{Slot: 123, BeaconBlockRoot: make([]byte, 32), Source: &ethpb.Checkpoint{Root: make([]byte, 32)}, Target: &ethpb.Checkpoint{Root: make([]byte, 32)}},
AggregationBits: ab,
Signature: sig.Marshal(),
}
require.NoError(t, c.Add(original))
ab = bitfield.NewBitlist(8)
ab.SetBitAt(1, true)
a := &ethpb.Attestation{
Data: &ethpb.AttestationData{Slot: 123, BeaconBlockRoot: make([]byte, 32), Source: &ethpb.Checkpoint{Root: make([]byte, 32)}, Target: &ethpb.Checkpoint{Root: make([]byte, 32)}},
AggregationBits: ab,
Signature: sig.Marshal(),
}
require.NoError(t, c.Add(a))
// Assert that the bit of the second attestation was not added to the original attestation.
assert.Equal(t, uint64(1), original.AggregationBits.Count())
})
}
func TestGetAll(t *testing.T) {

View File

@@ -120,6 +120,9 @@ func (s *Service) aggregateAndSaveForkChoiceAtts(atts []ethpb.Att) error {
return err
}
if features.Get().EnableExperimentalAttestationPool {
return s.cfg.Cache.SaveForkchoiceAttestations(aggregatedAtts)
}
return s.cfg.Pool.SaveForkchoiceAttestations(aggregatedAtts)
}

View File

@@ -20,7 +20,7 @@ func (s *Service) pruneExpired() {
s.pruneExpiredAtts()
s.updateMetrics()
case <-s.ctx.Done():
log.Debug("Context closed, exiting routine")
log.Debug("Context closed, exiting att pruning routine")
return
}
}
@@ -28,11 +28,13 @@ func (s *Service) pruneExpired() {
// pruneExpiredExperimental prunes attestations on every prune interval.
func (s *Service) pruneExpiredExperimental() {
ticker := time.NewTicker(s.cfg.pruneInterval)
defer ticker.Stop()
secondsPerSlot := params.BeaconConfig().SecondsPerSlot
offset := time.Duration(secondsPerSlot-1) * time.Second
slotTicker := slots.NewSlotTickerWithOffset(s.genesisTime, offset, secondsPerSlot)
defer slotTicker.Done()
for {
select {
case <-ticker.C:
case <-slotTicker.C():
expirySlot, err := s.expirySlot()
if err != nil {
log.WithError(err).Error("Could not get expiry slot")
@@ -41,7 +43,7 @@ func (s *Service) pruneExpiredExperimental() {
numExpired := s.cfg.Cache.PruneBefore(expirySlot)
s.updateMetricsExperimental(numExpired)
case <-s.ctx.Done():
log.Debug("Context closed, exiting routine")
log.Debug("Context closed, exiting att pruning routine")
return
}
}

View File

@@ -23,8 +23,7 @@ func TestPruneExpired_Ticker(t *testing.T) {
defer cancel()
s, err := NewService(ctx, &Config{
Pool: NewPool(),
pruneInterval: 250 * time.Millisecond,
Pool: NewPool(),
})
require.NoError(t, err)

View File

@@ -11,7 +11,6 @@ import (
"github.com/OffchainLabs/prysm/v7/beacon-chain/cache"
lruwrpr "github.com/OffchainLabs/prysm/v7/cache/lru"
"github.com/OffchainLabs/prysm/v7/config/features"
"github.com/OffchainLabs/prysm/v7/config/params"
lru "github.com/hashicorp/golang-lru"
)
@@ -31,7 +30,6 @@ type Service struct {
type Config struct {
Cache *cache.AttestationCache
Pool Pool
pruneInterval time.Duration
InitialSyncComplete chan struct{}
}
@@ -40,11 +38,6 @@ type Config struct {
func NewService(ctx context.Context, cfg *Config) (*Service, error) {
cache := lruwrpr.New(forkChoiceProcessedAttsSize)
if cfg.pruneInterval == 0 {
// Prune expired attestations from the pool every slot interval.
cfg.pruneInterval = time.Duration(params.BeaconConfig().SecondsPerSlot) * time.Second
}
ctx, cancel := context.WithCancel(ctx)
return &Service{
cfg: cfg,

View File

@@ -282,10 +282,8 @@ func (s *Server) handleAttestationsElectra(
if err = s.AttestationCache.Add(att); err != nil {
log.WithError(err).Error("Could not save attestation")
}
} else {
if err = s.AttestationsPool.SaveUnaggregatedAttestation(att); err != nil {
log.WithError(err).Error("Could not save attestation")
}
} else if err = s.AttestationsPool.SaveUnaggregatedAttestation(att); err != nil {
log.WithError(err).Error("Could not save attestation")
}
}
}()
@@ -384,10 +382,8 @@ func (s *Server) handleAttestations(
if err = s.AttestationsPool.SaveAggregatedAttestation(att); err != nil {
log.WithError(err).Error("Could not save aggregated attestation")
}
} else {
if err = s.AttestationsPool.SaveUnaggregatedAttestation(att); err != nil {
log.WithError(err).Error("Could not save unaggregated attestation")
}
} else if err = s.AttestationsPool.SaveUnaggregatedAttestation(att); err != nil {
log.WithError(err).Error("Could not save unaggregated attestation")
}
}

View File

@@ -439,16 +439,12 @@ func (vs *Server) deleteAttsInPool(ctx context.Context, atts []ethpb.Att) error
if err := vs.AttestationCache.DeleteCovered(att); err != nil {
return errors.Wrap(err, "could not delete attestation")
}
} else {
if att.IsAggregated() {
if err := vs.AttPool.DeleteAggregatedAttestation(att); err != nil {
return err
}
} else {
if err := vs.AttPool.DeleteUnaggregatedAttestation(att); err != nil {
return err
}
} else if att.IsAggregated() {
if err := vs.AttPool.DeleteAggregatedAttestation(att); err != nil {
return err
}
} else if err := vs.AttPool.DeleteUnaggregatedAttestation(att); err != nil {
return err
}
}
return nil

View File

@@ -26,11 +26,10 @@ func (s *Service) beaconAggregateProofSubscriber(_ context.Context, msg proto.Me
if features.Get().EnableExperimentalAttestationPool {
return s.cfg.attestationCache.Add(aggregate)
} else {
// An unaggregated attestation can make it here. Its valid, the aggregator it just itself, although it means poor performance for the subnet.
if !aggregate.IsAggregated() {
return s.cfg.attPool.SaveUnaggregatedAttestation(aggregate)
}
}
if aggregate.IsAggregated() {
return s.cfg.attPool.SaveAggregatedAttestation(aggregate)
}
// An unaggregated attestation can make it here. Its valid, the aggregator it just itself, although it means poor performance for the subnet.
return s.cfg.attPool.SaveUnaggregatedAttestation(aggregate)
}

View File

@@ -23,16 +23,16 @@ func (s *Service) committeeIndexBeaconAttestationSubscriber(_ context.Context, m
if features.Get().EnableExperimentalAttestationPool {
return s.cfg.attestationCache.Add(a)
} else {
exists, err := s.cfg.attPool.HasAggregatedAttestation(a)
if err != nil {
return errors.Wrap(err, "could not determine if attestation pool has this attestation")
}
if exists {
return nil
}
return s.cfg.attPool.SaveUnaggregatedAttestation(a)
}
exists, err := s.cfg.attPool.HasAggregatedAttestation(a)
if err != nil {
return errors.Wrap(err, "could not determine if attestation pool has this attestation")
}
if exists {
return nil
}
return s.cfg.attPool.SaveUnaggregatedAttestation(a)
}
func persistentSubnetIndices() []uint64 {

View File

@@ -0,0 +1,3 @@
### Fixed
- Make experimental att cache fully functional.