Micro optimizations on new-state-mgmt service for initial syncing (#5241)

* Starting a quick PoC

* Rate limit to one epoch worth of blocks in memory

* Proof of concept working

* Quick comment out

* Save previous finalized checkpoint

* Test

* Minor fixes

* More run time fixes

* Remove panic

* Feature flag

* Removed unused methods

* Fixed tests

* E2e test

* comment

* Compatible with current initial sync

* Starting

* New cache

* Cache getters and setters

* It should be part of state gen

* Need to use cache for DB

* Don't have to use finalized state

* Rm unused file

* some changes to memory mgmt when using mempool

* More run time fixes

* Can sync to head

* Feedback

* Revert "some changes to memory mgmt when using mempool"

This reverts commit f5b3e7ff47.

* Fixed sync tests

* Fixed existing tests

* Test for state summary getter

* Gaz

* Fix kafka passthrough

* Fixed inputs

* Gaz

* Fixed build

* Fixed visibility

* Trying without the ignore

* Didn't work..

* Fix kafka

Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
Co-authored-by: Preston Van Loon <preston@prysmaticlabs.com>
This commit is contained in:
terence tsao
2020-03-30 15:10:45 -07:00
committed by GitHub
parent c5f186d56f
commit 7f7866ff2a
47 changed files with 343 additions and 138 deletions

View File

@@ -6,6 +6,7 @@ import (
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/go-ssz"
"github.com/prysmaticlabs/prysm/beacon-chain/cache"
"github.com/prysmaticlabs/prysm/beacon-chain/core/blocks"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/core/state"
@@ -210,7 +211,7 @@ func TestPruneNonBoundary_CanPrune(t *testing.T) {
func TestGenerateState_CorrectlyGenerated(t *testing.T) {
db := testDB.SetupDB(t)
defer testDB.TeardownDB(t, db)
cfg := &Config{BeaconDB: db, StateGen: stategen.New(db)}
cfg := &Config{BeaconDB: db, StateGen: stategen.New(db, cache.NewStateSummaryCache())}
service, err := NewService(context.Background(), cfg)
if err != nil {
t.Fatal(err)

View File

@@ -140,12 +140,13 @@ func (s *Service) onBlock(ctx context.Context, signed *ethpb.SignedBeaconBlock)
}
if featureconfig.Get().NewStateMgmt {
finalizedState, err := s.stateGen.StateByRoot(ctx, fRoot)
fRoot := bytesutil.ToBytes32(postState.FinalizedCheckpoint().Root)
fBlock, err := s.beaconDB.Block(ctx, fRoot)
if err != nil {
return nil, err
return nil, errors.Wrap(err, "could not get finalized block to migrate")
}
if err := s.stateGen.MigrateToCold(ctx, finalizedState, fRoot); err != nil {
return nil, err
if err := s.stateGen.MigrateToCold(ctx, fBlock.Block.Slot, fRoot); err != nil {
return nil, errors.Wrap(err, "could not migrate to cold")
}
}
}
@@ -300,13 +301,12 @@ func (s *Service) onBlockInitialSyncStateTransition(ctx context.Context, signed
if featureconfig.Get().NewStateMgmt {
fRoot := bytesutil.ToBytes32(postState.FinalizedCheckpoint().Root)
finalizedState, err := s.stateGen.StateByRoot(ctx, fRoot)
fBlock, err := s.beaconDB.Block(ctx, fRoot)
if err != nil {
return errors.Wrap(err, "could not get state by root for migration")
return errors.Wrap(err, "could not get finalized block to migrate")
}
if err := s.stateGen.MigrateToCold(ctx, finalizedState, fRoot); err != nil {
return errors.Wrap(err, "could not migrate with new finalized root")
if err := s.stateGen.MigrateToCold(ctx, fBlock.Block.Slot, fRoot); err != nil {
return errors.Wrap(err, "could not migrate to cold")
}
}
}

View File

@@ -11,11 +11,16 @@ go_library(
"eth1_data.go",
"hot_state_cache.go",
"skip_slot_cache.go",
"state_summary.go",
],
importpath = "github.com/prysmaticlabs/prysm/beacon-chain/cache",
visibility = ["//beacon-chain:__subpackages__"],
visibility = [
"//beacon-chain:__subpackages__",
"//tools:__subpackages__",
],
deps = [
"//beacon-chain/state:go_default_library",
"//proto/beacon/p2p/v1:go_default_library",
"//shared/featureconfig:go_default_library",
"//shared/hashutil:go_default_library",
"//shared/params:go_default_library",

65
beacon-chain/cache/state_summary.go vendored Normal file
View File

@@ -0,0 +1,65 @@
package cache
import (
"sync"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
)
// StateSummaryCache caches state summary object.
type StateSummaryCache struct {
initSyncStateSummaries map[[32]byte]*pb.StateSummary
initSyncStateSummariesLock sync.RWMutex
}
// NewStateSummaryCache creates a new state summary cache.
func NewStateSummaryCache() *StateSummaryCache {
return &StateSummaryCache{
initSyncStateSummaries: make(map[[32]byte]*pb.StateSummary),
}
}
// Put saves a state summary to the initial sync state summaries cache.
func (s *StateSummaryCache) Put(r [32]byte, b *pb.StateSummary) {
s.initSyncStateSummariesLock.Lock()
defer s.initSyncStateSummariesLock.Unlock()
s.initSyncStateSummaries[r] = b
}
// Has checks if a state summary exists in the initial sync state summaries cache using the root
// of the block.
func (s *StateSummaryCache) Has(r [32]byte) bool {
s.initSyncStateSummariesLock.RLock()
defer s.initSyncStateSummariesLock.RUnlock()
_, ok := s.initSyncStateSummaries[r]
return ok
}
// Get retrieves a state summary from the initial sync state summaries cache using the root of
// the block.
func (s *StateSummaryCache) Get(r [32]byte) *pb.StateSummary {
s.initSyncStateSummariesLock.RLock()
defer s.initSyncStateSummariesLock.RUnlock()
b := s.initSyncStateSummaries[r]
return b
}
// GetAll retrieves all the beacon state summaries from the initial sync state summaries cache, the returned
// state summaries are unordered.
func (s *StateSummaryCache) GetAll() []*pb.StateSummary {
s.initSyncStateSummariesLock.RLock()
defer s.initSyncStateSummariesLock.RUnlock()
blks := make([]*pb.StateSummary, 0, len(s.initSyncStateSummaries))
for _, b := range s.initSyncStateSummaries {
blks = append(blks, b)
}
return blks
}
// Clear clears out the initial sync state summaries cache.
func (s *StateSummaryCache) Clear() {
s.initSyncStateSummariesLock.Lock()
defer s.initSyncStateSummariesLock.Unlock()
s.initSyncStateSummaries = make(map[[32]byte]*pb.StateSummary)
}

View File

@@ -26,6 +26,7 @@ go_library(
"//tools:__subpackages__",
],
deps = [
"//beacon-chain/cache:go_default_library",
"//beacon-chain/db/iface:go_default_library",
"//beacon-chain/db/kv:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",

View File

@@ -1,8 +1,11 @@
package db
import "github.com/prysmaticlabs/prysm/beacon-chain/db/kv"
import (
"github.com/prysmaticlabs/prysm/beacon-chain/cache"
"github.com/prysmaticlabs/prysm/beacon-chain/db/kv"
)
// NewDB initializes a new DB.
func NewDB(dirPath string) (Database, error) {
return kv.NewKVStore(dirPath)
func NewDB(dirPath string, stateSummaryCache *cache.StateSummaryCache) (Database, error) {
return kv.NewKVStore(dirPath, stateSummaryCache)
}

View File

@@ -1,13 +1,14 @@
package db
import (
"github.com/prysmaticlabs/prysm/beacon-chain/cache"
"github.com/prysmaticlabs/prysm/beacon-chain/db/kafka"
"github.com/prysmaticlabs/prysm/beacon-chain/db/kv"
)
// NewDB initializes a new DB with kafka wrapper.
func NewDB(dirPath string) (Database, error) {
db, err := kv.NewKVStore(dirPath)
func NewDB(dirPath string, stateSummaryCache *cache.StateSummaryCache) (Database, error) {
db, err := kv.NewKVStore(dirPath, stateSummaryCache)
if err != nil {
return nil, err
}

View File

@@ -90,6 +90,7 @@ type NoHeadAccessDatabase interface {
DeleteState(ctx context.Context, blockRoot [32]byte) error
DeleteStates(ctx context.Context, blockRoots [][32]byte) error
SaveStateSummary(ctx context.Context, summary *ethereum_beacon_p2p_v1.StateSummary) error
SaveStateSummaries(ctx context.Context, summaries []*ethereum_beacon_p2p_v1.StateSummary) error
// Slashing operations.
SaveProposerSlashing(ctx context.Context, slashing *eth.ProposerSlashing) error
SaveAttesterSlashing(ctx context.Context, slashing *eth.AttesterSlashing) error

View File

@@ -238,6 +238,11 @@ func (e Exporter) SaveStateSummary(ctx context.Context, summary *pb.StateSummary
return e.db.SaveStateSummary(ctx, summary)
}
// SaveStateSummaries -- passthrough.
func (e Exporter) SaveStateSummaries(ctx context.Context, summaries []*pb.StateSummary) error {
return e.db.SaveStateSummaries(ctx, summaries)
}
// SaveStates -- passthrough.
func (e Exporter) SaveStates(ctx context.Context, states []*state.BeaconState, blockRoots [][32]byte) error {
return e.db.SaveStates(ctx, states, blockRoots)

View File

@@ -26,6 +26,7 @@ go_library(
importpath = "github.com/prysmaticlabs/prysm/beacon-chain/db/kv",
visibility = ["//beacon-chain:__subpackages__"],
deps = [
"//beacon-chain/cache:go_default_library",
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/db/filters:go_default_library",
"//beacon-chain/db/iface:go_default_library",
@@ -74,6 +75,7 @@ go_test(
],
embed = [":go_default_library"],
deps = [
"//beacon-chain/cache:go_default_library",
"//beacon-chain/db/filters:go_default_library",
"//beacon-chain/state:go_default_library",
"//proto/beacon/p2p/v1:go_default_library",

View File

@@ -263,7 +263,7 @@ func (k *Store) SaveHeadBlockRoot(ctx context.Context, blockRoot [32]byte) error
defer span.End()
return k.db.Update(func(tx *bolt.Tx) error {
if featureconfig.Get().NewStateMgmt {
if tx.Bucket(stateSummaryBucket).Get(blockRoot[:]) == nil {
if tx.Bucket(stateSummaryBucket).Get(blockRoot[:]) == nil && !k.stateSummaryCache.Has(blockRoot) {
return errors.New("no state summary found with head block root")
}
} else {

View File

@@ -5,6 +5,7 @@ import (
"errors"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/featureconfig"
"github.com/prysmaticlabs/prysm/shared/traceutil"
bolt "go.etcd.io/bbolt"
@@ -65,7 +66,7 @@ func (k *Store) SaveJustifiedCheckpoint(ctx context.Context, checkpoint *ethpb.C
return k.db.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket(checkpointBucket)
if featureconfig.Get().NewStateMgmt {
if tx.Bucket(stateSummaryBucket).Get(checkpoint.Root) == nil {
if tx.Bucket(stateSummaryBucket).Get(checkpoint.Root) == nil && !k.stateSummaryCache.Has(bytesutil.ToBytes32(checkpoint.Root)) {
return errors.New("missing state summary for finalized root")
}
} else {
@@ -93,7 +94,7 @@ func (k *Store) SaveFinalizedCheckpoint(ctx context.Context, checkpoint *ethpb.C
return k.db.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket(checkpointBucket)
if featureconfig.Get().NewStateMgmt {
if tx.Bucket(stateSummaryBucket).Get(checkpoint.Root) == nil {
if tx.Bucket(stateSummaryBucket).Get(checkpoint.Root) == nil && !k.stateSummaryCache.Has(bytesutil.ToBytes32(checkpoint.Root)) {
return errors.New("missing state summary for finalized root")
}
} else {

View File

@@ -10,6 +10,7 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
prombolt "github.com/prysmaticlabs/prombbolt"
"github.com/prysmaticlabs/prysm/beacon-chain/cache"
"github.com/prysmaticlabs/prysm/beacon-chain/db/iface"
bolt "go.etcd.io/bbolt"
)
@@ -38,12 +39,13 @@ type Store struct {
validatorIndexCache *ristretto.Cache
stateSlotBitLock sync.Mutex
blockSlotBitLock sync.Mutex
stateSummaryCache *cache.StateSummaryCache
}
// NewKVStore initializes a new boltDB key-value store at the directory
// path specified, creates the kv-buckets based on the schema, and stores
// an open connection db object as a property of the Store struct.
func NewKVStore(dirPath string) (*Store, error) {
func NewKVStore(dirPath string, stateSummaryCache *cache.StateSummaryCache) (*Store, error) {
if err := os.MkdirAll(dirPath, 0700); err != nil {
return nil, err
}
@@ -79,6 +81,7 @@ func NewKVStore(dirPath string) (*Store, error) {
databasePath: dirPath,
blockCache: blockCache,
validatorIndexCache: validatorCache,
stateSummaryCache: stateSummaryCache,
}
if err := kv.db.Update(func(tx *bolt.Tx) error {

View File

@@ -8,6 +8,7 @@ import (
"path"
"testing"
"github.com/prysmaticlabs/prysm/beacon-chain/cache"
"github.com/prysmaticlabs/prysm/shared/testutil"
)
@@ -21,7 +22,7 @@ func setupDB(t testing.TB) *Store {
if err := os.RemoveAll(path); err != nil {
t.Fatalf("Failed to remove directory: %v", err)
}
db, err := NewKVStore(path)
db, err := NewKVStore(path, cache.NewStateSummaryCache())
if err != nil {
t.Fatalf("Failed to instantiate DB: %v", err)
}

View File

@@ -23,6 +23,26 @@ func (k *Store) SaveStateSummary(ctx context.Context, summary *pb.StateSummary)
})
}
// SaveStateSummaries saves state summary objects to the DB.
func (k *Store) SaveStateSummaries(ctx context.Context, summaries []*pb.StateSummary) error {
ctx, span := trace.StartSpan(ctx, "BeaconDB.SaveStateSummaries")
defer span.End()
return k.db.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket(stateSummaryBucket)
for _, summary := range summaries {
enc, err := encode(summary)
if err != nil {
return err
}
if err := bucket.Put(summary.Root, enc); err != nil {
return err
}
}
return nil
})
}
// StateSummary returns the state summary object from the db using input block root.
func (k *Store) StateSummary(ctx context.Context, blockRoot [32]byte) (*pb.StateSummary, error) {
ctx, span := trace.StartSpan(ctx, "BeaconDB.StateSummary")

View File

@@ -7,6 +7,7 @@ go_library(
importpath = "github.com/prysmaticlabs/prysm/beacon-chain/db/testing",
visibility = ["//beacon-chain:__subpackages__"],
deps = [
"//beacon-chain/cache:go_default_library",
"//beacon-chain/db:go_default_library",
"//beacon-chain/db/kv:go_default_library",
"//shared/testutil:go_default_library",

View File

@@ -8,6 +8,7 @@ import (
"path"
"testing"
"github.com/prysmaticlabs/prysm/beacon-chain/cache"
"github.com/prysmaticlabs/prysm/beacon-chain/db"
"github.com/prysmaticlabs/prysm/beacon-chain/db/kv"
"github.com/prysmaticlabs/prysm/shared/testutil"
@@ -23,7 +24,7 @@ func SetupDB(t testing.TB) db.Database {
if err := os.RemoveAll(p); err != nil {
t.Fatalf("failed to remove directory: %v", err)
}
s, err := kv.NewKVStore(p)
s, err := kv.NewKVStore(p, cache.NewStateSummaryCache())
if err != nil {
t.Fatal(err)
}

View File

@@ -8,6 +8,7 @@ go_library(
deps = [
"//beacon-chain/archiver:go_default_library",
"//beacon-chain/blockchain:go_default_library",
"//beacon-chain/cache:go_default_library",
"//beacon-chain/cache/depositcache:go_default_library",
"//beacon-chain/db:go_default_library",
"//beacon-chain/flags:go_default_library",

View File

@@ -18,6 +18,7 @@ import (
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/beacon-chain/archiver"
"github.com/prysmaticlabs/prysm/beacon-chain/blockchain"
"github.com/prysmaticlabs/prysm/beacon-chain/cache"
"github.com/prysmaticlabs/prysm/beacon-chain/cache/depositcache"
"github.com/prysmaticlabs/prysm/beacon-chain/db"
"github.com/prysmaticlabs/prysm/beacon-chain/flags"
@@ -58,20 +59,21 @@ const testSkipPowFlag = "test-skip-pow"
// full PoS node. It handles the lifecycle of the entire system and registers
// services to a service registry.
type BeaconNode struct {
ctx *cli.Context
services *shared.ServiceRegistry
lock sync.RWMutex
stop chan struct{} // Channel to wait for termination notifications.
db db.Database
attestationPool attestations.Pool
exitPool *voluntaryexits.Pool
slashingsPool *slashings.Pool
depositCache *depositcache.DepositCache
stateFeed *event.Feed
blockFeed *event.Feed
opFeed *event.Feed
forkChoiceStore forkchoice.ForkChoicer
stateGen *stategen.State
ctx *cli.Context
services *shared.ServiceRegistry
lock sync.RWMutex
stop chan struct{} // Channel to wait for termination notifications.
db db.Database
stateSummaryCache *cache.StateSummaryCache
attestationPool attestations.Pool
exitPool *voluntaryexits.Pool
slashingsPool *slashings.Pool
depositCache *depositcache.DepositCache
stateFeed *event.Feed
blockFeed *event.Feed
opFeed *event.Feed
forkChoiceStore forkchoice.ForkChoicer
stateGen *stategen.State
}
// NewBeaconNode creates a new node instance, sets up configuration options, and registers
@@ -92,15 +94,16 @@ func NewBeaconNode(ctx *cli.Context) (*BeaconNode, error) {
registry := shared.NewServiceRegistry()
beacon := &BeaconNode{
ctx: ctx,
services: registry,
stop: make(chan struct{}),
stateFeed: new(event.Feed),
blockFeed: new(event.Feed),
opFeed: new(event.Feed),
attestationPool: attestations.NewPool(),
exitPool: voluntaryexits.NewPool(),
slashingsPool: slashings.NewPool(),
ctx: ctx,
services: registry,
stop: make(chan struct{}),
stateFeed: new(event.Feed),
blockFeed: new(event.Feed),
opFeed: new(event.Feed),
attestationPool: attestations.NewPool(),
exitPool: voluntaryexits.NewPool(),
slashingsPool: slashings.NewPool(),
stateSummaryCache: cache.NewStateSummaryCache(),
}
if err := beacon.startDB(ctx); err != nil {
@@ -233,7 +236,7 @@ func (b *BeaconNode) startDB(ctx *cli.Context) error {
clearDB := ctx.Bool(cmd.ClearDB.Name)
forceClearDB := ctx.Bool(cmd.ForceClearDB.Name)
d, err := db.NewDB(dbPath)
d, err := db.NewDB(dbPath, b.stateSummaryCache)
if err != nil {
return err
}
@@ -252,7 +255,7 @@ func (b *BeaconNode) startDB(ctx *cli.Context) error {
if err := d.ClearDB(); err != nil {
return err
}
d, err = db.NewDB(dbPath)
d, err = db.NewDB(dbPath, b.stateSummaryCache)
if err != nil {
return err
}
@@ -264,7 +267,7 @@ func (b *BeaconNode) startDB(ctx *cli.Context) error {
}
func (b *BeaconNode) startStateGen() {
b.stateGen = stategen.New(b.db)
b.stateGen = stategen.New(b.db, b.stateSummaryCache)
}
func (b *BeaconNode) registerP2P(ctx *cli.Context) error {
@@ -437,6 +440,7 @@ func (b *BeaconNode) registerSyncService(ctx *cli.Context) error {
AttPool: b.attestationPool,
ExitPool: b.exitPool,
SlashingPool: b.slashingsPool,
StateSummaryCache: b.stateSummaryCache,
})
return b.services.RegisterService(rs)

View File

@@ -47,6 +47,7 @@ go_test(
],
embed = [":go_default_library"],
deps = [
"//beacon-chain/cache:go_default_library",
"//beacon-chain/core/blocks:go_default_library",
"//beacon-chain/db:go_default_library",
"//beacon-chain/db/testing:go_default_library",

View File

@@ -44,12 +44,9 @@ func (s *State) loadColdStateByRoot(ctx context.Context, blockRoot [32]byte) (*s
ctx, span := trace.StartSpan(ctx, "stateGen.loadColdStateByRoot")
defer span.End()
summary, err := s.beaconDB.StateSummary(ctx, blockRoot)
summary, err := s.stateSummary(ctx, blockRoot)
if err != nil {
return nil, err
}
if summary == nil {
return nil, errUnknownStateSummary
return nil, errors.Wrap(err, "could not get state summary")
}
// Use the archived point state if the summary slot lies on top of the archived point.
@@ -195,13 +192,10 @@ func (s *State) blockRootSlot(ctx context.Context, blockRoot [32]byte) (uint64,
ctx, span := trace.StartSpan(ctx, "stateGen.blockRootSlot")
defer span.End()
if s.beaconDB.HasStateSummary(ctx, blockRoot) {
summary, err := s.beaconDB.StateSummary(ctx, blockRoot)
if s.StateSummaryExists(ctx, blockRoot) {
summary, err := s.stateSummary(ctx, blockRoot)
if err != nil {
return 0, nil
}
if summary == nil {
return 0, errUnknownStateSummary
return 0, err
}
return summary.Slot, nil
}

View File

@@ -6,6 +6,7 @@ import (
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/go-ssz"
"github.com/prysmaticlabs/prysm/beacon-chain/cache"
testDB "github.com/prysmaticlabs/prysm/beacon-chain/db/testing"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/testutil"
@@ -16,7 +17,7 @@ func TestSaveColdState_NonArchivedPoint(t *testing.T) {
db := testDB.SetupDB(t)
defer testDB.TeardownDB(t, db)
service := New(db)
service := New(db, cache.NewStateSummaryCache())
service.slotsPerArchivedPoint = 2
beaconState, _ := testutil.DeterministicGenesisState(t, 32)
beaconState.SetSlot(1)
@@ -307,7 +308,7 @@ func TestBlockRootSlot_Exists(t *testing.T) {
db := testDB.SetupDB(t)
defer testDB.TeardownDB(t, db)
service := New(db)
service := New(db, cache.NewStateSummaryCache())
bRoot := [32]byte{'A'}
bSlot := uint64(100)
if err := service.beaconDB.SaveStateSummary(ctx, &pb.StateSummary{
@@ -332,7 +333,7 @@ func TestBlockRootSlot_CanRecoverAndSave(t *testing.T) {
db := testDB.SetupDB(t)
defer testDB.TeardownDB(t, db)
service := New(db)
service := New(db, cache.NewStateSummaryCache())
bSlot := uint64(100)
b := &ethpb.SignedBeaconBlock{Block: &ethpb.BeaconBlock{Slot: bSlot}}
bRoot, _ := ssz.HashTreeRoot(b.Block)

View File

@@ -5,6 +5,7 @@ import (
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/beacon-chain/state"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/params"
"go.opencensus.io/trace"
)
@@ -52,5 +53,24 @@ func (s *State) StateBySlot(ctx context.Context, slot uint64) (*state.BeaconStat
// StateSummaryExists returns true if the corresponding state of the input block either
// exists in the DB or it can be generated by state gen.
func (s *State) StateSummaryExists(ctx context.Context, blockRoot [32]byte) bool {
return s.beaconDB.HasStateSummary(ctx, blockRoot)
return s.beaconDB.HasStateSummary(ctx, blockRoot) || s.stateSummaryCache.Has(blockRoot)
}
// This returns the state summary object of a given block root, it first checks the cache
// then checks the DB. An error is returned if state summary object is nil.
func (s *State) stateSummary(ctx context.Context, blockRoot [32]byte) (*pb.StateSummary, error) {
var summary *pb.StateSummary
var err error
if s.stateSummaryCache.Has(blockRoot) {
summary = s.stateSummaryCache.Get(blockRoot)
} else {
summary, err = s.beaconDB.StateSummary(ctx, blockRoot)
if err != nil {
return nil, err
}
}
if summary == nil {
return nil, errUnknownStateSummary
}
return summary, nil
}

View File

@@ -7,6 +7,7 @@ import (
"github.com/gogo/protobuf/proto"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/go-ssz"
"github.com/prysmaticlabs/prysm/beacon-chain/cache"
testDB "github.com/prysmaticlabs/prysm/beacon-chain/db/testing"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/params"
@@ -18,7 +19,7 @@ func TestStateByRoot_ColdState(t *testing.T) {
db := testDB.SetupDB(t)
defer testDB.TeardownDB(t, db)
service := New(db)
service := New(db, cache.NewStateSummaryCache())
service.splitInfo.slot = 2
service.slotsPerArchivedPoint = 1
@@ -53,7 +54,7 @@ func TestStateByRoot_HotStateDB(t *testing.T) {
db := testDB.SetupDB(t)
defer testDB.TeardownDB(t, db)
service := New(db)
service := New(db, cache.NewStateSummaryCache())
beaconState, _ := testutil.DeterministicGenesisState(t, 32)
blk := &ethpb.SignedBeaconBlock{Block: &ethpb.BeaconBlock{}}
@@ -63,15 +64,15 @@ func TestStateByRoot_HotStateDB(t *testing.T) {
t.Fatal(err)
}
targetSlot := uint64(10)
targetRoot := [32]byte{'a'}
if err := service.beaconDB.SaveStateSummary(ctx, &pb.StateSummary{
Slot: targetSlot,
Root: blkRoot[:],
Root: targetRoot[:],
}); err != nil {
t.Fatal(err)
}
loadedState, err := service.StateByRoot(ctx, blkRoot)
loadedState, err := service.StateByRoot(ctx, targetRoot)
if err != nil {
t.Fatal(err)
}
@@ -85,7 +86,7 @@ func TestStateByRoot_HotStateCached(t *testing.T) {
db := testDB.SetupDB(t)
defer testDB.TeardownDB(t, db)
service := New(db)
service := New(db, cache.NewStateSummaryCache())
beaconState, _ := testutil.DeterministicGenesisState(t, 32)
r := [32]byte{'A'}
@@ -110,7 +111,7 @@ func TestStateBySlot_ColdState(t *testing.T) {
db := testDB.SetupDB(t)
defer testDB.TeardownDB(t, db)
service := New(db)
service := New(db, cache.NewStateSummaryCache())
service.slotsPerArchivedPoint = params.BeaconConfig().SlotsPerEpoch * 2
service.splitInfo.slot = service.slotsPerArchivedPoint + 1
@@ -155,7 +156,7 @@ func TestStateBySlot_HotStateDB(t *testing.T) {
db := testDB.SetupDB(t)
defer testDB.TeardownDB(t, db)
service := New(db)
service := New(db, cache.NewStateSummaryCache())
beaconState, _ := testutil.DeterministicGenesisState(t, 32)
b := &ethpb.SignedBeaconBlock{Block: &ethpb.BeaconBlock{}}
@@ -177,3 +178,45 @@ func TestStateBySlot_HotStateDB(t *testing.T) {
t.Error("Did not correctly load state")
}
}
func TestStateSummary_CanGetFromCacheOrDB(t *testing.T) {
ctx := context.Background()
db := testDB.SetupDB(t)
defer testDB.TeardownDB(t, db)
service := New(db, cache.NewStateSummaryCache())
r := [32]byte{'a'}
summary := &pb.StateSummary{Slot: 100}
_, err := service.stateSummary(ctx, r)
if err != errUnknownStateSummary {
t.Fatal("Did not get wanted error")
}
service.stateSummaryCache.Put(r, summary)
got, err := service.stateSummary(ctx, r)
if err != nil {
t.Fatal(err)
}
if !proto.Equal(got, summary) {
t.Error("Did not get wanted summary")
}
r = [32]byte{'b'}
summary = &pb.StateSummary{Root: r[:], Slot: 101}
_, err = service.stateSummary(ctx, r)
if err != errUnknownStateSummary {
t.Fatal("Did not get wanted error")
}
if err := service.beaconDB.SaveStateSummary(ctx, summary); err != nil {
t.Fatal(err)
}
got, err = service.stateSummary(ctx, r)
if err != nil {
t.Fatal("Did not get wanted error")
}
if !proto.Equal(got, summary) {
t.Error("Did not get wanted summary")
}
}

View File

@@ -36,12 +36,10 @@ func (s *State) saveHotState(ctx context.Context, blockRoot [32]byte, state *sta
}
// On an intermediate slots, save the hot state summary.
if err := s.beaconDB.SaveStateSummary(ctx, &pb.StateSummary{
s.stateSummaryCache.Put(blockRoot, &pb.StateSummary{
Slot: state.Slot(),
Root: blockRoot[:],
}); err != nil {
return err
}
})
// Store the copied state in the cache.
s.hotStateCache.Put(blockRoot, state)
@@ -56,21 +54,23 @@ func (s *State) loadHotStateByRoot(ctx context.Context, blockRoot [32]byte) (*st
ctx, span := trace.StartSpan(ctx, "stateGen.loadHotStateByRoot")
defer span.End()
// Load the hot state cache.
// Load the hot state from cache.
cachedState := s.hotStateCache.Get(blockRoot)
if cachedState != nil {
return cachedState, nil
}
summary, err := s.beaconDB.StateSummary(ctx, blockRoot)
if err != nil {
return nil, err
}
if summary == nil {
return nil, errUnknownStateSummary
// Load the hot state from DB.
if s.beaconDB.HasState(ctx, blockRoot) {
return s.beaconDB.State(ctx, blockRoot)
}
startState, err := s.lastSavedState(ctx, helpers.StartSlot(helpers.SlotToEpoch(summary.Slot)))
summary, err := s.stateSummary(ctx, blockRoot)
if err != nil {
return nil, errors.Wrap(err, "could not get state summary")
}
startState, err := s.lastSavedState(ctx, summary.Slot)
if err != nil {
return nil, err
}

View File

@@ -7,6 +7,7 @@ import (
"github.com/gogo/protobuf/proto"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/go-ssz"
"github.com/prysmaticlabs/prysm/beacon-chain/cache"
testDB "github.com/prysmaticlabs/prysm/beacon-chain/db/testing"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
@@ -21,7 +22,7 @@ func TestSaveHotState_AlreadyHas(t *testing.T) {
ctx := context.Background()
db := testDB.SetupDB(t)
defer testDB.TeardownDB(t, db)
service := New(db)
service := New(db, cache.NewStateSummaryCache())
beaconState, _ := testutil.DeterministicGenesisState(t, 32)
beaconState.SetSlot(params.BeaconConfig().SlotsPerEpoch)
@@ -48,7 +49,7 @@ func TestSaveHotState_CanSaveOnEpochBoundary(t *testing.T) {
ctx := context.Background()
db := testDB.SetupDB(t)
defer testDB.TeardownDB(t, db)
service := New(db)
service := New(db, cache.NewStateSummaryCache())
beaconState, _ := testutil.DeterministicGenesisState(t, 32)
beaconState.SetSlot(params.BeaconConfig().SlotsPerEpoch)
@@ -62,7 +63,7 @@ func TestSaveHotState_CanSaveOnEpochBoundary(t *testing.T) {
if !service.beaconDB.HasState(ctx, r) {
t.Error("Should have saved the state")
}
if !service.beaconDB.HasStateSummary(ctx, r) {
if !service.stateSummaryCache.Has(r) {
t.Error("Should have saved the state summary")
}
testutil.AssertLogsContain(t, hook, "Saved full state on epoch boundary")
@@ -73,7 +74,7 @@ func TestSaveHotState_NoSaveNotEpochBoundary(t *testing.T) {
ctx := context.Background()
db := testDB.SetupDB(t)
defer testDB.TeardownDB(t, db)
service := New(db)
service := New(db, cache.NewStateSummaryCache())
beaconState, _ := testutil.DeterministicGenesisState(t, 32)
beaconState.SetSlot(params.BeaconConfig().SlotsPerEpoch - 1)
@@ -95,7 +96,7 @@ func TestSaveHotState_NoSaveNotEpochBoundary(t *testing.T) {
if service.beaconDB.HasState(ctx, r) {
t.Error("Should not have saved the state")
}
if !service.beaconDB.HasStateSummary(ctx, r) {
if !service.stateSummaryCache.Has(r) {
t.Error("Should have saved the state summary")
}
testutil.AssertLogsDoNotContain(t, hook, "Saved full state on epoch boundary")
@@ -105,7 +106,7 @@ func TestLoadHoteStateByRoot_Cached(t *testing.T) {
ctx := context.Background()
db := testDB.SetupDB(t)
defer testDB.TeardownDB(t, db)
service := New(db)
service := New(db, cache.NewStateSummaryCache())
beaconState, _ := testutil.DeterministicGenesisState(t, 32)
r := [32]byte{'A'}
@@ -126,7 +127,7 @@ func TestLoadHoteStateByRoot_FromDBCanProcess(t *testing.T) {
ctx := context.Background()
db := testDB.SetupDB(t)
defer testDB.TeardownDB(t, db)
service := New(db)
service := New(db, cache.NewStateSummaryCache())
beaconState, _ := testutil.DeterministicGenesisState(t, 32)
blk := &ethpb.SignedBeaconBlock{Block: &ethpb.BeaconBlock{}}
@@ -136,15 +137,16 @@ func TestLoadHoteStateByRoot_FromDBCanProcess(t *testing.T) {
t.Fatal(err)
}
targetSlot := uint64(10)
targetRoot := [32]byte{'a'}
if err := service.beaconDB.SaveStateSummary(ctx, &pb.StateSummary{
Slot: targetSlot,
Root: blkRoot[:],
Root: targetRoot[:],
}); err != nil {
t.Fatal(err)
}
// This tests where hot state was not cached and needs processing.
loadedState, err := service.loadHotStateByRoot(ctx, blkRoot)
loadedState, err := service.loadHotStateByRoot(ctx, targetRoot)
if err != nil {
t.Fatal(err)
}
@@ -158,7 +160,7 @@ func TestLoadHoteStateByRoot_FromDBBoundaryCase(t *testing.T) {
ctx := context.Background()
db := testDB.SetupDB(t)
defer testDB.TeardownDB(t, db)
service := New(db)
service := New(db, cache.NewStateSummaryCache())
beaconState, _ := testutil.DeterministicGenesisState(t, 32)
blk := &ethpb.SignedBeaconBlock{Block: &ethpb.BeaconBlock{}}
@@ -191,7 +193,7 @@ func TestLoadHoteStateBySlot_CanAdvanceSlotUsingDB(t *testing.T) {
ctx := context.Background()
db := testDB.SetupDB(t)
defer testDB.TeardownDB(t, db)
service := New(db)
service := New(db, cache.NewStateSummaryCache())
beaconState, _ := testutil.DeterministicGenesisState(t, 32)
b := &ethpb.SignedBeaconBlock{Block: &ethpb.BeaconBlock{}}
if err := service.beaconDB.SaveBlock(ctx, b); err != nil {

View File

@@ -6,7 +6,6 @@ import (
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/db/filters"
"github.com/prysmaticlabs/prysm/beacon-chain/state"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/sirupsen/logrus"
"go.opencensus.io/trace"
@@ -15,22 +14,28 @@ import (
// MigrateToCold advances the split point in between the cold and hot state sections.
// It moves the recent finalized states from the hot section to the cold section and
// only preserve the ones that's on archived point.
func (s *State) MigrateToCold(ctx context.Context, finalizedState *state.BeaconState, finalizedRoot [32]byte) error {
func (s *State) MigrateToCold(ctx context.Context, finalizedSlot uint64, finalizedRoot [32]byte) error {
ctx, span := trace.StartSpan(ctx, "stateGen.MigrateToCold")
defer span.End()
// Verify migration is sensible. The new finalized point must increase the current split slot, and
// on an epoch boundary for hot state summary scheme to work.
currentSplitSlot := s.splitInfo.slot
if currentSplitSlot > finalizedState.Slot() {
if currentSplitSlot > finalizedSlot {
return nil
}
if !helpers.IsEpochStart(finalizedState.Slot()) {
if !helpers.IsEpochStart(finalizedSlot) {
return nil
}
// Migrate all state summary objects from cache to DB.
if err := s.beaconDB.SaveStateSummaries(ctx, s.stateSummaryCache.GetAll()); err != nil {
return err
}
s.stateSummaryCache.Clear()
// Move the states between split slot to finalized slot from hot section to the cold section.
filter := filters.NewFilter().SetStartSlot(currentSplitSlot).SetEndSlot(finalizedState.Slot() - 1)
filter := filters.NewFilter().SetStartSlot(currentSplitSlot).SetEndSlot(finalizedSlot - 1)
blockRoots, err := s.beaconDB.BlockRoots(ctx, filter)
if err != nil {
return err
@@ -84,7 +89,7 @@ func (s *State) MigrateToCold(ctx context.Context, finalizedState *state.BeaconS
}
// Update the split slot and root.
s.splitInfo = &splitSlotAndRoot{slot: finalizedState.Slot(), root: finalizedRoot}
s.splitInfo = &splitSlotAndRoot{slot: finalizedSlot, root: finalizedRoot}
log.WithFields(logrus.Fields{
"slot": s.splitInfo.slot,
"root": hex.EncodeToString(bytesutil.Trunc(s.splitInfo.root[:])),

View File

@@ -6,6 +6,7 @@ import (
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/go-ssz"
"github.com/prysmaticlabs/prysm/beacon-chain/cache"
testDB "github.com/prysmaticlabs/prysm/beacon-chain/db/testing"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/params"
@@ -19,11 +20,9 @@ func TestMigrateToCold_NoBlock(t *testing.T) {
db := testDB.SetupDB(t)
defer testDB.TeardownDB(t, db)
service := New(db)
service := New(db, cache.NewStateSummaryCache())
beaconState, _ := testutil.DeterministicGenesisState(t, 32)
beaconState.SetSlot(params.BeaconConfig().SlotsPerEpoch)
if err := service.MigrateToCold(ctx, beaconState, [32]byte{}); err != nil {
if err := service.MigrateToCold(ctx, params.BeaconConfig().SlotsPerEpoch, [32]byte{}); err != nil {
t.Fatal(err)
}
@@ -36,12 +35,9 @@ func TestMigrateToCold_HigherSplitSlot(t *testing.T) {
db := testDB.SetupDB(t)
defer testDB.TeardownDB(t, db)
service := New(db)
service := New(db, cache.NewStateSummaryCache())
service.splitInfo.slot = 2
beaconState, _ := testutil.DeterministicGenesisState(t, 32)
beaconState.SetSlot(1)
if err := service.MigrateToCold(ctx, beaconState, [32]byte{}); err != nil {
if err := service.MigrateToCold(ctx, 1, [32]byte{}); err != nil {
t.Fatal(err)
}
@@ -54,11 +50,8 @@ func TestMigrateToCold_NotEpochStart(t *testing.T) {
db := testDB.SetupDB(t)
defer testDB.TeardownDB(t, db)
service := New(db)
beaconState, _ := testutil.DeterministicGenesisState(t, 32)
beaconState.SetSlot(params.BeaconConfig().SlotsPerEpoch + 1)
if err := service.MigrateToCold(ctx, beaconState, [32]byte{}); err != nil {
service := New(db, cache.NewStateSummaryCache())
if err := service.MigrateToCold(ctx, params.BeaconConfig().SlotsPerEpoch+1, [32]byte{}); err != nil {
t.Fatal(err)
}
@@ -71,7 +64,7 @@ func TestMigrateToCold_MigrationCompletes(t *testing.T) {
db := testDB.SetupDB(t)
defer testDB.TeardownDB(t, db)
service := New(db)
service := New(db, cache.NewStateSummaryCache())
service.slotsPerArchivedPoint = 2
beaconState, _ := testutil.DeterministicGenesisState(t, 32)
@@ -106,7 +99,7 @@ func TestMigrateToCold_MigrationCompletes(t *testing.T) {
t.Fatal(err)
}
if err := service.MigrateToCold(ctx, beaconState, [32]byte{}); err != nil {
if err := service.MigrateToCold(ctx, beaconState.Slot(), [32]byte{}); err != nil {
t.Fatal(err)
}

View File

@@ -8,6 +8,7 @@ import (
"github.com/gogo/protobuf/proto"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/go-ssz"
"github.com/prysmaticlabs/prysm/beacon-chain/cache"
"github.com/prysmaticlabs/prysm/beacon-chain/core/blocks"
"github.com/prysmaticlabs/prysm/beacon-chain/db"
testDB "github.com/prysmaticlabs/prysm/beacon-chain/db/testing"
@@ -23,7 +24,7 @@ func TestComputeStateUpToSlot_GenesisState(t *testing.T) {
db := testDB.SetupDB(t)
defer testDB.TeardownDB(t, db)
service := New(db)
service := New(db, cache.NewStateSummaryCache())
gBlk := &ethpb.SignedBeaconBlock{Block: &ethpb.BeaconBlock{}}
gRoot, err := ssz.HashTreeRoot(gBlk.Block)
@@ -56,7 +57,7 @@ func TestComputeStateUpToSlot_CanProcessUpTo(t *testing.T) {
db := testDB.SetupDB(t)
defer testDB.TeardownDB(t, db)
service := New(db)
service := New(db, cache.NewStateSummaryCache())
gBlk := &ethpb.SignedBeaconBlock{Block: &ethpb.BeaconBlock{}}
gRoot, err := ssz.HashTreeRoot(gBlk.Block)
@@ -109,7 +110,7 @@ func TestReplayBlocks_AllSkipSlots(t *testing.T) {
beaconState.SetCurrentJustifiedCheckpoint(cp)
beaconState.SetCurrentEpochAttestations([]*pb.PendingAttestation{})
service := New(db)
service := New(db, cache.NewStateSummaryCache())
targetSlot := params.BeaconConfig().SlotsPerEpoch - 1
newState, err := service.ReplayBlocks(context.Background(), beaconState, []*ethpb.SignedBeaconBlock{}, targetSlot)
if err != nil {
@@ -145,7 +146,7 @@ func TestReplayBlocks_SameSlot(t *testing.T) {
beaconState.SetCurrentJustifiedCheckpoint(cp)
beaconState.SetCurrentEpochAttestations([]*pb.PendingAttestation{})
service := New(db)
service := New(db, cache.NewStateSummaryCache())
targetSlot := beaconState.Slot()
newState, err := service.ReplayBlocks(context.Background(), beaconState, []*ethpb.SignedBeaconBlock{}, targetSlot)
if err != nil {

View File

@@ -23,6 +23,7 @@ type State struct {
epochBoundaryLock sync.RWMutex
hotStateCache *cache.HotStateCache
splitInfo *splitSlotAndRoot
stateSummaryCache *cache.StateSummaryCache
}
// This tracks the split point. The point where slot and the block root of
@@ -33,13 +34,14 @@ type splitSlotAndRoot struct {
}
// New returns a new state management object.
func New(db db.NoHeadAccessDatabase) *State {
func New(db db.NoHeadAccessDatabase, stateSummaryCache *cache.StateSummaryCache) *State {
return &State{
beaconDB: db,
epochBoundarySlotToRoot: make(map[uint64][32]byte),
hotStateCache: cache.NewHotStateCache(),
splitInfo: &splitSlotAndRoot{slot: 0, root: params.BeaconConfig().ZeroHash},
slotsPerArchivedPoint: archivedInterval,
stateSummaryCache: stateSummaryCache,
}
}

View File

@@ -5,6 +5,7 @@ import (
"testing"
"github.com/gogo/protobuf/proto"
"github.com/prysmaticlabs/prysm/beacon-chain/cache"
testDB "github.com/prysmaticlabs/prysm/beacon-chain/db/testing"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/testutil"
@@ -15,7 +16,7 @@ func TestResume(t *testing.T) {
db := testDB.SetupDB(t)
defer testDB.TeardownDB(t, db)
service := New(db)
service := New(db, cache.NewStateSummaryCache())
root := [32]byte{'A'}
beaconState, _ := testutil.DeterministicGenesisState(t, 32)
beaconState.SetSlot(params.BeaconConfig().SlotsPerEpoch)

View File

@@ -5,6 +5,7 @@ import (
"testing"
//"github.com/gogo/protobuf/proto"
"github.com/prysmaticlabs/prysm/beacon-chain/cache"
testDB "github.com/prysmaticlabs/prysm/beacon-chain/db/testing"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/testutil"
@@ -17,7 +18,7 @@ func TestSaveState_ColdStateCanBeSaved(t *testing.T) {
db := testDB.SetupDB(t)
defer testDB.TeardownDB(t, db)
service := New(db)
service := New(db, cache.NewStateSummaryCache())
service.slotsPerArchivedPoint = 1
beaconState, _ := testutil.DeterministicGenesisState(t, 32)
@@ -48,7 +49,7 @@ func TestSaveState_HotStateCanBeSaved(t *testing.T) {
db := testDB.SetupDB(t)
defer testDB.TeardownDB(t, db)
service := New(db)
service := New(db, cache.NewStateSummaryCache())
service.slotsPerArchivedPoint = 1
beaconState, _ := testutil.DeterministicGenesisState(t, 32)
// This goes to hot section, verify it can save on epoch boundary.
@@ -63,7 +64,7 @@ func TestSaveState_HotStateCanBeSaved(t *testing.T) {
if !service.beaconDB.HasState(ctx, r) {
t.Error("Should have saved the state")
}
if !service.beaconDB.HasStateSummary(ctx, r) {
if !service.stateSummaryCache.Has(r) {
t.Error("Should have saved the state summary")
}
testutil.AssertLogsContain(t, hook, "Saved full state on epoch boundary")
@@ -75,7 +76,7 @@ func TestSaveState_HotStateCached(t *testing.T) {
db := testDB.SetupDB(t)
defer testDB.TeardownDB(t, db)
service := New(db)
service := New(db, cache.NewStateSummaryCache())
service.slotsPerArchivedPoint = 1
beaconState, _ := testutil.DeterministicGenesisState(t, 32)
beaconState.SetSlot(params.BeaconConfig().SlotsPerEpoch)

View File

@@ -108,6 +108,7 @@ go_test(
shard_count = 4,
deps = [
"//beacon-chain/blockchain/testing:go_default_library",
"//beacon-chain/cache:go_default_library",
"//beacon-chain/core/feed:go_default_library",
"//beacon-chain/core/feed/state:go_default_library",
"//beacon-chain/core/helpers:go_default_library",

View File

@@ -62,7 +62,7 @@ func (s *Service) processPendingAtts(ctx context.Context) error {
attestations := s.blkRootToPendingAtts[bRoot]
s.pendingAttsLock.RUnlock()
// Has the pending attestation's missing block arrived and the node processed block yet?
hasStateSummary := featureconfig.Get().NewStateMgmt && s.db.HasStateSummary(ctx, bRoot)
hasStateSummary := featureconfig.Get().NewStateMgmt && s.db.HasStateSummary(ctx, bRoot) || s.stateSummaryCache.Has(bRoot)
if s.db.HasBlock(ctx, bRoot) && (s.db.HasState(ctx, bRoot) || hasStateSummary) {
numberOfBlocksRecoveredFromAtt.Inc()
for _, att := range attestations {

View File

@@ -11,6 +11,7 @@ import (
"github.com/prysmaticlabs/go-bitfield"
"github.com/prysmaticlabs/go-ssz"
mock "github.com/prysmaticlabs/prysm/beacon-chain/blockchain/testing"
"github.com/prysmaticlabs/prysm/beacon-chain/cache"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
dbtest "github.com/prysmaticlabs/prysm/beacon-chain/db/testing"
"github.com/prysmaticlabs/prysm/beacon-chain/operations/attestations"
@@ -46,6 +47,7 @@ func TestProcessPendingAtts_NoBlockRequestBlock(t *testing.T) {
db: db,
chain: &mock.ChainService{Genesis: roughtime.Now()},
blkRootToPendingAtts: make(map[[32]byte][]*ethpb.AggregateAttestationAndProof),
stateSummaryCache: cache.NewStateSummaryCache(),
}
a := &ethpb.AggregateAttestationAndProof{Aggregate: &ethpb.Attestation{Data: &ethpb.AttestationData{Target: &ethpb.Checkpoint{}}}}
@@ -69,6 +71,7 @@ func TestProcessPendingAtts_HasBlockSaveUnAggregatedAtt(t *testing.T) {
chain: &mock.ChainService{Genesis: roughtime.Now()},
blkRootToPendingAtts: make(map[[32]byte][]*ethpb.AggregateAttestationAndProof),
attPool: attestations.NewPool(),
stateSummaryCache: cache.NewStateSummaryCache(),
}
a := &ethpb.AggregateAttestationAndProof{
@@ -175,6 +178,7 @@ func TestProcessPendingAtts_HasBlockSaveAggregatedAtt(t *testing.T) {
}},
blkRootToPendingAtts: make(map[[32]byte][]*ethpb.AggregateAttestationAndProof),
attPool: attestations.NewPool(),
stateSummaryCache: cache.NewStateSummaryCache(),
}
sb = &ethpb.SignedBeaconBlock{Block: &ethpb.BeaconBlock{}}

View File

@@ -9,6 +9,7 @@ import (
"github.com/pkg/errors"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/beacon-chain/blockchain"
"github.com/prysmaticlabs/prysm/beacon-chain/cache"
blockfeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/block"
"github.com/prysmaticlabs/prysm/beacon-chain/core/feed/operation"
statefeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/state"
@@ -43,6 +44,7 @@ type Config struct {
StateNotifier statefeed.Notifier
BlockNotifier blockfeed.Notifier
AttestationNotifier operation.Notifier
StateSummaryCache *cache.StateSummaryCache
}
// This defines the interface for interacting with block chain service
@@ -74,6 +76,7 @@ func NewRegularSync(cfg *Config) *Service {
blkRootToPendingAtts: make(map[[32]byte][]*ethpb.AggregateAttestationAndProof),
stateNotifier: cfg.StateNotifier,
blockNotifier: cfg.BlockNotifier,
stateSummaryCache: cfg.StateSummaryCache,
blocksRateLimiter: leakybucket.NewCollector(allowedBlocksPerSecond, allowedBlocksBurst, false /* deleteEmptyBuckets */),
}
@@ -106,6 +109,7 @@ type Service struct {
blockNotifier blockfeed.Notifier
blocksRateLimiter *leakybucket.Collector
attestationNotifier operation.Notifier
stateSummaryCache *cache.StateSummaryCache
}
// Start the regular sync service.

View File

@@ -9,6 +9,7 @@ import (
"github.com/prysmaticlabs/go-bitfield"
"github.com/prysmaticlabs/go-ssz"
mock "github.com/prysmaticlabs/prysm/beacon-chain/blockchain/testing"
"github.com/prysmaticlabs/prysm/beacon-chain/cache"
"github.com/prysmaticlabs/prysm/beacon-chain/core/feed"
statefeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/state"
dbtest "github.com/prysmaticlabs/prysm/beacon-chain/db/testing"
@@ -58,6 +59,7 @@ func TestService_committeeIndexBeaconAttestationSubscriber_ValidMessage(t *testi
stateNotifier: (&mock.ChainService{}).StateNotifier(),
attestationNotifier: (&mock.ChainService{}).OperationNotifier(),
initialSync: &mockSync.Sync{IsSyncing: false},
stateSummaryCache: cache.NewStateSummaryCache(),
}
r.registerSubscribers()
r.stateNotifier.StateFeed().Send(&feed.Event{

View File

@@ -128,9 +128,10 @@ func (r *Service) validateAggregatedAtt(ctx context.Context, a *ethpb.AggregateA
func (r *Service) validateBlockInAttestation(ctx context.Context, a *ethpb.AggregateAttestationAndProof) bool {
// Verify the block being voted and the processed state is in DB. The block should have passed validation if it's in the DB.
hasStateSummary := featureconfig.Get().NewStateMgmt && r.db.HasStateSummary(ctx, bytesutil.ToBytes32(a.Aggregate.Data.BeaconBlockRoot))
hasState := r.db.HasState(ctx, bytesutil.ToBytes32(a.Aggregate.Data.BeaconBlockRoot)) || hasStateSummary
hasBlock := r.db.HasBlock(ctx, bytesutil.ToBytes32(a.Aggregate.Data.BeaconBlockRoot))
blockRoot := bytesutil.ToBytes32(a.Aggregate.Data.BeaconBlockRoot)
hasStateSummary := featureconfig.Get().NewStateMgmt && r.db.HasStateSummary(ctx, blockRoot) || r.stateSummaryCache.Has(blockRoot)
hasState := r.db.HasState(ctx, blockRoot) || hasStateSummary
hasBlock := r.db.HasBlock(ctx, blockRoot)
if !(hasState && hasBlock) {
// A node doesn't have the block, it'll request from peer while saving the pending attestation to a queue.
r.savePendingAtt(a)

View File

@@ -14,6 +14,7 @@ import (
"github.com/prysmaticlabs/go-bitfield"
"github.com/prysmaticlabs/go-ssz"
mock "github.com/prysmaticlabs/prysm/beacon-chain/blockchain/testing"
"github.com/prysmaticlabs/prysm/beacon-chain/cache"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
dbtest "github.com/prysmaticlabs/prysm/beacon-chain/db/testing"
"github.com/prysmaticlabs/prysm/beacon-chain/operations/attestations"
@@ -139,6 +140,7 @@ func TestValidateAggregateAndProof_NoBlock(t *testing.T) {
initialSync: &mockSync.Sync{IsSyncing: false},
attPool: attestations.NewPool(),
blkRootToPendingAtts: make(map[[32]byte][]*ethpb.AggregateAttestationAndProof),
stateSummaryCache: cache.NewStateSummaryCache(),
}
buf := new(bytes.Buffer)
@@ -199,7 +201,8 @@ func TestValidateAggregateAndProof_NotWithinSlotRange(t *testing.T) {
initialSync: &mockSync.Sync{IsSyncing: false},
chain: &mock.ChainService{Genesis: time.Now(),
State: beaconState},
attPool: attestations.NewPool(),
attPool: attestations.NewPool(),
stateSummaryCache: cache.NewStateSummaryCache(),
}
buf := new(bytes.Buffer)
@@ -377,7 +380,8 @@ func TestValidateAggregateAndProof_CanValidate(t *testing.T) {
FinalizedCheckPoint: &ethpb.Checkpoint{
Epoch: 0,
}},
attPool: attestations.NewPool(),
attPool: attestations.NewPool(),
stateSummaryCache: cache.NewStateSummaryCache(),
}
buf := new(bytes.Buffer)

View File

@@ -74,9 +74,10 @@ func (s *Service) validateCommitteeIndexBeaconAttestation(ctx context.Context, p
}
// Verify the block being voted and the processed state is in DB and. The block should have passed validation if it's in the DB.
hasStateSummary := featureconfig.Get().NewStateMgmt && s.db.HasStateSummary(ctx, bytesutil.ToBytes32(att.Data.BeaconBlockRoot))
hasState := s.db.HasState(ctx, bytesutil.ToBytes32(att.Data.BeaconBlockRoot)) || hasStateSummary
hasBlock := s.db.HasBlock(ctx, bytesutil.ToBytes32(att.Data.BeaconBlockRoot))
blockRoot := bytesutil.ToBytes32(att.Data.BeaconBlockRoot)
hasStateSummary := featureconfig.Get().NewStateMgmt && s.db.HasStateSummary(ctx, blockRoot) || s.stateSummaryCache.Has(blockRoot)
hasState := s.db.HasState(ctx, blockRoot) || hasStateSummary
hasBlock := s.db.HasBlock(ctx, blockRoot)
if !(hasState && hasBlock) {
// A node doesn't have the block, it'll request from peer while saving the pending attestation to a queue.
s.savePendingAtt(&eth.AggregateAttestationAndProof{Aggregate: att})

View File

@@ -12,6 +12,7 @@ import (
"github.com/prysmaticlabs/go-bitfield"
"github.com/prysmaticlabs/go-ssz"
mockChain "github.com/prysmaticlabs/prysm/beacon-chain/blockchain/testing"
"github.com/prysmaticlabs/prysm/beacon-chain/cache"
dbtest "github.com/prysmaticlabs/prysm/beacon-chain/db/testing"
p2ptest "github.com/prysmaticlabs/prysm/beacon-chain/p2p/testing"
beaconstate "github.com/prysmaticlabs/prysm/beacon-chain/state"
@@ -36,6 +37,7 @@ func TestService_validateCommitteeIndexBeaconAttestation(t *testing.T) {
db: db,
chain: chain,
blkRootToPendingAtts: make(map[[32]byte][]*ethpb.AggregateAttestationAndProof),
stateSummaryCache: cache.NewStateSummaryCache(),
}
blk := &ethpb.SignedBeaconBlock{

View File

@@ -6,6 +6,7 @@ go_library(
importpath = "github.com/prysmaticlabs/prysm/tools/blocktree",
visibility = ["//visibility:private"],
deps = [
"//beacon-chain/cache:go_default_library",
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/db:go_default_library",
"//beacon-chain/db/filters:go_default_library",

View File

@@ -17,6 +17,7 @@ import (
"github.com/emicklei/dot"
"github.com/prysmaticlabs/go-ssz"
"github.com/prysmaticlabs/prysm/beacon-chain/cache"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/db"
"github.com/prysmaticlabs/prysm/beacon-chain/db/filters"
@@ -43,7 +44,7 @@ func main() {
params.UseDemoBeaconConfig()
flag.Parse()
db, err := db.NewDB(*datadir)
db, err := db.NewDB(*datadir, cache.NewStateSummaryCache())
if err != nil {
panic(err)
}

View File

@@ -6,6 +6,7 @@ go_library(
importpath = "github.com/prysmaticlabs/prysm/tools/extractor",
visibility = ["//visibility:public"],
deps = [
"//beacon-chain/cache:go_default_library",
"//beacon-chain/core/state/interop:go_default_library",
"//beacon-chain/db:go_default_library",
"//beacon-chain/db/filters:go_default_library",

View File

@@ -5,6 +5,7 @@ import (
"flag"
"fmt"
"github.com/prysmaticlabs/prysm/beacon-chain/cache"
"github.com/prysmaticlabs/prysm/beacon-chain/core/state/interop"
"github.com/prysmaticlabs/prysm/beacon-chain/db"
"github.com/prysmaticlabs/prysm/beacon-chain/db/filters"
@@ -27,7 +28,7 @@ func init() {
func main() {
flag.Parse()
fmt.Println("Starting process...")
d, err := db.NewDB(*datadir)
d, err := db.NewDB(*datadir, cache.NewStateSummaryCache())
if err != nil {
panic(err)
}

View File

@@ -6,6 +6,7 @@ go_library(
importpath = "github.com/prysmaticlabs/prysm/tools/interop/export-genesis",
visibility = ["//visibility:private"],
deps = [
"//beacon-chain/cache:go_default_library",
"//beacon-chain/db:go_default_library",
"@com_github_prysmaticlabs_go_ssz//:go_default_library",
],

View File

@@ -7,6 +7,7 @@ import (
"os"
"github.com/prysmaticlabs/go-ssz"
"github.com/prysmaticlabs/prysm/beacon-chain/cache"
"github.com/prysmaticlabs/prysm/beacon-chain/db"
)
@@ -21,7 +22,7 @@ func main() {
fmt.Printf("Reading db at %s and writing ssz output to %s.\n", os.Args[1], os.Args[2])
d, err := db.NewDB(os.Args[1])
d, err := db.NewDB(os.Args[1], cache.NewStateSummaryCache())
if err != nil {
panic(err)
}