Batch Verify Blocks (#6469)

* add everything so far
* checkpoint progress
* Merge branch 'master' of https://github.com/prysmaticlabs/geth-sharding into fastBLS
* fix
* checkpoint
* Merge branch 'master' of https://github.com/prysmaticlabs/geth-sharding into fastBLS
* checkpoint again
* checkpoint again
* Merge branch 'master' of https://github.com/prysmaticlabs/geth-sharding into fastBLS
* commenting
* Merge branch 'master' of https://github.com/prysmaticlabs/geth-sharding into fastBLS
* bls cleanup
* revert this back
* Merge branch 'master' of https://github.com/prysmaticlabs/geth-sharding into fastBLS
* revert core changes
* Merge branch 'master' of https://github.com/prysmaticlabs/geth-sharding into fastBLS
* add flag
* add test
* add one more test
* clean up
* comment
* lint
* terence's review
* Merge refs/heads/master into fastBLS
* Merge refs/heads/master into fastBLS
* Merge branch 'master' of https://github.com/prysmaticlabs/geth-sharding into fastBLS
* Merge refs/heads/master into fastBLS
* remove additional method
* Merge branch 'fastBLS' of https://github.com/prysmaticlabs/geth-sharding into fastBLS
* fix
* Merge refs/heads/master into fastBLS
* copy
* Merge branch 'fastBLS' of https://github.com/prysmaticlabs/geth-sharding into fastBLS
This commit is contained in:
Nishant Das
2020-07-07 12:16:12 +08:00
committed by GitHub
parent 8ddfde41e3
commit 64fa474434
13 changed files with 532 additions and 26 deletions

View File

@@ -43,6 +43,7 @@ go_library(
"//beacon-chain/state/stateutil:go_default_library",
"//proto/beacon/p2p/v1:go_default_library",
"//shared/attestationutil:go_default_library",
"//shared/bls:go_default_library",
"//shared/bytesutil:go_default_library",
"//shared/featureconfig:go_default_library",
"//shared/params:go_default_library",

View File

@@ -11,6 +11,7 @@ import (
"github.com/prysmaticlabs/prysm/beacon-chain/core/state"
stateTrie "github.com/prysmaticlabs/prysm/beacon-chain/state"
"github.com/prysmaticlabs/prysm/shared/attestationutil"
"github.com/prysmaticlabs/prysm/shared/bls"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/featureconfig"
"github.com/prysmaticlabs/prysm/shared/params"
@@ -84,7 +85,7 @@ func (s *Service) onBlock(ctx context.Context, signed *ethpb.SignedBeaconBlock,
return nil, errors.Wrapf(err, "could not save block from slot %d", b.Slot)
}
if err := s.insertBlockToForkChoiceStore(ctx, b, blockRoot, postState); err != nil {
if err := s.insertBlockAndAttestationsToForkChoiceStore(ctx, b, blockRoot, postState); err != nil {
return nil, errors.Wrapf(err, "could not insert block %d to fork choice store", b.Slot)
}
@@ -202,17 +203,74 @@ func (s *Service) onBlockInitialSyncStateTransition(ctx context.Context, signed
if err != nil {
return errors.Wrap(err, "could not execute state transition")
}
return s.handlePostStateInSync(ctx, signed, blockRoot, postState)
}
func (s *Service) onBlockBatch(ctx context.Context, blks []*ethpb.SignedBeaconBlock,
blockRoots [][32]byte) (*stateTrie.BeaconState, []*ethpb.Checkpoint, []*ethpb.Checkpoint, error) {
ctx, span := trace.StartSpan(ctx, "blockchain.onBlock")
defer span.End()
if len(blks) == 0 || len(blockRoots) == 0 {
return nil, nil, nil, errors.New("no blocks provided")
}
if blks[0] == nil || blks[0].Block == nil {
return nil, nil, nil, errors.New("nil block")
}
b := blks[0].Block
// Retrieve incoming block's pre state.
preState, err := s.verifyBlkPreState(ctx, b)
if err != nil {
return nil, nil, nil, err
}
// Perform a copy to preserve copy in cache.
preState = preState.Copy()
jCheckpoints := make([]*ethpb.Checkpoint, len(blks))
fCheckpoints := make([]*ethpb.Checkpoint, len(blks))
sigSet := &bls.SignatureSet{
Signatures: []bls.Signature{},
PublicKeys: []bls.PublicKey{},
Messages: [][32]byte{},
}
set := new(bls.SignatureSet)
for i, b := range blks {
set, preState, err = state.ExecuteStateTransitionNoVerifyAnySig(ctx, preState, b)
if err != nil {
return nil, nil, nil, err
}
jCheckpoints[i] = preState.CurrentJustifiedCheckpoint()
fCheckpoints[i] = preState.FinalizedCheckpoint()
sigSet.Join(set)
}
verify, err := bls.VerifyMultipleSignatures(sigSet.Signatures, sigSet.Messages, sigSet.PublicKeys)
if err != nil {
return nil, nil, nil, err
}
if !verify {
return nil, nil, nil, errors.New("batch block signature verification failed")
}
return preState, fCheckpoints, jCheckpoints, nil
}
// handles the state post transition and saves the appropriate checkpoints and forkchoice
// data.
func (s *Service) handlePostStateInSync(ctx context.Context, signed *ethpb.SignedBeaconBlock,
blockRoot [32]byte, postState *stateTrie.BeaconState) error {
b := signed.Block
s.saveInitSyncBlock(blockRoot, signed)
if err := s.insertBlockToForkChoiceStore(ctx, b, blockRoot, postState); err != nil {
return errors.Wrapf(err, "could not insert block %d to fork choice store", b.Slot)
}
if err := s.stateGen.SaveState(ctx, blockRoot, postState); err != nil {
return errors.Wrap(err, "could not save state")
}
if err := s.insertBlockAndAttestationsToForkChoiceStore(ctx, b, blockRoot, postState); err != nil {
return errors.Wrapf(err, "could not insert block %d to fork choice store", b.Slot)
}
// Rate limit how many blocks (2 epochs worth of blocks) a node keeps in the memory.
if uint64(len(s.getInitSyncBlocks())) > initialSyncBlockCacheSize {
if err := s.beaconDB.SaveBlocks(ctx, s.getInitSyncBlocks()); err != nil {
@@ -245,7 +303,57 @@ func (s *Service) onBlockInitialSyncStateTransition(ctx context.Context, signed
}
}
// Epoch boundary bookkeeping such as logging epoch summaries.
return s.handleEpochBoundary(postState)
}
// handles a block after the block's batch has been verified, where we can save blocks
// their state summaries and split them off to relative hot/cold storage.
func (s *Service) handleBlockAfterBatchVerify(ctx context.Context, signed *ethpb.SignedBeaconBlock,
blockRoot [32]byte, fCheckpoint *ethpb.Checkpoint, jCheckpoint *ethpb.Checkpoint) error {
b := signed.Block
s.saveInitSyncBlock(blockRoot, signed)
if err := s.insertBlockToForkChoiceStore(ctx, b, blockRoot, fCheckpoint, jCheckpoint); err != nil {
return err
}
s.stateGen.SaveStateSummary(ctx, signed, blockRoot)
// Rate limit how many blocks (2 epochs worth of blocks) a node keeps in the memory.
if uint64(len(s.getInitSyncBlocks())) > initialSyncBlockCacheSize {
if err := s.beaconDB.SaveBlocks(ctx, s.getInitSyncBlocks()); err != nil {
return err
}
s.clearInitSyncBlocks()
}
// Update finalized check point. Prune the block cache and helper caches on every new finalized epoch.
if fCheckpoint.Epoch > s.finalizedCheckpt.Epoch {
if err := s.beaconDB.SaveBlocks(ctx, s.getInitSyncBlocks()); err != nil {
return err
}
s.clearInitSyncBlocks()
if err := s.beaconDB.SaveFinalizedCheckpoint(ctx, fCheckpoint); err != nil {
return errors.Wrap(err, "could not save finalized checkpoint")
}
s.prevFinalizedCheckpt = s.finalizedCheckpt
s.finalizedCheckpt = fCheckpoint
fRoot := bytesutil.ToBytes32(fCheckpoint.Root)
fBlock, err := s.beaconDB.Block(ctx, fRoot)
if err != nil {
return errors.Wrap(err, "could not get finalized block to migrate")
}
if err := s.stateGen.MigrateToCold(ctx, fBlock.Block.Slot, fRoot); err != nil {
return errors.Wrap(err, "could not migrate to cold")
}
}
return nil
}
// Epoch boundary bookkeeping such as logging epoch summaries.
func (s *Service) handleEpochBoundary(postState *stateTrie.BeaconState) error {
if postState.Slot() >= s.nextEpochBoundarySlot {
reportEpochMetrics(postState)
s.nextEpochBoundarySlot = helpers.StartSlot(helpers.NextEpoch(postState))
@@ -258,25 +366,18 @@ func (s *Service) onBlockInitialSyncStateTransition(ctx context.Context, signed
return err
}
}
return nil
}
// This feeds in the block and block's attestations to fork choice store. It's allows fork choice store
// to gain information on the most current chain.
func (s *Service) insertBlockToForkChoiceStore(ctx context.Context, blk *ethpb.BeaconBlock, root [32]byte, state *stateTrie.BeaconState) error {
if err := s.fillInForkChoiceMissingBlocks(ctx, blk, state); err != nil {
func (s *Service) insertBlockAndAttestationsToForkChoiceStore(ctx context.Context, blk *ethpb.BeaconBlock, root [32]byte,
state *stateTrie.BeaconState) error {
fCheckpoint := state.FinalizedCheckpoint()
jCheckpoint := state.CurrentJustifiedCheckpoint()
if err := s.insertBlockToForkChoiceStore(ctx, blk, root, fCheckpoint, jCheckpoint); err != nil {
return err
}
// Feed in block to fork choice store.
if err := s.forkChoiceStore.ProcessBlock(ctx,
blk.Slot, root, bytesutil.ToBytes32(blk.ParentRoot), bytesutil.ToBytes32(blk.Body.Graffiti),
state.CurrentJustifiedCheckpoint().Epoch,
state.FinalizedCheckpointEpoch()); err != nil {
return errors.Wrap(err, "could not process block for proto array fork choice")
}
// Feed in block's attestations to fork choice store.
for _, a := range blk.Body.Attestations {
committee, err := helpers.BeaconCommitteeFromState(state, a.Data.Slot, a.Data.CommitteeIndex)
@@ -286,6 +387,20 @@ func (s *Service) insertBlockToForkChoiceStore(ctx context.Context, blk *ethpb.B
indices := attestationutil.AttestingIndices(a.AggregationBits, committee)
s.forkChoiceStore.ProcessAttestation(ctx, indices, bytesutil.ToBytes32(a.Data.BeaconBlockRoot), a.Data.Target.Epoch)
}
return nil
}
func (s *Service) insertBlockToForkChoiceStore(ctx context.Context, blk *ethpb.BeaconBlock,
root [32]byte, fCheckpoint *ethpb.Checkpoint, jCheckpoint *ethpb.Checkpoint) error {
if err := s.fillInForkChoiceMissingBlocks(ctx, blk, fCheckpoint, jCheckpoint); err != nil {
return err
}
// Feed in block to fork choice store.
if err := s.forkChoiceStore.ProcessBlock(ctx,
blk.Slot, root, bytesutil.ToBytes32(blk.ParentRoot), bytesutil.ToBytes32(blk.Body.Graffiti),
jCheckpoint.Epoch,
fCheckpoint.Epoch); err != nil {
return errors.Wrap(err, "could not process block for proto array fork choice")
}
return nil
}

View File

@@ -295,7 +295,8 @@ func (s *Service) finalizedImpliesNewJustified(ctx context.Context, state *state
// This retrieves missing blocks from DB (ie. the blocks that couldn't be received over sync) and inserts them to fork choice store.
// This is useful for block tree visualizer and additional vote accounting.
func (s *Service) fillInForkChoiceMissingBlocks(ctx context.Context, blk *ethpb.BeaconBlock, state *stateTrie.BeaconState) error {
func (s *Service) fillInForkChoiceMissingBlocks(ctx context.Context, blk *ethpb.BeaconBlock,
fCheckpoint *ethpb.Checkpoint, jCheckpoint *ethpb.Checkpoint) error {
pendingNodes := make([]*ethpb.BeaconBlock, 0)
parentRoot := bytesutil.ToBytes32(blk.ParentRoot)
@@ -326,8 +327,8 @@ func (s *Service) fillInForkChoiceMissingBlocks(ctx context.Context, blk *ethpb.
if err := s.forkChoiceStore.ProcessBlock(ctx,
b.Slot, r, bytesutil.ToBytes32(b.ParentRoot), bytesutil.ToBytes32(b.Body.Graffiti),
state.CurrentJustifiedCheckpoint().Epoch,
state.FinalizedCheckpointEpoch()); err != nil {
jCheckpoint.Epoch,
fCheckpoint.Epoch); err != nil {
return errors.Wrap(err, "could not process block for proto array fork choice")
}
}

View File

@@ -10,6 +10,7 @@ import (
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/go-ssz"
"github.com/prysmaticlabs/prysm/beacon-chain/core/blocks"
"github.com/prysmaticlabs/prysm/beacon-chain/core/state"
"github.com/prysmaticlabs/prysm/beacon-chain/db"
testDB "github.com/prysmaticlabs/prysm/beacon-chain/db/testing"
"github.com/prysmaticlabs/prysm/beacon-chain/forkchoice/protoarray"
@@ -129,6 +130,65 @@ func TestStore_OnBlock(t *testing.T) {
}
}
func TestStore_OnBlockBatch(t *testing.T) {
ctx := context.Background()
db, sc := testDB.SetupDB(t)
cfg := &Config{
BeaconDB: db,
StateGen: stategen.New(db, sc),
}
service, err := NewService(ctx, cfg)
if err != nil {
t.Fatal(err)
}
genesisStateRoot := [32]byte{}
genesis := blocks.NewGenesisBlock(genesisStateRoot[:])
if err := db.SaveBlock(ctx, genesis); err != nil {
t.Error(err)
}
st, keys := testutil.DeterministicGenesisState(t, 64)
bState := st.Copy()
blks := []*ethpb.SignedBeaconBlock{}
blkRoots := [][32]byte{}
var firstState *stateTrie.BeaconState
for i := 1; i < 10; i++ {
b, err := testutil.GenerateFullBlock(bState, keys, testutil.DefaultBlockGenConfig(), uint64(i))
if err != nil {
t.Fatal(err)
}
bState, err = state.ExecuteStateTransition(ctx, bState, b)
if err != nil {
t.Fatal(err)
}
if i == 1 {
firstState = bState.Copy()
}
root, err := stateutil.BlockRoot(b.Block)
if err != nil {
t.Fatal(err)
}
blks = append(blks, b)
blkRoots = append(blkRoots, root)
}
err = db.SaveBlock(context.Background(), blks[0])
if err != nil {
t.Fatal(err)
}
err = service.stateGen.SaveState(ctx, blkRoots[0], firstState)
if err != nil {
t.Fatal(err)
}
_, _, _, err = service.onBlockBatch(ctx, blks[1:], blkRoots[1:])
if err != nil {
t.Fatal(err)
}
}
func TestRemoveStateSinceLastFinalized_EmptyStartSlot(t *testing.T) {
ctx := context.Background()
db, _ := testDB.SetupDB(t)
@@ -391,7 +451,8 @@ func TestFillForkChoiceMissingBlocks_CanSave(t *testing.T) {
beaconState, _ := testutil.DeterministicGenesisState(t, 32)
block := &ethpb.BeaconBlock{Slot: 9, ParentRoot: roots[8], Body: &ethpb.BeaconBlockBody{Graffiti: []byte{}}}
if err := service.fillInForkChoiceMissingBlocks(context.Background(), block, beaconState); err != nil {
if err := service.fillInForkChoiceMissingBlocks(context.Background(), block,
beaconState.FinalizedCheckpoint(), beaconState.CurrentJustifiedCheckpoint()); err != nil {
t.Fatal(err)
}
@@ -462,7 +523,7 @@ func TestFillForkChoiceMissingBlocks_FilterFinalized(t *testing.T) {
}
beaconState, _ := testutil.DeterministicGenesisState(t, 32)
if err := service.fillInForkChoiceMissingBlocks(context.Background(), b65.Block, beaconState); err != nil {
if err := service.fillInForkChoiceMissingBlocks(context.Background(), b65.Block, beaconState.FinalizedCheckpoint(), beaconState.CurrentJustifiedCheckpoint()); err != nil {
t.Fatal(err)
}

View File

@@ -21,6 +21,7 @@ type BlockReceiver interface {
ReceiveBlock(ctx context.Context, block *ethpb.SignedBeaconBlock, blockRoot [32]byte) error
ReceiveBlockNoPubsub(ctx context.Context, block *ethpb.SignedBeaconBlock, blockRoot [32]byte) error
ReceiveBlockInitialSync(ctx context.Context, block *ethpb.SignedBeaconBlock, blockRoot [32]byte) error
ReceiveBlockBatch(ctx context.Context, blocks []*ethpb.SignedBeaconBlock, blkRoots [][32]byte) error
HasInitSyncBlock(root [32]byte) bool
}
@@ -163,6 +164,70 @@ func (s *Service) ReceiveBlockInitialSync(ctx context.Context, block *ethpb.Sign
return nil
}
// ReceiveBlockBatch processes the whole block batch at once, assuming the block batch is linear ,transitioning
// the state, performing batch verification of all collected signatures and then performing the appropriate
// actions for a block post-transition.
func (s *Service) ReceiveBlockBatch(ctx context.Context, blocks []*ethpb.SignedBeaconBlock, blkRoots [][32]byte) error {
ctx, span := trace.StartSpan(ctx, "beacon-chain.blockchain.ReceiveBlockBatch")
defer span.End()
// Apply state transition on the incoming newly received blockCopy without verifying its BLS contents.
postState, fCheckpoints, jCheckpoints, err := s.onBlockBatch(ctx, blocks, blkRoots)
if err != nil {
err := errors.Wrap(err, "could not process block")
traceutil.AnnotateError(span, err)
return err
}
for i, b := range blocks {
blockCopy := stateTrie.CopySignedBeaconBlock(b)
if err = s.handleBlockAfterBatchVerify(ctx, blockCopy, blkRoots[i], fCheckpoints[i], jCheckpoints[i]); err != nil {
traceutil.AnnotateError(span, err)
return err
}
// Send notification of the processed block to the state feed.
s.stateNotifier.StateFeed().Send(&feed.Event{
Type: statefeed.BlockProcessed,
Data: &statefeed.BlockProcessedData{
Slot: blockCopy.Block.Slot,
BlockRoot: blkRoots[i],
Verified: false,
},
})
// Reports on blockCopy and fork choice metrics.
reportSlotMetrics(blockCopy.Block.Slot, s.headSlot(), s.CurrentSlot(), s.finalizedCheckpt)
// Log state transition data.
log.WithFields(logrus.Fields{
"slot": blockCopy.Block.Slot,
"attestations": len(blockCopy.Block.Body.Attestations),
"deposits": len(blockCopy.Block.Body.Deposits),
}).Debug("Finished applying state transition")
}
lastBlk := blocks[len(blocks)-1]
lastRoot := blkRoots[len(blkRoots)-1]
if err := s.stateGen.SaveState(ctx, lastRoot, postState); err != nil {
return errors.Wrap(err, "could not save state")
}
cachedHeadRoot, err := s.HeadRoot(ctx)
if err != nil {
return errors.Wrap(err, "could not get head root from cache")
}
if !bytes.Equal(lastRoot[:], cachedHeadRoot) {
if err := s.saveHeadNoDB(ctx, lastBlk, lastRoot); err != nil {
err := errors.Wrap(err, "could not save head")
traceutil.AnnotateError(span, err)
return err
}
}
return s.handleEpochBoundary(postState)
}
// HasInitSyncBlock returns true if the block of the input root exists in initial sync blocks cache.
func (s *Service) HasInitSyncBlock(root [32]byte) bool {
return s.hasInitSyncBlock(root)

View File

@@ -410,7 +410,7 @@ func TestHasBlock_ForkChoiceAndDB(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if err := s.insertBlockToForkChoiceStore(ctx, block.Block, r, state); err != nil {
if err := s.insertBlockAndAttestationsToForkChoiceStore(ctx, block.Block, r, state); err != nil {
t.Fatal(err)
}
@@ -464,7 +464,7 @@ func BenchmarkHasBlockForkChoiceStore(b *testing.B) {
if err != nil {
b.Fatal(err)
}
if err := s.insertBlockToForkChoiceStore(ctx, block.Block, r, state); err != nil {
if err := s.insertBlockAndAttestationsToForkChoiceStore(ctx, block.Block, r, state); err != nil {
b.Fatal(err)
}

View File

@@ -171,6 +171,35 @@ func (ms *ChainService) ReceiveBlockInitialSync(ctx context.Context, block *ethp
return nil
}
// ReceiveBlockBatch processes blocks in batches from initial-sync.
func (ms *ChainService) ReceiveBlockBatch(ctx context.Context, blks []*ethpb.SignedBeaconBlock, roots [][32]byte) error {
if ms.State == nil {
ms.State = &stateTrie.BeaconState{}
}
for _, block := range blks {
if !bytes.Equal(ms.Root, block.Block.ParentRoot) {
return errors.Errorf("wanted %#x but got %#x", ms.Root, block.Block.ParentRoot)
}
if err := ms.State.SetSlot(block.Block.Slot); err != nil {
return err
}
ms.BlocksReceived = append(ms.BlocksReceived, block)
signingRoot, err := stateutil.BlockRoot(block.Block)
if err != nil {
return err
}
if ms.DB != nil {
if err := ms.DB.SaveBlock(ctx, block); err != nil {
return err
}
logrus.Infof("Saved block with root: %#x at slot %d", signingRoot, block.Block.Slot)
}
ms.Root = signingRoot[:]
ms.Block = block
}
return nil
}
// ReceiveBlockNoPubsub mocks ReceiveBlockNoPubsub method in chain service.
func (ms *ChainService) ReceiveBlockNoPubsub(ctx context.Context, block *ethpb.SignedBeaconBlock, blockRoot [32]byte) error {
if ms.State == nil {

View File

@@ -4,6 +4,7 @@ import (
"context"
"github.com/pkg/errors"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/state"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
@@ -21,6 +22,16 @@ func (s *State) HasState(ctx context.Context, blockRoot [32]byte) bool {
return s.beaconDB.HasState(ctx, blockRoot)
}
// SaveStateSummary saves the relevant state summary for a block and its corresponding state slot in the
// state summary cache.
func (s *State) SaveStateSummary(ctx context.Context, blk *ethpb.SignedBeaconBlock, blockRoot [32]byte) {
// Save State summary
s.stateSummaryCache.Put(blockRoot, &pb.StateSummary{
Slot: blk.Block.Slot,
Root: blockRoot[:],
})
}
// This saves a post finalized beacon state in the hot section of the DB. On the epoch boundary,
// it saves a full state. On an intermediate slot, it saves a back pointer to the
// nearest epoch boundary state.

View File

@@ -28,6 +28,7 @@ go_library(
"//proto/beacon/p2p/v1:go_default_library",
"//shared:go_default_library",
"//shared/bytesutil:go_default_library",
"//shared/featureconfig:go_default_library",
"//shared/mathutil:go_default_library",
"//shared/params:go_default_library",
"//shared/rand:go_default_library",

View File

@@ -1,8 +1,10 @@
package initialsync
import (
"bytes"
"context"
"encoding/hex"
"errors"
"fmt"
"time"
@@ -13,6 +15,7 @@ import (
"github.com/prysmaticlabs/prysm/beacon-chain/state/stateutil"
p2ppb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/featureconfig"
"github.com/prysmaticlabs/prysm/shared/mathutil"
"github.com/sirupsen/logrus"
)
@@ -27,6 +30,8 @@ const (
// blockReceiverFn defines block receiving function.
type blockReceiverFn func(ctx context.Context, block *eth.SignedBeaconBlock, blockRoot [32]byte) error
type batchBlockReceiverFn func(ctx context.Context, blks []*eth.SignedBeaconBlock, roots [][32]byte) error
// Round Robin sync looks at the latest peer statuses and syncs with the highest
// finalized peer.
//
@@ -55,10 +60,19 @@ func (s *Service) roundRobinSync(genesis time.Time) error {
if err := queue.start(); err != nil {
return err
}
blockReceiver := s.chain.ReceiveBlockInitialSync
batchReceiver := s.chain.ReceiveBlockBatch
// Step 1 - Sync to end of finalized epoch.
for fetchedBlocks := range queue.fetchedBlocks {
// Use Batch Block Verify to process and verify batches directly.
if featureconfig.Get().BatchBlockVerify {
if err := s.processBatchedBlocks(ctx, genesis, fetchedBlocks, batchReceiver); err != nil {
log.WithError(err).Info("Batch is not processed")
}
continue
}
for _, blk := range fetchedBlocks {
if err := s.processBlock(ctx, genesis, blk, blockReceiver); err != nil {
log.WithError(err).Info("Block is not processed")
@@ -158,6 +172,25 @@ func (s *Service) logSyncStatus(genesis time.Time, blk *eth.BeaconBlock, blkRoot
)
}
// logBatchSyncStatus and increments the block processing counter.
func (s *Service) logBatchSyncStatus(genesis time.Time, blks []*eth.SignedBeaconBlock, blkRoot [32]byte) {
s.counter.Incr(int64(len(blks)))
rate := float64(s.counter.Rate()) / counterSeconds
if rate == 0 {
rate = 1
}
firstBlk := blks[0]
timeRemaining := time.Duration(float64(helpers.SlotsSince(genesis)-firstBlk.Block.Slot)/rate) * time.Second
log.WithFields(logrus.Fields{
"peers": len(s.p2p.Peers().Connected()),
"blocksPerSecond": fmt.Sprintf("%.1f", rate),
}).Infof(
"Processing block batch of size %d starting from %s %d/%d - estimated time remaining %s",
len(blks), fmt.Sprintf("0x%s...", hex.EncodeToString(blkRoot[:])[:8]),
firstBlk.Block.Slot, helpers.SlotsSince(genesis), timeRemaining,
)
}
// processBlock performs basic checks on incoming block, and triggers receiver function.
func (s *Service) processBlock(
ctx context.Context,
@@ -183,3 +216,47 @@ func (s *Service) processBlock(
s.lastProcessedSlot = blk.Block.Slot
return nil
}
func (s *Service) processBatchedBlocks(ctx context.Context, genesis time.Time,
blks []*eth.SignedBeaconBlock, bFunc batchBlockReceiverFn) error {
if len(blks) == 0 {
return errors.New("0 blocks provided into method")
}
firstBlock := blks[0]
for s.lastProcessedSlot >= firstBlock.Block.Slot {
if len(blks) == 1 {
return errors.New("no good blocks in batch")
}
blks = blks[1:]
firstBlock = blks[0]
}
blkRoot, err := stateutil.BlockRoot(firstBlock.Block)
if err != nil {
return err
}
s.logBatchSyncStatus(genesis, blks, blkRoot)
parentRoot := bytesutil.ToBytes32(firstBlock.Block.ParentRoot)
if !s.db.HasBlock(ctx, parentRoot) && !s.chain.HasInitSyncBlock(parentRoot) {
return fmt.Errorf("beacon node doesn't have a block in db with root %#x", firstBlock.Block.ParentRoot)
}
blockRoots := make([][32]byte, len(blks))
blockRoots[0] = blkRoot
for i := 1; i < len(blks); i++ {
b := blks[i]
if !bytes.Equal(b.Block.ParentRoot, blockRoots[i-1][:]) {
return fmt.Errorf("expected linear block list with parent root of %#x but received %#x",
blockRoots[i-1][:], b.Block.ParentRoot)
}
blkRoot, err := stateutil.BlockRoot(b.Block)
if err != nil {
return err
}
blockRoots[i] = blkRoot
}
if err := bFunc(ctx, blks, blockRoots); err != nil {
return err
}
lastBlk := blks[len(blks)-1]
s.lastProcessedSlot = lastBlk.Block.Slot
return nil
}

View File

@@ -3,6 +3,7 @@ package initialsync
import (
"context"
"fmt"
"strings"
"testing"
eth "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
@@ -368,3 +369,136 @@ func TestService_processBlock(t *testing.T) {
}
})
}
func TestService_processBlockBatch(t *testing.T) {
beaconDB, _ := dbtest.SetupDB(t)
genesisBlk := &eth.BeaconBlock{
Slot: 0,
}
genesisBlkRoot, err := stateutil.BlockRoot(genesisBlk)
if err != nil {
t.Fatal(err)
}
err = beaconDB.SaveBlock(context.Background(), &eth.SignedBeaconBlock{Block: genesisBlk})
if err != nil {
t.Fatal(err)
}
st, err := stateTrie.InitializeFromProto(&p2ppb.BeaconState{})
if err != nil {
t.Fatal(err)
}
s := NewInitialSync(&Config{
P2P: p2pt.NewTestP2P(t),
DB: beaconDB,
Chain: &mock.ChainService{
State: st,
Root: genesisBlkRoot[:],
DB: beaconDB,
},
})
ctx := context.Background()
genesis := makeGenesisTime(32)
t.Run("process non-linear batch", func(t *testing.T) {
batch := []*eth.SignedBeaconBlock{}
currBlockRoot := genesisBlkRoot
for i := 1; i < 10; i++ {
parentRoot := currBlockRoot
blk1 := &eth.SignedBeaconBlock{
Block: &eth.BeaconBlock{
Slot: uint64(i),
ParentRoot: parentRoot[:],
},
}
blk1Root, err := stateutil.BlockRoot(blk1.Block)
if err != nil {
t.Fatal(err)
}
err = beaconDB.SaveBlock(context.Background(), blk1)
if err != nil {
t.Fatal(err)
}
batch = append(batch, blk1)
currBlockRoot = blk1Root
}
batch2 := []*eth.SignedBeaconBlock{}
for i := 10; i < 20; i++ {
parentRoot := currBlockRoot
blk1 := &eth.SignedBeaconBlock{
Block: &eth.BeaconBlock{
Slot: uint64(i),
ParentRoot: parentRoot[:],
},
}
blk1Root, err := stateutil.BlockRoot(blk1.Block)
if err != nil {
t.Fatal(err)
}
err = beaconDB.SaveBlock(context.Background(), blk1)
if err != nil {
t.Fatal(err)
}
batch2 = append(batch2, blk1)
currBlockRoot = blk1Root
}
// Process block normally.
err = s.processBatchedBlocks(ctx, genesis, batch, func(
ctx context.Context, blks []*eth.SignedBeaconBlock, blockRoots [][32]byte) error {
if err := s.chain.ReceiveBlockBatch(ctx, blks, blockRoots); err != nil {
t.Error(err)
}
return nil
})
if err != nil {
t.Error(err)
}
// Duplicate processing should trigger error.
err = s.processBatchedBlocks(ctx, genesis, batch, func(
ctx context.Context, blocks []*eth.SignedBeaconBlock, blockRoots [][32]byte) error {
return nil
})
expectedErr := fmt.Errorf("no good blocks in batch")
if err == nil || err.Error() != expectedErr.Error() {
t.Errorf("Expected error not thrown, want: %v, got: %v", expectedErr, err)
}
badBatch2 := []*eth.SignedBeaconBlock{}
for i, b := range batch2 {
// create a non-linear batch
if i%3 == 0 && i != 0 {
continue
}
badBatch2 = append(badBatch2, b)
}
// Bad batch should fail because it is non linear
err = s.processBatchedBlocks(ctx, genesis, badBatch2, func(
ctx context.Context, blks []*eth.SignedBeaconBlock, blockRoots [][32]byte) error {
return nil
})
expectedSubErr := "expected linear block list"
if err == nil || !strings.Contains(err.Error(), expectedSubErr) {
t.Errorf("Expected error not thrown, wanted error to include: %v, got: %v", expectedSubErr, err)
}
// Continue normal processing, should proceed w/o errors.
err = s.processBatchedBlocks(ctx, genesis, batch2, func(
ctx context.Context, blks []*eth.SignedBeaconBlock, blockRoots [][32]byte) error {
if err := s.chain.ReceiveBlockBatch(ctx, blks, blockRoots); err != nil {
t.Error(err)
}
return nil
})
if err != nil {
t.Error(err)
}
if s.chain.HeadSlot() != 19 {
t.Errorf("Unexpected head slot, want: %d, got: %d", 2, s.chain.HeadSlot())
}
})
}

View File

@@ -55,6 +55,7 @@ type Flags struct {
WaitForSynced bool // WaitForSynced uses WaitForSynced in validator startup to ensure it can communicate with the beacon node as soon as possible.
SkipRegenHistoricalStates bool // SkipRegenHistoricalState skips regenerating historical states from genesis to last finalized. This enables a quick switch over to using new-state-mgmt.
ReduceAttesterStateCopy bool // ReduceAttesterStateCopy reduces head state copies for attester rpc.
BatchBlockVerify bool // BatchBlockVerify performs batched verification of block batches that we receive when syncing.
// DisableForkChoice disables using LMD-GHOST fork choice to update
// the head of the chain based on attestations and instead accepts any valid received block
// as the chain head. UNSAFE, use with caution.
@@ -226,6 +227,10 @@ func ConfigureBeaconChain(ctx *cli.Context) {
log.Warn("Forcing max_cover strategy on attestation aggregation")
cfg.AttestationAggregationStrategy = "max_cover"
}
if ctx.Bool(batchBlockVerify.Name) {
log.Warn("Performing batch block verification when syncing.")
cfg.BatchBlockVerify = true
}
Init(cfg)
}

View File

@@ -148,6 +148,10 @@ var (
Name: "altona",
Usage: "This defines the flag through which we can run on the Altona Multiclient Testnet",
}
batchBlockVerify = &cli.BoolFlag{
Name: "batch-block-verify",
Usage: "When enabled we will perform full signature verification of blocks in batches instead of singularly.",
}
)
// devModeFlags holds list of flags that are set when development mode is on.
@@ -155,6 +159,7 @@ var devModeFlags = []cli.Flag{
initSyncVerifyEverythingFlag,
forceMaxCoverAttestationAggregation,
newBeaconStateLocks,
batchBlockVerify,
}
// Deprecated flags list.
@@ -589,6 +594,7 @@ var BeaconChainFlags = append(deprecatedFlags, []cli.Flag{
newBeaconStateLocks,
forceMaxCoverAttestationAggregation,
altonaTestnet,
batchBlockVerify,
}...)
// E2EBeaconChainFlags contains a list of the beacon chain feature flags to be tested in E2E.