Next slot state caching (#8357)

* Add and use trailing slot state cache

* Remove comment

* Update comments

* Update comments

* Move it to background

* Warn is more appropiate

* Raul's comment

* Same strategy for proposer

* Own helper

* Minor touchups

* Tests

* Feature flag

* Gaz
This commit is contained in:
terence tsao
2021-01-29 08:52:43 -08:00
committed by GitHub
parent e592cd7a80
commit 3aaa98decf
9 changed files with 197 additions and 17 deletions

View File

@@ -101,6 +101,13 @@ func (s *Service) onBlock(ctx context.Context, signed *ethpb.SignedBeaconBlock,
return err
}
// Updating next slot state cache can happen in the background. It shouldn't block rest of the process.
go func() {
if err := state.UpdateNextSlotCache(ctx, blockRoot[:], postState); err != nil {
log.WithError(err).Debug("could not update next slot state cache")
}
}()
// Update justified check point.
if postState.CurrentJustifiedCheckpoint().Epoch > s.justifiedCheckpt.Epoch {
if err := s.updateJustified(ctx, postState); err != nil {

View File

@@ -9,6 +9,7 @@ go_library(
"log.go",
"skip_slot_cache.go",
"state.go",
"trailing_slot_state_cache.go",
"transition.go",
],
importpath = "github.com/prysmaticlabs/prysm/beacon-chain/core/state",
@@ -34,12 +35,15 @@ go_library(
"//proto/beacon/p2p/v1:go_default_library",
"//shared/bls:go_default_library",
"//shared/bytesutil:go_default_library",
"//shared/featureconfig:go_default_library",
"//shared/hashutil:go_default_library",
"//shared/mathutil:go_default_library",
"//shared/params:go_default_library",
"//shared/traceutil:go_default_library",
"//shared/trieutil:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_prometheus_client_golang//prometheus:go_default_library",
"@com_github_prometheus_client_golang//prometheus/promauto:go_default_library",
"@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@io_opencensus_go//trace:go_default_library",
@@ -54,6 +58,7 @@ go_test(
"skip_slot_cache_test.go",
"state_fuzz_test.go",
"state_test.go",
"trailing_slot_state_cache_test.go",
"transition_fuzz_test.go",
"transition_test.go",
],

View File

@@ -0,0 +1,64 @@
package state
import (
"bytes"
"context"
"sync"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prysmaticlabs/prysm/beacon-chain/state"
)
type nextSlotCache struct {
sync.RWMutex
root []byte
state *state.BeaconState
}
var (
nsc nextSlotCache
// Metrics for the validator cache.
nextSlotCacheHit = promauto.NewCounter(prometheus.CounterOpts{
Name: "next_slot_cache_hit",
Help: "The total number of cache hits on the next slot state cache.",
})
nextSlotCacheMiss = promauto.NewCounter(prometheus.CounterOpts{
Name: "next_slot_cache_miss",
Help: "The total number of cache misses on the next slot state cache.",
})
)
// NextSlotState returns the saved state if the input root matches the root in `nextSlotCache`. Returns nil otherwise.
// This is useful to check before processing slots. With a cache hit, it will return last processed state with slot plus
// one advancement.
func NextSlotState(ctx context.Context, root []byte) (*state.BeaconState, error) {
nsc.RLock()
defer nsc.RUnlock()
if !bytes.Equal(root, nsc.root) {
nextSlotCacheMiss.Inc()
return nil, nil
}
nextSlotCacheHit.Inc()
// Returning copied state.
return nsc.state.Copy(), nil
}
// UpdateNextSlotCache updates the `nextSlotCache`. It saves the input state after advancing the state slot by 1
// by calling `ProcessSlots`, it also saves the input root for later look up.
// This is useful to call after successfully processing a block.
func UpdateNextSlotCache(ctx context.Context, root []byte, state *state.BeaconState) error {
// Advancing one slot by using a copied state.
copied := state.Copy()
copied, err := ProcessSlots(ctx, copied, copied.Slot()+1)
if err != nil {
return err
}
nsc.Lock()
defer nsc.Unlock()
nsc.root = root
nsc.state = copied
return nil
}

View File

@@ -0,0 +1,30 @@
package state_test
import (
"context"
"testing"
"github.com/prysmaticlabs/prysm/beacon-chain/core/state"
st "github.com/prysmaticlabs/prysm/beacon-chain/state"
"github.com/prysmaticlabs/prysm/shared/testutil"
"github.com/prysmaticlabs/prysm/shared/testutil/require"
)
func TestTrailingSlotState_RoundTrip(t *testing.T) {
ctx := context.Background()
r := []byte{'a'}
s, err := state.NextSlotState(ctx, r)
require.NoError(t, err)
require.Equal(t, (*st.BeaconState)(nil), s)
s, _ = testutil.DeterministicGenesisState(t, 1)
require.NoError(t, state.UpdateNextSlotCache(ctx, r, s))
s, err = state.NextSlotState(ctx, r)
require.NoError(t, err)
require.Equal(t, uint64(1), s.Slot())
require.NoError(t, state.UpdateNextSlotCache(ctx, r, s))
s, err = state.NextSlotState(ctx, r)
require.NoError(t, err)
require.Equal(t, uint64(2), s.Slot())
}

View File

@@ -18,6 +18,7 @@ import (
"github.com/prysmaticlabs/prysm/beacon-chain/core/state/interop"
stateTrie "github.com/prysmaticlabs/prysm/beacon-chain/state"
"github.com/prysmaticlabs/prysm/shared/bls"
"github.com/prysmaticlabs/prysm/shared/featureconfig"
"github.com/prysmaticlabs/prysm/shared/mathutil"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/traceutil"
@@ -130,10 +131,16 @@ func ExecuteStateTransitionNoVerifyAnySig(
defer span.End()
var err error
// Execute per slots transition.
state, err = ProcessSlots(ctx, state, signed.Block.Slot)
if err != nil {
return nil, nil, errors.Wrap(err, "could not process slot")
if featureconfig.Get().EnableNextSlotStateCache {
state, err = ProcessSlotsUsingNextSlotCache(ctx, state, signed.Block.ParentRoot, signed.Block.Slot)
if err != nil {
return nil, nil, errors.Wrap(err, "could not process slots")
}
} else {
state, err = ProcessSlots(ctx, state, signed.Block.Slot)
if err != nil {
return nil, nil, errors.Wrap(err, "could not process slot")
}
}
// Execute per block transition.
@@ -183,9 +190,17 @@ func CalculateStateRoot(
state = state.Copy()
// Execute per slots transition.
state, err := ProcessSlots(ctx, state, signed.Block.Slot)
if err != nil {
return [32]byte{}, errors.Wrap(err, "could not process slot")
var err error
if featureconfig.Get().EnableNextSlotStateCache {
state, err = ProcessSlotsUsingNextSlotCache(ctx, state, signed.Block.ParentRoot, signed.Block.Slot)
if err != nil {
return [32]byte{}, errors.Wrap(err, "could not process slots")
}
} else {
state, err = ProcessSlots(ctx, state, signed.Block.Slot)
if err != nil {
return [32]byte{}, errors.Wrap(err, "could not process slot")
}
}
// Execute per block transition.
@@ -253,6 +268,37 @@ func ProcessSlot(ctx context.Context, state *stateTrie.BeaconState) (*stateTrie.
return state, nil
}
// ProcessSlotsUsingNextSlotCache processes slots by using next slot cache for higher efficiency.
func ProcessSlotsUsingNextSlotCache(
ctx context.Context,
parentState *stateTrie.BeaconState,
parentRoot []byte,
slot uint64) (*stateTrie.BeaconState, error) {
ctx, span := trace.StartSpan(ctx, "beacon-chain.ChainService.ProcessSlotsUsingNextSlotCache")
defer span.End()
// Check whether the parent state has been advanced by 1 slot in next slot cache.
nextSlotState, err := NextSlotState(ctx, parentRoot)
if err != nil {
return nil, err
}
// If the next slot state is not nil (i.e. cache hit).
// We replace next slot state with parent state.
if nextSlotState != nil {
parentState = nextSlotState
}
// Since next slot cache only advances state by 1 slot,
// we check if there's more slots that need to process.
if slot > parentState.Slot() {
parentState, err = ProcessSlots(ctx, parentState, slot)
if err != nil {
return nil, errors.Wrap(err, "could not process slots")
}
}
return parentState, nil
}
// ProcessSlots process through skip slots and apply epoch transition when it's needed
//
// Spec pseudocode definition:

View File

@@ -890,3 +890,12 @@ func TestProcessSlots_LowerSlotAsParentState(t *testing.T) {
_, err = state.ProcessSlots(context.Background(), parentState, slot-1)
assert.ErrorContains(t, "expected state.slot 2 < slot 1", err)
}
func TestProcessSlotsUsingNextSlotCache(t *testing.T) {
ctx := context.Background()
s, _ := testutil.DeterministicGenesisState(t, 1)
r := []byte{'a'}
s, err := state.ProcessSlotsUsingNextSlotCache(ctx, s, r, 5)
require.NoError(t, err)
require.Equal(t, uint64(5), s.Slot())
}

View File

@@ -68,9 +68,17 @@ func (vs *Server) GetBlock(ctx context.Context, req *ethpb.BlockRequest) (*ethpb
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not get head state %v", err)
}
head, err = state.ProcessSlots(ctx, head, req.Slot)
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not advance slot to calculate proposer index: %v", err)
if featureconfig.Get().EnableNextSlotStateCache {
head, err = state.ProcessSlotsUsingNextSlotCache(ctx, head, parentRoot, req.Slot)
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not advance slots to calculate proposer index: %v", err)
}
} else {
head, err = state.ProcessSlots(ctx, head, req.Slot)
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not advance slot to calculate proposer index: %v", err)
}
}
var eth1Data *ethpb.Eth1Data

View File

@@ -61,17 +61,18 @@ type Flags struct {
DisableBroadcastSlashings bool // DisableBroadcastSlashings disables p2p broadcasting of proposer and attester slashings.
// Cache toggles.
EnableSSZCache bool // EnableSSZCache see https://github.com/prysmaticlabs/prysm/pull/4558.
EnableEth1DataVoteCache bool // EnableEth1DataVoteCache; see https://github.com/prysmaticlabs/prysm/issues/3106.
EnableSlasherConnection bool // EnableSlasher enable retrieval of slashing events from a slasher instance.
UseCheckPointInfoCache bool // UseCheckPointInfoCache uses check point info cache to efficiently verify attestation signatures.
KafkaBootstrapServers string // KafkaBootstrapServers to find kafka servers to stream blocks, attestations, etc.
AttestationAggregationStrategy string // AttestationAggregationStrategy defines aggregation strategy to be used when aggregating.
EnableSSZCache bool // EnableSSZCache see https://github.com/prysmaticlabs/prysm/pull/4558.
EnableEth1DataVoteCache bool // EnableEth1DataVoteCache; see https://github.com/prysmaticlabs/prysm/issues/3106.
EnableSlasherConnection bool // EnableSlasher enable retrieval of slashing events from a slasher instance.
UseCheckPointInfoCache bool // UseCheckPointInfoCache uses check point info cache to efficiently verify attestation signatures.
EnableNextSlotStateCache bool // EnableNextSlotStateCache enables next slot state cache to improve validator performance.
// Bug fixes related flags.
AttestTimely bool // AttestTimely fixes #8185. It is gated behind a flag to ensure beacon node's fix can safely roll out first. We'll invert this in v1.1.0.
KafkaBootstrapServers string // KafkaBootstrapServers to find kafka servers to stream blocks, attestations, etc.
AttestationAggregationStrategy string // AttestationAggregationStrategy defines aggregation strategy to be used when aggregating.
// KeystoreImportDebounceInterval specifies the time duration the validator waits to reload new keys if they have
// changed on disk. This feature is for advanced use cases only.
KeystoreImportDebounceInterval time.Duration
@@ -195,6 +196,10 @@ func ConfigureBeaconChain(ctx *cli.Context) {
log.Warn("Disabling slashing broadcasting to p2p network")
cfg.DisableBroadcastSlashings = true
}
if ctx.Bool(enableNextSlotStateCache.Name) {
log.Warn("Enabling next slot state cache")
cfg.EnableNextSlotStateCache = true
}
Init(cfg)
}

View File

@@ -110,11 +110,16 @@ var (
Name: "attest-timely",
Usage: "Fixes validator can attest timely after current block processes. See #8185 for more details",
}
enableNextSlotStateCache = &cli.BoolFlag{
Name: "enable-next-slot-state-cache",
Usage: "Improves attesting and proposing efficiency by caching the next slot state at the end of the current slot",
}
)
// devModeFlags holds list of flags that are set when development mode is on.
var devModeFlags = []cli.Flag{
enableLargerGossipHistory,
enableNextSlotStateCache,
}
// ValidatorFlags contains a list of all the feature flags that apply to the validator client.
@@ -160,6 +165,7 @@ var BeaconChainFlags = append(deprecatedFlags, []cli.Flag{
disablePruningDepositProofs,
disableSyncBacktracking,
disableBroadcastSlashingFlag,
enableNextSlotStateCache,
}...)
// E2EBeaconChainFlags contains a list of the beacon chain feature flags to be tested in E2E.