Fix committee test race (#12338)

* fix races in committee cache tests

* lint

* gratuitous defer ClearCache if ClearCache

* log warning to avoid failed block processing

* gaz

* add Clear to cache stubs

* fix Clear mistakes

* last fake cache fix

---------

Co-authored-by: Kasey Kirkham <kasey@users.noreply.github.com>
This commit is contained in:
kasey
2023-04-26 19:50:04 -05:00
committed by GitHub
parent 34b6c8f333
commit f962d13407
16 changed files with 116 additions and 35 deletions

View File

@@ -47,6 +47,7 @@ go_library(
"@com_github_pkg_errors//:go_default_library",
"@com_github_prometheus_client_golang//prometheus:go_default_library",
"@com_github_prometheus_client_golang//prometheus/promauto:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@io_k8s_client_go//tools/cache:go_default_library",
"@io_opencensus_go//trace:go_default_library",
],

View File

@@ -42,9 +42,16 @@ type BalanceCache struct {
// NewEffectiveBalanceCache creates a new effective balance cache for storing/accessing total balance by epoch.
func NewEffectiveBalanceCache() *BalanceCache {
return &BalanceCache{
cache: lruwrpr.New(maxBalanceCacheSize),
}
c := &BalanceCache{}
c.Clear()
return c
}
// Clear resets the SyncCommitteeCache to its initial state
func (c *BalanceCache) Clear() {
c.lock.Lock()
defer c.lock.Unlock()
c.cache = lruwrpr.New(maxBalanceCacheSize)
}
// AddTotalEffectiveBalance adds a new total effective balance entry for current balance for state `st` into the cache.

View File

@@ -3,16 +3,11 @@
package cache
import (
"sync"
lru "github.com/hashicorp/golang-lru"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/state"
)
// FakeBalanceCache is a fake struct with 1 LRU cache for looking up balance by epoch.
type FakeBalanceCache struct {
cache *lru.Cache
lock sync.RWMutex
}
// NewEffectiveBalanceCache creates a new effective balance cache for storing/accessing total balance by epoch.
@@ -29,3 +24,8 @@ func (c *FakeBalanceCache) AddTotalEffectiveBalance(st state.ReadOnlyBeaconState
func (c *FakeBalanceCache) Get(st state.ReadOnlyBeaconState) (uint64, error) {
return 0, nil
}
// Clear is a stub.
func (c *FakeBalanceCache) Clear() {
return
}

View File

@@ -56,10 +56,17 @@ func committeeKeyFn(obj interface{}) (string, error) {
// NewCommitteesCache creates a new committee cache for storing/accessing shuffled indices of a committee.
func NewCommitteesCache() *CommitteeCache {
return &CommitteeCache{
CommitteeCache: lruwrpr.New(maxCommitteesCacheSize),
inProgress: make(map[string]bool),
}
cc := &CommitteeCache{}
cc.Clear()
return cc
}
// Clear resets the CommitteeCache to its initial state
func (c *CommitteeCache) Clear() {
c.lock.Lock()
defer c.lock.Unlock()
c.CommitteeCache = lruwrpr.New(maxCommitteesCacheSize)
c.inProgress = make(map[string]bool)
}
// Committee fetches the shuffled indices by slot and committee index. Every list of indices

View File

@@ -69,3 +69,8 @@ func (c *FakeCommitteeCache) MarkInProgress(seed [32]byte) error {
func (c *FakeCommitteeCache) MarkNotInProgress(seed [32]byte) error {
return nil
}
// Clear is a stub.
func (c *FakeCommitteeCache) Clear() {
return
}

View File

@@ -46,9 +46,16 @@ func proposerIndicesKeyFn(obj interface{}) (string, error) {
// NewProposerIndicesCache creates a new proposer indices cache for storing/accessing proposer index assignments of an epoch.
func NewProposerIndicesCache() *ProposerIndicesCache {
return &ProposerIndicesCache{
proposerIndicesCache: cache.NewFIFO(proposerIndicesKeyFn),
}
c := &ProposerIndicesCache{}
c.Clear()
return c
}
// Clear resets the ProposerIndicesCache to its initial state
func (c *ProposerIndicesCache) Clear() {
c.lock.Lock()
defer c.lock.Unlock()
c.proposerIndicesCache = cache.NewFIFO(proposerIndicesKeyFn)
}
// AddProposerIndices adds ProposerIndices object to the cache.

View File

@@ -33,3 +33,7 @@ func (c *FakeProposerIndicesCache) HasProposerIndices(r [32]byte) (bool, error)
func (c *FakeProposerIndicesCache) Len() int {
return 0
}
// Clear is a stub.
func (c *FakeProposerIndicesCache) Clear() {
}

View File

@@ -4,12 +4,14 @@ package cache
import (
"sync"
"sync/atomic"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/state"
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v4/encoding/bytesutil"
log "github.com/sirupsen/logrus"
"k8s.io/client-go/tools/cache"
)
@@ -31,8 +33,9 @@ var (
// SyncCommitteeCache utilizes a FIFO cache to sufficiently cache validator position within sync committee.
// It is thread safe with concurrent read write.
type SyncCommitteeCache struct {
cache *cache.FIFO
lock sync.RWMutex
cache *cache.FIFO
lock sync.RWMutex
cleared *atomic.Uint64
}
// Index position of all validators in sync committee where `currentSyncCommitteeRoot` is the
@@ -51,9 +54,17 @@ type positionInCommittee struct {
// NewSyncCommittee initializes and returns a new SyncCommitteeCache.
func NewSyncCommittee() *SyncCommitteeCache {
return &SyncCommitteeCache{
cache: cache.NewFIFO(keyFn),
}
c := &SyncCommitteeCache{cleared: &atomic.Uint64{}}
c.Clear()
return c
}
// Clear resets the SyncCommitteeCache to its initial state
func (s *SyncCommitteeCache) Clear() {
s.lock.Lock()
defer s.lock.Unlock()
s.cleared.Add(1)
s.cache = cache.NewFIFO(keyFn)
}
// CurrentPeriodIndexPosition returns current period index position of a validator index with respect with
@@ -123,6 +134,10 @@ func (s *SyncCommitteeCache) idxPositionInCommittee(
// current epoch and next epoch. This should be called when `current_sync_committee` and `next_sync_committee`
// change and that happens every `EPOCHS_PER_SYNC_COMMITTEE_PERIOD`.
func (s *SyncCommitteeCache) UpdatePositionsInCommittee(syncCommitteeBoundaryRoot [32]byte, st state.BeaconState) error {
// since we call UpdatePositionsInCommittee asynchronously, keep track of the cache value
// seen at the beginning of the routine and compare at the end before updating. If the underlying value has been
// cycled (new address), don't update it.
clearCount := s.cleared.Load()
csc, err := st.CurrentSyncCommittee()
if err != nil {
return err
@@ -162,6 +177,10 @@ func (s *SyncCommitteeCache) UpdatePositionsInCommittee(syncCommitteeBoundaryRoo
s.lock.Lock()
defer s.lock.Unlock()
if clearCount != s.cleared.Load() {
log.Warn("cache rotated during async committee update operation - abandoning cache update")
return nil
}
if err := s.cache.Add(&syncCommitteeIndexPosition{
currentSyncCommitteeRoot: syncCommitteeBoundaryRoot,

View File

@@ -30,3 +30,8 @@ func (s *FakeSyncCommitteeCache) NextPeriodIndexPosition(root [32]byte, valIdx p
func (s *FakeSyncCommitteeCache) UpdatePositionsInCommittee(syncCommitteeBoundaryRoot [32]byte, state state.BeaconState) error {
return nil
}
// Clear -- fake.
func (s *FakeSyncCommitteeCache) Clear() {
return
}