mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-05-02 03:02:54 -04:00
Release --initial-sync-cache-state (#4938)
Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
This commit is contained in:
@@ -8,7 +8,6 @@ import (
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/state"
|
||||
stateTrie "github.com/prysmaticlabs/prysm/beacon-chain/state"
|
||||
"github.com/prysmaticlabs/prysm/shared/bytesutil"
|
||||
"github.com/prysmaticlabs/prysm/shared/featureconfig"
|
||||
"github.com/prysmaticlabs/prysm/shared/params"
|
||||
"go.opencensus.io/trace"
|
||||
)
|
||||
@@ -76,18 +75,8 @@ func (s *Service) saveHead(ctx context.Context, headRoot [32]byte) error {
|
||||
// Get the new head state from cached state or DB.
|
||||
var newHeadState *state.BeaconState
|
||||
var exists bool
|
||||
if featureconfig.Get().InitSyncCacheState {
|
||||
newHeadState, exists = s.initSyncState[headRoot]
|
||||
if !exists {
|
||||
newHeadState, err = s.beaconDB.State(ctx, headRoot)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "could not retrieve head state in DB")
|
||||
}
|
||||
if newHeadState == nil {
|
||||
return errors.New("cannot save nil head state")
|
||||
}
|
||||
}
|
||||
} else {
|
||||
newHeadState, exists = s.initSyncState[headRoot]
|
||||
if !exists {
|
||||
newHeadState, err = s.beaconDB.State(ctx, headRoot)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "could not retrieve head state in DB")
|
||||
|
||||
@@ -15,7 +15,6 @@ import (
|
||||
stateTrie "github.com/prysmaticlabs/prysm/beacon-chain/state"
|
||||
"github.com/prysmaticlabs/prysm/shared/attestationutil"
|
||||
"github.com/prysmaticlabs/prysm/shared/bytesutil"
|
||||
"github.com/prysmaticlabs/prysm/shared/featureconfig"
|
||||
"github.com/sirupsen/logrus"
|
||||
"go.opencensus.io/trace"
|
||||
)
|
||||
@@ -197,14 +196,8 @@ func (s *Service) onBlockInitialSyncStateTransition(ctx context.Context, signed
|
||||
return errors.Wrapf(err, "could not insert block %d to fork choice store", b.Slot)
|
||||
}
|
||||
|
||||
if featureconfig.Get().InitSyncCacheState {
|
||||
s.initSyncState[root] = postState.Copy()
|
||||
s.filterBoundaryCandidates(ctx, root, postState)
|
||||
} else {
|
||||
if err := s.beaconDB.SaveState(ctx, postState, root); err != nil {
|
||||
return errors.Wrap(err, "could not save state")
|
||||
}
|
||||
}
|
||||
s.initSyncState[root] = postState.Copy()
|
||||
s.filterBoundaryCandidates(ctx, root, postState)
|
||||
|
||||
if flags.Get().EnableArchive {
|
||||
atts := signed.Block.Body.Attestations
|
||||
@@ -259,17 +252,15 @@ func (s *Service) onBlockInitialSyncStateTransition(ctx context.Context, signed
|
||||
return errors.Wrap(err, "could not save finalized checkpoint")
|
||||
}
|
||||
|
||||
if featureconfig.Get().InitSyncCacheState {
|
||||
numOfStates := len(s.boundaryRoots)
|
||||
if numOfStates > initialSyncCacheSize {
|
||||
if err = s.persistCachedStates(ctx, numOfStates); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if len(s.initSyncState) > maxCacheSize {
|
||||
s.pruneOldNonFinalizedStates()
|
||||
numOfStates := len(s.boundaryRoots)
|
||||
if numOfStates > initialSyncCacheSize {
|
||||
if err = s.persistCachedStates(ctx, numOfStates); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if len(s.initSyncState) > maxCacheSize {
|
||||
s.pruneOldNonFinalizedStates()
|
||||
}
|
||||
|
||||
// Epoch boundary bookkeeping such as logging epoch summaries.
|
||||
if postState.Slot() >= s.nextEpochBoundarySlot {
|
||||
@@ -284,11 +275,9 @@ func (s *Service) onBlockInitialSyncStateTransition(ctx context.Context, signed
|
||||
return err
|
||||
}
|
||||
|
||||
if featureconfig.Get().InitSyncCacheState {
|
||||
if helpers.IsEpochStart(postState.Slot()) {
|
||||
if err := s.beaconDB.SaveState(ctx, postState, root); err != nil {
|
||||
return errors.Wrap(err, "could not save state")
|
||||
}
|
||||
if helpers.IsEpochStart(postState.Slot()) {
|
||||
if err := s.beaconDB.SaveState(ctx, postState, root); err != nil {
|
||||
return errors.Wrap(err, "could not save state")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -13,7 +13,6 @@ import (
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/db/filters"
|
||||
stateTrie "github.com/prysmaticlabs/prysm/beacon-chain/state"
|
||||
"github.com/prysmaticlabs/prysm/shared/bytesutil"
|
||||
"github.com/prysmaticlabs/prysm/shared/featureconfig"
|
||||
"github.com/prysmaticlabs/prysm/shared/params"
|
||||
"github.com/prysmaticlabs/prysm/shared/traceutil"
|
||||
"github.com/sirupsen/logrus"
|
||||
@@ -58,36 +57,25 @@ func (s *Service) getBlockPreState(ctx context.Context, b *ethpb.BeaconBlock) (*
|
||||
|
||||
// verifyBlkPreState validates input block has a valid pre-state.
|
||||
func (s *Service) verifyBlkPreState(ctx context.Context, b *ethpb.BeaconBlock) (*stateTrie.BeaconState, error) {
|
||||
if featureconfig.Get().InitSyncCacheState {
|
||||
preState := s.initSyncState[bytesutil.ToBytes32(b.ParentRoot)]
|
||||
var err error
|
||||
if preState == nil {
|
||||
preState, err = s.beaconDB.State(ctx, bytesutil.ToBytes32(b.ParentRoot))
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "could not get pre state for slot %d", b.Slot)
|
||||
}
|
||||
if preState == nil {
|
||||
if bytes.Equal(s.finalizedCheckpt.Root, b.ParentRoot) {
|
||||
return nil, fmt.Errorf("pre state of slot %d does not exist", b.Slot)
|
||||
}
|
||||
preState, err = s.generateState(ctx, bytesutil.ToBytes32(s.finalizedCheckpt.Root), bytesutil.ToBytes32(b.ParentRoot))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return preState, nil // No copy needed from newly hydrated DB object.
|
||||
}
|
||||
return preState.Copy(), nil
|
||||
}
|
||||
|
||||
preState, err := s.beaconDB.State(ctx, bytesutil.ToBytes32(b.ParentRoot))
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "could not get pre state for slot %d", b.Slot)
|
||||
}
|
||||
preState := s.initSyncState[bytesutil.ToBytes32(b.ParentRoot)]
|
||||
var err error
|
||||
if preState == nil {
|
||||
return nil, fmt.Errorf("pre state of slot %d does not exist", b.Slot)
|
||||
preState, err = s.beaconDB.State(ctx, bytesutil.ToBytes32(b.ParentRoot))
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "could not get pre state for slot %d", b.Slot)
|
||||
}
|
||||
if preState == nil {
|
||||
if bytes.Equal(s.finalizedCheckpt.Root, b.ParentRoot) {
|
||||
return nil, fmt.Errorf("pre state of slot %d does not exist", b.Slot)
|
||||
}
|
||||
preState, err = s.generateState(ctx, bytesutil.ToBytes32(s.finalizedCheckpt.Root), bytesutil.ToBytes32(b.ParentRoot))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return preState, nil // No copy needed from newly hydrated DB object.
|
||||
}
|
||||
return preState, nil
|
||||
return preState.Copy(), nil
|
||||
}
|
||||
|
||||
// verifyBlkDescendant validates input block root is a descendant of the
|
||||
@@ -260,26 +248,24 @@ func (s *Service) updateJustified(ctx context.Context, state *stateTrie.BeaconSt
|
||||
s.justifiedCheckpt = cpt
|
||||
}
|
||||
|
||||
if featureconfig.Get().InitSyncCacheState {
|
||||
justifiedRoot := bytesutil.ToBytes32(cpt.Root)
|
||||
justifiedRoot := bytesutil.ToBytes32(cpt.Root)
|
||||
|
||||
justifiedState := s.initSyncState[justifiedRoot]
|
||||
// If justified state is nil, resume back to normal syncing process and save
|
||||
// justified check point.
|
||||
if justifiedState == nil {
|
||||
if s.beaconDB.HasState(ctx, justifiedRoot) {
|
||||
return s.beaconDB.SaveJustifiedCheckpoint(ctx, cpt)
|
||||
}
|
||||
justifiedState, err = s.generateState(ctx, bytesutil.ToBytes32(s.finalizedCheckpt.Root), justifiedRoot)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
return s.beaconDB.SaveJustifiedCheckpoint(ctx, cpt)
|
||||
}
|
||||
justifiedState := s.initSyncState[justifiedRoot]
|
||||
// If justified state is nil, resume back to normal syncing process and save
|
||||
// justified check point.
|
||||
if justifiedState == nil {
|
||||
if s.beaconDB.HasState(ctx, justifiedRoot) {
|
||||
return s.beaconDB.SaveJustifiedCheckpoint(ctx, cpt)
|
||||
}
|
||||
if err := s.beaconDB.SaveState(ctx, justifiedState, justifiedRoot); err != nil {
|
||||
return errors.Wrap(err, "could not save justified state")
|
||||
justifiedState, err = s.generateState(ctx, bytesutil.ToBytes32(s.finalizedCheckpt.Root), justifiedRoot)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
return s.beaconDB.SaveJustifiedCheckpoint(ctx, cpt)
|
||||
}
|
||||
}
|
||||
if err := s.beaconDB.SaveState(ctx, justifiedState, justifiedRoot); err != nil {
|
||||
return errors.Wrap(err, "could not save justified state")
|
||||
}
|
||||
|
||||
return s.beaconDB.SaveJustifiedCheckpoint(ctx, cpt)
|
||||
}
|
||||
@@ -287,9 +273,6 @@ func (s *Service) updateJustified(ctx context.Context, state *stateTrie.BeaconSt
|
||||
// This saves every finalized state in DB during initial sync, needed as part of optimization to
|
||||
// use cache state during initial sync in case of restart.
|
||||
func (s *Service) saveInitState(ctx context.Context, state *stateTrie.BeaconState) error {
|
||||
if !featureconfig.Get().InitSyncCacheState {
|
||||
return nil
|
||||
}
|
||||
cpt := state.FinalizedCheckpoint()
|
||||
finalizedRoot := bytesutil.ToBytes32(cpt.Root)
|
||||
fs := s.initSyncState[finalizedRoot]
|
||||
|
||||
@@ -17,7 +17,6 @@ import (
|
||||
stateTrie "github.com/prysmaticlabs/prysm/beacon-chain/state"
|
||||
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
|
||||
"github.com/prysmaticlabs/prysm/shared/bytesutil"
|
||||
"github.com/prysmaticlabs/prysm/shared/featureconfig"
|
||||
"github.com/prysmaticlabs/prysm/shared/params"
|
||||
"github.com/prysmaticlabs/prysm/shared/testutil"
|
||||
)
|
||||
@@ -80,7 +79,7 @@ func TestStore_OnBlock(t *testing.T) {
|
||||
name: "parent block root does not have a state",
|
||||
blk: ðpb.BeaconBlock{},
|
||||
s: st.Copy(),
|
||||
wantErrString: "pre state of slot 0 does not exist",
|
||||
wantErrString: "provided block root does not have block saved in the db",
|
||||
},
|
||||
{
|
||||
name: "block is from the feature",
|
||||
@@ -324,32 +323,6 @@ func TestCachedPreState_CanGetFromCache(t *testing.T) {
|
||||
b := ðpb.BeaconBlock{Slot: 1, ParentRoot: r[:]}
|
||||
service.initSyncState[r] = s
|
||||
|
||||
wanted := "pre state of slot 1 does not exist"
|
||||
if _, err := service.verifyBlkPreState(ctx, b); !strings.Contains(err.Error(), wanted) {
|
||||
t.Fatal("Not expected error")
|
||||
}
|
||||
}
|
||||
|
||||
func TestCachedPreState_CanGetFromCacheWithFeature(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
db := testDB.SetupDB(t)
|
||||
defer testDB.TeardownDB(t, db)
|
||||
config := &featureconfig.Flags{
|
||||
InitSyncCacheState: true,
|
||||
}
|
||||
featureconfig.Init(config)
|
||||
|
||||
cfg := &Config{BeaconDB: db}
|
||||
service, err := NewService(ctx, cfg)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
s, _ := stateTrie.InitializeFromProto(&pb.BeaconState{Slot: 1})
|
||||
r := [32]byte{'A'}
|
||||
b := ðpb.BeaconBlock{Slot: 1, ParentRoot: r[:]}
|
||||
service.initSyncState[r] = s
|
||||
|
||||
received, err := service.verifyBlkPreState(ctx, b)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
@@ -403,11 +376,6 @@ func TestSaveInitState_CanSaveDelete(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
config := &featureconfig.Flags{
|
||||
InitSyncCacheState: true,
|
||||
}
|
||||
featureconfig.Init(config)
|
||||
|
||||
for i := uint64(0); i < 64; i++ {
|
||||
b := ðpb.BeaconBlock{Slot: i}
|
||||
s, _ := stateTrie.InitializeFromProto(&pb.BeaconState{Slot: i})
|
||||
|
||||
@@ -202,21 +202,11 @@ func (s *Service) ReceiveBlockNoVerify(ctx context.Context, block *ethpb.SignedB
|
||||
return errors.Wrap(err, "could not get head root from cache")
|
||||
}
|
||||
|
||||
if featureconfig.Get().InitSyncCacheState {
|
||||
if !bytes.Equal(root[:], cachedHeadRoot) {
|
||||
if err := s.saveHeadNoDB(ctx, blockCopy, root); err != nil {
|
||||
err := errors.Wrap(err, "could not save head")
|
||||
traceutil.AnnotateError(span, err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if !bytes.Equal(root[:], cachedHeadRoot) {
|
||||
if err := s.saveHead(ctx, root); err != nil {
|
||||
err := errors.Wrap(err, "could not save head")
|
||||
traceutil.AnnotateError(span, err)
|
||||
return err
|
||||
}
|
||||
if !bytes.Equal(root[:], cachedHeadRoot) {
|
||||
if err := s.saveHeadNoDB(ctx, blockCopy, root); err != nil {
|
||||
err := errors.Wrap(err, "could not save head")
|
||||
traceutil.AnnotateError(span, err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -33,7 +33,6 @@ import (
|
||||
stateTrie "github.com/prysmaticlabs/prysm/beacon-chain/state"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/stategen"
|
||||
"github.com/prysmaticlabs/prysm/shared/bytesutil"
|
||||
"github.com/prysmaticlabs/prysm/shared/featureconfig"
|
||||
"github.com/prysmaticlabs/prysm/shared/params"
|
||||
"go.opencensus.io/trace"
|
||||
)
|
||||
@@ -122,16 +121,14 @@ func (s *Service) Start() {
|
||||
// For running initial sync with state cache, in an event of restart, we use
|
||||
// last finalized check point as start point to sync instead of head
|
||||
// state. This is because we no longer save state every slot during sync.
|
||||
if featureconfig.Get().InitSyncCacheState {
|
||||
cp, err := s.beaconDB.FinalizedCheckpoint(ctx)
|
||||
cp, err := s.beaconDB.FinalizedCheckpoint(ctx)
|
||||
if err != nil {
|
||||
log.Fatalf("Could not fetch finalized cp: %v", err)
|
||||
}
|
||||
if beaconState == nil {
|
||||
beaconState, err = s.beaconDB.State(ctx, bytesutil.ToBytes32(cp.Root))
|
||||
if err != nil {
|
||||
log.Fatalf("Could not fetch finalized cp: %v", err)
|
||||
}
|
||||
if beaconState == nil {
|
||||
beaconState, err = s.beaconDB.State(ctx, bytesutil.ToBytes32(cp.Root))
|
||||
if err != nil {
|
||||
log.Fatalf("Could not fetch beacon state: %v", err)
|
||||
}
|
||||
log.Fatalf("Could not fetch beacon state: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -36,7 +36,6 @@ type Flags struct {
|
||||
EnableBackupWebhook bool // EnableBackupWebhook to allow database backups to trigger from monitoring port /db/backup.
|
||||
PruneEpochBoundaryStates bool // PruneEpochBoundaryStates prunes the epoch boundary state before last finalized check point.
|
||||
EnableSnappyDBCompression bool // EnableSnappyDBCompression in the database.
|
||||
InitSyncCacheState bool // InitSyncCacheState caches state during initial sync.
|
||||
KafkaBootstrapServers string // KafkaBootstrapServers to find kafka servers to stream blocks, attestations, etc.
|
||||
ProtectProposer bool // ProtectProposer prevents the validator client from signing any proposals that would be considered a slashable offense.
|
||||
ProtectAttester bool // ProtectAttester prevents the validator client from signing any attestations that would be considered a slashable offense.
|
||||
@@ -125,10 +124,6 @@ func ConfigureBeaconChain(ctx *cli.Context) {
|
||||
log.Warn("Enabling experimental kafka streaming.")
|
||||
cfg.KafkaBootstrapServers = ctx.GlobalString(kafkaBootstrapServersFlag.Name)
|
||||
}
|
||||
if ctx.GlobalBool(initSyncCacheStateFlag.Name) {
|
||||
log.Warn("Enabled initial sync cache state mode.")
|
||||
cfg.InitSyncCacheState = true
|
||||
}
|
||||
if ctx.GlobalBool(enableSlasherFlag.Name) {
|
||||
log.Warn("Enable slasher connection.")
|
||||
cfg.EnableSlasherConnection = true
|
||||
|
||||
@@ -54,12 +54,6 @@ var (
|
||||
"and attestation's aggregated signatures. Without this flag, only the proposer " +
|
||||
"signature is verified until the node reaches the end of the finalized chain.",
|
||||
}
|
||||
initSyncCacheStateFlag = cli.BoolFlag{
|
||||
Name: "initial-sync-cache-state",
|
||||
Usage: "Save state in cache during initial sync. We currently save state in the DB during " +
|
||||
"initial sync and disk-IO is one of the biggest bottleneck. This still saves finalized state in DB " +
|
||||
"and start syncing from there",
|
||||
}
|
||||
enableSlasherFlag = cli.BoolFlag{
|
||||
Name: "enable-slasher",
|
||||
Usage: "Enables connection to a slasher service in order to retrieve slashable events. Slasher is connected to the beacon node using gRPC and " +
|
||||
@@ -210,6 +204,11 @@ var (
|
||||
Usage: deprecatedUsage,
|
||||
Hidden: true,
|
||||
}
|
||||
deprecatedInitSyncCacheStateFlag = cli.BoolFlag{
|
||||
Name: "initial-sync-cache-state",
|
||||
Usage: deprecatedUsage,
|
||||
Hidden: true,
|
||||
}
|
||||
)
|
||||
|
||||
var deprecatedFlags = []cli.Flag{
|
||||
@@ -233,6 +232,7 @@ var deprecatedFlags = []cli.Flag{
|
||||
deprecatedprotoArrayForkChoice,
|
||||
deprecatedForkchoiceAggregateAttestations,
|
||||
deprecatedEnableAttestationCacheFlag,
|
||||
deprecatedInitSyncCacheStateFlag,
|
||||
}
|
||||
|
||||
// ValidatorFlags contains a list of all the feature flags that apply to the validator client.
|
||||
@@ -259,7 +259,6 @@ var BeaconChainFlags = append(deprecatedFlags, []cli.Flag{
|
||||
enableSSZCache,
|
||||
enableEth1DataVoteCacheFlag,
|
||||
initSyncVerifyEverythingFlag,
|
||||
initSyncCacheStateFlag,
|
||||
skipBLSVerifyFlag,
|
||||
kafkaBootstrapServersFlag,
|
||||
enableBackupWebhookFlag,
|
||||
@@ -278,7 +277,6 @@ var E2EBeaconChainFlags = []string{
|
||||
"--cache-filtered-block-tree",
|
||||
"--enable-skip-slots-cache",
|
||||
"--enable-eth1-data-vote-cache",
|
||||
"--initial-sync-cache-state",
|
||||
"--proto-array-forkchoice",
|
||||
"--enable-byte-mempool",
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user