diff --git a/beacon-chain/blockchain/process_block.go b/beacon-chain/blockchain/process_block.go index 5ca57898dd..43328e8491 100644 --- a/beacon-chain/blockchain/process_block.go +++ b/beacon-chain/blockchain/process_block.go @@ -101,6 +101,13 @@ func (s *Service) onBlock(ctx context.Context, signed *ethpb.SignedBeaconBlock, return err } + // Updating next slot state cache can happen in the background. It shouldn't block rest of the process. + go func() { + if err := state.UpdateNextSlotCache(ctx, blockRoot[:], postState); err != nil { + log.WithError(err).Debug("could not update next slot state cache") + } + }() + // Update justified check point. if postState.CurrentJustifiedCheckpoint().Epoch > s.justifiedCheckpt.Epoch { if err := s.updateJustified(ctx, postState); err != nil { diff --git a/beacon-chain/core/state/BUILD.bazel b/beacon-chain/core/state/BUILD.bazel index 6302f47247..a89695c9da 100644 --- a/beacon-chain/core/state/BUILD.bazel +++ b/beacon-chain/core/state/BUILD.bazel @@ -9,6 +9,7 @@ go_library( "log.go", "skip_slot_cache.go", "state.go", + "trailing_slot_state_cache.go", "transition.go", ], importpath = "github.com/prysmaticlabs/prysm/beacon-chain/core/state", @@ -34,12 +35,15 @@ go_library( "//proto/beacon/p2p/v1:go_default_library", "//shared/bls:go_default_library", "//shared/bytesutil:go_default_library", + "//shared/featureconfig:go_default_library", "//shared/hashutil:go_default_library", "//shared/mathutil:go_default_library", "//shared/params:go_default_library", "//shared/traceutil:go_default_library", "//shared/trieutil:go_default_library", "@com_github_pkg_errors//:go_default_library", + "@com_github_prometheus_client_golang//prometheus:go_default_library", + "@com_github_prometheus_client_golang//prometheus/promauto:go_default_library", "@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library", "@com_github_sirupsen_logrus//:go_default_library", "@io_opencensus_go//trace:go_default_library", @@ -54,6 +58,7 @@ go_test( "skip_slot_cache_test.go", "state_fuzz_test.go", "state_test.go", + "trailing_slot_state_cache_test.go", "transition_fuzz_test.go", "transition_test.go", ], diff --git a/beacon-chain/core/state/trailing_slot_state_cache.go b/beacon-chain/core/state/trailing_slot_state_cache.go new file mode 100644 index 0000000000..caefa369b8 --- /dev/null +++ b/beacon-chain/core/state/trailing_slot_state_cache.go @@ -0,0 +1,64 @@ +package state + +import ( + "bytes" + "context" + "sync" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prysmaticlabs/prysm/beacon-chain/state" +) + +type nextSlotCache struct { + sync.RWMutex + root []byte + state *state.BeaconState +} + +var ( + nsc nextSlotCache + // Metrics for the validator cache. + nextSlotCacheHit = promauto.NewCounter(prometheus.CounterOpts{ + Name: "next_slot_cache_hit", + Help: "The total number of cache hits on the next slot state cache.", + }) + nextSlotCacheMiss = promauto.NewCounter(prometheus.CounterOpts{ + Name: "next_slot_cache_miss", + Help: "The total number of cache misses on the next slot state cache.", + }) +) + +// NextSlotState returns the saved state if the input root matches the root in `nextSlotCache`. Returns nil otherwise. +// This is useful to check before processing slots. With a cache hit, it will return last processed state with slot plus +// one advancement. +func NextSlotState(ctx context.Context, root []byte) (*state.BeaconState, error) { + nsc.RLock() + defer nsc.RUnlock() + if !bytes.Equal(root, nsc.root) { + nextSlotCacheMiss.Inc() + return nil, nil + } + nextSlotCacheHit.Inc() + // Returning copied state. + return nsc.state.Copy(), nil +} + +// UpdateNextSlotCache updates the `nextSlotCache`. It saves the input state after advancing the state slot by 1 +// by calling `ProcessSlots`, it also saves the input root for later look up. +// This is useful to call after successfully processing a block. +func UpdateNextSlotCache(ctx context.Context, root []byte, state *state.BeaconState) error { + // Advancing one slot by using a copied state. + copied := state.Copy() + copied, err := ProcessSlots(ctx, copied, copied.Slot()+1) + if err != nil { + return err + } + + nsc.Lock() + defer nsc.Unlock() + + nsc.root = root + nsc.state = copied + return nil +} diff --git a/beacon-chain/core/state/trailing_slot_state_cache_test.go b/beacon-chain/core/state/trailing_slot_state_cache_test.go new file mode 100644 index 0000000000..434cd53aec --- /dev/null +++ b/beacon-chain/core/state/trailing_slot_state_cache_test.go @@ -0,0 +1,30 @@ +package state_test + +import ( + "context" + "testing" + + "github.com/prysmaticlabs/prysm/beacon-chain/core/state" + st "github.com/prysmaticlabs/prysm/beacon-chain/state" + "github.com/prysmaticlabs/prysm/shared/testutil" + "github.com/prysmaticlabs/prysm/shared/testutil/require" +) + +func TestTrailingSlotState_RoundTrip(t *testing.T) { + ctx := context.Background() + r := []byte{'a'} + s, err := state.NextSlotState(ctx, r) + require.NoError(t, err) + require.Equal(t, (*st.BeaconState)(nil), s) + + s, _ = testutil.DeterministicGenesisState(t, 1) + require.NoError(t, state.UpdateNextSlotCache(ctx, r, s)) + s, err = state.NextSlotState(ctx, r) + require.NoError(t, err) + require.Equal(t, uint64(1), s.Slot()) + + require.NoError(t, state.UpdateNextSlotCache(ctx, r, s)) + s, err = state.NextSlotState(ctx, r) + require.NoError(t, err) + require.Equal(t, uint64(2), s.Slot()) +} diff --git a/beacon-chain/core/state/transition.go b/beacon-chain/core/state/transition.go index 331ecebc94..365a8b048a 100644 --- a/beacon-chain/core/state/transition.go +++ b/beacon-chain/core/state/transition.go @@ -18,6 +18,7 @@ import ( "github.com/prysmaticlabs/prysm/beacon-chain/core/state/interop" stateTrie "github.com/prysmaticlabs/prysm/beacon-chain/state" "github.com/prysmaticlabs/prysm/shared/bls" + "github.com/prysmaticlabs/prysm/shared/featureconfig" "github.com/prysmaticlabs/prysm/shared/mathutil" "github.com/prysmaticlabs/prysm/shared/params" "github.com/prysmaticlabs/prysm/shared/traceutil" @@ -130,10 +131,16 @@ func ExecuteStateTransitionNoVerifyAnySig( defer span.End() var err error - // Execute per slots transition. - state, err = ProcessSlots(ctx, state, signed.Block.Slot) - if err != nil { - return nil, nil, errors.Wrap(err, "could not process slot") + if featureconfig.Get().EnableNextSlotStateCache { + state, err = ProcessSlotsUsingNextSlotCache(ctx, state, signed.Block.ParentRoot, signed.Block.Slot) + if err != nil { + return nil, nil, errors.Wrap(err, "could not process slots") + } + } else { + state, err = ProcessSlots(ctx, state, signed.Block.Slot) + if err != nil { + return nil, nil, errors.Wrap(err, "could not process slot") + } } // Execute per block transition. @@ -183,9 +190,17 @@ func CalculateStateRoot( state = state.Copy() // Execute per slots transition. - state, err := ProcessSlots(ctx, state, signed.Block.Slot) - if err != nil { - return [32]byte{}, errors.Wrap(err, "could not process slot") + var err error + if featureconfig.Get().EnableNextSlotStateCache { + state, err = ProcessSlotsUsingNextSlotCache(ctx, state, signed.Block.ParentRoot, signed.Block.Slot) + if err != nil { + return [32]byte{}, errors.Wrap(err, "could not process slots") + } + } else { + state, err = ProcessSlots(ctx, state, signed.Block.Slot) + if err != nil { + return [32]byte{}, errors.Wrap(err, "could not process slot") + } } // Execute per block transition. @@ -253,6 +268,37 @@ func ProcessSlot(ctx context.Context, state *stateTrie.BeaconState) (*stateTrie. return state, nil } +// ProcessSlotsUsingNextSlotCache processes slots by using next slot cache for higher efficiency. +func ProcessSlotsUsingNextSlotCache( + ctx context.Context, + parentState *stateTrie.BeaconState, + parentRoot []byte, + slot uint64) (*stateTrie.BeaconState, error) { + ctx, span := trace.StartSpan(ctx, "beacon-chain.ChainService.ProcessSlotsUsingNextSlotCache") + defer span.End() + + // Check whether the parent state has been advanced by 1 slot in next slot cache. + nextSlotState, err := NextSlotState(ctx, parentRoot) + if err != nil { + return nil, err + } + // If the next slot state is not nil (i.e. cache hit). + // We replace next slot state with parent state. + if nextSlotState != nil { + parentState = nextSlotState + } + + // Since next slot cache only advances state by 1 slot, + // we check if there's more slots that need to process. + if slot > parentState.Slot() { + parentState, err = ProcessSlots(ctx, parentState, slot) + if err != nil { + return nil, errors.Wrap(err, "could not process slots") + } + } + return parentState, nil +} + // ProcessSlots process through skip slots and apply epoch transition when it's needed // // Spec pseudocode definition: diff --git a/beacon-chain/core/state/transition_test.go b/beacon-chain/core/state/transition_test.go index b207783ccc..3a47857c4a 100644 --- a/beacon-chain/core/state/transition_test.go +++ b/beacon-chain/core/state/transition_test.go @@ -890,3 +890,12 @@ func TestProcessSlots_LowerSlotAsParentState(t *testing.T) { _, err = state.ProcessSlots(context.Background(), parentState, slot-1) assert.ErrorContains(t, "expected state.slot 2 < slot 1", err) } + +func TestProcessSlotsUsingNextSlotCache(t *testing.T) { + ctx := context.Background() + s, _ := testutil.DeterministicGenesisState(t, 1) + r := []byte{'a'} + s, err := state.ProcessSlotsUsingNextSlotCache(ctx, s, r, 5) + require.NoError(t, err) + require.Equal(t, uint64(5), s.Slot()) +} diff --git a/beacon-chain/rpc/validator/proposer.go b/beacon-chain/rpc/validator/proposer.go index a0d23c6102..8f03ddd80d 100644 --- a/beacon-chain/rpc/validator/proposer.go +++ b/beacon-chain/rpc/validator/proposer.go @@ -68,9 +68,17 @@ func (vs *Server) GetBlock(ctx context.Context, req *ethpb.BlockRequest) (*ethpb if err != nil { return nil, status.Errorf(codes.Internal, "Could not get head state %v", err) } - head, err = state.ProcessSlots(ctx, head, req.Slot) - if err != nil { - return nil, status.Errorf(codes.Internal, "Could not advance slot to calculate proposer index: %v", err) + + if featureconfig.Get().EnableNextSlotStateCache { + head, err = state.ProcessSlotsUsingNextSlotCache(ctx, head, parentRoot, req.Slot) + if err != nil { + return nil, status.Errorf(codes.Internal, "Could not advance slots to calculate proposer index: %v", err) + } + } else { + head, err = state.ProcessSlots(ctx, head, req.Slot) + if err != nil { + return nil, status.Errorf(codes.Internal, "Could not advance slot to calculate proposer index: %v", err) + } } var eth1Data *ethpb.Eth1Data diff --git a/shared/featureconfig/config.go b/shared/featureconfig/config.go index 89c86f7e74..ffeb1b7c92 100644 --- a/shared/featureconfig/config.go +++ b/shared/featureconfig/config.go @@ -61,17 +61,18 @@ type Flags struct { DisableBroadcastSlashings bool // DisableBroadcastSlashings disables p2p broadcasting of proposer and attester slashings. // Cache toggles. - EnableSSZCache bool // EnableSSZCache see https://github.com/prysmaticlabs/prysm/pull/4558. - EnableEth1DataVoteCache bool // EnableEth1DataVoteCache; see https://github.com/prysmaticlabs/prysm/issues/3106. - EnableSlasherConnection bool // EnableSlasher enable retrieval of slashing events from a slasher instance. - UseCheckPointInfoCache bool // UseCheckPointInfoCache uses check point info cache to efficiently verify attestation signatures. - - KafkaBootstrapServers string // KafkaBootstrapServers to find kafka servers to stream blocks, attestations, etc. - AttestationAggregationStrategy string // AttestationAggregationStrategy defines aggregation strategy to be used when aggregating. + EnableSSZCache bool // EnableSSZCache see https://github.com/prysmaticlabs/prysm/pull/4558. + EnableEth1DataVoteCache bool // EnableEth1DataVoteCache; see https://github.com/prysmaticlabs/prysm/issues/3106. + EnableSlasherConnection bool // EnableSlasher enable retrieval of slashing events from a slasher instance. + UseCheckPointInfoCache bool // UseCheckPointInfoCache uses check point info cache to efficiently verify attestation signatures. + EnableNextSlotStateCache bool // EnableNextSlotStateCache enables next slot state cache to improve validator performance. // Bug fixes related flags. AttestTimely bool // AttestTimely fixes #8185. It is gated behind a flag to ensure beacon node's fix can safely roll out first. We'll invert this in v1.1.0. + KafkaBootstrapServers string // KafkaBootstrapServers to find kafka servers to stream blocks, attestations, etc. + AttestationAggregationStrategy string // AttestationAggregationStrategy defines aggregation strategy to be used when aggregating. + // KeystoreImportDebounceInterval specifies the time duration the validator waits to reload new keys if they have // changed on disk. This feature is for advanced use cases only. KeystoreImportDebounceInterval time.Duration @@ -195,6 +196,10 @@ func ConfigureBeaconChain(ctx *cli.Context) { log.Warn("Disabling slashing broadcasting to p2p network") cfg.DisableBroadcastSlashings = true } + if ctx.Bool(enableNextSlotStateCache.Name) { + log.Warn("Enabling next slot state cache") + cfg.EnableNextSlotStateCache = true + } Init(cfg) } diff --git a/shared/featureconfig/flags.go b/shared/featureconfig/flags.go index 4ee54c5f62..e13e07c5ba 100644 --- a/shared/featureconfig/flags.go +++ b/shared/featureconfig/flags.go @@ -110,11 +110,16 @@ var ( Name: "attest-timely", Usage: "Fixes validator can attest timely after current block processes. See #8185 for more details", } + enableNextSlotStateCache = &cli.BoolFlag{ + Name: "enable-next-slot-state-cache", + Usage: "Improves attesting and proposing efficiency by caching the next slot state at the end of the current slot", + } ) // devModeFlags holds list of flags that are set when development mode is on. var devModeFlags = []cli.Flag{ enableLargerGossipHistory, + enableNextSlotStateCache, } // ValidatorFlags contains a list of all the feature flags that apply to the validator client. @@ -160,6 +165,7 @@ var BeaconChainFlags = append(deprecatedFlags, []cli.Flag{ disablePruningDepositProofs, disableSyncBacktracking, disableBroadcastSlashingFlag, + enableNextSlotStateCache, }...) // E2EBeaconChainFlags contains a list of the beacon chain feature flags to be tested in E2E.