Revert "Revert "Add In Progress Checker For Checkpoint Cache"" (#7690)

This reverts commit 46c04b98d9.
This commit is contained in:
Raul Jordan
2020-10-31 10:59:50 -05:00
committed by GitHub
parent df762bbfee
commit b3155a04f5
4 changed files with 145 additions and 23 deletions

View File

@@ -7,6 +7,7 @@ import (
"github.com/pkg/errors"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/beacon-chain/cache"
"github.com/prysmaticlabs/prysm/beacon-chain/core/blocks"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/core/state"
@@ -18,7 +19,7 @@ import (
// getAttPreState retrieves the att pre state by either from the cache or the DB.
func (s *Service) getAttPreState(ctx context.Context, c *ethpb.Checkpoint) (*stateTrie.BeaconState, error) {
cachedState, err := s.checkpointStateCache.StateByCheckpoint(c)
cachedState, err := s.checkpointStateCache.StateByCheckpoint(ctx, c)
if err != nil {
return nil, errors.Wrap(err, "could not get cached checkpoint state")
}
@@ -26,17 +27,33 @@ func (s *Service) getAttPreState(ctx context.Context, c *ethpb.Checkpoint) (*sta
return cachedState, nil
}
if err := s.checkpointStateCache.MarkInProgress(c); err != nil {
if errors.Is(err, cache.ErrAlreadyInProgress) {
cachedState, err = s.checkpointStateCache.StateByCheckpoint(ctx, c)
if err != nil {
return nil, errors.Wrap(err, "could not get cached checkpoint state")
}
if cachedState != nil {
return cachedState, nil
}
} else {
return nil, err
}
}
defer func() {
if err := s.checkpointStateCache.MarkNotInProgress(c); err != nil {
log.WithError(err).Error("Failed to mark cache not in progress")
}
}()
baseState, err := s.stateGen.StateByRoot(ctx, bytesutil.ToBytes32(c.Root))
if err != nil {
return nil, errors.Wrapf(err, "could not get pre state for epoch %d", c.Epoch)
}
epochStartSlot, err := helpers.StartSlot(c.Epoch)
if err != nil {
return nil, err
}
if epochStartSlot > baseState.Slot() {
baseState = baseState.Copy()
baseState, err = state.ProcessSlots(ctx, baseState, epochStartSlot)
if err != nil {
return nil, errors.Wrapf(err, "could not process slots up to epoch %d", c.Epoch)
@@ -46,7 +63,6 @@ func (s *Service) getAttPreState(ctx context.Context, c *ethpb.Checkpoint) (*sta
}
return baseState, nil
}
has, err := s.stateGen.HasState(ctx, bytesutil.ToBytes32(c.Root))
if err != nil {
return nil, err
@@ -57,7 +73,6 @@ func (s *Service) getAttPreState(ctx context.Context, c *ethpb.Checkpoint) (*sta
}
}
return baseState, nil
}
// verifyAttTargetEpoch validates attestation is from the current or previous epoch.

View File

@@ -2,7 +2,9 @@ package blockchain
import (
"context"
"sync"
"testing"
"time"
"github.com/gogo/protobuf/proto"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
@@ -178,11 +180,11 @@ func TestStore_SaveCheckpointState(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, 1*params.BeaconConfig().SlotsPerEpoch, s1.Slot(), "Unexpected state slot")
s1, err = service.checkpointStateCache.StateByCheckpoint(cp1)
s1, err = service.checkpointStateCache.StateByCheckpoint(ctx, cp1)
require.NoError(t, err)
assert.Equal(t, 1*params.BeaconConfig().SlotsPerEpoch, s1.Slot(), "Unexpected state slot")
s2, err = service.checkpointStateCache.StateByCheckpoint(cp2)
s2, err = service.checkpointStateCache.StateByCheckpoint(ctx, cp2)
require.NoError(t, err)
assert.Equal(t, 2*params.BeaconConfig().SlotsPerEpoch, s2.Slot(), "Unexpected state slot")
@@ -218,7 +220,7 @@ func TestStore_UpdateCheckpointState(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, returned.Slot(), checkpoint.Epoch*params.BeaconConfig().SlotsPerEpoch, "Incorrectly returned base state")
cached, err := service.checkpointStateCache.StateByCheckpoint(checkpoint)
cached, err := service.checkpointStateCache.StateByCheckpoint(ctx, checkpoint)
require.NoError(t, err)
assert.Equal(t, returned.Slot(), cached.Slot(), "State should have been cached")
@@ -233,13 +235,53 @@ func TestStore_UpdateCheckpointState(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, returned.Slot(), baseState.Slot(), "Incorrectly returned base state")
cached, err = service.checkpointStateCache.StateByCheckpoint(newCheckpoint)
cached, err = service.checkpointStateCache.StateByCheckpoint(ctx, newCheckpoint)
require.NoError(t, err)
if !proto.Equal(returned.InnerStateUnsafe(), cached.InnerStateUnsafe()) {
t.Error("Incorrectly cached base state")
}
}
func TestStore_CheckpointState_InProgress(t *testing.T) {
ctx := context.Background()
db, sc := testDB.SetupDB(t)
cfg := &Config{
BeaconDB: db,
StateGen: stategen.New(db, sc),
}
service, err := NewService(ctx, cfg)
require.NoError(t, err)
epoch := uint64(1)
baseState, _ := testutil.DeterministicGenesisState(t, 1)
checkpoint := &ethpb.Checkpoint{Epoch: epoch, Root: bytesutil.PadTo([]byte("hi"), 32)}
require.NoError(t, service.beaconDB.SaveState(ctx, baseState, bytesutil.ToBytes32(checkpoint.Root)))
require.NoError(t, service.checkpointStateCache.MarkInProgress(checkpoint))
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
response, err := service.getAttPreState(ctx, checkpoint)
require.NoError(t, err)
if !proto.Equal(baseState.InnerStateUnsafe(), response.InnerStateUnsafe()) {
t.Error("Expected equal responses from cache")
}
}()
wg.Add(1)
go func() {
defer wg.Done()
assert.NoError(t, service.checkpointStateCache.AddCheckpointState(checkpoint, baseState))
assert.NoError(t, service.checkpointStateCache.MarkNotInProgress(checkpoint))
}()
testutil.WaitTimeout(&wg, time.Second*2)
}
func TestAttEpoch_MatchPrevEpoch(t *testing.T) {
ctx := context.Background()
db, _ := testDB.SetupDB(t)

View File

@@ -1,7 +1,10 @@
package cache
import (
"context"
"math"
"sync"
"time"
lru "github.com/hashicorp/golang-lru"
"github.com/prometheus/client_golang/prometheus"
@@ -30,8 +33,9 @@ var (
// CheckpointStateCache is a struct with 1 queue for looking up state by checkpoint.
type CheckpointStateCache struct {
cache *lru.Cache
lock sync.RWMutex
cache *lru.Cache
lock sync.RWMutex
inProgress map[string]bool
}
// NewCheckpointStateCache creates a new checkpoint state cache for storing/accessing processed state.
@@ -41,22 +45,45 @@ func NewCheckpointStateCache() *CheckpointStateCache {
panic(err)
}
return &CheckpointStateCache{
cache: cache,
cache: cache,
inProgress: map[string]bool{},
}
}
// StateByCheckpoint fetches state by checkpoint. Returns true with a
// reference to the CheckpointState info, if exists. Otherwise returns false, nil.
func (c *CheckpointStateCache) StateByCheckpoint(cp *ethpb.Checkpoint) (*stateTrie.BeaconState, error) {
c.lock.RLock()
defer c.lock.RUnlock()
h, err := hashutil.HashProto(cp)
func (c *CheckpointStateCache) StateByCheckpoint(ctx context.Context, cp *ethpb.Checkpoint) (*stateTrie.BeaconState, error) {
k, err := checkpointKey(cp)
if err != nil {
return nil, err
}
item, exists := c.cache.Get(h)
delay := minDelay
// Another identical request may be in progress already. Let's wait until
// any in progress request resolves or our timeout is exceeded.
for {
if ctx.Err() != nil {
return nil, ctx.Err()
}
c.lock.RLock()
if !c.inProgress[k] {
c.lock.RUnlock()
break
}
c.lock.RUnlock()
// This increasing backoff is to decrease the CPU cycles while waiting
// for the in progress boolean to flip to false.
time.Sleep(time.Duration(delay) * time.Nanosecond)
delay *= delayFactor
delay = math.Min(delay, maxDelay)
}
c.lock.RLock()
defer c.lock.RUnlock()
item, exists := c.cache.Get(k)
if exists && item != nil {
checkpointStateHit.Inc()
// Copy here is unnecessary since the return will only be used to verify attestation signature.
@@ -72,10 +99,47 @@ func (c *CheckpointStateCache) StateByCheckpoint(cp *ethpb.Checkpoint) (*stateTr
func (c *CheckpointStateCache) AddCheckpointState(cp *ethpb.Checkpoint, s *stateTrie.BeaconState) error {
c.lock.Lock()
defer c.lock.Unlock()
h, err := hashutil.HashProto(cp)
k, err := checkpointKey(cp)
if err != nil {
return err
}
c.cache.Add(h, s)
c.cache.ContainsOrAdd(k, s)
return nil
}
// MarkInProgress a request so that any other similar requests will block on
// Get until MarkNotInProgress is called.
func (c *CheckpointStateCache) MarkInProgress(cp *ethpb.Checkpoint) error {
c.lock.Lock()
defer c.lock.Unlock()
k, err := checkpointKey(cp)
if err != nil {
return err
}
if c.inProgress[k] {
return ErrAlreadyInProgress
}
c.inProgress[k] = true
return nil
}
// MarkNotInProgress will release the lock on a given request. This should be
// called after put.
func (c *CheckpointStateCache) MarkNotInProgress(cp *ethpb.Checkpoint) error {
c.lock.Lock()
defer c.lock.Unlock()
k, err := checkpointKey(cp)
if err != nil {
return err
}
delete(c.inProgress, k)
return nil
}
func checkpointKey(cp *ethpb.Checkpoint) (string, error) {
h, err := hashutil.HashProto(cp)
if err != nil {
return "", err
}
return string(h[:]), nil
}

View File

@@ -1,6 +1,7 @@
package cache
import (
"context"
"testing"
"github.com/gogo/protobuf/proto"
@@ -23,13 +24,13 @@ func TestCheckpointStateCache_StateByCheckpoint(t *testing.T) {
})
require.NoError(t, err)
state, err := cache.StateByCheckpoint(cp1)
state, err := cache.StateByCheckpoint(context.Background(), cp1)
require.NoError(t, err)
assert.Equal(t, (*stateTrie.BeaconState)(nil), state, "Expected state not to exist in empty cache")
require.NoError(t, cache.AddCheckpointState(cp1, st))
state, err = cache.StateByCheckpoint(cp1)
state, err = cache.StateByCheckpoint(context.Background(), cp1)
require.NoError(t, err)
if !proto.Equal(state.InnerStateUnsafe(), st.InnerStateUnsafe()) {
@@ -43,11 +44,11 @@ func TestCheckpointStateCache_StateByCheckpoint(t *testing.T) {
require.NoError(t, err)
require.NoError(t, cache.AddCheckpointState(cp2, st2))
state, err = cache.StateByCheckpoint(cp2)
state, err = cache.StateByCheckpoint(context.Background(), cp2)
require.NoError(t, err)
assert.DeepEqual(t, st2.CloneInnerState(), state.CloneInnerState(), "incorrectly cached state")
state, err = cache.StateByCheckpoint(cp1)
state, err = cache.StateByCheckpoint(context.Background(), cp1)
require.NoError(t, err)
assert.DeepEqual(t, st.CloneInnerState(), state.CloneInnerState(), "incorrectly cached state")
}