mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-09 13:28:01 -05:00
Compare commits
1 Commits
d929e1dcaa
...
update-lru
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7e9a82ff47 |
235
cache/nonblocking/lru.go
vendored
235
cache/nonblocking/lru.go
vendored
@@ -1,5 +1,4 @@
|
||||
// Copyright (c) HashiCorp, Inc.
|
||||
// SPDX-License-Identifier: MPL-2.0
|
||||
// Package lru implements an LRU cache optimized for concurrent reads
|
||||
package nonblocking
|
||||
|
||||
import (
|
||||
@@ -10,137 +9,171 @@ import (
|
||||
// EvictCallback is used to get a callback when a cache entry is evicted
|
||||
type EvictCallback[K comparable, V any] func(key K, value V)
|
||||
|
||||
// LRU implements a non-thread safe fixed size LRU cache
|
||||
// LRU implements a thread-safe fixed size LRU cache optimized for reads
|
||||
type LRU[K comparable, V any] struct {
|
||||
itemsLock sync.RWMutex
|
||||
evictListLock sync.RWMutex
|
||||
size int
|
||||
evictList *lruList[K, V]
|
||||
items map[K]*entry[K, V]
|
||||
onEvict EvictCallback[K, V]
|
||||
getChan chan *entry[K, V]
|
||||
size int
|
||||
lock sync.RWMutex // Main lock for structural changes
|
||||
items map[K]*entry[K, V]
|
||||
evictList *lruList[K, V]
|
||||
onEvict EvictCallback[K, V]
|
||||
|
||||
// Buffered updates for list operations
|
||||
updateBatch []*entry[K, V]
|
||||
batchLock sync.Mutex
|
||||
batchSize int
|
||||
updateSignal chan struct{}
|
||||
done chan struct{}
|
||||
}
|
||||
|
||||
// NewLRU constructs an LRU of the given size
|
||||
func NewLRU[K comparable, V any](size int, onEvict EvictCallback[K, V]) (*LRU[K, V], error) {
|
||||
if size <= 0 {
|
||||
return nil, errors.New("must provide a positive size")
|
||||
}
|
||||
// Initialize the channel buffer size as being 10% of the cache size.
|
||||
chanSize := size / 10
|
||||
|
||||
batchSize := size / 20 // 5% of cache size for batch operations
|
||||
if batchSize < 10 {
|
||||
batchSize = 10
|
||||
}
|
||||
|
||||
c := &LRU[K, V]{
|
||||
size: size,
|
||||
evictList: newList[K, V](),
|
||||
items: make(map[K]*entry[K, V]),
|
||||
onEvict: onEvict,
|
||||
getChan: make(chan *entry[K, V], chanSize),
|
||||
size: size,
|
||||
evictList: newList[K, V](),
|
||||
items: make(map[K]*entry[K, V], size),
|
||||
onEvict: onEvict,
|
||||
updateBatch: make([]*entry[K, V], 0, batchSize),
|
||||
batchSize: batchSize,
|
||||
updateSignal: make(chan struct{}, 1),
|
||||
done: make(chan struct{}),
|
||||
}
|
||||
// Spin off separate go-routine to handle evict list
|
||||
// operations.
|
||||
go c.handleGetRequests()
|
||||
|
||||
go c.processBatchUpdates()
|
||||
return c, nil
|
||||
}
|
||||
|
||||
// Add adds a value to the cache. Returns true if an eviction occurred.
|
||||
func (c *LRU[K, V]) Add(key K, value V) (evicted bool) {
|
||||
// Check for existing item
|
||||
c.itemsLock.RLock()
|
||||
if ent, ok := c.items[key]; ok {
|
||||
c.itemsLock.RUnlock()
|
||||
// Get looks up a key's value from the cache
|
||||
func (c *LRU[K, V]) Get(key K) (value V, ok bool) {
|
||||
// Fast path: Read-only lookup
|
||||
c.lock.RLock()
|
||||
if ent, exists := c.items[key]; exists {
|
||||
value = ent.value // Direct value access
|
||||
c.lock.RUnlock()
|
||||
|
||||
c.evictListLock.Lock()
|
||||
// Queue update for batch processing
|
||||
c.queueUpdate(ent)
|
||||
return value, true
|
||||
}
|
||||
c.lock.RUnlock()
|
||||
return value, false
|
||||
}
|
||||
|
||||
// queueUpdate adds an entry to the batch update queue
|
||||
func (c *LRU[K, V]) queueUpdate(ent *entry[K, V]) {
|
||||
c.batchLock.Lock()
|
||||
c.updateBatch = append(c.updateBatch, ent)
|
||||
|
||||
// Signal if batch is full
|
||||
if len(c.updateBatch) >= c.batchSize {
|
||||
select {
|
||||
case c.updateSignal <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
c.batchLock.Unlock()
|
||||
}
|
||||
|
||||
// processBatchUpdates handles batched LRU list updates
|
||||
func (c *LRU[K, V]) processBatchUpdates() {
|
||||
for {
|
||||
select {
|
||||
case <-c.done:
|
||||
return
|
||||
case <-c.updateSignal:
|
||||
c.processBatch()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *LRU[K, V]) processBatch() {
|
||||
c.batchLock.Lock()
|
||||
batch := c.updateBatch
|
||||
c.updateBatch = make([]*entry[K, V], 0, c.batchSize)
|
||||
c.batchLock.Unlock()
|
||||
|
||||
if len(batch) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
c.lock.Lock()
|
||||
for _, ent := range batch {
|
||||
c.evictList.moveToFront(ent)
|
||||
}
|
||||
c.lock.Unlock()
|
||||
}
|
||||
|
||||
// Add adds a value to the cache
|
||||
func (c *LRU[K, V]) Add(key K, value V) (evicted bool) {
|
||||
c.lock.Lock()
|
||||
defer c.lock.Unlock()
|
||||
|
||||
// Check for existing item
|
||||
if ent, ok := c.items[key]; ok {
|
||||
c.evictList.moveToFront(ent)
|
||||
c.evictListLock.Unlock()
|
||||
ent.value = value
|
||||
return false
|
||||
}
|
||||
c.itemsLock.RUnlock()
|
||||
|
||||
// Add new item
|
||||
c.evictListLock.Lock()
|
||||
ent := c.evictList.pushFront(key, value)
|
||||
c.evictListLock.Unlock()
|
||||
|
||||
c.itemsLock.Lock()
|
||||
c.items[key] = ent
|
||||
c.itemsLock.Unlock()
|
||||
|
||||
c.evictListLock.RLock()
|
||||
evict := c.evictList.length() > c.size
|
||||
c.evictListLock.RUnlock()
|
||||
|
||||
// Verify size not exceeded
|
||||
if evict {
|
||||
c.removeOldest()
|
||||
// Remove oldest if needed
|
||||
if c.evictList.length() > c.size {
|
||||
oldest := c.evictList.back()
|
||||
if oldest != nil {
|
||||
c.removeElement(oldest)
|
||||
return true
|
||||
}
|
||||
}
|
||||
return evict
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
// Get looks up a key's value from the cache.
|
||||
func (c *LRU[K, V]) Get(key K) (value V, ok bool) {
|
||||
c.itemsLock.RLock()
|
||||
if ent, ok := c.items[key]; ok {
|
||||
c.itemsLock.RUnlock()
|
||||
|
||||
// Make this get function non-blocking for multiple readers.
|
||||
c.getChan <- ent
|
||||
return ent.value, true
|
||||
}
|
||||
c.itemsLock.RUnlock()
|
||||
return
|
||||
}
|
||||
|
||||
// Len returns the number of items in the cache.
|
||||
func (c *LRU[K, V]) Len() int {
|
||||
c.evictListLock.RLock()
|
||||
defer c.evictListLock.RUnlock()
|
||||
return c.evictList.length()
|
||||
}
|
||||
|
||||
// Resize changes the cache size.
|
||||
func (c *LRU[K, V]) Resize(size int) (evicted int) {
|
||||
diff := c.Len() - size
|
||||
if diff < 0 {
|
||||
diff = 0
|
||||
}
|
||||
for i := 0; i < diff; i++ {
|
||||
c.removeOldest()
|
||||
}
|
||||
c.size = size
|
||||
return diff
|
||||
}
|
||||
|
||||
// removeOldest removes the oldest item from the cache.
|
||||
func (c *LRU[K, V]) removeOldest() {
|
||||
c.evictListLock.RLock()
|
||||
if ent := c.evictList.back(); ent != nil {
|
||||
c.evictListLock.RUnlock()
|
||||
c.removeElement(ent)
|
||||
return
|
||||
}
|
||||
c.evictListLock.RUnlock()
|
||||
}
|
||||
|
||||
// removeElement is used to remove a given list element from the cache
|
||||
// removeElement removes an element from the cache
|
||||
func (c *LRU[K, V]) removeElement(e *entry[K, V]) {
|
||||
c.evictListLock.Lock()
|
||||
c.evictList.remove(e)
|
||||
c.evictListLock.Unlock()
|
||||
|
||||
c.itemsLock.Lock()
|
||||
delete(c.items, e.key)
|
||||
c.itemsLock.Unlock()
|
||||
if c.onEvict != nil {
|
||||
c.onEvict(e.key, e.value)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *LRU[K, V]) handleGetRequests() {
|
||||
for {
|
||||
entry := <-c.getChan
|
||||
c.evictListLock.Lock()
|
||||
c.evictList.moveToFront(entry)
|
||||
c.evictListLock.Unlock()
|
||||
}
|
||||
// Len returns the number of items in the cache
|
||||
func (c *LRU[K, V]) Len() int {
|
||||
c.lock.RLock()
|
||||
defer c.lock.RUnlock()
|
||||
return c.evictList.length()
|
||||
}
|
||||
|
||||
// Resize changes the cache size
|
||||
func (c *LRU[K, V]) Resize(size int) (evicted int) {
|
||||
c.lock.Lock()
|
||||
defer c.lock.Unlock()
|
||||
|
||||
c.size = size
|
||||
diff := c.evictList.length() - size
|
||||
if diff < 0 {
|
||||
diff = 0
|
||||
}
|
||||
|
||||
for i := 0; i < diff; i++ {
|
||||
if ent := c.evictList.back(); ent != nil {
|
||||
c.removeElement(ent)
|
||||
}
|
||||
}
|
||||
return diff
|
||||
}
|
||||
|
||||
// Close stops the background goroutine
|
||||
func (c *LRU[K, V]) Close() {
|
||||
close(c.done)
|
||||
}
|
||||
|
||||
127
cache/nonblocking/lru_test.go
vendored
127
cache/nonblocking/lru_test.go
vendored
@@ -5,6 +5,7 @@ package nonblocking
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
@@ -113,3 +114,129 @@ func TestLRU_Resize(t *testing.T) {
|
||||
t.Errorf("Cache should have contained 2 elements")
|
||||
}
|
||||
}
|
||||
|
||||
// BenchmarkLRU_Add benchmarks the Add operation with different cache sizes
|
||||
func BenchmarkLRU_Add(b *testing.B) {
|
||||
sizes := []int{100, 1000, 10000}
|
||||
for _, size := range sizes {
|
||||
b.Run(fmt.Sprintf("size-%d", size), func(b *testing.B) {
|
||||
cache, _ := NewLRU[int, int](size, nil)
|
||||
b.ResetTimer()
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
cache.Add(i, i)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// BenchmarkLRU_Get benchmarks the Get operation with different cache sizes
|
||||
func BenchmarkLRU_Get(b *testing.B) {
|
||||
sizes := []int{100, 1000, 10000}
|
||||
for _, size := range sizes {
|
||||
b.Run(fmt.Sprintf("size-%d", size), func(b *testing.B) {
|
||||
cache, _ := NewLRU[int, int](size, nil)
|
||||
// Pre-populate cache
|
||||
for i := 0; i < size; i++ {
|
||||
cache.Add(i, i)
|
||||
}
|
||||
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
cache.Get(i % size)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// BenchmarkLRU_AddWithEviction benchmarks Add operation when cache is full
|
||||
func BenchmarkLRU_AddWithEviction(b *testing.B) {
|
||||
sizes := []int{100, 1000, 10000}
|
||||
for _, size := range sizes {
|
||||
b.Run(fmt.Sprintf("size-%d", size), func(b *testing.B) {
|
||||
cache, _ := NewLRU[int, int](size, nil)
|
||||
// Pre-populate cache to force evictions
|
||||
for i := 0; i < size; i++ {
|
||||
cache.Add(i, i)
|
||||
}
|
||||
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
cache.Add(size+i, i)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// BenchmarkLRU_MixedOperations benchmarks a mix of Add and Get operations
|
||||
func BenchmarkLRU_MixedOperations(b *testing.B) {
|
||||
sizes := []int{100, 1000, 10000}
|
||||
for _, size := range sizes {
|
||||
b.Run(fmt.Sprintf("size-%d", size), func(b *testing.B) {
|
||||
cache, _ := NewLRU[int, int](size, nil)
|
||||
// Pre-populate half the cache
|
||||
for i := 0; i < size/2; i++ {
|
||||
cache.Add(i, i)
|
||||
}
|
||||
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
if i%2 == 0 {
|
||||
cache.Add(i, i)
|
||||
} else {
|
||||
cache.Get(i % (size / 2))
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// BenchmarkLRU_ParallelGet benchmarks concurrent Get operations
|
||||
func BenchmarkLRU_ParallelGet(b *testing.B) {
|
||||
sizes := []int{100, 1000, 10000}
|
||||
for _, size := range sizes {
|
||||
b.Run(fmt.Sprintf("size-%d", size), func(b *testing.B) {
|
||||
cache, _ := NewLRU[int, int](size, nil)
|
||||
// Pre-populate cache
|
||||
for i := 0; i < size; i++ {
|
||||
cache.Add(i, i)
|
||||
}
|
||||
|
||||
b.ResetTimer()
|
||||
b.RunParallel(func(pb *testing.PB) {
|
||||
i := 0
|
||||
for pb.Next() {
|
||||
cache.Get(i % size)
|
||||
i++
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// BenchmarkLRU_ParallelAddGet benchmarks concurrent Add and Get operations
|
||||
func BenchmarkLRU_ParallelAddGet(b *testing.B) {
|
||||
sizes := []int{100, 1000, 10000}
|
||||
for _, size := range sizes {
|
||||
b.Run(fmt.Sprintf("size-%d", size), func(b *testing.B) {
|
||||
cache, _ := NewLRU[int, int](size, nil)
|
||||
// Pre-populate half the cache
|
||||
for i := 0; i < size/2; i++ {
|
||||
cache.Add(i, i)
|
||||
}
|
||||
|
||||
b.ResetTimer()
|
||||
b.RunParallel(func(pb *testing.PB) {
|
||||
i := 0
|
||||
for pb.Next() {
|
||||
if i%2 == 0 {
|
||||
cache.Add(i, i)
|
||||
} else {
|
||||
cache.Get(i % (size / 2))
|
||||
}
|
||||
i++
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user