mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-09 15:37:56 -05:00
Batch save blocks for initial sync. 80% faster BPS (#5215)
* Starting a quick PoC * Rate limit to one epoch worth of blocks in memory * Proof of concept working * Quick comment out * Save previous finalized checkpoint * Merge branch 'master' of github.com:prysmaticlabs/prysm into batch-save * Test * Merge branch 'prev-finalized-getter' into batch-save * Minor fixes * Use a map * More run time fixes * Remove panic * Feature flag * Removed unused methods * Fixed tests * E2e test * Merge branch 'master' into batch-save * comment * Merge branch 'master' into batch-save * Compatible with current initial sync * Merge branch 'batch-save' of github.com:prysmaticlabs/prysm into batch-save * Merge refs/heads/master into batch-save * Merge refs/heads/master into batch-save * Merge refs/heads/master into batch-save * Merge branch 'master' of github.com:prysmaticlabs/prysm into batch-save * Feedback * Merge branch 'batch-save' of github.com:prysmaticlabs/prysm into batch-save * Merge refs/heads/master into batch-save
This commit is contained in:
@@ -5,9 +5,11 @@ import (
|
||||
"sort"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
|
||||
stateTrie "github.com/prysmaticlabs/prysm/beacon-chain/state"
|
||||
"github.com/prysmaticlabs/prysm/shared/bytesutil"
|
||||
"github.com/prysmaticlabs/prysm/shared/featureconfig"
|
||||
"github.com/prysmaticlabs/prysm/shared/params"
|
||||
)
|
||||
|
||||
@@ -170,10 +172,21 @@ func (s *Service) generateState(ctx context.Context, startRoot [32]byte, endRoot
|
||||
if preState == nil {
|
||||
return nil, errors.New("finalized state does not exist in db")
|
||||
}
|
||||
endBlock, err := s.beaconDB.Block(ctx, endRoot)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
var endBlock *ethpb.SignedBeaconBlock
|
||||
if featureconfig.Get().InitSyncBatchSaveBlocks && s.hasInitSyncBlock(endRoot) {
|
||||
if err := s.beaconDB.SaveBlocks(ctx, s.getInitSyncBlocks()); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
s.clearInitSyncBlocks()
|
||||
endBlock = s.getInitSyncBlock(endRoot)
|
||||
} else {
|
||||
endBlock, err = s.beaconDB.Block(ctx, endRoot)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
if endBlock == nil {
|
||||
return nil, errors.New("provided block root does not have block saved in the db")
|
||||
}
|
||||
@@ -189,3 +202,48 @@ func (s *Service) generateState(ctx context.Context, startRoot [32]byte, endRoot
|
||||
}
|
||||
return postState, nil
|
||||
}
|
||||
|
||||
// This saves a beacon block to the initial sync blocks cache.
|
||||
func (s *Service) saveInitSyncBlock(r [32]byte, b *ethpb.SignedBeaconBlock) {
|
||||
s.initSyncBlocksLock.Lock()
|
||||
defer s.initSyncBlocksLock.Unlock()
|
||||
s.initSyncBlocks[r] = b
|
||||
}
|
||||
|
||||
// This checks if a beacon block exists in the initial sync blocks cache using the root
|
||||
// of the block.
|
||||
func (s *Service) hasInitSyncBlock(r [32]byte) bool {
|
||||
s.initSyncBlocksLock.RLock()
|
||||
defer s.initSyncBlocksLock.RUnlock()
|
||||
_, ok := s.initSyncBlocks[r]
|
||||
return ok
|
||||
}
|
||||
|
||||
// This retrieves a beacon block from the initial sync blocks cache using the root of
|
||||
// the block.
|
||||
func (s *Service) getInitSyncBlock(r [32]byte) *ethpb.SignedBeaconBlock {
|
||||
s.initSyncBlocksLock.RLock()
|
||||
defer s.initSyncBlocksLock.RUnlock()
|
||||
b := s.initSyncBlocks[r]
|
||||
return b
|
||||
}
|
||||
|
||||
// This retrieves all the beacon blocks from the initial sync blocks cache, the returned
|
||||
// blocks are unordered.
|
||||
func (s *Service) getInitSyncBlocks() []*ethpb.SignedBeaconBlock {
|
||||
s.initSyncBlocksLock.RLock()
|
||||
defer s.initSyncBlocksLock.RUnlock()
|
||||
|
||||
blks := make([]*ethpb.SignedBeaconBlock, 0, len(s.initSyncBlocks))
|
||||
for _, b := range s.initSyncBlocks {
|
||||
blks = append(blks, b)
|
||||
}
|
||||
return blks
|
||||
}
|
||||
|
||||
// This clears out the initial sync blocks cache.
|
||||
func (s *Service) clearInitSyncBlocks() {
|
||||
s.initSyncBlocksLock.Lock()
|
||||
defer s.initSyncBlocksLock.Unlock()
|
||||
s.initSyncBlocks = make(map[[32]byte]*ethpb.SignedBeaconBlock)
|
||||
}
|
||||
|
||||
@@ -15,10 +15,14 @@ import (
|
||||
"github.com/prysmaticlabs/prysm/shared/attestationutil"
|
||||
"github.com/prysmaticlabs/prysm/shared/bytesutil"
|
||||
"github.com/prysmaticlabs/prysm/shared/featureconfig"
|
||||
"github.com/prysmaticlabs/prysm/shared/params"
|
||||
"github.com/sirupsen/logrus"
|
||||
"go.opencensus.io/trace"
|
||||
)
|
||||
|
||||
// This defines size of the upper bound for initial sync block cache.
|
||||
var initialSyncBlockCacheSize = 2 * params.BeaconConfig().SlotsPerEpoch
|
||||
|
||||
// onBlock is called when a gossip block is received. It runs regular state transition on the block.
|
||||
//
|
||||
// Spec pseudocode definition:
|
||||
@@ -210,13 +214,17 @@ func (s *Service) onBlockInitialSyncStateTransition(ctx context.Context, signed
|
||||
return errors.Wrap(err, "could not execute state transition")
|
||||
}
|
||||
|
||||
if err := s.beaconDB.SaveBlock(ctx, signed); err != nil {
|
||||
return errors.Wrapf(err, "could not save block from slot %d", b.Slot)
|
||||
}
|
||||
root, err := stateutil.BlockRoot(b)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "could not get signing root of block %d", b.Slot)
|
||||
}
|
||||
if featureconfig.Get().InitSyncBatchSaveBlocks {
|
||||
s.saveInitSyncBlock(root, signed)
|
||||
} else {
|
||||
if err := s.beaconDB.SaveBlock(ctx, signed); err != nil {
|
||||
return errors.Wrapf(err, "could not save block from slot %d", b.Slot)
|
||||
}
|
||||
}
|
||||
|
||||
if err := s.insertBlockToForkChoiceStore(ctx, b, root, postState); err != nil {
|
||||
return errors.Wrapf(err, "could not insert block %d to fork choice store", b.Slot)
|
||||
@@ -247,6 +255,14 @@ func (s *Service) onBlockInitialSyncStateTransition(ctx context.Context, signed
|
||||
}
|
||||
}
|
||||
|
||||
// Rate limit how many blocks (2 epochs worth of blocks) a node keeps in the memory.
|
||||
if len(s.getInitSyncBlocks()) > int(initialSyncBlockCacheSize) {
|
||||
if err := s.beaconDB.SaveBlocks(ctx, s.getInitSyncBlocks()); err != nil {
|
||||
return err
|
||||
}
|
||||
s.clearInitSyncBlocks()
|
||||
}
|
||||
|
||||
// Update finalized check point. Prune the block cache and helper caches on every new finalized epoch.
|
||||
if postState.FinalizedCheckpointEpoch() > s.finalizedCheckpt.Epoch {
|
||||
if !featureconfig.Get().NewStateMgmt {
|
||||
@@ -264,6 +280,13 @@ func (s *Service) onBlockInitialSyncStateTransition(ctx context.Context, signed
|
||||
}
|
||||
}
|
||||
|
||||
if featureconfig.Get().InitSyncBatchSaveBlocks {
|
||||
if err := s.beaconDB.SaveBlocks(ctx, s.getInitSyncBlocks()); err != nil {
|
||||
return err
|
||||
}
|
||||
s.clearInitSyncBlocks()
|
||||
}
|
||||
|
||||
if err := s.beaconDB.SaveFinalizedCheckpoint(ctx, postState.FinalizedCheckpoint()); err != nil {
|
||||
return errors.Wrap(err, "could not save finalized checkpoint")
|
||||
}
|
||||
|
||||
@@ -229,21 +229,36 @@ func (s *Service) shouldUpdateCurrentJustified(ctx context.Context, newJustified
|
||||
if helpers.SlotsSinceEpochStarts(s.CurrentSlot()) < params.BeaconConfig().SafeSlotsToUpdateJustified {
|
||||
return true, nil
|
||||
}
|
||||
newJustifiedBlockSigned, err := s.beaconDB.Block(ctx, bytesutil.ToBytes32(newJustifiedCheckpt.Root))
|
||||
if err != nil {
|
||||
return false, err
|
||||
var newJustifiedBlockSigned *ethpb.SignedBeaconBlock
|
||||
justifiedRoot := bytesutil.ToBytes32(newJustifiedCheckpt.Root)
|
||||
var err error
|
||||
if featureconfig.Get().InitSyncBatchSaveBlocks && s.hasInitSyncBlock(justifiedRoot) {
|
||||
newJustifiedBlockSigned = s.getInitSyncBlock(justifiedRoot)
|
||||
} else {
|
||||
newJustifiedBlockSigned, err = s.beaconDB.Block(ctx, justifiedRoot)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
}
|
||||
if newJustifiedBlockSigned == nil || newJustifiedBlockSigned.Block == nil {
|
||||
return false, errors.New("nil new justified block")
|
||||
}
|
||||
|
||||
newJustifiedBlock := newJustifiedBlockSigned.Block
|
||||
if newJustifiedBlock.Slot <= helpers.StartSlot(s.justifiedCheckpt.Epoch) {
|
||||
return false, nil
|
||||
}
|
||||
justifiedBlockSigned, err := s.beaconDB.Block(ctx, bytesutil.ToBytes32(s.justifiedCheckpt.Root))
|
||||
if err != nil {
|
||||
return false, err
|
||||
var justifiedBlockSigned *ethpb.SignedBeaconBlock
|
||||
cachedJustifiedRoot := bytesutil.ToBytes32(s.justifiedCheckpt.Root)
|
||||
if featureconfig.Get().InitSyncBatchSaveBlocks && s.hasInitSyncBlock(cachedJustifiedRoot) {
|
||||
justifiedBlockSigned = s.getInitSyncBlock(cachedJustifiedRoot)
|
||||
} else {
|
||||
justifiedBlockSigned, err = s.beaconDB.Block(ctx, cachedJustifiedRoot)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
}
|
||||
|
||||
if justifiedBlockSigned == nil || justifiedBlockSigned.Block == nil {
|
||||
return false, errors.New("nil justified block")
|
||||
}
|
||||
@@ -267,6 +282,7 @@ func (s *Service) updateJustified(ctx context.Context, state *stateTrie.BeaconSt
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if canUpdate {
|
||||
s.prevJustifiedCheckpt = s.justifiedCheckpt
|
||||
s.justifiedCheckpt = cpt
|
||||
@@ -278,6 +294,7 @@ func (s *Service) updateJustified(ctx context.Context, state *stateTrie.BeaconSt
|
||||
justifiedState := s.initSyncState[justifiedRoot]
|
||||
// If justified state is nil, resume back to normal syncing process and save
|
||||
// justified check point.
|
||||
var err error
|
||||
if justifiedState == nil {
|
||||
if s.beaconDB.HasState(ctx, justifiedRoot) {
|
||||
return s.beaconDB.SaveJustifiedCheckpoint(ctx, cpt)
|
||||
@@ -376,6 +393,11 @@ func (s *Service) ancestor(ctx context.Context, root []byte, slot uint64) ([]byt
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "could not get ancestor block")
|
||||
}
|
||||
|
||||
if featureconfig.Get().InitSyncBatchSaveBlocks && s.hasInitSyncBlock(bytesutil.ToBytes32(root)) {
|
||||
signed = s.getInitSyncBlock(bytesutil.ToBytes32(root))
|
||||
}
|
||||
|
||||
if signed == nil || signed.Block == nil {
|
||||
return nil, errors.New("nil block")
|
||||
}
|
||||
|
||||
@@ -26,6 +26,7 @@ type BlockReceiver interface {
|
||||
ReceiveBlockNoPubsub(ctx context.Context, block *ethpb.SignedBeaconBlock) error
|
||||
ReceiveBlockNoPubsubForkchoice(ctx context.Context, block *ethpb.SignedBeaconBlock) error
|
||||
ReceiveBlockNoVerify(ctx context.Context, block *ethpb.SignedBeaconBlock) error
|
||||
HasInitSyncBlock(root [32]byte) bool
|
||||
}
|
||||
|
||||
// ReceiveBlock is a function that defines the operations that are preformed on
|
||||
@@ -236,3 +237,8 @@ func (s *Service) ReceiveBlockNoVerify(ctx context.Context, block *ethpb.SignedB
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// HasInitSyncBlock returns true if the block of the input root exists in initial sync blocks cache.
|
||||
func (s *Service) HasInitSyncBlock(root [32]byte) bool {
|
||||
return s.hasInitSyncBlock(root)
|
||||
}
|
||||
|
||||
@@ -75,6 +75,8 @@ type Service struct {
|
||||
checkpointStateLock sync.Mutex
|
||||
stateGen *stategen.State
|
||||
opsService *attestations.Service
|
||||
initSyncBlocks map[[32]byte]*ethpb.SignedBeaconBlock
|
||||
initSyncBlocksLock sync.RWMutex
|
||||
}
|
||||
|
||||
// Config options for the service.
|
||||
@@ -117,6 +119,7 @@ func NewService(ctx context.Context, cfg *Config) (*Service, error) {
|
||||
checkpointState: cache.NewCheckpointStateCache(),
|
||||
opsService: cfg.OpsService,
|
||||
stateGen: cfg.StateGen,
|
||||
initSyncBlocks: make(map[[32]byte]*ethpb.SignedBeaconBlock),
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
||||
@@ -234,3 +234,8 @@ func (ms *ChainService) IsValidAttestation(ctx context.Context, att *ethpb.Attes
|
||||
|
||||
// ClearCachedStates does nothing.
|
||||
func (ms *ChainService) ClearCachedStates() {}
|
||||
|
||||
// HasInitSyncBlock mocks the same method in the chain service.
|
||||
func (ms *ChainService) HasInitSyncBlock(root [32]byte) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
@@ -11,7 +11,6 @@ import (
|
||||
|
||||
"github.com/pkg/errors"
|
||||
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
|
||||
"github.com/prysmaticlabs/go-ssz"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/db/filters"
|
||||
"github.com/prysmaticlabs/prysm/shared/bytesutil"
|
||||
@@ -227,16 +226,16 @@ func (k *Store) SaveBlocks(ctx context.Context, blocks []*ethpb.SignedBeaconBloc
|
||||
defer span.End()
|
||||
|
||||
return k.db.Update(func(tx *bolt.Tx) error {
|
||||
bkt := tx.Bucket(blocksBucket)
|
||||
for _, block := range blocks {
|
||||
if err := k.setBlockSlotBitField(ctx, tx, block.Block.Slot); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
blockRoot, err := ssz.HashTreeRoot(block.Block)
|
||||
blockRoot, err := stateutil.BlockRoot(block.Block)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
bkt := tx.Bucket(blocksBucket)
|
||||
|
||||
if existingBlock := bkt.Get(blockRoot[:]); existingBlock != nil {
|
||||
continue
|
||||
}
|
||||
@@ -249,6 +248,7 @@ func (k *Store) SaveBlocks(ctx context.Context, blocks []*ethpb.SignedBeaconBloc
|
||||
return errors.Wrap(err, "could not update DB indices")
|
||||
}
|
||||
k.blockCache.Set(string(blockRoot[:]), block, int64(len(enc)))
|
||||
|
||||
if err := bkt.Put(blockRoot[:], enc); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -109,6 +109,7 @@ func (k *Store) SaveFinalizedCheckpoint(ctx context.Context, checkpoint *ethpb.C
|
||||
if err := bucket.Put(finalizedCheckpointKey, enc); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return k.updateFinalizedBlockRoots(ctx, tx, checkpoint)
|
||||
})
|
||||
}
|
||||
|
||||
@@ -55,6 +55,7 @@ func (k *Store) updateFinalizedBlockRoots(ctx context.Context, tx *bolt.Tx, chec
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
blockRoots, err := k.BlockRoots(ctx, filters.NewFilter().
|
||||
SetStartEpoch(previousFinalizedCheckpoint.Epoch).
|
||||
SetEndEpoch(checkpoint.Epoch+1),
|
||||
|
||||
@@ -181,7 +181,17 @@ func (s *Service) roundRobinSync(genesis time.Time) error {
|
||||
}
|
||||
}
|
||||
}
|
||||
startBlock := s.chain.HeadSlot() + 1
|
||||
var startBlock uint64
|
||||
if featureconfig.Get().InitSyncBatchSaveBlocks {
|
||||
lastFinalizedEpoch := s.chain.FinalizedCheckpt().Epoch
|
||||
lastFinalizedState, err := s.db.HighestSlotStatesBelow(ctx, helpers.StartSlot(lastFinalizedEpoch))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
startBlock = lastFinalizedState[0].Slot() + 1
|
||||
} else {
|
||||
startBlock = s.chain.HeadSlot() + 1
|
||||
}
|
||||
skippedBlocks := blockBatchSize * uint64(lastEmptyRequests*len(peers))
|
||||
if startBlock+skippedBlocks > helpers.StartSlot(finalizedEpoch+1) {
|
||||
log.WithField("finalizedEpoch", finalizedEpoch).Debug("Requested block range is greater than the finalized epoch")
|
||||
@@ -209,10 +219,12 @@ func (s *Service) roundRobinSync(genesis time.Time) error {
|
||||
|
||||
for _, blk := range blocks {
|
||||
s.logSyncStatus(genesis, blk.Block, peers, counter)
|
||||
if !s.db.HasBlock(ctx, bytesutil.ToBytes32(blk.Block.ParentRoot)) {
|
||||
log.Debugf("Beacon node doesn't have a block in db with root %#x", blk.Block.ParentRoot)
|
||||
parentRoot := bytesutil.ToBytes32(blk.Block.ParentRoot)
|
||||
if !s.db.HasBlock(ctx, parentRoot) && !s.chain.HasInitSyncBlock(parentRoot) {
|
||||
log.WithField("parentRoot", parentRoot).Debug("Beacon node doesn't have a block in DB or cache")
|
||||
continue
|
||||
}
|
||||
|
||||
s.blockNotifier.BlockFeed().Send(&feed.Event{
|
||||
Type: blockfeed.ReceivedBlock,
|
||||
Data: &blockfeed.ReceivedBlockData{SignedBlock: blk},
|
||||
|
||||
@@ -198,7 +198,8 @@ func (s *Service) logSyncStatus(genesis time.Time, blk *eth.BeaconBlock, counter
|
||||
}
|
||||
|
||||
func (s *Service) processBlock(ctx context.Context, blk *eth.SignedBeaconBlock) error {
|
||||
if !s.db.HasBlock(ctx, bytesutil.ToBytes32(blk.Block.ParentRoot)) {
|
||||
parentRoot := bytesutil.ToBytes32(blk.Block.ParentRoot)
|
||||
if !s.db.HasBlock(ctx, parentRoot) && !s.chain.HasInitSyncBlock(parentRoot) {
|
||||
return fmt.Errorf("beacon node doesn't have a block in db with root %#x", blk.Block.ParentRoot)
|
||||
}
|
||||
s.blockNotifier.BlockFeed().Send(&feed.Event{
|
||||
|
||||
@@ -53,6 +53,7 @@ type Flags struct {
|
||||
EnableInitSyncQueue bool // EnableInitSyncQueue enables the new initial sync implementation.
|
||||
EnableFieldTrie bool // EnableFieldTrie enables the state from using field specific tries when computing the root.
|
||||
EnableBlockHTR bool // EnableBlockHTR enables custom hashing of our beacon blocks.
|
||||
InitSyncBatchSaveBlocks bool // InitSyncBatchSaveBlocks enables batch save blocks mode during initial syncing.
|
||||
// DisableForkChoice disables using LMD-GHOST fork choice to update
|
||||
// the head of the chain based on attestations and instead accepts any valid received block
|
||||
// as the chain head. UNSAFE, use with caution.
|
||||
@@ -190,6 +191,10 @@ func ConfigureBeaconChain(ctx *cli.Context) {
|
||||
log.Warn("Enabling custom block hashing")
|
||||
cfg.EnableBlockHTR = true
|
||||
}
|
||||
if ctx.Bool(initSyncBatchSaveBlocks.Name) {
|
||||
log.Warn("Enabling init sync batch save blocks mode")
|
||||
cfg.InitSyncBatchSaveBlocks = true
|
||||
}
|
||||
Init(cfg)
|
||||
}
|
||||
|
||||
|
||||
@@ -141,6 +141,11 @@ var (
|
||||
Name: "enable-custom-block-htr",
|
||||
Usage: "Enables the usage of a custom hashing method for our block",
|
||||
}
|
||||
initSyncBatchSaveBlocks = &cli.BoolFlag{
|
||||
Name: "init-sync-batch-save-blocks",
|
||||
Usage: "Instead of saving one block per slot to the DB during initial syncing, this enables batch saving" +
|
||||
" of epochs worth of blocks to the DB",
|
||||
}
|
||||
)
|
||||
|
||||
// Deprecated flags list.
|
||||
@@ -323,6 +328,7 @@ var BeaconChainFlags = append(deprecatedFlags, []cli.Flag{
|
||||
enableInitSyncQueue,
|
||||
enableFieldTrie,
|
||||
enableCustomBlockHTR,
|
||||
initSyncBatchSaveBlocks,
|
||||
}...)
|
||||
|
||||
// E2EBeaconChainFlags contains a list of the beacon chain feature flags to be tested in E2E.
|
||||
@@ -336,6 +342,7 @@ var E2EBeaconChainFlags = []string{
|
||||
"--check-head-state",
|
||||
"--enable-initial-sync-queue",
|
||||
"--enable-state-field-trie",
|
||||
"--init-sync-batch-save-blocks",
|
||||
// TODO(5123): This flag currently fails E2E. Commenting until it's resolved.
|
||||
//"--enable-dynamic-committee-subnets",
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user