Part 1 of blockchain pkg clean up (#6537)

* Fix span names

* Clean up on block func

* Add updateFinalized helper

* Deprecate ReceiveBlockNoPubsub for ReceiveBlock

* Proposer to broadcast block

* Update migrate interface

* Replace ReceiveBlock for all

* Go fmt
This commit is contained in:
terence tsao
2020-07-09 16:50:48 -07:00
committed by GitHub
parent f7088e037c
commit d54cefbe42
19 changed files with 147 additions and 186 deletions

View File

@@ -141,7 +141,7 @@ func (s *Service) HeadBlock(ctx context.Context) (*ethpb.SignedBeaconBlock, erro
// If the head is nil from service struct,
// it will attempt to get the head state from DB.
func (s *Service) HeadState(ctx context.Context) (*state.BeaconState, error) {
ctx, span := trace.StartSpan(ctx, "blockchain.HeadState")
ctx, span := trace.StartSpan(ctx, "blockChain.HeadState")
defer span.End()
ok := s.hasHeadState()

View File

@@ -29,7 +29,7 @@ type head struct {
// Determined the head from the fork choice service and saves its new data
// (head root, head block, and head state) to the local service cache.
func (s *Service) updateHead(ctx context.Context, balances []uint64) error {
ctx, span := trace.StartSpan(ctx, "blockchain.updateHead")
ctx, span := trace.StartSpan(ctx, "blockChain.updateHead")
defer span.End()
// To get the proper head update, a node first checks its best justified
@@ -67,7 +67,7 @@ func (s *Service) updateHead(ctx context.Context, balances []uint64) error {
// This saves head info to the local service cache, it also saves the
// new head root to the DB.
func (s *Service) saveHead(ctx context.Context, headRoot [32]byte) error {
ctx, span := trace.StartSpan(ctx, "blockchain.saveHead")
ctx, span := trace.StartSpan(ctx, "blockChain.saveHead")
defer span.End()
// Do nothing if head hasn't changed.
@@ -211,7 +211,7 @@ func (s *Service) headBlock() *ethpb.SignedBeaconBlock {
// This returns the head state.
// It does a full copy on head state for immutability.
func (s *Service) headState(ctx context.Context) *stateTrie.BeaconState {
ctx, span := trace.StartSpan(ctx, "blockchain.headState")
ctx, span := trace.StartSpan(ctx, "blockChain.headState")
defer span.End()
s.headLock.RLock()
@@ -239,7 +239,7 @@ func (s *Service) hasHeadState() bool {
// This updates recent canonical block mapping. It uses input head root and retrieves
// all the canonical block roots that are ancestor of the input head block root.
func (s *Service) updateRecentCanonicalBlocks(ctx context.Context, headRoot [32]byte) error {
ctx, span := trace.StartSpan(ctx, "blockchain.updateRecentCanonicalBlocks")
ctx, span := trace.StartSpan(ctx, "blockChain.updateRecentCanonicalBlocks")
defer span.End()
s.recentCanonicalBlocksLock.Lock()

View File

@@ -68,7 +68,7 @@ var ErrTargetRootNotInDB = errors.New("target root does not exist in db")
// store.latest_messages[i] = LatestMessage(epoch=target.epoch, root=attestation.data.beacon_block_root)
// TODO(#6072): This code path is highly untested. Requires comprehensive tests and simpler refactoring.
func (s *Service) onAttestation(ctx context.Context, a *ethpb.Attestation) ([]uint64, error) {
ctx, span := trace.StartSpan(ctx, "blockchain.onAttestation")
ctx, span := trace.StartSpan(ctx, "blockChain.onAttestation")
defer span.End()
if a == nil {

View File

@@ -2,7 +2,6 @@ package blockchain
import (
"context"
"encoding/hex"
"fmt"
"github.com/pkg/errors"
@@ -15,7 +14,6 @@ import (
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/featureconfig"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/sirupsen/logrus"
"go.opencensus.io/trace"
)
@@ -55,83 +53,66 @@ var initialSyncBlockCacheSize = 2 * params.BeaconConfig().SlotsPerEpoch
// # Update finalized checkpoint
// if state.finalized_checkpoint.epoch > store.finalized_checkpoint.epoch:
// store.finalized_checkpoint = state.finalized_checkpoint
func (s *Service) onBlock(ctx context.Context, signed *ethpb.SignedBeaconBlock, blockRoot [32]byte) (*stateTrie.BeaconState, error) {
ctx, span := trace.StartSpan(ctx, "blockchain.onBlock")
func (s *Service) onBlock(ctx context.Context, signed *ethpb.SignedBeaconBlock, blockRoot [32]byte) error {
ctx, span := trace.StartSpan(ctx, "blockChain.onBlock")
defer span.End()
if signed == nil || signed.Block == nil {
return nil, errors.New("nil block")
return errors.New("nil block")
}
b := signed.Block
// Retrieve incoming block's pre state.
preState, err := s.getBlockPreState(ctx, b)
if err != nil {
return nil, err
return err
}
log.WithFields(logrus.Fields{
"slot": b.Slot,
"root": fmt.Sprintf("0x%s...", hex.EncodeToString(blockRoot[:])[:8]),
}).Debug("Executing state transition on block")
postState, err := state.ExecuteStateTransition(ctx, preState, signed)
if err != nil {
return nil, errors.Wrap(err, "could not execute state transition")
return errors.Wrap(err, "could not execute state transition")
}
if err := s.beaconDB.SaveBlock(ctx, signed); err != nil {
return nil, errors.Wrapf(err, "could not save block from slot %d", b.Slot)
return errors.Wrapf(err, "could not save block from slot %d", b.Slot)
}
if err := s.insertBlockAndAttestationsToForkChoiceStore(ctx, b, blockRoot, postState); err != nil {
return nil, errors.Wrapf(err, "could not insert block %d to fork choice store", b.Slot)
return errors.Wrapf(err, "could not insert block %d to fork choice store", b.Slot)
}
if err := s.stateGen.SaveState(ctx, blockRoot, postState); err != nil {
return nil, errors.Wrap(err, "could not save state")
return errors.Wrap(err, "could not save state")
}
// Update justified check point.
if postState.CurrentJustifiedCheckpoint().Epoch > s.justifiedCheckpt.Epoch {
if err := s.updateJustified(ctx, postState); err != nil {
return nil, err
return err
}
}
// Update finalized check point. Prune the block cache and helper caches on every new finalized epoch.
// Update finalized check point.
if postState.FinalizedCheckpointEpoch() > s.finalizedCheckpt.Epoch {
if err := s.beaconDB.SaveBlocks(ctx, s.getInitSyncBlocks()); err != nil {
return nil, err
return err
}
s.clearInitSyncBlocks()
if err := s.beaconDB.SaveFinalizedCheckpoint(ctx, postState.FinalizedCheckpoint()); err != nil {
return nil, errors.Wrap(err, "could not save finalized checkpoint")
if err := s.updateFinalized(ctx, postState.FinalizedCheckpoint()); err != nil {
return err
}
fRoot := bytesutil.ToBytes32(postState.FinalizedCheckpoint().Root)
// Prune proto array fork choice nodes, all nodes before finalized check point will
// be pruned.
if err := s.forkChoiceStore.Prune(ctx, fRoot); err != nil {
return nil, errors.Wrap(err, "could not prune proto array fork choice nodes")
return errors.Wrap(err, "could not prune proto array fork choice nodes")
}
s.prevFinalizedCheckpt = s.finalizedCheckpt
s.finalizedCheckpt = postState.FinalizedCheckpoint()
if err := s.finalizedImpliesNewJustified(ctx, postState); err != nil {
return nil, errors.Wrap(err, "could not save new justified")
return errors.Wrap(err, "could not save new justified")
}
fBlock, err := s.beaconDB.Block(ctx, fRoot)
if err != nil {
return nil, errors.Wrap(err, "could not get finalized block to migrate")
}
if err := s.stateGen.MigrateToCold(ctx, fBlock.Block.Slot, fRoot); err != nil {
return nil, errors.Wrap(err, "could not migrate to cold")
if err := s.stateGen.MigrateToCold(ctx, fRoot); err != nil {
return errors.Wrap(err, "could not migrate to cold")
}
// Update deposit cache.
@@ -143,30 +124,20 @@ func (s *Service) onBlock(ctx context.Context, signed *ethpb.SignedBeaconBlock,
logEpochData(postState)
reportEpochMetrics(postState)
// Update committees cache at epoch boundary slot.
// Update caches at epoch boundary slot.
if err := helpers.UpdateCommitteeCache(postState, helpers.CurrentEpoch(postState)); err != nil {
return nil, err
return err
}
if err := helpers.UpdateProposerIndicesInCache(postState, helpers.CurrentEpoch(postState)); err != nil {
return nil, err
return err
}
s.nextEpochBoundarySlot = helpers.StartSlot(helpers.NextEpoch(postState))
}
// Delete the processed block attestations from attestation pool.
if err := s.deletePoolAtts(b.Body.Attestations); err != nil {
return nil, err
}
// Delete the processed block attester slashings from slashings pool.
for i := 0; i < len(b.Body.AttesterSlashings); i++ {
s.slashingPool.MarkIncludedAttesterSlashing(b.Body.AttesterSlashings[i])
}
defer reportAttestationInclusion(b)
return postState, nil
return nil
}
// onBlockInitialSyncStateTransition is called when an initial sync block is received.
@@ -175,7 +146,7 @@ func (s *Service) onBlock(ctx context.Context, signed *ethpb.SignedBeaconBlock,
// The block's signing root should be computed before calling this method to avoid redundant
// computation in this method and methods it calls into.
func (s *Service) onBlockInitialSyncStateTransition(ctx context.Context, signed *ethpb.SignedBeaconBlock, blockRoot [32]byte) error {
ctx, span := trace.StartSpan(ctx, "blockchain.onBlock")
ctx, span := trace.StartSpan(ctx, "blockChain.onBlock")
defer span.End()
if signed == nil || signed.Block == nil {
@@ -219,7 +190,7 @@ func (s *Service) onBlockInitialSyncStateTransition(ctx context.Context, signed
func (s *Service) onBlockBatch(ctx context.Context, blks []*ethpb.SignedBeaconBlock,
blockRoots [][32]byte) (*stateTrie.BeaconState, []*ethpb.Checkpoint, []*ethpb.Checkpoint, error) {
ctx, span := trace.StartSpan(ctx, "blockchain.onBlock")
ctx, span := trace.StartSpan(ctx, "blockChain.onBlock")
defer span.End()
if len(blks) == 0 || len(blockRoots) == 0 {
@@ -301,19 +272,12 @@ func (s *Service) handlePostStateInSync(ctx context.Context, signed *ethpb.Signe
}
s.clearInitSyncBlocks()
if err := s.beaconDB.SaveFinalizedCheckpoint(ctx, postState.FinalizedCheckpoint()); err != nil {
return errors.Wrap(err, "could not save finalized checkpoint")
if err := s.updateFinalized(ctx, postState.FinalizedCheckpoint()); err != nil {
return err
}
s.prevFinalizedCheckpt = s.finalizedCheckpt
s.finalizedCheckpt = postState.FinalizedCheckpoint()
fRoot := bytesutil.ToBytes32(postState.FinalizedCheckpoint().Root)
fBlock, err := s.beaconDB.Block(ctx, fRoot)
if err != nil {
return errors.Wrap(err, "could not get finalized block to migrate")
}
if err := s.stateGen.MigrateToCold(ctx, fBlock.Block.Slot, fRoot); err != nil {
if err := s.stateGen.MigrateToCold(ctx, fRoot); err != nil {
return errors.Wrap(err, "could not migrate to cold")
}
}
@@ -348,19 +312,12 @@ func (s *Service) handleBlockAfterBatchVerify(ctx context.Context, signed *ethpb
}
s.clearInitSyncBlocks()
if err := s.beaconDB.SaveFinalizedCheckpoint(ctx, fCheckpoint); err != nil {
return errors.Wrap(err, "could not save finalized checkpoint")
if err := s.updateFinalized(ctx, fCheckpoint); err != nil {
return err
}
s.prevFinalizedCheckpt = s.finalizedCheckpt
s.finalizedCheckpt = fCheckpoint
fRoot := bytesutil.ToBytes32(fCheckpoint.Root)
fBlock, err := s.beaconDB.Block(ctx, fRoot)
if err != nil {
return errors.Wrap(err, "could not get finalized block to migrate")
}
if err := s.stateGen.MigrateToCold(ctx, fBlock.Block.Slot, fRoot); err != nil {
if err := s.stateGen.MigrateToCold(ctx, fRoot); err != nil {
return errors.Wrap(err, "could not migrate to cold")
}
}

View File

@@ -32,7 +32,7 @@ func (s *Service) CurrentSlot() uint64 {
// to retrieve the state in DB. It verifies the pre state's validity and the incoming block
// is in the correct time window.
func (s *Service) getBlockPreState(ctx context.Context, b *ethpb.BeaconBlock) (*stateTrie.BeaconState, error) {
ctx, span := trace.StartSpan(ctx, "forkchoice.getBlockPreState")
ctx, span := trace.StartSpan(ctx, "forkChoice.getBlockPreState")
defer span.End()
// Verify incoming block has a valid pre state.
@@ -90,7 +90,7 @@ func (s *Service) verifyBlkPreState(ctx context.Context, b *ethpb.BeaconBlock) e
// verifyBlkDescendant validates input block root is a descendant of the
// current finalized block root.
func (s *Service) verifyBlkDescendant(ctx context.Context, root [32]byte, slot uint64) error {
ctx, span := trace.StartSpan(ctx, "forkchoice.verifyBlkDescendant")
ctx, span := trace.StartSpan(ctx, "forkChoice.verifyBlkDescendant")
defer span.End()
finalizedBlkSigned, err := s.beaconDB.Block(ctx, bytesutil.ToBytes32(s.finalizedCheckpt.Root))
@@ -199,6 +199,12 @@ func (s *Service) updateJustified(ctx context.Context, state *stateTrie.BeaconSt
return s.beaconDB.SaveJustifiedCheckpoint(ctx, cpt)
}
func (s *Service) updateFinalized(ctx context.Context, cp *ethpb.Checkpoint) error {
s.prevFinalizedCheckpt = s.finalizedCheckpt
s.finalizedCheckpt = cp
return s.beaconDB.SaveFinalizedCheckpoint(ctx, cp)
}
// ancestor returns the block root of an ancestry block from the input block root.
//
// Spec pseudocode definition:
@@ -212,7 +218,7 @@ func (s *Service) updateJustified(ctx context.Context, state *stateTrie.BeaconSt
// # root is older than queried slot, thus a skip slot. Return most recent root prior to slot
// return root
func (s *Service) ancestor(ctx context.Context, root []byte, slot uint64) ([]byte, error) {
ctx, span := trace.StartSpan(ctx, "forkchoice.ancestor")
ctx, span := trace.StartSpan(ctx, "forkChoice.ancestor")
defer span.End()
// Stop recursive ancestry lookup if context is cancelled.

View File

@@ -122,7 +122,7 @@ func TestStore_OnBlock(t *testing.T) {
if err != nil {
t.Error(err)
}
_, err = service.onBlock(ctx, &ethpb.SignedBeaconBlock{Block: tt.blk}, root)
err = service.onBlock(ctx, &ethpb.SignedBeaconBlock{Block: tt.blk}, root)
if err == nil || !strings.Contains(err.Error(), tt.wantErrString) {
t.Errorf("Store.OnBlock() error = %v, wantErr = %v", err, tt.wantErrString)
}

View File

@@ -3,14 +3,12 @@ package blockchain
import (
"bytes"
"context"
"encoding/hex"
"github.com/pkg/errors"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/beacon-chain/core/feed"
statefeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/state"
stateTrie "github.com/prysmaticlabs/prysm/beacon-chain/state"
"github.com/prysmaticlabs/prysm/shared/featureconfig"
"github.com/prysmaticlabs/prysm/shared/traceutil"
"github.com/sirupsen/logrus"
"go.opencensus.io/trace"
@@ -19,79 +17,28 @@ import (
// BlockReceiver interface defines the methods of chain service receive and processing new blocks.
type BlockReceiver interface {
ReceiveBlock(ctx context.Context, block *ethpb.SignedBeaconBlock, blockRoot [32]byte) error
ReceiveBlockNoPubsub(ctx context.Context, block *ethpb.SignedBeaconBlock, blockRoot [32]byte) error
ReceiveBlockInitialSync(ctx context.Context, block *ethpb.SignedBeaconBlock, blockRoot [32]byte) error
ReceiveBlockBatch(ctx context.Context, blocks []*ethpb.SignedBeaconBlock, blkRoots [][32]byte) error
HasInitSyncBlock(root [32]byte) bool
}
// ReceiveBlock is a function that defines the operations that are performed on
// blocks that is received from rpc service. The operations consists of:
// 1. Gossip block to other peers
// 2. Validate block, apply state transition and update check points
// 3. Apply fork choice to the processed block
// 4. Save latest head info
func (s *Service) ReceiveBlock(ctx context.Context, block *ethpb.SignedBeaconBlock, blockRoot [32]byte) error {
ctx, span := trace.StartSpan(ctx, "beacon-chain.blockchain.ReceiveBlock")
defer span.End()
// Broadcast the new block to the network.
if err := s.p2p.Broadcast(ctx, block); err != nil {
return errors.Wrap(err, "could not broadcast block")
}
log.WithFields(logrus.Fields{
"blockRoot": hex.EncodeToString(blockRoot[:]),
}).Debug("Broadcasting block")
if err := captureSentTimeMetric(uint64(s.genesisTime.Unix()), block.Block.Slot); err != nil {
// If a node fails to capture metric, this shouldn't cause the block processing to fail.
log.Warnf("Could not capture block sent time metric: %v", err)
}
if err := s.ReceiveBlockNoPubsub(ctx, block, blockRoot); err != nil {
return err
}
return nil
}
// ReceiveBlockNoPubsub is a function that defines the the operations (minus pubsub)
// ReceiveBlock is a function that defines the the operations (minus pubsub)
// that are performed on blocks that is received from regular sync service. The operations consists of:
// 1. Validate block, apply state transition and update check points
// 2. Apply fork choice to the processed block
// 3. Save latest head info
func (s *Service) ReceiveBlockNoPubsub(ctx context.Context, block *ethpb.SignedBeaconBlock, blockRoot [32]byte) error {
ctx, span := trace.StartSpan(ctx, "beacon-chain.blockchain.ReceiveBlockNoPubsub")
func (s *Service) ReceiveBlock(ctx context.Context, block *ethpb.SignedBeaconBlock, blockRoot [32]byte) error {
ctx, span := trace.StartSpan(ctx, "blockChain.ReceiveBlock")
defer span.End()
blockCopy := stateTrie.CopySignedBeaconBlock(block)
// Apply state transition on the new block.
_, err := s.onBlock(ctx, blockCopy, blockRoot)
if err != nil {
if err := s.onBlock(ctx, blockCopy, blockRoot); err != nil {
err := errors.Wrap(err, "could not process block")
traceutil.AnnotateError(span, err)
return err
}
// Add attestations from the block to the pool for fork choice.
if err := s.attPool.SaveBlockAttestations(blockCopy.Block.Body.Attestations); err != nil {
log.Errorf("Could not save attestation for fork choice: %v", err)
return nil
}
for _, exit := range block.Block.Body.VoluntaryExits {
s.exitPool.MarkIncluded(exit)
}
if featureconfig.Get().DisableForkChoice && block.Block.Slot > s.headSlot() {
if err := s.saveHead(ctx, blockRoot); err != nil {
return errors.Wrap(err, "could not save head")
}
} else {
if err := s.updateHead(ctx, s.getJustifiedBalances()); err != nil {
return errors.Wrap(err, "could not update head")
}
}
// Send notification of the processed block to the state feed.
s.stateNotifier.StateFeed().Send(&feed.Event{
Type: statefeed.BlockProcessed,
@@ -102,6 +49,16 @@ func (s *Service) ReceiveBlockNoPubsub(ctx context.Context, block *ethpb.SignedB
},
})
// Handle post block operations such as attestations and exits.
if err := s.handlePostBlockOperations(blockCopy.Block); err != nil {
return err
}
// Update and save head block after fork choice.
if err := s.updateHead(ctx, s.getJustifiedBalances()); err != nil {
return errors.Wrap(err, "could not update head")
}
// Reports on block and fork choice metrics.
reportSlotMetrics(blockCopy.Block.Slot, s.headSlot(), s.CurrentSlot(), s.finalizedCheckpt)
@@ -117,11 +74,11 @@ func (s *Service) ReceiveBlockNoPubsub(ctx context.Context, block *ethpb.SignedB
// ReceiveBlockInitialSync processes the input block for the purpose of initial syncing.
// This method should only be used on blocks during initial syncing phase.
func (s *Service) ReceiveBlockInitialSync(ctx context.Context, block *ethpb.SignedBeaconBlock, blockRoot [32]byte) error {
ctx, span := trace.StartSpan(ctx, "beacon-chain.blockchain.ReceiveBlockNoVerify")
ctx, span := trace.StartSpan(ctx, "blockChain.ReceiveBlockNoVerify")
defer span.End()
blockCopy := stateTrie.CopySignedBeaconBlock(block)
// Apply state transition on the incoming newly received blockCopy without verifying its BLS contents.
// Apply state transition on the new block.
if err := s.onBlockInitialSyncStateTransition(ctx, blockCopy, blockRoot); err != nil {
err := errors.Wrap(err, "could not process block")
traceutil.AnnotateError(span, err)
@@ -132,7 +89,6 @@ func (s *Service) ReceiveBlockInitialSync(ctx context.Context, block *ethpb.Sign
if err != nil {
return errors.Wrap(err, "could not get head root from cache")
}
if !bytes.Equal(blockRoot[:], cachedHeadRoot) {
if err := s.saveHeadNoDB(ctx, blockCopy, blockRoot); err != nil {
err := errors.Wrap(err, "could not save head")
@@ -147,7 +103,7 @@ func (s *Service) ReceiveBlockInitialSync(ctx context.Context, block *ethpb.Sign
Data: &statefeed.BlockProcessedData{
Slot: blockCopy.Block.Slot,
BlockRoot: blockRoot,
Verified: false,
Verified: true,
},
})
@@ -168,7 +124,7 @@ func (s *Service) ReceiveBlockInitialSync(ctx context.Context, block *ethpb.Sign
// the state, performing batch verification of all collected signatures and then performing the appropriate
// actions for a block post-transition.
func (s *Service) ReceiveBlockBatch(ctx context.Context, blocks []*ethpb.SignedBeaconBlock, blkRoots [][32]byte) error {
ctx, span := trace.StartSpan(ctx, "beacon-chain.blockchain.ReceiveBlockBatch")
ctx, span := trace.StartSpan(ctx, "blockChain.ReceiveBlockBatch")
defer span.End()
// Apply state transition on the incoming newly received blockCopy without verifying its BLS contents.
@@ -191,19 +147,12 @@ func (s *Service) ReceiveBlockBatch(ctx context.Context, blocks []*ethpb.SignedB
Data: &statefeed.BlockProcessedData{
Slot: blockCopy.Block.Slot,
BlockRoot: blkRoots[i],
Verified: false,
Verified: true,
},
})
// Reports on blockCopy and fork choice metrics.
reportSlotMetrics(blockCopy.Block.Slot, s.headSlot(), s.CurrentSlot(), s.finalizedCheckpt)
// Log state transition data.
log.WithFields(logrus.Fields{
"slot": blockCopy.Block.Slot,
"attestations": len(blockCopy.Block.Body.Attestations),
"deposits": len(blockCopy.Block.Body.Deposits),
}).Debug("Finished applying state transition")
}
lastBlk := blocks[len(blocks)-1]
lastRoot := blkRoots[len(blkRoots)-1]
@@ -232,3 +181,26 @@ func (s *Service) ReceiveBlockBatch(ctx context.Context, blocks []*ethpb.SignedB
func (s *Service) HasInitSyncBlock(root [32]byte) bool {
return s.hasInitSyncBlock(root)
}
func (s *Service) handlePostBlockOperations(b *ethpb.BeaconBlock) error {
// Delete the processed block attestations from attestation pool.
if err := s.deletePoolAtts(b.Body.Attestations); err != nil {
return err
}
// Add block attestations to the fork choice pool to compute head.
if err := s.attPool.SaveBlockAttestations(b.Body.Attestations); err != nil {
log.Errorf("Could not save block attestations for fork choice: %v", err)
return nil
}
// Mark block exits as seen so we don't include same ones in future blocks.
for _, e := range b.Body.VoluntaryExits {
s.exitPool.MarkIncluded(e)
}
// Mark attester slashings as seen so we don't include same ones in future blocks.
for _, as := range b.Body.AttesterSlashings {
s.slashingPool.MarkIncludedAttesterSlashing(as)
}
return nil
}

View File

@@ -17,7 +17,7 @@ import (
"github.com/prysmaticlabs/prysm/shared/testutil"
)
func TestService_ReceiveBlockNoPubsub(t *testing.T) {
func TestService_ReceiveBlock(t *testing.T) {
ctx := context.Background()
genesis, keys := testutil.DeterministicGenesisState(t, 64)
@@ -146,8 +146,8 @@ func TestService_ReceiveBlockNoPubsub(t *testing.T) {
if err != nil {
t.Error(err)
}
if err := s.ReceiveBlockNoPubsub(ctx, tt.args.block, root); (err != nil) != tt.wantErr {
t.Errorf("ReceiveBlockNoPubsub() error = %v, wantErr %v", err, tt.wantErr)
if err := s.ReceiveBlock(ctx, tt.args.block, root); (err != nil) != tt.wantErr {
t.Errorf("ReceiveBlock() error = %v, wantErr %v", err, tt.wantErr)
} else {
tt.check(t, s)
}

View File

@@ -139,11 +139,6 @@ func (mon *MockOperationNotifier) OperationFeed() *event.Feed {
return mon.feed
}
// ReceiveBlock mocks ReceiveBlock method in chain service.
func (ms *ChainService) ReceiveBlock(ctx context.Context, block *ethpb.SignedBeaconBlock, blockRoot [32]byte) error {
return nil
}
// ReceiveBlockInitialSync mocks ReceiveBlockInitialSync method in chain service.
func (ms *ChainService) ReceiveBlockInitialSync(ctx context.Context, block *ethpb.SignedBeaconBlock, blockRoot [32]byte) error {
if ms.State == nil {
@@ -200,8 +195,8 @@ func (ms *ChainService) ReceiveBlockBatch(ctx context.Context, blks []*ethpb.Sig
return nil
}
// ReceiveBlockNoPubsub mocks ReceiveBlockNoPubsub method in chain service.
func (ms *ChainService) ReceiveBlockNoPubsub(ctx context.Context, block *ethpb.SignedBeaconBlock, blockRoot [32]byte) error {
// ReceiveBlock mocks ReceiveBlock method in chain service.
func (ms *ChainService) ReceiveBlock(ctx context.Context, block *ethpb.SignedBeaconBlock, blockRoot [32]byte) error {
if ms.State == nil {
ms.State = &stateTrie.BeaconState{}
}

View File

@@ -2,6 +2,7 @@ package validator
import (
"context"
"encoding/hex"
"fmt"
"math/big"
"time"
@@ -26,6 +27,7 @@ import (
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/rand"
"github.com/prysmaticlabs/prysm/shared/trieutil"
"github.com/sirupsen/logrus"
"go.opencensus.io/trace"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
@@ -134,6 +136,14 @@ func (vs *Server) ProposeBlock(ctx context.Context, blk *ethpb.SignedBeaconBlock
})
}()
// Broadcast the new block to the network.
if err := vs.P2P.Broadcast(ctx, blk); err != nil {
return nil, status.Errorf(codes.Internal, "Could not broadcast block: %v", err)
}
log.WithFields(logrus.Fields{
"blockRoot": hex.EncodeToString(root[:]),
}).Debug("Broadcasting block")
if err := vs.BlockReceiver.ReceiveBlock(ctx, blk, root); err != nil {
return nil, status.Errorf(codes.Internal, "Could not process beacon block: %v", err)
}

View File

@@ -20,6 +20,7 @@ import (
"github.com/prysmaticlabs/prysm/beacon-chain/operations/attestations"
"github.com/prysmaticlabs/prysm/beacon-chain/operations/slashings"
"github.com/prysmaticlabs/prysm/beacon-chain/operations/voluntaryexits"
mockp2p "github.com/prysmaticlabs/prysm/beacon-chain/p2p/testing"
mockPOW "github.com/prysmaticlabs/prysm/beacon-chain/powchain/testing"
beaconstate "github.com/prysmaticlabs/prysm/beacon-chain/state"
"github.com/prysmaticlabs/prysm/beacon-chain/state/stategen"
@@ -293,7 +294,10 @@ func TestProposeBlock_OK(t *testing.T) {
numDeposits := uint64(64)
beaconState, _ := testutil.DeterministicGenesisState(t, numDeposits)
bsRoot, err := beaconState.HashTreeRoot(ctx)
if err != nil {
t.Fatal(err)
}
genesisRoot, err := stateutil.BlockRoot(genesis.Block)
if err != nil {
t.Fatal(err)
@@ -302,7 +306,7 @@ func TestProposeBlock_OK(t *testing.T) {
t.Fatalf("Could not save genesis state: %v", err)
}
c := &mock.ChainService{}
c := &mock.ChainService{Root: bsRoot[:], State: beaconState}
proposerServer := &Server{
BeaconDB: db,
ChainStartFetcher: &mockPOW.POWChain{},
@@ -311,10 +315,11 @@ func TestProposeBlock_OK(t *testing.T) {
BlockReceiver: c,
HeadFetcher: c,
BlockNotifier: c.BlockNotifier(),
P2P: mockp2p.NewTestP2P(t),
}
req := testutil.NewBeaconBlock()
req.Block.Slot = 5
req.Block.ParentRoot = bytesutil.PadTo([]byte("parent-hash"), 32)
req.Block.ParentRoot = bsRoot[:]
if err := db.SaveBlock(ctx, req); err != nil {
t.Fatal(err)
}

View File

@@ -15,7 +15,7 @@ import (
// MigrateToCold advances the finalized info in between the cold and hot state sections.
// It moves the recent finalized states from the hot section to the cold section and
// only preserve the ones that's on archived point.
func (s *State) MigrateToCold(ctx context.Context, fSlot uint64, fRoot [32]byte) error {
func (s *State) MigrateToCold(ctx context.Context, fRoot [32]byte) error {
ctx, span := trace.StartSpan(ctx, "stateGen.MigrateToCold")
defer span.End()
@@ -23,6 +23,11 @@ func (s *State) MigrateToCold(ctx context.Context, fSlot uint64, fRoot [32]byte)
oldFSlot := s.finalizedInfo.slot
s.finalizedInfo.lock.RUnlock()
fBlock, err := s.beaconDB.Block(ctx, fRoot)
if err != nil {
return err
}
fSlot := fBlock.Block.Slot
if oldFSlot > fSlot {
return nil
}

View File

@@ -16,20 +16,26 @@ import (
func TestMigrateToCold_CanSaveFinalizedInfo(t *testing.T) {
ctx := context.Background()
db, _ := testDB.SetupDB(t)
service := New(db, cache.NewStateSummaryCache())
db, c := testDB.SetupDB(t)
service := New(db, c)
beaconState, _ := testutil.DeterministicGenesisState(t, 32)
r := [32]byte{'a'}
if err := service.epochBoundaryStateCache.put(r, beaconState); err != nil {
b := &ethpb.SignedBeaconBlock{Block: &ethpb.BeaconBlock{Slot: 1}}
br, err := stateutil.BlockRoot(b.Block)
if err != nil {
t.Fatal(err)
}
if err := service.beaconDB.SaveBlock(ctx, b); err != nil {
t.Fatal(err)
}
if err := service.epochBoundaryStateCache.put(br, beaconState); err != nil {
t.Fatal(err)
}
if err := service.MigrateToCold(ctx, 1, r); err != nil {
if err := service.MigrateToCold(ctx, br); err != nil {
t.Fatal(err)
}
wanted := &finalizedInfo{state: beaconState, root: r, slot: 1}
wanted := &finalizedInfo{state: beaconState, root: br, slot: 1}
if !reflect.DeepEqual(wanted, service.finalizedInfo) {
t.Error("Incorrect finalized info")
}
@@ -47,12 +53,18 @@ func TestMigrateToCold_HappyPath(t *testing.T) {
if err := beaconState.SetSlot(stateSlot); err != nil {
t.Fatal(err)
}
fRoot := [32]byte{'a'}
b := &ethpb.SignedBeaconBlock{Block: &ethpb.BeaconBlock{Slot: 2}}
fRoot, err := stateutil.BlockRoot(b.Block)
if err != nil {
t.Fatal(err)
}
if err := service.beaconDB.SaveBlock(ctx, b); err != nil {
t.Fatal(err)
}
if err := service.epochBoundaryStateCache.put(fRoot, beaconState); err != nil {
t.Fatal(err)
}
fSlot := uint64(2)
if err := service.MigrateToCold(ctx, fSlot, fRoot); err != nil {
if err := service.MigrateToCold(ctx, fRoot); err != nil {
t.Fatal(err)
}
@@ -90,7 +102,7 @@ func TestMigrateToCold_RegeneratePath(t *testing.T) {
if err := beaconState.SetSlot(stateSlot); err != nil {
t.Fatal(err)
}
blk := &ethpb.SignedBeaconBlock{Block: &ethpb.BeaconBlock{}}
blk := &ethpb.SignedBeaconBlock{Block: &ethpb.BeaconBlock{Slot: 2}}
fRoot, err := stateutil.BlockRoot(blk.Block)
if err != nil {
t.Fatal(err)
@@ -113,8 +125,7 @@ func TestMigrateToCold_RegeneratePath(t *testing.T) {
state: beaconState,
}
fSlot := uint64(2)
if err := service.MigrateToCold(ctx, fSlot, fRoot); err != nil {
if err := service.MigrateToCold(ctx, fRoot); err != nil {
t.Fatal(err)
}

View File

@@ -279,7 +279,7 @@ func TestBlocksQueueLoop(t *testing.T) {
if err != nil {
return err
}
if err := mc.ReceiveBlockNoPubsub(ctx, block, root); err != nil {
if err := mc.ReceiveBlock(ctx, block, root); err != nil {
return err
}

View File

@@ -126,7 +126,7 @@ func (s *Service) roundRobinSync(genesis time.Time) error {
return nil
}
for _, blk := range resp {
err := s.processBlock(ctx, genesis, blk, s.chain.ReceiveBlockNoPubsub)
err := s.processBlock(ctx, genesis, blk, s.chain.ReceiveBlock)
if err != nil {
log.WithError(err).Error("Failed to process block, exiting init sync")
return nil

View File

@@ -333,7 +333,7 @@ func TestService_processBlock(t *testing.T) {
// Process block normally.
err = s.processBlock(ctx, genesis, blk1, func(
ctx context.Context, block *eth.SignedBeaconBlock, blockRoot [32]byte) error {
if err := s.chain.ReceiveBlockNoPubsub(ctx, block, blockRoot); err != nil {
if err := s.chain.ReceiveBlock(ctx, block, blockRoot); err != nil {
t.Error(err)
}
return nil
@@ -355,7 +355,7 @@ func TestService_processBlock(t *testing.T) {
// Continue normal processing, should proceed w/o errors.
err = s.processBlock(ctx, genesis, blk2, func(
ctx context.Context, block *eth.SignedBeaconBlock, blockRoot [32]byte) error {
if err := s.chain.ReceiveBlockNoPubsub(ctx, block, blockRoot); err != nil {
if err := s.chain.ReceiveBlock(ctx, block, blockRoot); err != nil {
t.Error(err)
}
return nil

View File

@@ -115,7 +115,7 @@ func (s *Service) processPendingBlocks(ctx context.Context) error {
return err
}
if err := s.chain.ReceiveBlockNoPubsub(ctx, b, blkRoot); err != nil {
if err := s.chain.ReceiveBlock(ctx, b, blkRoot); err != nil {
log.Errorf("Could not process block from slot %d: %v", b.Block.Slot, err)
traceutil.AnnotateError(span, err)
}

View File

@@ -30,7 +30,7 @@ func (s *Service) beaconBlockSubscriber(ctx context.Context, msg proto.Message)
return err
}
if err := s.chain.ReceiveBlockNoPubsub(ctx, signed, root); err != nil {
if err := s.chain.ReceiveBlock(ctx, signed, root); err != nil {
interop.WriteBlockToDisk(signed, true /*failed*/)
return err
}

View File

@@ -75,7 +75,7 @@ func TestService_beaconBlockSubscriber(t *testing.T) {
args: args{
msg: &ethpb.SignedBeaconBlock{
Block: &ethpb.BeaconBlock{
// An empty block will return an err in mocked chainService.ReceiveBlockNoPubsub.
// An empty block will return an err in mocked chainService.ReceiveBlock.
Body: &ethpb.BeaconBlockBody{Attestations: pooledAttestations},
},
},