Threadsafe LRU With Non-Blocking Reads for Concurrent Readers (#12476)

* add nonblocking simple lru

* method

* add in missing tests, fix panic
This commit is contained in:
Raul Jordan
2023-07-12 05:57:52 -04:00
committed by GitHub
parent 0266609bf6
commit 402799a584
4 changed files with 390 additions and 0 deletions

17
cache/nonblocking/BUILD.bazel vendored Normal file
View File

@@ -0,0 +1,17 @@
load("@prysm//tools/go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = [
"list.go",
"lru.go",
],
importpath = "github.com/prysmaticlabs/prysm/v4/cache/nonblocking",
visibility = ["//visibility:public"],
)
go_test(
name = "go_default_test",
srcs = ["lru_test.go"],
embed = [":go_default_library"],
)

123
cache/nonblocking/list.go vendored Normal file
View File

@@ -0,0 +1,123 @@
// Copyright 2009 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE_list file.
package nonblocking
// entry is an LRU entry
type entry[K comparable, V any] struct {
// Next and previous pointers in the doubly-linked list of elements.
// To simplify the implementation, internally a list l is implemented
// as a ring, such that &l.root is both the next element of the last
// list element (l.Back()) and the previous element of the first list
// element (l.Front()).
next, prev *entry[K, V]
// The list to which this element belongs.
list *lruList[K, V]
// The LRU key of this element.
key K
// The value stored with this element.
value V
}
// lruList represents a doubly linked list.
// The zero value for lruList is an empty list ready to use.
type lruList[K comparable, V any] struct {
root entry[K, V] // sentinel list element, only &root, root.prev, and root.next are used
len int // current list length excluding (this) sentinel element
}
// init initializes or clears list l.
func (l *lruList[K, V]) init() *lruList[K, V] {
l.root.next = &l.root
l.root.prev = &l.root
l.len = 0
return l
}
// newList returns an initialized list.
func newList[K comparable, V any]() *lruList[K, V] { return new(lruList[K, V]).init() }
// length returns the number of elements of list l.
// The complexity is O(1).
func (l *lruList[K, V]) length() int { return l.len }
// back returns the last element of list l or nil if the list is empty.
func (l *lruList[K, V]) back() *entry[K, V] {
if l.len == 0 {
return nil
}
return l.root.prev
}
// lazyInit lazily initializes a zero List value.
func (l *lruList[K, V]) lazyInit() {
if l.root.next == nil {
l.init()
}
}
// insert inserts e after at, increments l.len, and returns e.
func (l *lruList[K, V]) insert(e, at *entry[K, V]) *entry[K, V] {
e.prev = at
e.next = at.next
e.prev.next = e
e.next.prev = e
e.list = l
l.len++
return e
}
// insertValue is a convenience wrapper for insert(&Element{Value: v}, at).
func (l *lruList[K, V]) insertValue(k K, v V, at *entry[K, V]) *entry[K, V] {
return l.insert(&entry[K, V]{value: v, key: k}, at)
}
// remove removes e from its list, decrements l.len
func (l *lruList[K, V]) remove(e *entry[K, V]) V {
// If already removed, do nothing.
if e.prev == nil && e.next == nil {
return e.value
}
e.prev.next = e.next
e.next.prev = e.prev
e.next = nil // avoid memory leaks
e.prev = nil // avoid memory leaks
e.list = nil
l.len--
return e.value
}
// move moves e to next to at.
func (*lruList[K, V]) move(e, at *entry[K, V]) {
if e == at {
return
}
e.prev.next = e.next
e.next.prev = e.prev
e.prev = at
e.next = at.next
e.prev.next = e
e.next.prev = e
}
// pushFront inserts a new element e with value v at the front of list l and returns e.
func (l *lruList[K, V]) pushFront(k K, v V) *entry[K, V] {
l.lazyInit()
return l.insertValue(k, v, &l.root)
}
// moveToFront moves element e to the front of list l.
// If e is not an element of l, the list is not modified.
// The element must not be nil.
func (l *lruList[K, V]) moveToFront(e *entry[K, V]) {
if e.list != l || l.root.next == e {
return
}
// see comment in List.Remove about initialization of l
l.move(e, &l.root)
}

135
cache/nonblocking/lru.go vendored Normal file
View File

@@ -0,0 +1,135 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
package nonblocking
import (
"errors"
"sync"
)
// EvictCallback is used to get a callback when a cache entry is evicted
type EvictCallback[K comparable, V any] func(key K, value V)
// LRU implements a non-thread safe fixed size LRU cache
type LRU[K comparable, V any] struct {
itemsLock sync.RWMutex
evictListLock sync.RWMutex
size int
evictList *lruList[K, V]
items map[K]*entry[K, V]
onEvict EvictCallback[K, V]
}
// NewLRU constructs an LRU of the given size
func NewLRU[K comparable, V any](size int, onEvict EvictCallback[K, V]) (*LRU[K, V], error) {
if size <= 0 {
return nil, errors.New("must provide a positive size")
}
c := &LRU[K, V]{
size: size,
evictList: newList[K, V](),
items: make(map[K]*entry[K, V]),
onEvict: onEvict,
}
return c, nil
}
// Add adds a value to the cache. Returns true if an eviction occurred.
func (c *LRU[K, V]) Add(key K, value V) (evicted bool) {
// Check for existing item
c.itemsLock.RLock()
if ent, ok := c.items[key]; ok {
c.itemsLock.RUnlock()
c.evictListLock.Lock()
c.evictList.moveToFront(ent)
c.evictListLock.Unlock()
ent.value = value
return false
}
c.itemsLock.RUnlock()
// Add new item
c.evictListLock.Lock()
ent := c.evictList.pushFront(key, value)
c.evictListLock.Unlock()
c.itemsLock.Lock()
c.items[key] = ent
c.itemsLock.Unlock()
c.evictListLock.RLock()
evict := c.evictList.length() > c.size
c.evictListLock.RUnlock()
// Verify size not exceeded
if evict {
c.removeOldest()
}
return evict
}
// Get looks up a key's value from the cache.
func (c *LRU[K, V]) Get(key K) (value V, ok bool) {
c.itemsLock.RLock()
if ent, ok := c.items[key]; ok {
c.itemsLock.RUnlock()
// Make this get function non-blocking for multiple readers.
go func() {
c.evictListLock.Lock()
c.evictList.moveToFront(ent)
c.evictListLock.Unlock()
}()
return ent.value, true
}
c.itemsLock.RUnlock()
return
}
// Len returns the number of items in the cache.
func (c *LRU[K, V]) Len() int {
c.evictListLock.RLock()
defer c.evictListLock.RUnlock()
return c.evictList.length()
}
// Resize changes the cache size.
func (c *LRU[K, V]) Resize(size int) (evicted int) {
diff := c.Len() - size
if diff < 0 {
diff = 0
}
for i := 0; i < diff; i++ {
c.removeOldest()
}
c.size = size
return diff
}
// removeOldest removes the oldest item from the cache.
func (c *LRU[K, V]) removeOldest() {
c.evictListLock.RLock()
if ent := c.evictList.back(); ent != nil {
c.evictListLock.RUnlock()
c.removeElement(ent)
return
}
c.evictListLock.RUnlock()
}
// removeElement is used to remove a given list element from the cache
func (c *LRU[K, V]) removeElement(e *entry[K, V]) {
c.evictListLock.Lock()
c.evictList.remove(e)
c.evictListLock.Unlock()
c.itemsLock.Lock()
delete(c.items, e.key)
c.itemsLock.Unlock()
if c.onEvict != nil {
c.onEvict(e.key, e.value)
}
}

115
cache/nonblocking/lru_test.go vendored Normal file
View File

@@ -0,0 +1,115 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
package nonblocking
import (
"context"
"testing"
"time"
)
func TestLRU_Concurrency(t *testing.T) {
onEvicted := func(_ int, _ int) {}
size := 20
cache, err := NewLRU(size, onEvicted)
if err != nil {
t.Fatalf("err: %v", err)
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
defer cancel()
for i := 0; i < 100; i++ {
go func(j int) {
for {
if ctx.Err() != nil {
return
}
cache.Add(j, j)
cache.Get(j)
time.Sleep(time.Millisecond * 50)
}
}(i)
}
<-ctx.Done()
}
func TestLRU_Eviction(t *testing.T) {
evictCounter := 0
onEvicted := func(_ int, _ int) {
evictCounter++
}
size := 20
cache, err := NewLRU(size, onEvicted)
if err != nil {
t.Fatalf("err: %v", err)
}
for i := 0; i < 20; i++ {
cache.Add(i, i)
cache.Get(i)
}
cache.Add(20, 20)
if evictCounter != 1 {
t.Fatalf("should have evicted 1 element: %d", evictCounter)
}
}
// Test that Add returns true/false if an eviction occurred
func TestLRU_Add(t *testing.T) {
evictCounter := 0
onEvicted := func(_ int, _ int) {
evictCounter++
}
l, err := NewLRU(1, onEvicted)
if err != nil {
t.Fatalf("err: %v", err)
}
if l.Add(1, 1) == true || evictCounter != 0 {
t.Errorf("should not have an eviction")
}
if l.Add(2, 2) == false || evictCounter != 1 {
t.Errorf("should have an eviction")
}
}
// Test that Resize can upsize and downsize
func TestLRU_Resize(t *testing.T) {
onEvictCounter := 0
onEvicted := func(k int, v int) {
onEvictCounter++
}
l, err := NewLRU(2, onEvicted)
if err != nil {
t.Fatalf("err: %v", err)
}
// Downsize
l.Add(1, 1)
l.Add(2, 2)
evicted := l.Resize(1)
if evicted != 1 {
t.Errorf("1 element should have been evicted: %v", evicted)
}
if onEvictCounter != 1 {
t.Errorf("onEvicted should have been called 1 time: %v", onEvictCounter)
}
l.Add(3, 3)
if _, ok := l.Get(1); ok {
t.Errorf("Element 1 should have been evicted")
}
// Upsize
evicted = l.Resize(2)
if evicted != 0 {
t.Errorf("0 elements should have been evicted: %v", evicted)
}
l.Add(4, 4)
if _, ok := l.Get(3); !ok {
t.Errorf("Cache should have contained 2 elements")
}
if _, ok := l.Get(4); !ok {
t.Errorf("Cache should have contained 2 elements")
}
}