refactor next slot cache (#12233)

* refactor next slot cache

* fix test

* rename function

* fix spectests

* remove TODO comments

---------

Co-authored-by: Nishant Das <nishdas93@gmail.com>
This commit is contained in:
Potuz
2023-04-04 13:04:00 -03:00
committed by GitHub
parent 35e3eeddf9
commit fb65421678
5 changed files with 77 additions and 51 deletions

View File

@@ -671,9 +671,7 @@ func (s *Service) fillMissingPayloadIDRoutine(ctx context.Context, stateFeed *ev
for {
select {
case <-ticker.C():
if err := s.fillMissingBlockPayloadId(ctx); err != nil {
log.WithError(err).Error("Could not fill missing payload ID")
}
s.lateBlockTasks(ctx)
case <-ctx.Done():
log.Debug("Context closed, exiting routine")
@@ -683,11 +681,13 @@ func (s *Service) fillMissingPayloadIDRoutine(ctx context.Context, stateFeed *ev
}()
}
// fillMissingBlockPayloadId is called 4 seconds into the slot and calls FCU if we are proposing next slot
// and the cache has been missed
func (s *Service) fillMissingBlockPayloadId(ctx context.Context) error {
// lateBlockTasks is called 4 seconds into the slot and performs tasks
// related to late blocks. It emits a MissedSlot state feed event.
// It calls FCU and sets the right attributes if we are proposing next slot
// it also updates the next slot cache to deal with skipped slots.
func (s *Service) lateBlockTasks(ctx context.Context) {
if s.CurrentSlot() == s.HeadSlot() {
return nil
return
}
s.cfg.StateNotifier.StateFeed().Send(&feed.Event{
Type: statefeed.MissedSlot,
@@ -697,21 +697,31 @@ func (s *Service) fillMissingBlockPayloadId(ctx context.Context) error {
_, id, has := s.cfg.ProposerSlotIndexCache.GetProposerPayloadIDs(s.CurrentSlot()+1, [32]byte{} /* head root */)
// There exists proposer for next slot, but we haven't called fcu w/ payload attribute yet.
if !has || id != [8]byte{} {
return nil
return
}
s.headLock.RLock()
headBlock, err := s.headBlock()
if err != nil {
s.headLock.RUnlock()
return err
log.WithError(err).Debug("could not perform late block tasks: failed to retrieve head block")
return
}
headState := s.headState(ctx)
headRoot := s.headRoot()
headState := s.headState(ctx)
s.headLock.RUnlock()
_, err = s.notifyForkchoiceUpdate(ctx, &notifyForkchoiceUpdateArg{
headState: headState,
headRoot: headRoot,
headBlock: headBlock.Block(),
})
return err
if err != nil {
log.WithError(err).Debug("could not perform late block tasks: failed to update forkchoice with engine")
}
lastRoot, lastState := transition.LastCachedState()
if lastState == nil {
lastRoot, lastState = headRoot[:], headState
}
if err = transition.UpdateNextSlotCache(ctx, lastRoot, lastState); err != nil {
log.WithError(err).Debug("could not update next slot state cache")
}
}

View File

@@ -2153,6 +2153,7 @@ func TestOnBlock_HandleBlockAttestations(t *testing.T) {
}
func TestFillMissingBlockPayloadId_DiffSlotExitEarly(t *testing.T) {
logHook := logTest.NewGlobal()
fc := doublylinkedtree.New()
ctx := context.Background()
beaconDB := testDB.SetupDB(t)
@@ -2164,7 +2165,8 @@ func TestFillMissingBlockPayloadId_DiffSlotExitEarly(t *testing.T) {
service, err := NewService(ctx, opts...)
require.NoError(t, err)
require.NoError(t, service.fillMissingBlockPayloadId(ctx), 0)
service.lateBlockTasks(ctx)
require.LogsDoNotContain(t, logHook, "could not perform late block tasks")
}
// Helper function to simulate the block being on time or delayed for proposer

View File

@@ -5,15 +5,19 @@ import (
"context"
"sync"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/state"
"github.com/prysmaticlabs/prysm/v4/encoding/bytesutil"
)
type nextSlotCache struct {
sync.RWMutex
root []byte
state state.BeaconState
sync.Mutex
prevRoot []byte
lastRoot []byte
prevState state.BeaconState
lastState state.BeaconState
}
var (
@@ -29,19 +33,22 @@ var (
})
)
// NextSlotState returns the saved state if the input root matches the root in `nextSlotCache`. Returns nil otherwise.
// This is useful to check before processing slots. With a cache hit, it will return last processed state with slot plus
// one advancement.
func NextSlotState(_ context.Context, root []byte) (state.BeaconState, error) {
nsc.RLock()
defer nsc.RUnlock()
if !bytes.Equal(root, nsc.root) || bytes.Equal(root, []byte{}) {
nextSlotCacheMiss.Inc()
return nil, nil
// NextSlotState returns the saved state for the given blockroot.
// It returns the last updated state if it matches. Otherwise it returns the previously
// updated state if it matches its root. If no root matches it returns nil
func NextSlotState(root []byte) state.BeaconState {
nsc.Lock()
defer nsc.Unlock()
if bytes.Equal(root, nsc.lastRoot) {
nextSlotCacheHit.Inc()
return nsc.lastState.Copy()
}
nextSlotCacheHit.Inc()
// Returning copied state.
return nsc.state.Copy(), nil
if bytes.Equal(root, nsc.prevRoot) {
nextSlotCacheHit.Inc()
return nsc.prevState.Copy()
}
nextSlotCacheMiss.Inc()
return nil
}
// UpdateNextSlotCache updates the `nextSlotCache`. It saves the input state after advancing the state slot by 1
@@ -52,13 +59,25 @@ func UpdateNextSlotCache(ctx context.Context, root []byte, state state.BeaconSta
copied := state.Copy()
copied, err := ProcessSlots(ctx, copied, copied.Slot()+1)
if err != nil {
return err
return errors.Wrap(err, "could not process slots")
}
nsc.Lock()
defer nsc.Unlock()
nsc.root = root
nsc.state = copied
nsc.prevRoot = nsc.lastRoot
nsc.prevState = nsc.lastState
nsc.lastRoot = bytesutil.SafeCopyBytes(root)
nsc.lastState = copied
return nil
}
// LastCachedState returns the last cached state and root in the cache
func LastCachedState() ([]byte, state.BeaconState) {
nsc.Lock()
defer nsc.Unlock()
if nsc.lastState == nil {
return nil, nil
}
return bytesutil.SafeCopyBytes(nsc.lastRoot), nsc.lastState.Copy()
}

View File

@@ -13,18 +13,23 @@ import (
func TestTrailingSlotState_RoundTrip(t *testing.T) {
ctx := context.Background()
r := []byte{'a'}
s, err := transition.NextSlotState(ctx, r)
require.NoError(t, err)
s := transition.NextSlotState(r)
require.Equal(t, nil, s)
s, _ = util.DeterministicGenesisState(t, 1)
require.NoError(t, transition.UpdateNextSlotCache(ctx, r, s))
s, err = transition.NextSlotState(ctx, r)
require.NoError(t, err)
s = transition.NextSlotState(r)
require.Equal(t, primitives.Slot(1), s.Slot())
lastRoot, lastState := transition.LastCachedState()
require.DeepEqual(t, r, lastRoot)
require.Equal(t, s.Slot(), lastState.Slot())
require.NoError(t, transition.UpdateNextSlotCache(ctx, r, s))
s, err = transition.NextSlotState(ctx, r)
require.NoError(t, err)
s = transition.NextSlotState(r)
require.Equal(t, primitives.Slot(2), s.Slot())
lastRoot, lastState = transition.LastCachedState()
require.DeepEqual(t, r, lastRoot)
require.Equal(t, s.Slot(), lastState.Slot())
}

View File

@@ -147,25 +147,15 @@ func ProcessSlotsUsingNextSlotCache(
ctx, span := trace.StartSpan(ctx, "core.state.ProcessSlotsUsingNextSlotCache")
defer span.End()
// Check whether the parent state has been advanced by 1 slot in next slot cache.
nextSlotState, err := NextSlotState(ctx, parentRoot)
if err != nil {
return nil, err
}
cachedStateExists := nextSlotState != nil && !nextSlotState.IsNil()
// If the next slot state is not nil (i.e. cache hit).
// We replace next slot state with parent state.
if cachedStateExists {
nextSlotState := NextSlotState(parentRoot)
if nextSlotState != nil {
parentState = nextSlotState
}
// In the event our cached state has advanced our
// state to the desired slot, we exit early.
if cachedStateExists && parentState.Slot() == slot {
if parentState.Slot() == slot {
return parentState, nil
}
// Since next slot cache only advances state by 1 slot,
// we check if there's more slots that need to process.
var err error
parentState, err = ProcessSlots(ctx, parentState, slot)
if err != nil {
return nil, errors.Wrap(err, "could not process slots")