Compare commits

...

1 Commits

Author SHA1 Message Date
terence tsao
7e9a82ff47 Update LRU cache to faster read by batch evict 2025-01-03 14:59:19 -08:00
2 changed files with 261 additions and 101 deletions

View File

@@ -1,5 +1,4 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
// Package lru implements an LRU cache optimized for concurrent reads
package nonblocking
import (
@@ -10,137 +9,171 @@ import (
// EvictCallback is used to get a callback when a cache entry is evicted
type EvictCallback[K comparable, V any] func(key K, value V)
// LRU implements a non-thread safe fixed size LRU cache
// LRU implements a thread-safe fixed size LRU cache optimized for reads
type LRU[K comparable, V any] struct {
itemsLock sync.RWMutex
evictListLock sync.RWMutex
size int
evictList *lruList[K, V]
items map[K]*entry[K, V]
onEvict EvictCallback[K, V]
getChan chan *entry[K, V]
size int
lock sync.RWMutex // Main lock for structural changes
items map[K]*entry[K, V]
evictList *lruList[K, V]
onEvict EvictCallback[K, V]
// Buffered updates for list operations
updateBatch []*entry[K, V]
batchLock sync.Mutex
batchSize int
updateSignal chan struct{}
done chan struct{}
}
// NewLRU constructs an LRU of the given size
func NewLRU[K comparable, V any](size int, onEvict EvictCallback[K, V]) (*LRU[K, V], error) {
if size <= 0 {
return nil, errors.New("must provide a positive size")
}
// Initialize the channel buffer size as being 10% of the cache size.
chanSize := size / 10
batchSize := size / 20 // 5% of cache size for batch operations
if batchSize < 10 {
batchSize = 10
}
c := &LRU[K, V]{
size: size,
evictList: newList[K, V](),
items: make(map[K]*entry[K, V]),
onEvict: onEvict,
getChan: make(chan *entry[K, V], chanSize),
size: size,
evictList: newList[K, V](),
items: make(map[K]*entry[K, V], size),
onEvict: onEvict,
updateBatch: make([]*entry[K, V], 0, batchSize),
batchSize: batchSize,
updateSignal: make(chan struct{}, 1),
done: make(chan struct{}),
}
// Spin off separate go-routine to handle evict list
// operations.
go c.handleGetRequests()
go c.processBatchUpdates()
return c, nil
}
// Add adds a value to the cache. Returns true if an eviction occurred.
func (c *LRU[K, V]) Add(key K, value V) (evicted bool) {
// Check for existing item
c.itemsLock.RLock()
if ent, ok := c.items[key]; ok {
c.itemsLock.RUnlock()
// Get looks up a key's value from the cache
func (c *LRU[K, V]) Get(key K) (value V, ok bool) {
// Fast path: Read-only lookup
c.lock.RLock()
if ent, exists := c.items[key]; exists {
value = ent.value // Direct value access
c.lock.RUnlock()
c.evictListLock.Lock()
// Queue update for batch processing
c.queueUpdate(ent)
return value, true
}
c.lock.RUnlock()
return value, false
}
// queueUpdate adds an entry to the batch update queue
func (c *LRU[K, V]) queueUpdate(ent *entry[K, V]) {
c.batchLock.Lock()
c.updateBatch = append(c.updateBatch, ent)
// Signal if batch is full
if len(c.updateBatch) >= c.batchSize {
select {
case c.updateSignal <- struct{}{}:
default:
}
}
c.batchLock.Unlock()
}
// processBatchUpdates handles batched LRU list updates
func (c *LRU[K, V]) processBatchUpdates() {
for {
select {
case <-c.done:
return
case <-c.updateSignal:
c.processBatch()
}
}
}
func (c *LRU[K, V]) processBatch() {
c.batchLock.Lock()
batch := c.updateBatch
c.updateBatch = make([]*entry[K, V], 0, c.batchSize)
c.batchLock.Unlock()
if len(batch) == 0 {
return
}
c.lock.Lock()
for _, ent := range batch {
c.evictList.moveToFront(ent)
}
c.lock.Unlock()
}
// Add adds a value to the cache
func (c *LRU[K, V]) Add(key K, value V) (evicted bool) {
c.lock.Lock()
defer c.lock.Unlock()
// Check for existing item
if ent, ok := c.items[key]; ok {
c.evictList.moveToFront(ent)
c.evictListLock.Unlock()
ent.value = value
return false
}
c.itemsLock.RUnlock()
// Add new item
c.evictListLock.Lock()
ent := c.evictList.pushFront(key, value)
c.evictListLock.Unlock()
c.itemsLock.Lock()
c.items[key] = ent
c.itemsLock.Unlock()
c.evictListLock.RLock()
evict := c.evictList.length() > c.size
c.evictListLock.RUnlock()
// Verify size not exceeded
if evict {
c.removeOldest()
// Remove oldest if needed
if c.evictList.length() > c.size {
oldest := c.evictList.back()
if oldest != nil {
c.removeElement(oldest)
return true
}
}
return evict
return false
}
// Get looks up a key's value from the cache.
func (c *LRU[K, V]) Get(key K) (value V, ok bool) {
c.itemsLock.RLock()
if ent, ok := c.items[key]; ok {
c.itemsLock.RUnlock()
// Make this get function non-blocking for multiple readers.
c.getChan <- ent
return ent.value, true
}
c.itemsLock.RUnlock()
return
}
// Len returns the number of items in the cache.
func (c *LRU[K, V]) Len() int {
c.evictListLock.RLock()
defer c.evictListLock.RUnlock()
return c.evictList.length()
}
// Resize changes the cache size.
func (c *LRU[K, V]) Resize(size int) (evicted int) {
diff := c.Len() - size
if diff < 0 {
diff = 0
}
for i := 0; i < diff; i++ {
c.removeOldest()
}
c.size = size
return diff
}
// removeOldest removes the oldest item from the cache.
func (c *LRU[K, V]) removeOldest() {
c.evictListLock.RLock()
if ent := c.evictList.back(); ent != nil {
c.evictListLock.RUnlock()
c.removeElement(ent)
return
}
c.evictListLock.RUnlock()
}
// removeElement is used to remove a given list element from the cache
// removeElement removes an element from the cache
func (c *LRU[K, V]) removeElement(e *entry[K, V]) {
c.evictListLock.Lock()
c.evictList.remove(e)
c.evictListLock.Unlock()
c.itemsLock.Lock()
delete(c.items, e.key)
c.itemsLock.Unlock()
if c.onEvict != nil {
c.onEvict(e.key, e.value)
}
}
func (c *LRU[K, V]) handleGetRequests() {
for {
entry := <-c.getChan
c.evictListLock.Lock()
c.evictList.moveToFront(entry)
c.evictListLock.Unlock()
}
// Len returns the number of items in the cache
func (c *LRU[K, V]) Len() int {
c.lock.RLock()
defer c.lock.RUnlock()
return c.evictList.length()
}
// Resize changes the cache size
func (c *LRU[K, V]) Resize(size int) (evicted int) {
c.lock.Lock()
defer c.lock.Unlock()
c.size = size
diff := c.evictList.length() - size
if diff < 0 {
diff = 0
}
for i := 0; i < diff; i++ {
if ent := c.evictList.back(); ent != nil {
c.removeElement(ent)
}
}
return diff
}
// Close stops the background goroutine
func (c *LRU[K, V]) Close() {
close(c.done)
}

View File

@@ -5,6 +5,7 @@ package nonblocking
import (
"context"
"fmt"
"testing"
"time"
)
@@ -113,3 +114,129 @@ func TestLRU_Resize(t *testing.T) {
t.Errorf("Cache should have contained 2 elements")
}
}
// BenchmarkLRU_Add benchmarks the Add operation with different cache sizes
func BenchmarkLRU_Add(b *testing.B) {
sizes := []int{100, 1000, 10000}
for _, size := range sizes {
b.Run(fmt.Sprintf("size-%d", size), func(b *testing.B) {
cache, _ := NewLRU[int, int](size, nil)
b.ResetTimer()
for i := 0; i < b.N; i++ {
cache.Add(i, i)
}
})
}
}
// BenchmarkLRU_Get benchmarks the Get operation with different cache sizes
func BenchmarkLRU_Get(b *testing.B) {
sizes := []int{100, 1000, 10000}
for _, size := range sizes {
b.Run(fmt.Sprintf("size-%d", size), func(b *testing.B) {
cache, _ := NewLRU[int, int](size, nil)
// Pre-populate cache
for i := 0; i < size; i++ {
cache.Add(i, i)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
cache.Get(i % size)
}
})
}
}
// BenchmarkLRU_AddWithEviction benchmarks Add operation when cache is full
func BenchmarkLRU_AddWithEviction(b *testing.B) {
sizes := []int{100, 1000, 10000}
for _, size := range sizes {
b.Run(fmt.Sprintf("size-%d", size), func(b *testing.B) {
cache, _ := NewLRU[int, int](size, nil)
// Pre-populate cache to force evictions
for i := 0; i < size; i++ {
cache.Add(i, i)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
cache.Add(size+i, i)
}
})
}
}
// BenchmarkLRU_MixedOperations benchmarks a mix of Add and Get operations
func BenchmarkLRU_MixedOperations(b *testing.B) {
sizes := []int{100, 1000, 10000}
for _, size := range sizes {
b.Run(fmt.Sprintf("size-%d", size), func(b *testing.B) {
cache, _ := NewLRU[int, int](size, nil)
// Pre-populate half the cache
for i := 0; i < size/2; i++ {
cache.Add(i, i)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
if i%2 == 0 {
cache.Add(i, i)
} else {
cache.Get(i % (size / 2))
}
}
})
}
}
// BenchmarkLRU_ParallelGet benchmarks concurrent Get operations
func BenchmarkLRU_ParallelGet(b *testing.B) {
sizes := []int{100, 1000, 10000}
for _, size := range sizes {
b.Run(fmt.Sprintf("size-%d", size), func(b *testing.B) {
cache, _ := NewLRU[int, int](size, nil)
// Pre-populate cache
for i := 0; i < size; i++ {
cache.Add(i, i)
}
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
i := 0
for pb.Next() {
cache.Get(i % size)
i++
}
})
})
}
}
// BenchmarkLRU_ParallelAddGet benchmarks concurrent Add and Get operations
func BenchmarkLRU_ParallelAddGet(b *testing.B) {
sizes := []int{100, 1000, 10000}
for _, size := range sizes {
b.Run(fmt.Sprintf("size-%d", size), func(b *testing.B) {
cache, _ := NewLRU[int, int](size, nil)
// Pre-populate half the cache
for i := 0; i < size/2; i++ {
cache.Add(i, i)
}
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
i := 0
for pb.Next() {
if i%2 == 0 {
cache.Add(i, i)
} else {
cache.Get(i % (size / 2))
}
i++
}
})
})
}
}