optimize data column seen cache memory usage with slot-aware pruning (#15477)

* optimize data column seen cache memory usage with slot-aware pruning

* Manu's feedback

* Fix lint
This commit is contained in:
terence
2025-07-15 09:00:36 -07:00
committed by GitHub
parent 499d27b6ae
commit 6f5ff03b42
8 changed files with 247 additions and 9 deletions

View File

@@ -35,6 +35,7 @@ go_library(
"rpc_send_request.go",
"rpc_status.go",
"service.go",
"slot_aware_cache.go",
"subscriber.go",
"subscriber_beacon_aggregate_proof.go",
"subscriber_beacon_attestation.go",
@@ -187,6 +188,7 @@ go_test(
"rpc_status_test.go",
"rpc_test.go",
"service_test.go",
"slot_aware_cache_test.go",
"subscriber_beacon_aggregate_proof_test.go",
"subscriber_beacon_blocks_test.go",
"subscriber_test.go",

View File

@@ -133,7 +133,7 @@ func TestBroadcastMissingDataColumnSidecars(t *testing.T) {
for _, index := range [...]uint64{1, 17, 19, 42, 75, 87, 102, 117} {
key := computeCacheKey(slot, proposerIndex, index)
service.seenDataColumnCache.Add(key, true)
service.seenDataColumnCache.Add(slot, key, true)
}
err := service.broadcastMissingDataColumnSidecars(slot, proposerIndex, root, timeIntoSlot)
@@ -164,7 +164,7 @@ func TestBroadcastMissingDataColumnSidecars(t *testing.T) {
for _, index := range [...]uint64{1, 17, 19, 102, 117} { // 42, 75 and 87 are missing
key := computeCacheKey(slot, proposerIndex, index)
service.seenDataColumnCache.Add(key, true)
service.seenDataColumnCache.Add(slot, key, true)
}
for _, index := range [...]uint64{42, 75, 87} {

View File

@@ -50,6 +50,7 @@ import (
"github.com/OffchainLabs/prysm/v6/runtime"
prysmTime "github.com/OffchainLabs/prysm/v6/time"
"github.com/OffchainLabs/prysm/v6/time/slots"
"github.com/sirupsen/logrus"
)
var _ runtime.Service = (*Service)(nil)
@@ -145,7 +146,7 @@ type Service struct {
seenBlockCache *lru.Cache
seenBlobLock sync.RWMutex
seenBlobCache *lru.Cache
seenDataColumnCache *lru.Cache
seenDataColumnCache *slotAwareCache
seenAggregatedAttestationLock sync.RWMutex
seenAggregatedAttestationCache *lru.Cache
seenUnAggregatedAttestationLock sync.RWMutex
@@ -269,6 +270,9 @@ func (s *Service) Start() {
// Update sync metrics.
async.RunEvery(s.ctx, syncMetricsInterval, s.updateMetrics)
// Prune data column cache periodically on finalization.
async.RunEvery(s.ctx, 30*time.Second, s.pruneDataColumnCache)
}
// Stop the regular sync service.
@@ -306,7 +310,7 @@ func (s *Service) Status() error {
func (s *Service) initCaches() {
s.seenBlockCache = lruwrpr.New(seenBlockSize)
s.seenBlobCache = lruwrpr.New(seenBlockSize * params.BeaconConfig().DeprecatedMaxBlobsPerBlockElectra)
s.seenDataColumnCache = lruwrpr.New(seenDataColumnSize)
s.seenDataColumnCache = newSlotAwareCache(seenDataColumnSize)
s.seenAggregatedAttestationCache = lruwrpr.New(seenAggregatedAttSize)
s.seenUnAggregatedAttestationCache = lruwrpr.New(seenUnaggregatedAttSize)
s.seenSyncMessageCache = lruwrpr.New(seenSyncMsgSize)
@@ -321,7 +325,7 @@ func (s *Service) initCaches() {
func (s *Service) waitForChainStart() {
clock, err := s.clockWaiter.WaitForClock(s.ctx)
if err != nil {
log.WithError(err).Error("sync service failed to receive genesis data")
log.WithError(err).Error("Sync service failed to receive genesis data")
return
}
s.cfg.clock = clock
@@ -333,7 +337,7 @@ func (s *Service) waitForChainStart() {
log.
WithError(err).
WithField("genesisValidatorRoot", clock.GenesisValidatorsRoot()).
Error("sync service failed to initialize context version map")
Error("Sync service failed to initialize context version map")
return
}
s.ctxMap = ctxMap
@@ -394,6 +398,24 @@ func (s *Service) markForChainStart() {
s.chainStarted.Set()
}
// pruneDataColumnCache removes entries from the data column cache that are older than the finalized slot.
func (s *Service) pruneDataColumnCache() {
finalizedCheckpoint := s.cfg.chain.FinalizedCheckpt()
finalizedSlot, err := slots.EpochStart(finalizedCheckpoint.Epoch)
if err != nil {
log.WithError(err).Error("Could not calculate finalized slot for cache pruning")
return
}
pruned := s.seenDataColumnCache.pruneSlotsBefore(finalizedSlot)
if pruned > 0 {
log.WithFields(logrus.Fields{
"finalizedSlot": finalizedSlot,
"prunedEntries": pruned,
}).Debug("Pruned data column cache entries before finalized slot")
}
}
func (s *Service) chainIsStarted() bool {
return s.chainStarted.IsSet()
}

View File

@@ -0,0 +1,91 @@
package sync
import (
"slices"
"sync"
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
lru "github.com/hashicorp/golang-lru"
)
const maxSlots = 1000 // Maximum number of slots to track before pruning oldest
// slotAwareCache is a cache that tracks which keys belong to which slot
// to enable slot-based pruning when blocks are finalized.
type slotAwareCache struct {
cache *lru.Cache
slotToKeys map[primitives.Slot][]string
mu sync.RWMutex
}
// newSlotAwareCache creates a new slot-aware cache with the given size.
func newSlotAwareCache(size int) *slotAwareCache {
cache, _ := lru.New(size)
return &slotAwareCache{
cache: cache,
slotToKeys: make(map[primitives.Slot][]string),
}
}
// Get retrieves a value from the cache.
func (c *slotAwareCache) Get(key string) (interface{}, bool) {
c.mu.RLock()
defer c.mu.RUnlock()
return c.cache.Get(key)
}
// Add adds a value to the cache associated with a specific slot.
func (c *slotAwareCache) Add(slot primitives.Slot, key string, value interface{}) {
c.mu.Lock()
defer c.mu.Unlock()
// Add to cache
c.cache.Add(key, value)
// Track slot association
c.slotToKeys[slot] = append(c.slotToKeys[slot], key)
c.pruneOldestSlots()
}
// pruneSlotsBefore removes all entries with slots less than the given slot.
// This should be called when a new finalized checkpoint is reached.
func (c *slotAwareCache) pruneSlotsBefore(finalizedSlot primitives.Slot) int {
c.mu.Lock()
defer c.mu.Unlock()
pruned := 0
for slot, keys := range c.slotToKeys {
if slot < finalizedSlot {
for _, key := range keys {
c.cache.Remove(key)
pruned++
}
delete(c.slotToKeys, slot)
}
}
return pruned
}
// pruneOldestSlots removes the oldest slots when we exceed maxSlots.
// This ensures bounded memory usage even during long finalization delays.
// Note: This function assumes the mutex is already held.
func (c *slotAwareCache) pruneOldestSlots() {
if len(c.slotToKeys) <= maxSlots {
return
}
// Get all slots and sort them to find the oldest
slots := make([]primitives.Slot, 0, len(c.slotToKeys))
for slot := range c.slotToKeys {
slots = append(slots, slot)
}
slices.Sort(slots)
// Remove oldest slots until we're back under the limit
slotsToRemove := len(c.slotToKeys) - maxSlots
for i := range slotsToRemove {
slot := slots[i]
delete(c.slotToKeys, slot)
}
}

View File

@@ -0,0 +1,121 @@
package sync
import (
"fmt"
"testing"
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
"github.com/OffchainLabs/prysm/v6/testing/require"
)
func TestSlotAwareCache(t *testing.T) {
cache := newSlotAwareCache(100)
t.Run("basic operations", func(t *testing.T) {
// Add entries for different slots
cache.Add(primitives.Slot(10), "key1", "value1")
cache.Add(primitives.Slot(20), "key2", "value2")
cache.Add(primitives.Slot(30), "key3", "value3")
// Check they exist
val, exists := cache.Get("key1")
require.Equal(t, true, exists)
require.Equal(t, "value1", val)
val, exists = cache.Get("key2")
require.Equal(t, true, exists)
require.Equal(t, "value2", val)
val, exists = cache.Get("key3")
require.Equal(t, true, exists)
require.Equal(t, "value3", val)
// Check cache stats
totalEntries, slotsTracked := cache.cache.Len(), len(cache.slotToKeys)
require.Equal(t, 3, totalEntries)
require.Equal(t, 3, slotsTracked)
})
// Test slot-based pruning
t.Run("slot-based pruning", func(t *testing.T) {
cache := newSlotAwareCache(100)
// Add entries for different slots
cache.Add(primitives.Slot(10), "key10", "value10")
cache.Add(primitives.Slot(20), "key20", "value20")
cache.Add(primitives.Slot(30), "key30", "value30")
cache.Add(primitives.Slot(40), "key40", "value40")
pruned := cache.pruneSlotsBefore(primitives.Slot(25))
require.Equal(t, 2, pruned) // Should prune entries from slots 10 and 20
// Check that entries from slots 10 and 20 are gone
_, exists := cache.Get("key10")
require.Equal(t, false, exists)
_, exists = cache.Get("key20")
require.Equal(t, false, exists)
// Check that entries from slots 30 and 40 still exist
val, exists := cache.Get("key30")
require.Equal(t, true, exists)
require.Equal(t, "value30", val)
val, exists = cache.Get("key40")
require.Equal(t, true, exists)
require.Equal(t, "value40", val)
// Check cache stats
totalEntries, slotsTracked := cache.cache.Len(), len(cache.slotToKeys)
require.Equal(t, 2, totalEntries)
require.Equal(t, 2, slotsTracked)
})
t.Run("multiple keys per slot", func(t *testing.T) {
cache := newSlotAwareCache(100)
// Add multiple entries for the same slot
cache.Add(primitives.Slot(10), "key1", "value1")
cache.Add(primitives.Slot(10), "key2", "value2")
cache.Add(primitives.Slot(20), "key3", "value3")
// Check they exist
val, exists := cache.Get("key1")
require.Equal(t, true, exists)
require.Equal(t, "value1", val)
val, exists = cache.Get("key2")
require.Equal(t, true, exists)
require.Equal(t, "value2", val)
// Prune slot 10
pruned := cache.pruneSlotsBefore(primitives.Slot(15))
require.Equal(t, 2, pruned) // Should prune both keys from slot 10
// Check that both keys from slot 10 are gone
_, exists = cache.Get("key1")
require.Equal(t, false, exists)
_, exists = cache.Get("key2")
require.Equal(t, false, exists)
// Check that key from slot 20 still exists
val, exists = cache.Get("key3")
require.Equal(t, true, exists)
require.Equal(t, "value3", val)
})
t.Run("bounded slot tracking", func(t *testing.T) {
cache := newSlotAwareCache(200000) // Large cache to avoid LRU eviction
// Add entries for 1005 slots, each with one key
for i := 0; i < 1005; i++ {
slot := primitives.Slot(i)
key := fmt.Sprintf("key%d", i)
cache.Add(slot, key, fmt.Sprintf("value%d", i))
}
// Should only track 1000 slots (oldest 5 slots pruned)
require.Equal(t, 1000, len(cache.slotToKeys))
})
}

View File

@@ -232,7 +232,7 @@ func (s *Service) hasSeenDataColumnIndex(slot primitives.Slot, proposerIndex pri
// Sets the data column with the same slot, proposer index, and data column index as seen.
func (s *Service) setSeenDataColumnIndex(slot primitives.Slot, proposerIndex primitives.ValidatorIndex, index uint64) {
key := computeCacheKey(slot, proposerIndex, index)
s.seenDataColumnCache.Add(key, true)
s.seenDataColumnCache.Add(slot, key, true)
}
func computeCacheKey(slot primitives.Slot, proposerIndex primitives.ValidatorIndex, index uint64) string {

View File

@@ -13,7 +13,6 @@ import (
"github.com/OffchainLabs/prysm/v6/beacon-chain/startup"
mockSync "github.com/OffchainLabs/prysm/v6/beacon-chain/sync/initial-sync/testing"
"github.com/OffchainLabs/prysm/v6/beacon-chain/verification"
lruwrpr "github.com/OffchainLabs/prysm/v6/cache/lru"
fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams"
"github.com/OffchainLabs/prysm/v6/config/params"
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
@@ -66,7 +65,7 @@ func TestValidateDataColumn(t *testing.T) {
service := &Service{
cfg: &config{p2p: p, initialSync: &mockSync.Sync{}, clock: clock, chain: chainService},
newColumnsVerifier: newDataColumnsVerifier,
seenDataColumnCache: lruwrpr.New(seenDataColumnSize),
seenDataColumnCache: newSlotAwareCache(seenDataColumnSize),
}
// Encode a `beaconBlock` message instead of expected.

3
changelog/tt_beans.md Normal file
View File

@@ -0,0 +1,3 @@
### Added
- Slot aware cache for seen data column gossip p2p to reduce memory usages.