diff --git a/beacon-chain/blockchain/init_sync_process_block_test.go b/beacon-chain/blockchain/init_sync_process_block_test.go index ae870403b5..fdf0f7fcde 100644 --- a/beacon-chain/blockchain/init_sync_process_block_test.go +++ b/beacon-chain/blockchain/init_sync_process_block_test.go @@ -6,6 +6,7 @@ import ( ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" "github.com/prysmaticlabs/go-ssz" + "github.com/prysmaticlabs/prysm/beacon-chain/cache" "github.com/prysmaticlabs/prysm/beacon-chain/core/blocks" "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" "github.com/prysmaticlabs/prysm/beacon-chain/core/state" @@ -210,7 +211,7 @@ func TestPruneNonBoundary_CanPrune(t *testing.T) { func TestGenerateState_CorrectlyGenerated(t *testing.T) { db := testDB.SetupDB(t) defer testDB.TeardownDB(t, db) - cfg := &Config{BeaconDB: db, StateGen: stategen.New(db)} + cfg := &Config{BeaconDB: db, StateGen: stategen.New(db, cache.NewStateSummaryCache())} service, err := NewService(context.Background(), cfg) if err != nil { t.Fatal(err) diff --git a/beacon-chain/blockchain/process_block.go b/beacon-chain/blockchain/process_block.go index cff7ce5830..81e766e786 100644 --- a/beacon-chain/blockchain/process_block.go +++ b/beacon-chain/blockchain/process_block.go @@ -140,12 +140,13 @@ func (s *Service) onBlock(ctx context.Context, signed *ethpb.SignedBeaconBlock) } if featureconfig.Get().NewStateMgmt { - finalizedState, err := s.stateGen.StateByRoot(ctx, fRoot) + fRoot := bytesutil.ToBytes32(postState.FinalizedCheckpoint().Root) + fBlock, err := s.beaconDB.Block(ctx, fRoot) if err != nil { - return nil, err + return nil, errors.Wrap(err, "could not get finalized block to migrate") } - if err := s.stateGen.MigrateToCold(ctx, finalizedState, fRoot); err != nil { - return nil, err + if err := s.stateGen.MigrateToCold(ctx, fBlock.Block.Slot, fRoot); err != nil { + return nil, errors.Wrap(err, "could not migrate to cold") } } } @@ -300,13 +301,12 @@ func (s *Service) onBlockInitialSyncStateTransition(ctx context.Context, signed if featureconfig.Get().NewStateMgmt { fRoot := bytesutil.ToBytes32(postState.FinalizedCheckpoint().Root) - finalizedState, err := s.stateGen.StateByRoot(ctx, fRoot) + fBlock, err := s.beaconDB.Block(ctx, fRoot) if err != nil { - return errors.Wrap(err, "could not get state by root for migration") + return errors.Wrap(err, "could not get finalized block to migrate") } - if err := s.stateGen.MigrateToCold(ctx, finalizedState, fRoot); err != nil { - return errors.Wrap(err, "could not migrate with new finalized root") - + if err := s.stateGen.MigrateToCold(ctx, fBlock.Block.Slot, fRoot); err != nil { + return errors.Wrap(err, "could not migrate to cold") } } } diff --git a/beacon-chain/cache/BUILD.bazel b/beacon-chain/cache/BUILD.bazel index feed8782ca..9b508498b3 100644 --- a/beacon-chain/cache/BUILD.bazel +++ b/beacon-chain/cache/BUILD.bazel @@ -11,11 +11,16 @@ go_library( "eth1_data.go", "hot_state_cache.go", "skip_slot_cache.go", + "state_summary.go", ], importpath = "github.com/prysmaticlabs/prysm/beacon-chain/cache", - visibility = ["//beacon-chain:__subpackages__"], + visibility = [ + "//beacon-chain:__subpackages__", + "//tools:__subpackages__", + ], deps = [ "//beacon-chain/state:go_default_library", + "//proto/beacon/p2p/v1:go_default_library", "//shared/featureconfig:go_default_library", "//shared/hashutil:go_default_library", "//shared/params:go_default_library", diff --git a/beacon-chain/cache/state_summary.go b/beacon-chain/cache/state_summary.go new file mode 100644 index 0000000000..a7cda4f07d --- /dev/null +++ b/beacon-chain/cache/state_summary.go @@ -0,0 +1,65 @@ +package cache + +import ( + "sync" + + pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" +) + +// StateSummaryCache caches state summary object. +type StateSummaryCache struct { + initSyncStateSummaries map[[32]byte]*pb.StateSummary + initSyncStateSummariesLock sync.RWMutex +} + +// NewStateSummaryCache creates a new state summary cache. +func NewStateSummaryCache() *StateSummaryCache { + return &StateSummaryCache{ + initSyncStateSummaries: make(map[[32]byte]*pb.StateSummary), + } +} + +// Put saves a state summary to the initial sync state summaries cache. +func (s *StateSummaryCache) Put(r [32]byte, b *pb.StateSummary) { + s.initSyncStateSummariesLock.Lock() + defer s.initSyncStateSummariesLock.Unlock() + s.initSyncStateSummaries[r] = b +} + +// Has checks if a state summary exists in the initial sync state summaries cache using the root +// of the block. +func (s *StateSummaryCache) Has(r [32]byte) bool { + s.initSyncStateSummariesLock.RLock() + defer s.initSyncStateSummariesLock.RUnlock() + _, ok := s.initSyncStateSummaries[r] + return ok +} + +// Get retrieves a state summary from the initial sync state summaries cache using the root of +// the block. +func (s *StateSummaryCache) Get(r [32]byte) *pb.StateSummary { + s.initSyncStateSummariesLock.RLock() + defer s.initSyncStateSummariesLock.RUnlock() + b := s.initSyncStateSummaries[r] + return b +} + +// GetAll retrieves all the beacon state summaries from the initial sync state summaries cache, the returned +// state summaries are unordered. +func (s *StateSummaryCache) GetAll() []*pb.StateSummary { + s.initSyncStateSummariesLock.RLock() + defer s.initSyncStateSummariesLock.RUnlock() + + blks := make([]*pb.StateSummary, 0, len(s.initSyncStateSummaries)) + for _, b := range s.initSyncStateSummaries { + blks = append(blks, b) + } + return blks +} + +// Clear clears out the initial sync state summaries cache. +func (s *StateSummaryCache) Clear() { + s.initSyncStateSummariesLock.Lock() + defer s.initSyncStateSummariesLock.Unlock() + s.initSyncStateSummaries = make(map[[32]byte]*pb.StateSummary) +} diff --git a/beacon-chain/db/BUILD.bazel b/beacon-chain/db/BUILD.bazel index 11c0ee81f7..dbebc3618e 100644 --- a/beacon-chain/db/BUILD.bazel +++ b/beacon-chain/db/BUILD.bazel @@ -26,6 +26,7 @@ go_library( "//tools:__subpackages__", ], deps = [ + "//beacon-chain/cache:go_default_library", "//beacon-chain/db/iface:go_default_library", "//beacon-chain/db/kv:go_default_library", "@com_github_sirupsen_logrus//:go_default_library", diff --git a/beacon-chain/db/db.go b/beacon-chain/db/db.go index 1d0ccf7fe4..95b19a6050 100644 --- a/beacon-chain/db/db.go +++ b/beacon-chain/db/db.go @@ -1,8 +1,11 @@ package db -import "github.com/prysmaticlabs/prysm/beacon-chain/db/kv" +import ( + "github.com/prysmaticlabs/prysm/beacon-chain/cache" + "github.com/prysmaticlabs/prysm/beacon-chain/db/kv" +) // NewDB initializes a new DB. -func NewDB(dirPath string) (Database, error) { - return kv.NewKVStore(dirPath) +func NewDB(dirPath string, stateSummaryCache *cache.StateSummaryCache) (Database, error) { + return kv.NewKVStore(dirPath, stateSummaryCache) } diff --git a/beacon-chain/db/db_kafka_wrapped.go b/beacon-chain/db/db_kafka_wrapped.go index ee59b093cb..d13faa91c1 100644 --- a/beacon-chain/db/db_kafka_wrapped.go +++ b/beacon-chain/db/db_kafka_wrapped.go @@ -1,13 +1,14 @@ package db import ( + "github.com/prysmaticlabs/prysm/beacon-chain/cache" "github.com/prysmaticlabs/prysm/beacon-chain/db/kafka" "github.com/prysmaticlabs/prysm/beacon-chain/db/kv" ) // NewDB initializes a new DB with kafka wrapper. -func NewDB(dirPath string) (Database, error) { - db, err := kv.NewKVStore(dirPath) +func NewDB(dirPath string, stateSummaryCache *cache.StateSummaryCache) (Database, error) { + db, err := kv.NewKVStore(dirPath, stateSummaryCache) if err != nil { return nil, err } diff --git a/beacon-chain/db/iface/interface.go b/beacon-chain/db/iface/interface.go index 3267645fa4..37f00aa628 100644 --- a/beacon-chain/db/iface/interface.go +++ b/beacon-chain/db/iface/interface.go @@ -90,6 +90,7 @@ type NoHeadAccessDatabase interface { DeleteState(ctx context.Context, blockRoot [32]byte) error DeleteStates(ctx context.Context, blockRoots [][32]byte) error SaveStateSummary(ctx context.Context, summary *ethereum_beacon_p2p_v1.StateSummary) error + SaveStateSummaries(ctx context.Context, summaries []*ethereum_beacon_p2p_v1.StateSummary) error // Slashing operations. SaveProposerSlashing(ctx context.Context, slashing *eth.ProposerSlashing) error SaveAttesterSlashing(ctx context.Context, slashing *eth.AttesterSlashing) error diff --git a/beacon-chain/db/kafka/passthrough.go b/beacon-chain/db/kafka/passthrough.go index 9796e6bc91..431e9ac8f4 100644 --- a/beacon-chain/db/kafka/passthrough.go +++ b/beacon-chain/db/kafka/passthrough.go @@ -238,6 +238,11 @@ func (e Exporter) SaveStateSummary(ctx context.Context, summary *pb.StateSummary return e.db.SaveStateSummary(ctx, summary) } +// SaveStateSummaries -- passthrough. +func (e Exporter) SaveStateSummaries(ctx context.Context, summaries []*pb.StateSummary) error { + return e.db.SaveStateSummaries(ctx, summaries) +} + // SaveStates -- passthrough. func (e Exporter) SaveStates(ctx context.Context, states []*state.BeaconState, blockRoots [][32]byte) error { return e.db.SaveStates(ctx, states, blockRoots) diff --git a/beacon-chain/db/kv/BUILD.bazel b/beacon-chain/db/kv/BUILD.bazel index 7e87c34525..3028065ad7 100644 --- a/beacon-chain/db/kv/BUILD.bazel +++ b/beacon-chain/db/kv/BUILD.bazel @@ -26,6 +26,7 @@ go_library( importpath = "github.com/prysmaticlabs/prysm/beacon-chain/db/kv", visibility = ["//beacon-chain:__subpackages__"], deps = [ + "//beacon-chain/cache:go_default_library", "//beacon-chain/core/helpers:go_default_library", "//beacon-chain/db/filters:go_default_library", "//beacon-chain/db/iface:go_default_library", @@ -74,6 +75,7 @@ go_test( ], embed = [":go_default_library"], deps = [ + "//beacon-chain/cache:go_default_library", "//beacon-chain/db/filters:go_default_library", "//beacon-chain/state:go_default_library", "//proto/beacon/p2p/v1:go_default_library", diff --git a/beacon-chain/db/kv/blocks.go b/beacon-chain/db/kv/blocks.go index a07a0f1371..7cd956a85f 100644 --- a/beacon-chain/db/kv/blocks.go +++ b/beacon-chain/db/kv/blocks.go @@ -263,7 +263,7 @@ func (k *Store) SaveHeadBlockRoot(ctx context.Context, blockRoot [32]byte) error defer span.End() return k.db.Update(func(tx *bolt.Tx) error { if featureconfig.Get().NewStateMgmt { - if tx.Bucket(stateSummaryBucket).Get(blockRoot[:]) == nil { + if tx.Bucket(stateSummaryBucket).Get(blockRoot[:]) == nil && !k.stateSummaryCache.Has(blockRoot) { return errors.New("no state summary found with head block root") } } else { diff --git a/beacon-chain/db/kv/checkpoint.go b/beacon-chain/db/kv/checkpoint.go index 0018d6ad01..0b91a8d5ce 100644 --- a/beacon-chain/db/kv/checkpoint.go +++ b/beacon-chain/db/kv/checkpoint.go @@ -5,6 +5,7 @@ import ( "errors" ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" + "github.com/prysmaticlabs/prysm/shared/bytesutil" "github.com/prysmaticlabs/prysm/shared/featureconfig" "github.com/prysmaticlabs/prysm/shared/traceutil" bolt "go.etcd.io/bbolt" @@ -65,7 +66,7 @@ func (k *Store) SaveJustifiedCheckpoint(ctx context.Context, checkpoint *ethpb.C return k.db.Update(func(tx *bolt.Tx) error { bucket := tx.Bucket(checkpointBucket) if featureconfig.Get().NewStateMgmt { - if tx.Bucket(stateSummaryBucket).Get(checkpoint.Root) == nil { + if tx.Bucket(stateSummaryBucket).Get(checkpoint.Root) == nil && !k.stateSummaryCache.Has(bytesutil.ToBytes32(checkpoint.Root)) { return errors.New("missing state summary for finalized root") } } else { @@ -93,7 +94,7 @@ func (k *Store) SaveFinalizedCheckpoint(ctx context.Context, checkpoint *ethpb.C return k.db.Update(func(tx *bolt.Tx) error { bucket := tx.Bucket(checkpointBucket) if featureconfig.Get().NewStateMgmt { - if tx.Bucket(stateSummaryBucket).Get(checkpoint.Root) == nil { + if tx.Bucket(stateSummaryBucket).Get(checkpoint.Root) == nil && !k.stateSummaryCache.Has(bytesutil.ToBytes32(checkpoint.Root)) { return errors.New("missing state summary for finalized root") } } else { diff --git a/beacon-chain/db/kv/kv.go b/beacon-chain/db/kv/kv.go index 030c5cad40..4dff30d297 100644 --- a/beacon-chain/db/kv/kv.go +++ b/beacon-chain/db/kv/kv.go @@ -10,6 +10,7 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" prombolt "github.com/prysmaticlabs/prombbolt" + "github.com/prysmaticlabs/prysm/beacon-chain/cache" "github.com/prysmaticlabs/prysm/beacon-chain/db/iface" bolt "go.etcd.io/bbolt" ) @@ -38,12 +39,13 @@ type Store struct { validatorIndexCache *ristretto.Cache stateSlotBitLock sync.Mutex blockSlotBitLock sync.Mutex + stateSummaryCache *cache.StateSummaryCache } // NewKVStore initializes a new boltDB key-value store at the directory // path specified, creates the kv-buckets based on the schema, and stores // an open connection db object as a property of the Store struct. -func NewKVStore(dirPath string) (*Store, error) { +func NewKVStore(dirPath string, stateSummaryCache *cache.StateSummaryCache) (*Store, error) { if err := os.MkdirAll(dirPath, 0700); err != nil { return nil, err } @@ -79,6 +81,7 @@ func NewKVStore(dirPath string) (*Store, error) { databasePath: dirPath, blockCache: blockCache, validatorIndexCache: validatorCache, + stateSummaryCache: stateSummaryCache, } if err := kv.db.Update(func(tx *bolt.Tx) error { diff --git a/beacon-chain/db/kv/kv_test.go b/beacon-chain/db/kv/kv_test.go index d08cfa6c3e..dd719fbb8c 100644 --- a/beacon-chain/db/kv/kv_test.go +++ b/beacon-chain/db/kv/kv_test.go @@ -8,6 +8,7 @@ import ( "path" "testing" + "github.com/prysmaticlabs/prysm/beacon-chain/cache" "github.com/prysmaticlabs/prysm/shared/testutil" ) @@ -21,7 +22,7 @@ func setupDB(t testing.TB) *Store { if err := os.RemoveAll(path); err != nil { t.Fatalf("Failed to remove directory: %v", err) } - db, err := NewKVStore(path) + db, err := NewKVStore(path, cache.NewStateSummaryCache()) if err != nil { t.Fatalf("Failed to instantiate DB: %v", err) } diff --git a/beacon-chain/db/kv/state_summary.go b/beacon-chain/db/kv/state_summary.go index a4b6317832..2f9f2ffa5e 100644 --- a/beacon-chain/db/kv/state_summary.go +++ b/beacon-chain/db/kv/state_summary.go @@ -23,6 +23,26 @@ func (k *Store) SaveStateSummary(ctx context.Context, summary *pb.StateSummary) }) } +// SaveStateSummaries saves state summary objects to the DB. +func (k *Store) SaveStateSummaries(ctx context.Context, summaries []*pb.StateSummary) error { + ctx, span := trace.StartSpan(ctx, "BeaconDB.SaveStateSummaries") + defer span.End() + + return k.db.Update(func(tx *bolt.Tx) error { + bucket := tx.Bucket(stateSummaryBucket) + for _, summary := range summaries { + enc, err := encode(summary) + if err != nil { + return err + } + if err := bucket.Put(summary.Root, enc); err != nil { + return err + } + } + return nil + }) +} + // StateSummary returns the state summary object from the db using input block root. func (k *Store) StateSummary(ctx context.Context, blockRoot [32]byte) (*pb.StateSummary, error) { ctx, span := trace.StartSpan(ctx, "BeaconDB.StateSummary") diff --git a/beacon-chain/db/testing/BUILD.bazel b/beacon-chain/db/testing/BUILD.bazel index 2ca28958de..3df156edb9 100644 --- a/beacon-chain/db/testing/BUILD.bazel +++ b/beacon-chain/db/testing/BUILD.bazel @@ -7,6 +7,7 @@ go_library( importpath = "github.com/prysmaticlabs/prysm/beacon-chain/db/testing", visibility = ["//beacon-chain:__subpackages__"], deps = [ + "//beacon-chain/cache:go_default_library", "//beacon-chain/db:go_default_library", "//beacon-chain/db/kv:go_default_library", "//shared/testutil:go_default_library", diff --git a/beacon-chain/db/testing/setup_db.go b/beacon-chain/db/testing/setup_db.go index 8e0c347b9e..c1450767a8 100644 --- a/beacon-chain/db/testing/setup_db.go +++ b/beacon-chain/db/testing/setup_db.go @@ -8,6 +8,7 @@ import ( "path" "testing" + "github.com/prysmaticlabs/prysm/beacon-chain/cache" "github.com/prysmaticlabs/prysm/beacon-chain/db" "github.com/prysmaticlabs/prysm/beacon-chain/db/kv" "github.com/prysmaticlabs/prysm/shared/testutil" @@ -23,7 +24,7 @@ func SetupDB(t testing.TB) db.Database { if err := os.RemoveAll(p); err != nil { t.Fatalf("failed to remove directory: %v", err) } - s, err := kv.NewKVStore(p) + s, err := kv.NewKVStore(p, cache.NewStateSummaryCache()) if err != nil { t.Fatal(err) } diff --git a/beacon-chain/node/BUILD.bazel b/beacon-chain/node/BUILD.bazel index 4c4a78a334..ee8ce766ce 100644 --- a/beacon-chain/node/BUILD.bazel +++ b/beacon-chain/node/BUILD.bazel @@ -8,6 +8,7 @@ go_library( deps = [ "//beacon-chain/archiver:go_default_library", "//beacon-chain/blockchain:go_default_library", + "//beacon-chain/cache:go_default_library", "//beacon-chain/cache/depositcache:go_default_library", "//beacon-chain/db:go_default_library", "//beacon-chain/flags:go_default_library", diff --git a/beacon-chain/node/node.go b/beacon-chain/node/node.go index b730fd823f..f09af7a3d7 100644 --- a/beacon-chain/node/node.go +++ b/beacon-chain/node/node.go @@ -18,6 +18,7 @@ import ( "github.com/pkg/errors" "github.com/prysmaticlabs/prysm/beacon-chain/archiver" "github.com/prysmaticlabs/prysm/beacon-chain/blockchain" + "github.com/prysmaticlabs/prysm/beacon-chain/cache" "github.com/prysmaticlabs/prysm/beacon-chain/cache/depositcache" "github.com/prysmaticlabs/prysm/beacon-chain/db" "github.com/prysmaticlabs/prysm/beacon-chain/flags" @@ -58,20 +59,21 @@ const testSkipPowFlag = "test-skip-pow" // full PoS node. It handles the lifecycle of the entire system and registers // services to a service registry. type BeaconNode struct { - ctx *cli.Context - services *shared.ServiceRegistry - lock sync.RWMutex - stop chan struct{} // Channel to wait for termination notifications. - db db.Database - attestationPool attestations.Pool - exitPool *voluntaryexits.Pool - slashingsPool *slashings.Pool - depositCache *depositcache.DepositCache - stateFeed *event.Feed - blockFeed *event.Feed - opFeed *event.Feed - forkChoiceStore forkchoice.ForkChoicer - stateGen *stategen.State + ctx *cli.Context + services *shared.ServiceRegistry + lock sync.RWMutex + stop chan struct{} // Channel to wait for termination notifications. + db db.Database + stateSummaryCache *cache.StateSummaryCache + attestationPool attestations.Pool + exitPool *voluntaryexits.Pool + slashingsPool *slashings.Pool + depositCache *depositcache.DepositCache + stateFeed *event.Feed + blockFeed *event.Feed + opFeed *event.Feed + forkChoiceStore forkchoice.ForkChoicer + stateGen *stategen.State } // NewBeaconNode creates a new node instance, sets up configuration options, and registers @@ -92,15 +94,16 @@ func NewBeaconNode(ctx *cli.Context) (*BeaconNode, error) { registry := shared.NewServiceRegistry() beacon := &BeaconNode{ - ctx: ctx, - services: registry, - stop: make(chan struct{}), - stateFeed: new(event.Feed), - blockFeed: new(event.Feed), - opFeed: new(event.Feed), - attestationPool: attestations.NewPool(), - exitPool: voluntaryexits.NewPool(), - slashingsPool: slashings.NewPool(), + ctx: ctx, + services: registry, + stop: make(chan struct{}), + stateFeed: new(event.Feed), + blockFeed: new(event.Feed), + opFeed: new(event.Feed), + attestationPool: attestations.NewPool(), + exitPool: voluntaryexits.NewPool(), + slashingsPool: slashings.NewPool(), + stateSummaryCache: cache.NewStateSummaryCache(), } if err := beacon.startDB(ctx); err != nil { @@ -233,7 +236,7 @@ func (b *BeaconNode) startDB(ctx *cli.Context) error { clearDB := ctx.Bool(cmd.ClearDB.Name) forceClearDB := ctx.Bool(cmd.ForceClearDB.Name) - d, err := db.NewDB(dbPath) + d, err := db.NewDB(dbPath, b.stateSummaryCache) if err != nil { return err } @@ -252,7 +255,7 @@ func (b *BeaconNode) startDB(ctx *cli.Context) error { if err := d.ClearDB(); err != nil { return err } - d, err = db.NewDB(dbPath) + d, err = db.NewDB(dbPath, b.stateSummaryCache) if err != nil { return err } @@ -264,7 +267,7 @@ func (b *BeaconNode) startDB(ctx *cli.Context) error { } func (b *BeaconNode) startStateGen() { - b.stateGen = stategen.New(b.db) + b.stateGen = stategen.New(b.db, b.stateSummaryCache) } func (b *BeaconNode) registerP2P(ctx *cli.Context) error { @@ -437,6 +440,7 @@ func (b *BeaconNode) registerSyncService(ctx *cli.Context) error { AttPool: b.attestationPool, ExitPool: b.exitPool, SlashingPool: b.slashingsPool, + StateSummaryCache: b.stateSummaryCache, }) return b.services.RegisterService(rs) diff --git a/beacon-chain/state/stategen/BUILD.bazel b/beacon-chain/state/stategen/BUILD.bazel index 1aa72fcfe6..8cfd418311 100644 --- a/beacon-chain/state/stategen/BUILD.bazel +++ b/beacon-chain/state/stategen/BUILD.bazel @@ -47,6 +47,7 @@ go_test( ], embed = [":go_default_library"], deps = [ + "//beacon-chain/cache:go_default_library", "//beacon-chain/core/blocks:go_default_library", "//beacon-chain/db:go_default_library", "//beacon-chain/db/testing:go_default_library", diff --git a/beacon-chain/state/stategen/cold.go b/beacon-chain/state/stategen/cold.go index bdae6ea03e..60ffa3d60d 100644 --- a/beacon-chain/state/stategen/cold.go +++ b/beacon-chain/state/stategen/cold.go @@ -44,12 +44,9 @@ func (s *State) loadColdStateByRoot(ctx context.Context, blockRoot [32]byte) (*s ctx, span := trace.StartSpan(ctx, "stateGen.loadColdStateByRoot") defer span.End() - summary, err := s.beaconDB.StateSummary(ctx, blockRoot) + summary, err := s.stateSummary(ctx, blockRoot) if err != nil { - return nil, err - } - if summary == nil { - return nil, errUnknownStateSummary + return nil, errors.Wrap(err, "could not get state summary") } // Use the archived point state if the summary slot lies on top of the archived point. @@ -195,13 +192,10 @@ func (s *State) blockRootSlot(ctx context.Context, blockRoot [32]byte) (uint64, ctx, span := trace.StartSpan(ctx, "stateGen.blockRootSlot") defer span.End() - if s.beaconDB.HasStateSummary(ctx, blockRoot) { - summary, err := s.beaconDB.StateSummary(ctx, blockRoot) + if s.StateSummaryExists(ctx, blockRoot) { + summary, err := s.stateSummary(ctx, blockRoot) if err != nil { - return 0, nil - } - if summary == nil { - return 0, errUnknownStateSummary + return 0, err } return summary.Slot, nil } diff --git a/beacon-chain/state/stategen/cold_test.go b/beacon-chain/state/stategen/cold_test.go index ab060c63bb..6fb2508d07 100644 --- a/beacon-chain/state/stategen/cold_test.go +++ b/beacon-chain/state/stategen/cold_test.go @@ -6,6 +6,7 @@ import ( ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" "github.com/prysmaticlabs/go-ssz" + "github.com/prysmaticlabs/prysm/beacon-chain/cache" testDB "github.com/prysmaticlabs/prysm/beacon-chain/db/testing" pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" "github.com/prysmaticlabs/prysm/shared/testutil" @@ -16,7 +17,7 @@ func TestSaveColdState_NonArchivedPoint(t *testing.T) { db := testDB.SetupDB(t) defer testDB.TeardownDB(t, db) - service := New(db) + service := New(db, cache.NewStateSummaryCache()) service.slotsPerArchivedPoint = 2 beaconState, _ := testutil.DeterministicGenesisState(t, 32) beaconState.SetSlot(1) @@ -307,7 +308,7 @@ func TestBlockRootSlot_Exists(t *testing.T) { db := testDB.SetupDB(t) defer testDB.TeardownDB(t, db) - service := New(db) + service := New(db, cache.NewStateSummaryCache()) bRoot := [32]byte{'A'} bSlot := uint64(100) if err := service.beaconDB.SaveStateSummary(ctx, &pb.StateSummary{ @@ -332,7 +333,7 @@ func TestBlockRootSlot_CanRecoverAndSave(t *testing.T) { db := testDB.SetupDB(t) defer testDB.TeardownDB(t, db) - service := New(db) + service := New(db, cache.NewStateSummaryCache()) bSlot := uint64(100) b := ðpb.SignedBeaconBlock{Block: ðpb.BeaconBlock{Slot: bSlot}} bRoot, _ := ssz.HashTreeRoot(b.Block) diff --git a/beacon-chain/state/stategen/getter.go b/beacon-chain/state/stategen/getter.go index 4f53f717b7..3b71430b50 100644 --- a/beacon-chain/state/stategen/getter.go +++ b/beacon-chain/state/stategen/getter.go @@ -5,6 +5,7 @@ import ( "github.com/pkg/errors" "github.com/prysmaticlabs/prysm/beacon-chain/state" + pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" "github.com/prysmaticlabs/prysm/shared/params" "go.opencensus.io/trace" ) @@ -52,5 +53,24 @@ func (s *State) StateBySlot(ctx context.Context, slot uint64) (*state.BeaconStat // StateSummaryExists returns true if the corresponding state of the input block either // exists in the DB or it can be generated by state gen. func (s *State) StateSummaryExists(ctx context.Context, blockRoot [32]byte) bool { - return s.beaconDB.HasStateSummary(ctx, blockRoot) + return s.beaconDB.HasStateSummary(ctx, blockRoot) || s.stateSummaryCache.Has(blockRoot) +} + +// This returns the state summary object of a given block root, it first checks the cache +// then checks the DB. An error is returned if state summary object is nil. +func (s *State) stateSummary(ctx context.Context, blockRoot [32]byte) (*pb.StateSummary, error) { + var summary *pb.StateSummary + var err error + if s.stateSummaryCache.Has(blockRoot) { + summary = s.stateSummaryCache.Get(blockRoot) + } else { + summary, err = s.beaconDB.StateSummary(ctx, blockRoot) + if err != nil { + return nil, err + } + } + if summary == nil { + return nil, errUnknownStateSummary + } + return summary, nil } diff --git a/beacon-chain/state/stategen/getter_test.go b/beacon-chain/state/stategen/getter_test.go index 48e287246c..43e30604ab 100644 --- a/beacon-chain/state/stategen/getter_test.go +++ b/beacon-chain/state/stategen/getter_test.go @@ -7,6 +7,7 @@ import ( "github.com/gogo/protobuf/proto" ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" "github.com/prysmaticlabs/go-ssz" + "github.com/prysmaticlabs/prysm/beacon-chain/cache" testDB "github.com/prysmaticlabs/prysm/beacon-chain/db/testing" pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" "github.com/prysmaticlabs/prysm/shared/params" @@ -18,7 +19,7 @@ func TestStateByRoot_ColdState(t *testing.T) { db := testDB.SetupDB(t) defer testDB.TeardownDB(t, db) - service := New(db) + service := New(db, cache.NewStateSummaryCache()) service.splitInfo.slot = 2 service.slotsPerArchivedPoint = 1 @@ -53,7 +54,7 @@ func TestStateByRoot_HotStateDB(t *testing.T) { db := testDB.SetupDB(t) defer testDB.TeardownDB(t, db) - service := New(db) + service := New(db, cache.NewStateSummaryCache()) beaconState, _ := testutil.DeterministicGenesisState(t, 32) blk := ðpb.SignedBeaconBlock{Block: ðpb.BeaconBlock{}} @@ -63,15 +64,15 @@ func TestStateByRoot_HotStateDB(t *testing.T) { t.Fatal(err) } targetSlot := uint64(10) - + targetRoot := [32]byte{'a'} if err := service.beaconDB.SaveStateSummary(ctx, &pb.StateSummary{ Slot: targetSlot, - Root: blkRoot[:], + Root: targetRoot[:], }); err != nil { t.Fatal(err) } - loadedState, err := service.StateByRoot(ctx, blkRoot) + loadedState, err := service.StateByRoot(ctx, targetRoot) if err != nil { t.Fatal(err) } @@ -85,7 +86,7 @@ func TestStateByRoot_HotStateCached(t *testing.T) { db := testDB.SetupDB(t) defer testDB.TeardownDB(t, db) - service := New(db) + service := New(db, cache.NewStateSummaryCache()) beaconState, _ := testutil.DeterministicGenesisState(t, 32) r := [32]byte{'A'} @@ -110,7 +111,7 @@ func TestStateBySlot_ColdState(t *testing.T) { db := testDB.SetupDB(t) defer testDB.TeardownDB(t, db) - service := New(db) + service := New(db, cache.NewStateSummaryCache()) service.slotsPerArchivedPoint = params.BeaconConfig().SlotsPerEpoch * 2 service.splitInfo.slot = service.slotsPerArchivedPoint + 1 @@ -155,7 +156,7 @@ func TestStateBySlot_HotStateDB(t *testing.T) { db := testDB.SetupDB(t) defer testDB.TeardownDB(t, db) - service := New(db) + service := New(db, cache.NewStateSummaryCache()) beaconState, _ := testutil.DeterministicGenesisState(t, 32) b := ðpb.SignedBeaconBlock{Block: ðpb.BeaconBlock{}} @@ -177,3 +178,45 @@ func TestStateBySlot_HotStateDB(t *testing.T) { t.Error("Did not correctly load state") } } + +func TestStateSummary_CanGetFromCacheOrDB(t *testing.T) { + ctx := context.Background() + db := testDB.SetupDB(t) + defer testDB.TeardownDB(t, db) + + service := New(db, cache.NewStateSummaryCache()) + + r := [32]byte{'a'} + summary := &pb.StateSummary{Slot: 100} + _, err := service.stateSummary(ctx, r) + if err != errUnknownStateSummary { + t.Fatal("Did not get wanted error") + } + + service.stateSummaryCache.Put(r, summary) + got, err := service.stateSummary(ctx, r) + if err != nil { + t.Fatal(err) + } + if !proto.Equal(got, summary) { + t.Error("Did not get wanted summary") + } + + r = [32]byte{'b'} + summary = &pb.StateSummary{Root: r[:], Slot: 101} + _, err = service.stateSummary(ctx, r) + if err != errUnknownStateSummary { + t.Fatal("Did not get wanted error") + } + + if err := service.beaconDB.SaveStateSummary(ctx, summary); err != nil { + t.Fatal(err) + } + got, err = service.stateSummary(ctx, r) + if err != nil { + t.Fatal("Did not get wanted error") + } + if !proto.Equal(got, summary) { + t.Error("Did not get wanted summary") + } +} diff --git a/beacon-chain/state/stategen/hot.go b/beacon-chain/state/stategen/hot.go index 6e38ed011a..4760769d60 100644 --- a/beacon-chain/state/stategen/hot.go +++ b/beacon-chain/state/stategen/hot.go @@ -36,12 +36,10 @@ func (s *State) saveHotState(ctx context.Context, blockRoot [32]byte, state *sta } // On an intermediate slots, save the hot state summary. - if err := s.beaconDB.SaveStateSummary(ctx, &pb.StateSummary{ + s.stateSummaryCache.Put(blockRoot, &pb.StateSummary{ Slot: state.Slot(), Root: blockRoot[:], - }); err != nil { - return err - } + }) // Store the copied state in the cache. s.hotStateCache.Put(blockRoot, state) @@ -56,21 +54,23 @@ func (s *State) loadHotStateByRoot(ctx context.Context, blockRoot [32]byte) (*st ctx, span := trace.StartSpan(ctx, "stateGen.loadHotStateByRoot") defer span.End() - // Load the hot state cache. + // Load the hot state from cache. cachedState := s.hotStateCache.Get(blockRoot) if cachedState != nil { return cachedState, nil } - summary, err := s.beaconDB.StateSummary(ctx, blockRoot) - if err != nil { - return nil, err - } - if summary == nil { - return nil, errUnknownStateSummary + // Load the hot state from DB. + if s.beaconDB.HasState(ctx, blockRoot) { + return s.beaconDB.State(ctx, blockRoot) } - startState, err := s.lastSavedState(ctx, helpers.StartSlot(helpers.SlotToEpoch(summary.Slot))) + summary, err := s.stateSummary(ctx, blockRoot) + if err != nil { + return nil, errors.Wrap(err, "could not get state summary") + } + + startState, err := s.lastSavedState(ctx, summary.Slot) if err != nil { return nil, err } diff --git a/beacon-chain/state/stategen/hot_test.go b/beacon-chain/state/stategen/hot_test.go index 5cc6bddd83..3714c9e23d 100644 --- a/beacon-chain/state/stategen/hot_test.go +++ b/beacon-chain/state/stategen/hot_test.go @@ -7,6 +7,7 @@ import ( "github.com/gogo/protobuf/proto" ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" "github.com/prysmaticlabs/go-ssz" + "github.com/prysmaticlabs/prysm/beacon-chain/cache" testDB "github.com/prysmaticlabs/prysm/beacon-chain/db/testing" pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" @@ -21,7 +22,7 @@ func TestSaveHotState_AlreadyHas(t *testing.T) { ctx := context.Background() db := testDB.SetupDB(t) defer testDB.TeardownDB(t, db) - service := New(db) + service := New(db, cache.NewStateSummaryCache()) beaconState, _ := testutil.DeterministicGenesisState(t, 32) beaconState.SetSlot(params.BeaconConfig().SlotsPerEpoch) @@ -48,7 +49,7 @@ func TestSaveHotState_CanSaveOnEpochBoundary(t *testing.T) { ctx := context.Background() db := testDB.SetupDB(t) defer testDB.TeardownDB(t, db) - service := New(db) + service := New(db, cache.NewStateSummaryCache()) beaconState, _ := testutil.DeterministicGenesisState(t, 32) beaconState.SetSlot(params.BeaconConfig().SlotsPerEpoch) @@ -62,7 +63,7 @@ func TestSaveHotState_CanSaveOnEpochBoundary(t *testing.T) { if !service.beaconDB.HasState(ctx, r) { t.Error("Should have saved the state") } - if !service.beaconDB.HasStateSummary(ctx, r) { + if !service.stateSummaryCache.Has(r) { t.Error("Should have saved the state summary") } testutil.AssertLogsContain(t, hook, "Saved full state on epoch boundary") @@ -73,7 +74,7 @@ func TestSaveHotState_NoSaveNotEpochBoundary(t *testing.T) { ctx := context.Background() db := testDB.SetupDB(t) defer testDB.TeardownDB(t, db) - service := New(db) + service := New(db, cache.NewStateSummaryCache()) beaconState, _ := testutil.DeterministicGenesisState(t, 32) beaconState.SetSlot(params.BeaconConfig().SlotsPerEpoch - 1) @@ -95,7 +96,7 @@ func TestSaveHotState_NoSaveNotEpochBoundary(t *testing.T) { if service.beaconDB.HasState(ctx, r) { t.Error("Should not have saved the state") } - if !service.beaconDB.HasStateSummary(ctx, r) { + if !service.stateSummaryCache.Has(r) { t.Error("Should have saved the state summary") } testutil.AssertLogsDoNotContain(t, hook, "Saved full state on epoch boundary") @@ -105,7 +106,7 @@ func TestLoadHoteStateByRoot_Cached(t *testing.T) { ctx := context.Background() db := testDB.SetupDB(t) defer testDB.TeardownDB(t, db) - service := New(db) + service := New(db, cache.NewStateSummaryCache()) beaconState, _ := testutil.DeterministicGenesisState(t, 32) r := [32]byte{'A'} @@ -126,7 +127,7 @@ func TestLoadHoteStateByRoot_FromDBCanProcess(t *testing.T) { ctx := context.Background() db := testDB.SetupDB(t) defer testDB.TeardownDB(t, db) - service := New(db) + service := New(db, cache.NewStateSummaryCache()) beaconState, _ := testutil.DeterministicGenesisState(t, 32) blk := ðpb.SignedBeaconBlock{Block: ðpb.BeaconBlock{}} @@ -136,15 +137,16 @@ func TestLoadHoteStateByRoot_FromDBCanProcess(t *testing.T) { t.Fatal(err) } targetSlot := uint64(10) + targetRoot := [32]byte{'a'} if err := service.beaconDB.SaveStateSummary(ctx, &pb.StateSummary{ Slot: targetSlot, - Root: blkRoot[:], + Root: targetRoot[:], }); err != nil { t.Fatal(err) } // This tests where hot state was not cached and needs processing. - loadedState, err := service.loadHotStateByRoot(ctx, blkRoot) + loadedState, err := service.loadHotStateByRoot(ctx, targetRoot) if err != nil { t.Fatal(err) } @@ -158,7 +160,7 @@ func TestLoadHoteStateByRoot_FromDBBoundaryCase(t *testing.T) { ctx := context.Background() db := testDB.SetupDB(t) defer testDB.TeardownDB(t, db) - service := New(db) + service := New(db, cache.NewStateSummaryCache()) beaconState, _ := testutil.DeterministicGenesisState(t, 32) blk := ðpb.SignedBeaconBlock{Block: ðpb.BeaconBlock{}} @@ -191,7 +193,7 @@ func TestLoadHoteStateBySlot_CanAdvanceSlotUsingDB(t *testing.T) { ctx := context.Background() db := testDB.SetupDB(t) defer testDB.TeardownDB(t, db) - service := New(db) + service := New(db, cache.NewStateSummaryCache()) beaconState, _ := testutil.DeterministicGenesisState(t, 32) b := ðpb.SignedBeaconBlock{Block: ðpb.BeaconBlock{}} if err := service.beaconDB.SaveBlock(ctx, b); err != nil { diff --git a/beacon-chain/state/stategen/migrate.go b/beacon-chain/state/stategen/migrate.go index 364d61af36..919c1a5c7a 100644 --- a/beacon-chain/state/stategen/migrate.go +++ b/beacon-chain/state/stategen/migrate.go @@ -6,7 +6,6 @@ import ( "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" "github.com/prysmaticlabs/prysm/beacon-chain/db/filters" - "github.com/prysmaticlabs/prysm/beacon-chain/state" "github.com/prysmaticlabs/prysm/shared/bytesutil" "github.com/sirupsen/logrus" "go.opencensus.io/trace" @@ -15,22 +14,28 @@ import ( // MigrateToCold advances the split point in between the cold and hot state sections. // It moves the recent finalized states from the hot section to the cold section and // only preserve the ones that's on archived point. -func (s *State) MigrateToCold(ctx context.Context, finalizedState *state.BeaconState, finalizedRoot [32]byte) error { +func (s *State) MigrateToCold(ctx context.Context, finalizedSlot uint64, finalizedRoot [32]byte) error { ctx, span := trace.StartSpan(ctx, "stateGen.MigrateToCold") defer span.End() // Verify migration is sensible. The new finalized point must increase the current split slot, and // on an epoch boundary for hot state summary scheme to work. currentSplitSlot := s.splitInfo.slot - if currentSplitSlot > finalizedState.Slot() { + if currentSplitSlot > finalizedSlot { return nil } - if !helpers.IsEpochStart(finalizedState.Slot()) { + if !helpers.IsEpochStart(finalizedSlot) { return nil } + // Migrate all state summary objects from cache to DB. + if err := s.beaconDB.SaveStateSummaries(ctx, s.stateSummaryCache.GetAll()); err != nil { + return err + } + s.stateSummaryCache.Clear() + // Move the states between split slot to finalized slot from hot section to the cold section. - filter := filters.NewFilter().SetStartSlot(currentSplitSlot).SetEndSlot(finalizedState.Slot() - 1) + filter := filters.NewFilter().SetStartSlot(currentSplitSlot).SetEndSlot(finalizedSlot - 1) blockRoots, err := s.beaconDB.BlockRoots(ctx, filter) if err != nil { return err @@ -84,7 +89,7 @@ func (s *State) MigrateToCold(ctx context.Context, finalizedState *state.BeaconS } // Update the split slot and root. - s.splitInfo = &splitSlotAndRoot{slot: finalizedState.Slot(), root: finalizedRoot} + s.splitInfo = &splitSlotAndRoot{slot: finalizedSlot, root: finalizedRoot} log.WithFields(logrus.Fields{ "slot": s.splitInfo.slot, "root": hex.EncodeToString(bytesutil.Trunc(s.splitInfo.root[:])), diff --git a/beacon-chain/state/stategen/migrate_test.go b/beacon-chain/state/stategen/migrate_test.go index 133e23d0c9..a4638a6e81 100644 --- a/beacon-chain/state/stategen/migrate_test.go +++ b/beacon-chain/state/stategen/migrate_test.go @@ -6,6 +6,7 @@ import ( ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" "github.com/prysmaticlabs/go-ssz" + "github.com/prysmaticlabs/prysm/beacon-chain/cache" testDB "github.com/prysmaticlabs/prysm/beacon-chain/db/testing" pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" "github.com/prysmaticlabs/prysm/shared/params" @@ -19,11 +20,9 @@ func TestMigrateToCold_NoBlock(t *testing.T) { db := testDB.SetupDB(t) defer testDB.TeardownDB(t, db) - service := New(db) + service := New(db, cache.NewStateSummaryCache()) - beaconState, _ := testutil.DeterministicGenesisState(t, 32) - beaconState.SetSlot(params.BeaconConfig().SlotsPerEpoch) - if err := service.MigrateToCold(ctx, beaconState, [32]byte{}); err != nil { + if err := service.MigrateToCold(ctx, params.BeaconConfig().SlotsPerEpoch, [32]byte{}); err != nil { t.Fatal(err) } @@ -36,12 +35,9 @@ func TestMigrateToCold_HigherSplitSlot(t *testing.T) { db := testDB.SetupDB(t) defer testDB.TeardownDB(t, db) - service := New(db) + service := New(db, cache.NewStateSummaryCache()) service.splitInfo.slot = 2 - - beaconState, _ := testutil.DeterministicGenesisState(t, 32) - beaconState.SetSlot(1) - if err := service.MigrateToCold(ctx, beaconState, [32]byte{}); err != nil { + if err := service.MigrateToCold(ctx, 1, [32]byte{}); err != nil { t.Fatal(err) } @@ -54,11 +50,8 @@ func TestMigrateToCold_NotEpochStart(t *testing.T) { db := testDB.SetupDB(t) defer testDB.TeardownDB(t, db) - service := New(db) - - beaconState, _ := testutil.DeterministicGenesisState(t, 32) - beaconState.SetSlot(params.BeaconConfig().SlotsPerEpoch + 1) - if err := service.MigrateToCold(ctx, beaconState, [32]byte{}); err != nil { + service := New(db, cache.NewStateSummaryCache()) + if err := service.MigrateToCold(ctx, params.BeaconConfig().SlotsPerEpoch+1, [32]byte{}); err != nil { t.Fatal(err) } @@ -71,7 +64,7 @@ func TestMigrateToCold_MigrationCompletes(t *testing.T) { db := testDB.SetupDB(t) defer testDB.TeardownDB(t, db) - service := New(db) + service := New(db, cache.NewStateSummaryCache()) service.slotsPerArchivedPoint = 2 beaconState, _ := testutil.DeterministicGenesisState(t, 32) @@ -106,7 +99,7 @@ func TestMigrateToCold_MigrationCompletes(t *testing.T) { t.Fatal(err) } - if err := service.MigrateToCold(ctx, beaconState, [32]byte{}); err != nil { + if err := service.MigrateToCold(ctx, beaconState.Slot(), [32]byte{}); err != nil { t.Fatal(err) } diff --git a/beacon-chain/state/stategen/replay_test.go b/beacon-chain/state/stategen/replay_test.go index aa3a80ffe2..bad2497386 100644 --- a/beacon-chain/state/stategen/replay_test.go +++ b/beacon-chain/state/stategen/replay_test.go @@ -8,6 +8,7 @@ import ( "github.com/gogo/protobuf/proto" ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" "github.com/prysmaticlabs/go-ssz" + "github.com/prysmaticlabs/prysm/beacon-chain/cache" "github.com/prysmaticlabs/prysm/beacon-chain/core/blocks" "github.com/prysmaticlabs/prysm/beacon-chain/db" testDB "github.com/prysmaticlabs/prysm/beacon-chain/db/testing" @@ -23,7 +24,7 @@ func TestComputeStateUpToSlot_GenesisState(t *testing.T) { db := testDB.SetupDB(t) defer testDB.TeardownDB(t, db) - service := New(db) + service := New(db, cache.NewStateSummaryCache()) gBlk := ðpb.SignedBeaconBlock{Block: ðpb.BeaconBlock{}} gRoot, err := ssz.HashTreeRoot(gBlk.Block) @@ -56,7 +57,7 @@ func TestComputeStateUpToSlot_CanProcessUpTo(t *testing.T) { db := testDB.SetupDB(t) defer testDB.TeardownDB(t, db) - service := New(db) + service := New(db, cache.NewStateSummaryCache()) gBlk := ðpb.SignedBeaconBlock{Block: ðpb.BeaconBlock{}} gRoot, err := ssz.HashTreeRoot(gBlk.Block) @@ -109,7 +110,7 @@ func TestReplayBlocks_AllSkipSlots(t *testing.T) { beaconState.SetCurrentJustifiedCheckpoint(cp) beaconState.SetCurrentEpochAttestations([]*pb.PendingAttestation{}) - service := New(db) + service := New(db, cache.NewStateSummaryCache()) targetSlot := params.BeaconConfig().SlotsPerEpoch - 1 newState, err := service.ReplayBlocks(context.Background(), beaconState, []*ethpb.SignedBeaconBlock{}, targetSlot) if err != nil { @@ -145,7 +146,7 @@ func TestReplayBlocks_SameSlot(t *testing.T) { beaconState.SetCurrentJustifiedCheckpoint(cp) beaconState.SetCurrentEpochAttestations([]*pb.PendingAttestation{}) - service := New(db) + service := New(db, cache.NewStateSummaryCache()) targetSlot := beaconState.Slot() newState, err := service.ReplayBlocks(context.Background(), beaconState, []*ethpb.SignedBeaconBlock{}, targetSlot) if err != nil { diff --git a/beacon-chain/state/stategen/service.go b/beacon-chain/state/stategen/service.go index c1064fba9f..0727aae412 100644 --- a/beacon-chain/state/stategen/service.go +++ b/beacon-chain/state/stategen/service.go @@ -23,6 +23,7 @@ type State struct { epochBoundaryLock sync.RWMutex hotStateCache *cache.HotStateCache splitInfo *splitSlotAndRoot + stateSummaryCache *cache.StateSummaryCache } // This tracks the split point. The point where slot and the block root of @@ -33,13 +34,14 @@ type splitSlotAndRoot struct { } // New returns a new state management object. -func New(db db.NoHeadAccessDatabase) *State { +func New(db db.NoHeadAccessDatabase, stateSummaryCache *cache.StateSummaryCache) *State { return &State{ beaconDB: db, epochBoundarySlotToRoot: make(map[uint64][32]byte), hotStateCache: cache.NewHotStateCache(), splitInfo: &splitSlotAndRoot{slot: 0, root: params.BeaconConfig().ZeroHash}, slotsPerArchivedPoint: archivedInterval, + stateSummaryCache: stateSummaryCache, } } diff --git a/beacon-chain/state/stategen/service_test.go b/beacon-chain/state/stategen/service_test.go index 6d54fdaeac..796393f2db 100644 --- a/beacon-chain/state/stategen/service_test.go +++ b/beacon-chain/state/stategen/service_test.go @@ -5,6 +5,7 @@ import ( "testing" "github.com/gogo/protobuf/proto" + "github.com/prysmaticlabs/prysm/beacon-chain/cache" testDB "github.com/prysmaticlabs/prysm/beacon-chain/db/testing" "github.com/prysmaticlabs/prysm/shared/params" "github.com/prysmaticlabs/prysm/shared/testutil" @@ -15,7 +16,7 @@ func TestResume(t *testing.T) { db := testDB.SetupDB(t) defer testDB.TeardownDB(t, db) - service := New(db) + service := New(db, cache.NewStateSummaryCache()) root := [32]byte{'A'} beaconState, _ := testutil.DeterministicGenesisState(t, 32) beaconState.SetSlot(params.BeaconConfig().SlotsPerEpoch) diff --git a/beacon-chain/state/stategen/setter_test.go b/beacon-chain/state/stategen/setter_test.go index 7823564c3f..f956dfe950 100644 --- a/beacon-chain/state/stategen/setter_test.go +++ b/beacon-chain/state/stategen/setter_test.go @@ -5,6 +5,7 @@ import ( "testing" //"github.com/gogo/protobuf/proto" + "github.com/prysmaticlabs/prysm/beacon-chain/cache" testDB "github.com/prysmaticlabs/prysm/beacon-chain/db/testing" "github.com/prysmaticlabs/prysm/shared/params" "github.com/prysmaticlabs/prysm/shared/testutil" @@ -17,7 +18,7 @@ func TestSaveState_ColdStateCanBeSaved(t *testing.T) { db := testDB.SetupDB(t) defer testDB.TeardownDB(t, db) - service := New(db) + service := New(db, cache.NewStateSummaryCache()) service.slotsPerArchivedPoint = 1 beaconState, _ := testutil.DeterministicGenesisState(t, 32) @@ -48,7 +49,7 @@ func TestSaveState_HotStateCanBeSaved(t *testing.T) { db := testDB.SetupDB(t) defer testDB.TeardownDB(t, db) - service := New(db) + service := New(db, cache.NewStateSummaryCache()) service.slotsPerArchivedPoint = 1 beaconState, _ := testutil.DeterministicGenesisState(t, 32) // This goes to hot section, verify it can save on epoch boundary. @@ -63,7 +64,7 @@ func TestSaveState_HotStateCanBeSaved(t *testing.T) { if !service.beaconDB.HasState(ctx, r) { t.Error("Should have saved the state") } - if !service.beaconDB.HasStateSummary(ctx, r) { + if !service.stateSummaryCache.Has(r) { t.Error("Should have saved the state summary") } testutil.AssertLogsContain(t, hook, "Saved full state on epoch boundary") @@ -75,7 +76,7 @@ func TestSaveState_HotStateCached(t *testing.T) { db := testDB.SetupDB(t) defer testDB.TeardownDB(t, db) - service := New(db) + service := New(db, cache.NewStateSummaryCache()) service.slotsPerArchivedPoint = 1 beaconState, _ := testutil.DeterministicGenesisState(t, 32) beaconState.SetSlot(params.BeaconConfig().SlotsPerEpoch) diff --git a/beacon-chain/sync/BUILD.bazel b/beacon-chain/sync/BUILD.bazel index 58b13c2b6f..bc54e2363f 100644 --- a/beacon-chain/sync/BUILD.bazel +++ b/beacon-chain/sync/BUILD.bazel @@ -108,6 +108,7 @@ go_test( shard_count = 4, deps = [ "//beacon-chain/blockchain/testing:go_default_library", + "//beacon-chain/cache:go_default_library", "//beacon-chain/core/feed:go_default_library", "//beacon-chain/core/feed/state:go_default_library", "//beacon-chain/core/helpers:go_default_library", diff --git a/beacon-chain/sync/pending_attestations_queue.go b/beacon-chain/sync/pending_attestations_queue.go index bfb54748c9..5ba160fa4f 100644 --- a/beacon-chain/sync/pending_attestations_queue.go +++ b/beacon-chain/sync/pending_attestations_queue.go @@ -62,7 +62,7 @@ func (s *Service) processPendingAtts(ctx context.Context) error { attestations := s.blkRootToPendingAtts[bRoot] s.pendingAttsLock.RUnlock() // Has the pending attestation's missing block arrived and the node processed block yet? - hasStateSummary := featureconfig.Get().NewStateMgmt && s.db.HasStateSummary(ctx, bRoot) + hasStateSummary := featureconfig.Get().NewStateMgmt && s.db.HasStateSummary(ctx, bRoot) || s.stateSummaryCache.Has(bRoot) if s.db.HasBlock(ctx, bRoot) && (s.db.HasState(ctx, bRoot) || hasStateSummary) { numberOfBlocksRecoveredFromAtt.Inc() for _, att := range attestations { diff --git a/beacon-chain/sync/pending_attestations_queue_test.go b/beacon-chain/sync/pending_attestations_queue_test.go index baba3d7559..264e344c30 100644 --- a/beacon-chain/sync/pending_attestations_queue_test.go +++ b/beacon-chain/sync/pending_attestations_queue_test.go @@ -11,6 +11,7 @@ import ( "github.com/prysmaticlabs/go-bitfield" "github.com/prysmaticlabs/go-ssz" mock "github.com/prysmaticlabs/prysm/beacon-chain/blockchain/testing" + "github.com/prysmaticlabs/prysm/beacon-chain/cache" "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" dbtest "github.com/prysmaticlabs/prysm/beacon-chain/db/testing" "github.com/prysmaticlabs/prysm/beacon-chain/operations/attestations" @@ -46,6 +47,7 @@ func TestProcessPendingAtts_NoBlockRequestBlock(t *testing.T) { db: db, chain: &mock.ChainService{Genesis: roughtime.Now()}, blkRootToPendingAtts: make(map[[32]byte][]*ethpb.AggregateAttestationAndProof), + stateSummaryCache: cache.NewStateSummaryCache(), } a := ðpb.AggregateAttestationAndProof{Aggregate: ðpb.Attestation{Data: ðpb.AttestationData{Target: ðpb.Checkpoint{}}}} @@ -69,6 +71,7 @@ func TestProcessPendingAtts_HasBlockSaveUnAggregatedAtt(t *testing.T) { chain: &mock.ChainService{Genesis: roughtime.Now()}, blkRootToPendingAtts: make(map[[32]byte][]*ethpb.AggregateAttestationAndProof), attPool: attestations.NewPool(), + stateSummaryCache: cache.NewStateSummaryCache(), } a := ðpb.AggregateAttestationAndProof{ @@ -175,6 +178,7 @@ func TestProcessPendingAtts_HasBlockSaveAggregatedAtt(t *testing.T) { }}, blkRootToPendingAtts: make(map[[32]byte][]*ethpb.AggregateAttestationAndProof), attPool: attestations.NewPool(), + stateSummaryCache: cache.NewStateSummaryCache(), } sb = ðpb.SignedBeaconBlock{Block: ðpb.BeaconBlock{}} diff --git a/beacon-chain/sync/service.go b/beacon-chain/sync/service.go index cbeef88cef..5162349ab1 100644 --- a/beacon-chain/sync/service.go +++ b/beacon-chain/sync/service.go @@ -9,6 +9,7 @@ import ( "github.com/pkg/errors" ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" "github.com/prysmaticlabs/prysm/beacon-chain/blockchain" + "github.com/prysmaticlabs/prysm/beacon-chain/cache" blockfeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/block" "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/operation" statefeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/state" @@ -43,6 +44,7 @@ type Config struct { StateNotifier statefeed.Notifier BlockNotifier blockfeed.Notifier AttestationNotifier operation.Notifier + StateSummaryCache *cache.StateSummaryCache } // This defines the interface for interacting with block chain service @@ -74,6 +76,7 @@ func NewRegularSync(cfg *Config) *Service { blkRootToPendingAtts: make(map[[32]byte][]*ethpb.AggregateAttestationAndProof), stateNotifier: cfg.StateNotifier, blockNotifier: cfg.BlockNotifier, + stateSummaryCache: cfg.StateSummaryCache, blocksRateLimiter: leakybucket.NewCollector(allowedBlocksPerSecond, allowedBlocksBurst, false /* deleteEmptyBuckets */), } @@ -106,6 +109,7 @@ type Service struct { blockNotifier blockfeed.Notifier blocksRateLimiter *leakybucket.Collector attestationNotifier operation.Notifier + stateSummaryCache *cache.StateSummaryCache } // Start the regular sync service. diff --git a/beacon-chain/sync/subscriber_committee_index_beacon_attestation_test.go b/beacon-chain/sync/subscriber_committee_index_beacon_attestation_test.go index b856ae8586..579c55342f 100644 --- a/beacon-chain/sync/subscriber_committee_index_beacon_attestation_test.go +++ b/beacon-chain/sync/subscriber_committee_index_beacon_attestation_test.go @@ -9,6 +9,7 @@ import ( "github.com/prysmaticlabs/go-bitfield" "github.com/prysmaticlabs/go-ssz" mock "github.com/prysmaticlabs/prysm/beacon-chain/blockchain/testing" + "github.com/prysmaticlabs/prysm/beacon-chain/cache" "github.com/prysmaticlabs/prysm/beacon-chain/core/feed" statefeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/state" dbtest "github.com/prysmaticlabs/prysm/beacon-chain/db/testing" @@ -58,6 +59,7 @@ func TestService_committeeIndexBeaconAttestationSubscriber_ValidMessage(t *testi stateNotifier: (&mock.ChainService{}).StateNotifier(), attestationNotifier: (&mock.ChainService{}).OperationNotifier(), initialSync: &mockSync.Sync{IsSyncing: false}, + stateSummaryCache: cache.NewStateSummaryCache(), } r.registerSubscribers() r.stateNotifier.StateFeed().Send(&feed.Event{ diff --git a/beacon-chain/sync/validate_aggregate_proof.go b/beacon-chain/sync/validate_aggregate_proof.go index 44aa027691..0325574120 100644 --- a/beacon-chain/sync/validate_aggregate_proof.go +++ b/beacon-chain/sync/validate_aggregate_proof.go @@ -128,9 +128,10 @@ func (r *Service) validateAggregatedAtt(ctx context.Context, a *ethpb.AggregateA func (r *Service) validateBlockInAttestation(ctx context.Context, a *ethpb.AggregateAttestationAndProof) bool { // Verify the block being voted and the processed state is in DB. The block should have passed validation if it's in the DB. - hasStateSummary := featureconfig.Get().NewStateMgmt && r.db.HasStateSummary(ctx, bytesutil.ToBytes32(a.Aggregate.Data.BeaconBlockRoot)) - hasState := r.db.HasState(ctx, bytesutil.ToBytes32(a.Aggregate.Data.BeaconBlockRoot)) || hasStateSummary - hasBlock := r.db.HasBlock(ctx, bytesutil.ToBytes32(a.Aggregate.Data.BeaconBlockRoot)) + blockRoot := bytesutil.ToBytes32(a.Aggregate.Data.BeaconBlockRoot) + hasStateSummary := featureconfig.Get().NewStateMgmt && r.db.HasStateSummary(ctx, blockRoot) || r.stateSummaryCache.Has(blockRoot) + hasState := r.db.HasState(ctx, blockRoot) || hasStateSummary + hasBlock := r.db.HasBlock(ctx, blockRoot) if !(hasState && hasBlock) { // A node doesn't have the block, it'll request from peer while saving the pending attestation to a queue. r.savePendingAtt(a) diff --git a/beacon-chain/sync/validate_aggregate_proof_test.go b/beacon-chain/sync/validate_aggregate_proof_test.go index e379691a5d..1cd16b4d56 100644 --- a/beacon-chain/sync/validate_aggregate_proof_test.go +++ b/beacon-chain/sync/validate_aggregate_proof_test.go @@ -14,6 +14,7 @@ import ( "github.com/prysmaticlabs/go-bitfield" "github.com/prysmaticlabs/go-ssz" mock "github.com/prysmaticlabs/prysm/beacon-chain/blockchain/testing" + "github.com/prysmaticlabs/prysm/beacon-chain/cache" "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" dbtest "github.com/prysmaticlabs/prysm/beacon-chain/db/testing" "github.com/prysmaticlabs/prysm/beacon-chain/operations/attestations" @@ -139,6 +140,7 @@ func TestValidateAggregateAndProof_NoBlock(t *testing.T) { initialSync: &mockSync.Sync{IsSyncing: false}, attPool: attestations.NewPool(), blkRootToPendingAtts: make(map[[32]byte][]*ethpb.AggregateAttestationAndProof), + stateSummaryCache: cache.NewStateSummaryCache(), } buf := new(bytes.Buffer) @@ -199,7 +201,8 @@ func TestValidateAggregateAndProof_NotWithinSlotRange(t *testing.T) { initialSync: &mockSync.Sync{IsSyncing: false}, chain: &mock.ChainService{Genesis: time.Now(), State: beaconState}, - attPool: attestations.NewPool(), + attPool: attestations.NewPool(), + stateSummaryCache: cache.NewStateSummaryCache(), } buf := new(bytes.Buffer) @@ -377,7 +380,8 @@ func TestValidateAggregateAndProof_CanValidate(t *testing.T) { FinalizedCheckPoint: ðpb.Checkpoint{ Epoch: 0, }}, - attPool: attestations.NewPool(), + attPool: attestations.NewPool(), + stateSummaryCache: cache.NewStateSummaryCache(), } buf := new(bytes.Buffer) diff --git a/beacon-chain/sync/validate_committee_index_beacon_attestation.go b/beacon-chain/sync/validate_committee_index_beacon_attestation.go index d816caa673..9e84c63f0c 100644 --- a/beacon-chain/sync/validate_committee_index_beacon_attestation.go +++ b/beacon-chain/sync/validate_committee_index_beacon_attestation.go @@ -74,9 +74,10 @@ func (s *Service) validateCommitteeIndexBeaconAttestation(ctx context.Context, p } // Verify the block being voted and the processed state is in DB and. The block should have passed validation if it's in the DB. - hasStateSummary := featureconfig.Get().NewStateMgmt && s.db.HasStateSummary(ctx, bytesutil.ToBytes32(att.Data.BeaconBlockRoot)) - hasState := s.db.HasState(ctx, bytesutil.ToBytes32(att.Data.BeaconBlockRoot)) || hasStateSummary - hasBlock := s.db.HasBlock(ctx, bytesutil.ToBytes32(att.Data.BeaconBlockRoot)) + blockRoot := bytesutil.ToBytes32(att.Data.BeaconBlockRoot) + hasStateSummary := featureconfig.Get().NewStateMgmt && s.db.HasStateSummary(ctx, blockRoot) || s.stateSummaryCache.Has(blockRoot) + hasState := s.db.HasState(ctx, blockRoot) || hasStateSummary + hasBlock := s.db.HasBlock(ctx, blockRoot) if !(hasState && hasBlock) { // A node doesn't have the block, it'll request from peer while saving the pending attestation to a queue. s.savePendingAtt(ð.AggregateAttestationAndProof{Aggregate: att}) diff --git a/beacon-chain/sync/validate_committee_index_beacon_attestation_test.go b/beacon-chain/sync/validate_committee_index_beacon_attestation_test.go index 67f8cc8340..89f6f179fd 100644 --- a/beacon-chain/sync/validate_committee_index_beacon_attestation_test.go +++ b/beacon-chain/sync/validate_committee_index_beacon_attestation_test.go @@ -12,6 +12,7 @@ import ( "github.com/prysmaticlabs/go-bitfield" "github.com/prysmaticlabs/go-ssz" mockChain "github.com/prysmaticlabs/prysm/beacon-chain/blockchain/testing" + "github.com/prysmaticlabs/prysm/beacon-chain/cache" dbtest "github.com/prysmaticlabs/prysm/beacon-chain/db/testing" p2ptest "github.com/prysmaticlabs/prysm/beacon-chain/p2p/testing" beaconstate "github.com/prysmaticlabs/prysm/beacon-chain/state" @@ -36,6 +37,7 @@ func TestService_validateCommitteeIndexBeaconAttestation(t *testing.T) { db: db, chain: chain, blkRootToPendingAtts: make(map[[32]byte][]*ethpb.AggregateAttestationAndProof), + stateSummaryCache: cache.NewStateSummaryCache(), } blk := ðpb.SignedBeaconBlock{ diff --git a/tools/blocktree/BUILD.bazel b/tools/blocktree/BUILD.bazel index e67e8e7c1d..3d385bd467 100644 --- a/tools/blocktree/BUILD.bazel +++ b/tools/blocktree/BUILD.bazel @@ -6,6 +6,7 @@ go_library( importpath = "github.com/prysmaticlabs/prysm/tools/blocktree", visibility = ["//visibility:private"], deps = [ + "//beacon-chain/cache:go_default_library", "//beacon-chain/core/helpers:go_default_library", "//beacon-chain/db:go_default_library", "//beacon-chain/db/filters:go_default_library", diff --git a/tools/blocktree/main.go b/tools/blocktree/main.go index 11c00f351c..c55221e49e 100644 --- a/tools/blocktree/main.go +++ b/tools/blocktree/main.go @@ -17,6 +17,7 @@ import ( "github.com/emicklei/dot" "github.com/prysmaticlabs/go-ssz" + "github.com/prysmaticlabs/prysm/beacon-chain/cache" "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" "github.com/prysmaticlabs/prysm/beacon-chain/db" "github.com/prysmaticlabs/prysm/beacon-chain/db/filters" @@ -43,7 +44,7 @@ func main() { params.UseDemoBeaconConfig() flag.Parse() - db, err := db.NewDB(*datadir) + db, err := db.NewDB(*datadir, cache.NewStateSummaryCache()) if err != nil { panic(err) } diff --git a/tools/extractor/BUILD.bazel b/tools/extractor/BUILD.bazel index 0a838619bb..ecefdbfa21 100644 --- a/tools/extractor/BUILD.bazel +++ b/tools/extractor/BUILD.bazel @@ -6,6 +6,7 @@ go_library( importpath = "github.com/prysmaticlabs/prysm/tools/extractor", visibility = ["//visibility:public"], deps = [ + "//beacon-chain/cache:go_default_library", "//beacon-chain/core/state/interop:go_default_library", "//beacon-chain/db:go_default_library", "//beacon-chain/db/filters:go_default_library", diff --git a/tools/extractor/main.go b/tools/extractor/main.go index df7c814400..6d9f05a9f8 100644 --- a/tools/extractor/main.go +++ b/tools/extractor/main.go @@ -5,6 +5,7 @@ import ( "flag" "fmt" + "github.com/prysmaticlabs/prysm/beacon-chain/cache" "github.com/prysmaticlabs/prysm/beacon-chain/core/state/interop" "github.com/prysmaticlabs/prysm/beacon-chain/db" "github.com/prysmaticlabs/prysm/beacon-chain/db/filters" @@ -27,7 +28,7 @@ func init() { func main() { flag.Parse() fmt.Println("Starting process...") - d, err := db.NewDB(*datadir) + d, err := db.NewDB(*datadir, cache.NewStateSummaryCache()) if err != nil { panic(err) } diff --git a/tools/interop/export-genesis/BUILD.bazel b/tools/interop/export-genesis/BUILD.bazel index 50c55fd93d..1c02a20862 100644 --- a/tools/interop/export-genesis/BUILD.bazel +++ b/tools/interop/export-genesis/BUILD.bazel @@ -6,6 +6,7 @@ go_library( importpath = "github.com/prysmaticlabs/prysm/tools/interop/export-genesis", visibility = ["//visibility:private"], deps = [ + "//beacon-chain/cache:go_default_library", "//beacon-chain/db:go_default_library", "@com_github_prysmaticlabs_go_ssz//:go_default_library", ], diff --git a/tools/interop/export-genesis/main.go b/tools/interop/export-genesis/main.go index fc2e3bdd62..d8fd9b581f 100644 --- a/tools/interop/export-genesis/main.go +++ b/tools/interop/export-genesis/main.go @@ -7,6 +7,7 @@ import ( "os" "github.com/prysmaticlabs/go-ssz" + "github.com/prysmaticlabs/prysm/beacon-chain/cache" "github.com/prysmaticlabs/prysm/beacon-chain/db" ) @@ -21,7 +22,7 @@ func main() { fmt.Printf("Reading db at %s and writing ssz output to %s.\n", os.Args[1], os.Args[2]) - d, err := db.NewDB(os.Args[1]) + d, err := db.NewDB(os.Args[1], cache.NewStateSummaryCache()) if err != nil { panic(err) }