use different keys for the proposer indices cache (#13272)

* use different keys for the proposer indices cache

* Add a way to get the proposer indices from a checkpoint

* fix fuzzing tests

* use htr instead of body root

* move comment
This commit is contained in:
Potuz
2023-12-19 10:14:55 -03:00
committed by GitHub
parent e9d26c61d7
commit 9efaa832cd
8 changed files with 369 additions and 171 deletions

View File

@@ -7,15 +7,12 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
forkchoicetypes "github.com/prysmaticlabs/prysm/v4/beacon-chain/forkchoice/types"
fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams"
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
"k8s.io/client-go/tools/cache"
)
var (
// maxProposerIndicesCacheSize defines the max number of proposer indices on per block root basis can cache.
// Due to reorgs and long finality, it's good to keep the old cache around for quickly switch over.
maxProposerIndicesCacheSize = uint64(8)
// ProposerIndicesCacheMiss tracks the number of proposerIndices requests that aren't present in the cache.
ProposerIndicesCacheMiss = promauto.NewCounter(prometheus.CounterOpts{
Name: "proposer_indices_cache_miss",
@@ -28,85 +25,128 @@ var (
})
)
// ProposerIndicesCache is a struct with 1 queue for looking up proposer indices by root.
// ProposerIndicesCache keeps track of the proposer indices in the next two
// epochs. It is keyed by the state root of the last epoch before. That is, for
// blocks during epoch 2, for example slot 65, it will be keyed by the state
// root of slot 63 (last slot in epoch 1).
// The cache keeps two sets of indices computed, the "safe" set is computed
// right before the epoch transition into the current epoch. For example for
// epoch 2 we will compute this list after importing block 63. The "unsafe"
// version is computed an epoch in advance, for example for epoch 3, it will be
// computed after importing block 63.
//
// The cache also keeps a map from checkpoints to state roots so that one is
// able to access the proposer indices list from a checkpoint instead. The
// checkpoint is the checkpoint for the epoch previous to the requested
// proposer indices. That is, for a slot in epoch 2 (eg. 65), the checkpoint
// root would be for slot 32 if present.
type ProposerIndicesCache struct {
proposerIndicesCache *cache.FIFO
lock sync.RWMutex
sync.Mutex
indices map[primitives.Epoch]map[[32]byte][fieldparams.SlotsPerEpoch]primitives.ValidatorIndex
unsafeIndices map[primitives.Epoch]map[[32]byte][fieldparams.SlotsPerEpoch]primitives.ValidatorIndex
rootMap map[forkchoicetypes.Checkpoint][32]byte // A map from checkpoint root to state root
}
// proposerIndicesKeyFn takes the block root as the key to retrieve proposer indices in a given epoch.
func proposerIndicesKeyFn(obj interface{}) (string, error) {
info, ok := obj.(*ProposerIndices)
if !ok {
return "", ErrNotProposerIndices
}
return key(info.BlockRoot), nil
}
// NewProposerIndicesCache creates a new proposer indices cache for storing/accessing proposer index assignments of an epoch.
// NewProposerIndicesCache returns a newly created cache
func NewProposerIndicesCache() *ProposerIndicesCache {
c := &ProposerIndicesCache{}
c.Clear()
return c
}
// Clear resets the ProposerIndicesCache to its initial state
func (c *ProposerIndicesCache) Clear() {
c.lock.Lock()
defer c.lock.Unlock()
c.proposerIndicesCache = cache.NewFIFO(proposerIndicesKeyFn)
}
// AddProposerIndices adds ProposerIndices object to the cache.
// This method also trims the least recently list if the cache size has ready the max cache size limit.
func (c *ProposerIndicesCache) AddProposerIndices(p *ProposerIndices) error {
c.lock.Lock()
defer c.lock.Unlock()
if err := c.proposerIndicesCache.AddIfNotPresent(p); err != nil {
return err
return &ProposerIndicesCache{
indices: make(map[primitives.Epoch]map[[32]byte][fieldparams.SlotsPerEpoch]primitives.ValidatorIndex),
unsafeIndices: make(map[primitives.Epoch]map[[32]byte][fieldparams.SlotsPerEpoch]primitives.ValidatorIndex),
rootMap: make(map[forkchoicetypes.Checkpoint][32]byte),
}
trim(c.proposerIndicesCache, maxProposerIndicesCacheSize)
return nil
}
// HasProposerIndices returns the proposer indices of a block root seed.
func (c *ProposerIndicesCache) HasProposerIndices(r [32]byte) (bool, error) {
c.lock.RLock()
defer c.lock.RUnlock()
_, exists, err := c.proposerIndicesCache.GetByKey(key(r))
if err != nil {
return false, err
// ProposerIndices returns the proposer indices (safe) for the given root
func (p *ProposerIndicesCache) ProposerIndices(epoch primitives.Epoch, root [32]byte) ([fieldparams.SlotsPerEpoch]primitives.ValidatorIndex, bool) {
p.Lock()
defer p.Unlock()
inner, ok := p.indices[epoch]
if !ok {
ProposerIndicesCacheMiss.Inc()
return [fieldparams.SlotsPerEpoch]primitives.ValidatorIndex{}, false
}
return exists, nil
}
// ProposerIndices returns the proposer indices of a block root seed.
func (c *ProposerIndicesCache) ProposerIndices(r [32]byte) ([]primitives.ValidatorIndex, error) {
c.lock.RLock()
defer c.lock.RUnlock()
obj, exists, err := c.proposerIndicesCache.GetByKey(key(r))
if err != nil {
return nil, err
}
indices, exists := inner[root]
if exists {
ProposerIndicesCacheHit.Inc()
} else {
ProposerIndicesCacheMiss.Inc()
return nil, nil
}
return indices, exists
}
item, ok := obj.(*ProposerIndices)
// UnsafeProposerIndices returns the proposer indices (unsafe) for the given root
func (p *ProposerIndicesCache) UnsafeProposerIndices(epoch primitives.Epoch, root [32]byte) ([fieldparams.SlotsPerEpoch]primitives.ValidatorIndex, bool) {
p.Lock()
defer p.Unlock()
inner, ok := p.unsafeIndices[epoch]
if !ok {
return nil, ErrNotProposerIndices
return [fieldparams.SlotsPerEpoch]primitives.ValidatorIndex{}, false
}
return item.ProposerIndices, nil
indices, exists := inner[root]
return indices, exists
}
// Len returns the number of keys in the underlying cache.
func (c *ProposerIndicesCache) Len() int {
return len(c.proposerIndicesCache.ListKeys())
// Prune resets the ProposerIndicesCache to its initial state
func (p *ProposerIndicesCache) Prune(epoch primitives.Epoch) {
p.Lock()
defer p.Unlock()
for key := range p.indices {
if key < epoch {
delete(p.indices, key)
}
}
for key := range p.unsafeIndices {
if key < epoch {
delete(p.unsafeIndices, key)
}
}
for key := range p.rootMap {
if key.Epoch+1 < epoch {
delete(p.rootMap, key)
}
}
}
// Set sets the proposer indices for the given root as key
func (p *ProposerIndicesCache) Set(epoch primitives.Epoch, root [32]byte, indices [fieldparams.SlotsPerEpoch]primitives.ValidatorIndex) {
p.Lock()
defer p.Unlock()
inner, ok := p.indices[epoch]
if !ok {
inner = make(map[[32]byte][fieldparams.SlotsPerEpoch]primitives.ValidatorIndex)
p.indices[epoch] = inner
}
inner[root] = indices
}
// SetUnsafe sets the unsafe proposer indices for the given root as key
func (p *ProposerIndicesCache) SetUnsafe(epoch primitives.Epoch, root [32]byte, indices [fieldparams.SlotsPerEpoch]primitives.ValidatorIndex) {
p.Lock()
defer p.Unlock()
inner, ok := p.unsafeIndices[epoch]
if !ok {
inner = make(map[[32]byte][fieldparams.SlotsPerEpoch]primitives.ValidatorIndex)
p.unsafeIndices[epoch] = inner
}
inner[root] = indices
}
// SetCheckpoint updates the map from checkpoints to state roots
func (p *ProposerIndicesCache) SetCheckpoint(c forkchoicetypes.Checkpoint, root [32]byte) {
p.Lock()
defer p.Unlock()
p.rootMap[c] = root
}
// IndicesFromCheckpoint returns the proposer indices from a checkpoint rather than the state root
func (p *ProposerIndicesCache) IndicesFromCheckpoint(c forkchoicetypes.Checkpoint) ([fieldparams.SlotsPerEpoch]primitives.ValidatorIndex, bool) {
p.Lock()
emptyIndices := [fieldparams.SlotsPerEpoch]primitives.ValidatorIndex{}
root, ok := p.rootMap[c]
p.Unlock()
if !ok {
return emptyIndices, ok
}
return p.ProposerIndices(c.Epoch+1, root)
}

View File

@@ -3,7 +3,11 @@
// This file is used in fuzzer builds to bypass proposer indices caches.
package cache
import "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
import (
forkchoicetypes "github.com/prysmaticlabs/prysm/v4/beacon-chain/forkchoice/types"
fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams"
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
)
// FakeProposerIndicesCache is a struct with 1 queue for looking up proposer indices by root.
type FakeProposerIndicesCache struct {
@@ -14,26 +18,31 @@ func NewProposerIndicesCache() *FakeProposerIndicesCache {
return &FakeProposerIndicesCache{}
}
// AddProposerIndices adds ProposerIndices object to the cache.
// This method also trims the least recently list if the cache size has ready the max cache size limit.
func (c *FakeProposerIndicesCache) AddProposerIndices(p *ProposerIndices) error {
return nil
// ProposerIndices is a stub.
func (c *FakeProposerIndicesCache) ProposerIndices(_ primitives.Epoch, _ [32]byte) ([fieldparams.SlotsPerEpoch]primitives.ValidatorIndex, bool) {
return [fieldparams.SlotsPerEpoch]primitives.ValidatorIndex{}, false
}
// ProposerIndices returns the proposer indices of a block root seed.
func (c *FakeProposerIndicesCache) ProposerIndices(r [32]byte) ([]primitives.ValidatorIndex, error) {
return nil, nil
// UnsafeProposerIndices is a stub.
func (c *FakeProposerIndicesCache) UnsafeProposerIndices(_ primitives.Epoch, _ [32]byte) ([fieldparams.SlotsPerEpoch]primitives.ValidatorIndex, bool) {
return [fieldparams.SlotsPerEpoch]primitives.ValidatorIndex{}, false
}
// HasProposerIndices returns the proposer indices of a block root seed.
func (c *FakeProposerIndicesCache) HasProposerIndices(r [32]byte) (bool, error) {
return false, nil
// Prune is a stub.
func (p *FakeProposerIndicesCache) Prune(epoch primitives.Epoch) {}
// Set is a stub.
func (p *FakeProposerIndicesCache) Set(epoch primitives.Epoch, root [32]byte, indices [fieldparams.SlotsPerEpoch]primitives.ValidatorIndex) {
}
func (c *FakeProposerIndicesCache) Len() int {
return 0
// SetUnsafe is a stub.
func (p *FakeProposerIndicesCache) SetUnsafe(epoch primitives.Epoch, root [32]byte, indices [fieldparams.SlotsPerEpoch]primitives.ValidatorIndex) {
}
// Clear is a stub.
func (c *FakeProposerIndicesCache) Clear() {
// SetCheckpoint is a stub.
func (p *FakeProposerIndicesCache) SetCheckpoint(c forkchoicetypes.Checkpoint, root [32]byte) {}
// IndicesFromCheckpoint is a stub.
func (p *FakeProposerIndicesCache) IndicesFromCheckpoint(_ forkchoicetypes.Checkpoint) ([fieldparams.SlotsPerEpoch]primitives.ValidatorIndex, bool) {
return [fieldparams.SlotsPerEpoch]primitives.ValidatorIndex{}, false
}

View File

@@ -3,72 +3,152 @@
package cache
import (
"strconv"
"testing"
forkchoicetypes "github.com/prysmaticlabs/prysm/v4/beacon-chain/forkchoice/types"
fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams"
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v4/encoding/bytesutil"
"github.com/prysmaticlabs/prysm/v4/testing/assert"
"github.com/prysmaticlabs/prysm/v4/testing/require"
)
func TestProposerKeyFn_OK(t *testing.T) {
item := &ProposerIndices{
BlockRoot: [32]byte{'A'},
ProposerIndices: []primitives.ValidatorIndex{1, 2, 3, 4, 5},
}
k, err := proposerIndicesKeyFn(item)
require.NoError(t, err)
assert.Equal(t, key(item.BlockRoot), k)
}
func TestProposerKeyFn_InvalidObj(t *testing.T) {
_, err := proposerIndicesKeyFn("bad")
assert.Equal(t, ErrNotProposerIndices, err)
}
func TestProposerCache_AddProposerIndicesList(t *testing.T) {
func TestProposerCache_Set(t *testing.T) {
cache := NewProposerIndicesCache()
bRoot := [32]byte{'A'}
indices, err := cache.ProposerIndices(bRoot)
require.NoError(t, err)
if indices != nil {
t.Error("Expected committee count not to exist in empty cache")
}
has, err := cache.HasProposerIndices(bRoot)
require.NoError(t, err)
assert.Equal(t, false, has)
require.NoError(t, cache.AddProposerIndices(&ProposerIndices{
ProposerIndices: indices,
BlockRoot: bRoot,
}))
indices, ok := cache.ProposerIndices(0, bRoot)
require.Equal(t, false, ok)
emptyIndices := [fieldparams.SlotsPerEpoch]primitives.ValidatorIndex{}
require.Equal(t, indices, emptyIndices, "Expected committee count not to exist in empty cache")
emptyIndices[0] = 1
cache.Set(0, bRoot, emptyIndices)
received, err := cache.ProposerIndices(bRoot)
require.NoError(t, err)
assert.DeepEqual(t, received, indices)
has, err = cache.HasProposerIndices(bRoot)
require.NoError(t, err)
assert.Equal(t, true, has)
received, ok := cache.ProposerIndices(0, bRoot)
require.Equal(t, true, ok)
require.Equal(t, received, emptyIndices)
item := &ProposerIndices{BlockRoot: [32]byte{'B'}, ProposerIndices: []primitives.ValidatorIndex{1, 2, 3, 4, 5, 6}}
require.NoError(t, cache.AddProposerIndices(item))
received, err = cache.ProposerIndices(item.BlockRoot)
require.NoError(t, err)
assert.DeepEqual(t, item.ProposerIndices, received)
has, err = cache.HasProposerIndices(bRoot)
require.NoError(t, err)
assert.Equal(t, true, has)
newRoot := [32]byte{'B'}
copy(emptyIndices[3:], []primitives.ValidatorIndex{1, 2, 3, 4, 5, 6})
cache.Set(0, newRoot, emptyIndices)
received, ok = cache.ProposerIndices(0, newRoot)
require.Equal(t, true, ok)
require.Equal(t, emptyIndices, received)
}
func TestProposerCache_CanRotate(t *testing.T) {
func TestProposerCache_SetUnsafe(t *testing.T) {
cache := NewProposerIndicesCache()
for i := 0; i < int(maxProposerIndicesCacheSize)+1; i++ {
s := []byte(strconv.Itoa(i))
item := &ProposerIndices{BlockRoot: bytesutil.ToBytes32(s)}
require.NoError(t, cache.AddProposerIndices(item))
}
assert.Equal(t, int(maxProposerIndicesCacheSize), cache.Len())
bRoot := [32]byte{'A'}
indices, ok := cache.UnsafeProposerIndices(0, bRoot)
require.Equal(t, false, ok)
emptyIndices := [fieldparams.SlotsPerEpoch]primitives.ValidatorIndex{}
require.Equal(t, indices, emptyIndices, "Expected committee count not to exist in empty cache")
emptyIndices[0] = 1
cache.SetUnsafe(0, bRoot, emptyIndices)
received, ok := cache.UnsafeProposerIndices(0, bRoot)
require.Equal(t, true, ok)
require.Equal(t, received, emptyIndices)
newRoot := [32]byte{'B'}
copy(emptyIndices[3:], []primitives.ValidatorIndex{1, 2, 3, 4, 5, 6})
cache.SetUnsafe(0, newRoot, emptyIndices)
received, ok = cache.UnsafeProposerIndices(0, newRoot)
require.Equal(t, true, ok)
require.Equal(t, emptyIndices, received)
}
func TestProposerCache_CheckpointAndPrune(t *testing.T) {
cache := NewProposerIndicesCache()
indices := [fieldparams.SlotsPerEpoch]primitives.ValidatorIndex{}
root := [32]byte{'a'}
cpRoot := [32]byte{'b'}
copy(indices[3:], []primitives.ValidatorIndex{1, 2, 3, 4, 5, 6})
for i := 1; i < 10; i++ {
cache.Set(primitives.Epoch(i), root, indices)
cache.SetUnsafe(primitives.Epoch(i), root, indices)
cache.SetCheckpoint(forkchoicetypes.Checkpoint{Epoch: primitives.Epoch(i - 1), Root: cpRoot}, root)
}
received, ok := cache.ProposerIndices(1, root)
require.Equal(t, true, ok)
require.Equal(t, indices, received)
received, ok = cache.ProposerIndices(4, root)
require.Equal(t, true, ok)
require.Equal(t, indices, received)
received, ok = cache.ProposerIndices(9, root)
require.Equal(t, true, ok)
require.Equal(t, indices, received)
received, ok = cache.UnsafeProposerIndices(1, root)
require.Equal(t, true, ok)
require.Equal(t, indices, received)
received, ok = cache.UnsafeProposerIndices(4, root)
require.Equal(t, true, ok)
require.Equal(t, indices, received)
received, ok = cache.UnsafeProposerIndices(9, root)
require.Equal(t, true, ok)
require.Equal(t, indices, received)
received, ok = cache.IndicesFromCheckpoint(forkchoicetypes.Checkpoint{Epoch: 0, Root: cpRoot})
require.Equal(t, true, ok)
require.Equal(t, indices, received)
received, ok = cache.IndicesFromCheckpoint(forkchoicetypes.Checkpoint{Epoch: 3, Root: cpRoot})
require.Equal(t, true, ok)
require.Equal(t, indices, received)
received, ok = cache.IndicesFromCheckpoint(forkchoicetypes.Checkpoint{Epoch: 4, Root: cpRoot})
require.Equal(t, true, ok)
require.Equal(t, indices, received)
received, ok = cache.IndicesFromCheckpoint(forkchoicetypes.Checkpoint{Epoch: 8, Root: cpRoot})
require.Equal(t, true, ok)
require.Equal(t, indices, received)
cache.Prune(5)
emptyIndices := [fieldparams.SlotsPerEpoch]primitives.ValidatorIndex{}
received, ok = cache.ProposerIndices(1, root)
require.Equal(t, false, ok)
require.Equal(t, emptyIndices, received)
received, ok = cache.ProposerIndices(4, root)
require.Equal(t, false, ok)
require.Equal(t, emptyIndices, received)
received, ok = cache.ProposerIndices(9, root)
require.Equal(t, true, ok)
require.Equal(t, indices, received)
received, ok = cache.UnsafeProposerIndices(1, root)
require.Equal(t, false, ok)
require.Equal(t, emptyIndices, received)
received, ok = cache.UnsafeProposerIndices(4, root)
require.Equal(t, false, ok)
require.Equal(t, emptyIndices, received)
received, ok = cache.UnsafeProposerIndices(9, root)
require.Equal(t, true, ok)
require.Equal(t, indices, received)
received, ok = cache.IndicesFromCheckpoint(forkchoicetypes.Checkpoint{Epoch: 0, Root: cpRoot})
require.Equal(t, false, ok)
require.Equal(t, emptyIndices, received)
received, ok = cache.IndicesFromCheckpoint(forkchoicetypes.Checkpoint{Epoch: 3, Root: cpRoot})
require.Equal(t, false, ok)
require.Equal(t, emptyIndices, received)
received, ok = cache.IndicesFromCheckpoint(forkchoicetypes.Checkpoint{Epoch: 4, Root: cpRoot})
require.Equal(t, true, ok)
require.Equal(t, indices, received)
received, ok = cache.IndicesFromCheckpoint(forkchoicetypes.Checkpoint{Epoch: 8, Root: cpRoot})
require.Equal(t, true, ok)
require.Equal(t, indices, received)
}