Refactor ChainService for Receiving Blocks Synchronously (#1984)

* refactor chain service

* restructure service and lint

* all calls to receive block in chain service are now blocking

* begin fixing tests

* refactored blockchain tests

* builds correctly

* blockchain tests pass again

* lint

* sync and rpc tests pass again

* done

* add in open tracing

* span in fork choice

* Update beacon-chain/blockchain/block_processing.go

Co-Authored-By: rauljordan <raul@prysmaticlabs.com>
This commit is contained in:
Raul Jordan
2019-03-13 17:17:32 -04:00
committed by GitHub
parent 544c040fcb
commit 5371218b9b
18 changed files with 920 additions and 1002 deletions

View File

@@ -3,6 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = [
"block_processing.go",
"fork_choice.go",
"service.go",
],
@@ -20,18 +21,18 @@ go_library(
"//shared/bytesutil:go_default_library",
"//shared/event:go_default_library",
"//shared/hashutil:go_default_library",
"//shared/messagehandler:go_default_library",
"//shared/params:go_default_library",
"@com_github_ethereum_go_ethereum//common:go_default_library",
"@com_github_ethereum_go_ethereum//core/types:go_default_library",
"@com_github_gogo_protobuf//proto:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@io_opencensus_go//trace:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = [
"block_processing_test.go",
"fork_choice_test.go",
"service_test.go",
],

View File

@@ -0,0 +1,197 @@
package blockchain
import (
"context"
"fmt"
"github.com/ethereum/go-ethereum/common"
gethTypes "github.com/ethereum/go-ethereum/core/types"
b "github.com/prysmaticlabs/prysm/beacon-chain/core/blocks"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/core/state"
"github.com/prysmaticlabs/prysm/beacon-chain/core/validators"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/hashutil"
"github.com/prysmaticlabs/prysm/shared/params"
"go.opencensus.io/trace"
)
// ReceiveBlock is a function that defines the operations that are preformed on
// any block that is received from p2p layer or rpc. It checks the block to see
// if it passes the pre-processing conditions, if it does then the per slot
// state transition function is carried out on the block.
// spec:
// def process_block(block):
// if not block_pre_processing_conditions(block):
// return nil, error
//
// # process skipped slots
//
// while (state.slot < block.slot - 1):
// state = slot_state_transition(state, block=None)
//
// # process slot with block
// state = slot_state_transition(state, block)
//
// # check state root
// if block.state_root == hash(state):
// return state, error
// else:
// return nil, error # or throw or whatever
//
func (c *ChainService) ReceiveBlock(ctx context.Context, block *pb.BeaconBlock) (*pb.BeaconState, error) {
ctx, span := trace.StartSpan(ctx, "beacon-chain.blockchain.ReceiveBlock")
defer span.End()
beaconState, err := c.beaconDB.State(ctx)
if err != nil {
return nil, fmt.Errorf("could not retrieve beacon state: %v", err)
}
blockRoot, err := hashutil.HashBeaconBlock(block)
if err != nil {
return nil, fmt.Errorf("could not tree hash incoming block: %v", err)
}
if block.Slot == params.BeaconConfig().GenesisSlot {
return nil, fmt.Errorf("cannot process a genesis block: received block with slot %d",
block.Slot-params.BeaconConfig().GenesisSlot)
}
// Save blocks with higher slot numbers in cache.
if err := c.isBlockReadyForProcessing(block, beaconState); err != nil {
return nil, fmt.Errorf("block with root %#x is not ready for processing: %v", blockRoot, err)
}
// Retrieve the last processed beacon block's hash root.
headRoot, err := c.ChainHeadRoot()
if err != nil {
return nil, fmt.Errorf("could not retrieve chain head root: %v", err)
}
log.WithField("slotNumber", block.Slot-params.BeaconConfig().GenesisSlot).Info(
"Executing state transition")
// Check for skipped slots.
numSkippedSlots := 0
for beaconState.Slot < block.Slot-1 {
beaconState, err = c.runStateTransition(headRoot, nil, beaconState)
if err != nil {
return nil, fmt.Errorf("could not execute state transition without block %v", err)
}
numSkippedSlots++
}
if numSkippedSlots > 0 {
log.Warnf("Processed %d skipped slots", numSkippedSlots)
}
beaconState, err = c.runStateTransition(headRoot, block, beaconState)
if err != nil {
return nil, fmt.Errorf("could not execute state transition with block %v", err)
}
// if there exists a block for the slot being processed.
if err := c.beaconDB.SaveBlock(block); err != nil {
return nil, fmt.Errorf("failed to save block: %v", err)
}
// Forward processed block to operation pool to remove individual operation from DB.
if c.opsPoolService.IncomingProcessedBlockFeed().Send(block) == 0 {
log.Error("Sent processed block to no subscribers")
}
// Remove pending deposits from the deposit queue.
for _, dep := range block.Body.Deposits {
c.beaconDB.RemovePendingDeposit(ctx, dep)
}
log.WithField("hash", fmt.Sprintf("%#x", blockRoot)).Debug("Processed beacon block")
return beaconState, nil
}
func (c *ChainService) isBlockReadyForProcessing(block *pb.BeaconBlock, beaconState *pb.BeaconState) error {
var powBlockFetcher func(ctx context.Context, hash common.Hash) (*gethTypes.Block, error)
if c.enablePOWChain {
powBlockFetcher = c.web3Service.Client().BlockByHash
}
if err := b.IsValidBlock(c.ctx, beaconState, block, c.enablePOWChain,
c.beaconDB.HasBlock, powBlockFetcher, c.genesisTime); err != nil {
return fmt.Errorf("block does not fulfill pre-processing conditions %v", err)
}
return nil
}
func (c *ChainService) runStateTransition(
headRoot [32]byte, block *pb.BeaconBlock, beaconState *pb.BeaconState,
) (*pb.BeaconState, error) {
beaconState, err := state.ExecuteStateTransition(
c.ctx,
beaconState,
block,
headRoot,
true, /* sig verify */
)
if err != nil {
return nil, fmt.Errorf("could not execute state transition %v", err)
}
log.WithField(
"slotsSinceGenesis", beaconState.Slot-params.BeaconConfig().GenesisSlot,
).Info("Slot transition successfully processed")
if block != nil {
log.WithField(
"slotsSinceGenesis", beaconState.Slot-params.BeaconConfig().GenesisSlot,
).Info("Block transition successfully processed")
}
if helpers.IsEpochEnd(beaconState.Slot) {
// Save activated validators of this epoch to public key -> index DB.
if err := c.saveValidatorIdx(beaconState); err != nil {
return nil, fmt.Errorf("could not save validator index: %v", err)
}
// Delete exited validators of this epoch to public key -> index DB.
if err := c.deleteValidatorIdx(beaconState); err != nil {
return nil, fmt.Errorf("could not delete validator index: %v", err)
}
log.WithField(
"SlotsSinceGenesis", beaconState.Slot-params.BeaconConfig().GenesisSlot,
).Info("Epoch transition successfully processed")
}
return beaconState, nil
}
func (c *ChainService) saveFinalizedState(beaconState *pb.BeaconState) error {
// check if the finalized epoch has changed, if it
// has we save the finalized state.
if c.finalizedEpoch != beaconState.FinalizedEpoch {
c.finalizedEpoch = beaconState.FinalizedEpoch
return c.beaconDB.SaveFinalizedState(beaconState)
}
return nil
}
// saveValidatorIdx saves the validators public key to index mapping in DB, these
// validators were activated from current epoch. After it saves, current epoch key
// is deleted from ActivatedValidators mapping.
func (c *ChainService) saveValidatorIdx(state *pb.BeaconState) error {
for _, idx := range validators.ActivatedValidators[helpers.CurrentEpoch(state)] {
pubKey := state.ValidatorRegistry[idx].Pubkey
if err := c.beaconDB.SaveValidatorIndex(pubKey, int(idx)); err != nil {
return fmt.Errorf("could not save validator index: %v", err)
}
}
delete(validators.ActivatedValidators, helpers.CurrentEpoch(state))
return nil
}
// deleteValidatorIdx deletes the validators public key to index mapping in DB, the
// validators were exited from current epoch. After it deletes, current epoch key
// is deleted from ExitedValidators mapping.
func (c *ChainService) deleteValidatorIdx(state *pb.BeaconState) error {
for _, idx := range validators.ExitedValidators[helpers.CurrentEpoch(state)] {
pubKey := state.ValidatorRegistry[idx].Pubkey
if err := c.beaconDB.DeleteValidatorIndex(pubKey); err != nil {
return fmt.Errorf("could not delete validator index: %v", err)
}
}
delete(validators.ExitedValidators, helpers.CurrentEpoch(state))
return nil
}

View File

@@ -0,0 +1,360 @@
package blockchain
import (
"context"
"encoding/binary"
"math/big"
"testing"
"time"
b "github.com/prysmaticlabs/prysm/beacon-chain/core/blocks"
"github.com/prysmaticlabs/prysm/beacon-chain/core/state"
v "github.com/prysmaticlabs/prysm/beacon-chain/core/validators"
"github.com/prysmaticlabs/prysm/beacon-chain/internal"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/hashutil"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/testutil"
"github.com/prysmaticlabs/prysm/shared/trieutil"
logTest "github.com/sirupsen/logrus/hooks/test"
)
func TestReceiveBlock_FaultyPOWChain(t *testing.T) {
db := internal.SetupDB(t)
defer internal.TeardownDB(t, db)
chainService := setupBeaconChain(t, true, db, true, nil)
unixTime := uint64(time.Now().Unix())
deposits, _ := setupInitialDeposits(t, 100)
if err := db.InitializeState(unixTime, deposits, &pb.Eth1Data{}); err != nil {
t.Fatalf("Could not initialize beacon state to disk: %v", err)
}
if err := SetSlotInState(chainService, 1); err != nil {
t.Fatal(err)
}
parentBlock := &pb.BeaconBlock{
Slot: 1,
}
parentRoot, err := hashutil.HashBeaconBlock(parentBlock)
if err != nil {
t.Fatalf("Unable to tree hash block %v", err)
}
if err := chainService.beaconDB.SaveBlock(parentBlock); err != nil {
t.Fatalf("Unable to save block %v", err)
}
block := &pb.BeaconBlock{
Slot: 2,
ParentRootHash32: parentRoot[:],
Eth1Data: &pb.Eth1Data{
DepositRootHash32: []byte("a"),
BlockHash32: []byte("b"),
},
}
if err := chainService.beaconDB.SaveBlock(block); err != nil {
t.Fatal(err)
}
if _, err := chainService.ReceiveBlock(context.Background(), block); err == nil {
t.Errorf("Expected receive block to fail, received nil: %v", err)
}
}
func TestReceiveBlock_ProcessCorrectly(t *testing.T) {
hook := logTest.NewGlobal()
db := internal.SetupDB(t)
defer internal.TeardownDB(t, db)
chainService := setupBeaconChain(t, false, db, true, nil)
deposits, privKeys := setupInitialDeposits(t, 100)
eth1Data := &pb.Eth1Data{
DepositRootHash32: []byte{},
BlockHash32: []byte{},
}
beaconState, err := state.GenesisBeaconState(deposits, 0, eth1Data)
if err != nil {
t.Fatalf("Can't generate genesis state: %v", err)
}
stateRoot, err := hashutil.HashProto(beaconState)
if err != nil {
t.Fatalf("Could not tree hash state: %v", err)
}
parentHash, genesisBlock := setupGenesisBlock(t, chainService, beaconState)
if err := chainService.beaconDB.UpdateChainHead(genesisBlock, beaconState); err != nil {
t.Fatal(err)
}
currentSlot := params.BeaconConfig().GenesisSlot
beaconState.Slot++
randaoReveal := createRandaoReveal(t, beaconState, privKeys)
block := &pb.BeaconBlock{
Slot: currentSlot + 1,
StateRootHash32: stateRoot[:],
ParentRootHash32: parentHash[:],
RandaoReveal: randaoReveal,
Eth1Data: &pb.Eth1Data{
DepositRootHash32: []byte("a"),
BlockHash32: []byte("b"),
},
Body: &pb.BeaconBlockBody{
Attestations: nil,
},
}
if err := chainService.beaconDB.SaveBlock(block); err != nil {
t.Fatal(err)
}
if _, err := chainService.ReceiveBlock(context.Background(), block); err != nil {
t.Errorf("Block failed processing: %v", err)
}
testutil.AssertLogsContain(t, hook, "Processed beacon block")
}
func TestReceiveBlock_RemovesPendingDeposits(t *testing.T) {
hook := logTest.NewGlobal()
db := internal.SetupDB(t)
defer internal.TeardownDB(t, db)
chainService := setupBeaconChain(t, false, db, true, nil)
deposits, privKeys := setupInitialDeposits(t, 100)
eth1Data := &pb.Eth1Data{
DepositRootHash32: []byte{},
BlockHash32: []byte{},
}
beaconState, err := state.GenesisBeaconState(deposits, 0, eth1Data)
if err != nil {
t.Fatalf("Can't generate genesis state: %v", err)
}
stateRoot, err := hashutil.HashProto(beaconState)
if err != nil {
t.Fatalf("Could not tree hash state: %v", err)
}
parentHash, genesisBlock := setupGenesisBlock(t, chainService, beaconState)
beaconState.Slot++
if err := chainService.beaconDB.UpdateChainHead(genesisBlock, beaconState); err != nil {
t.Fatal(err)
}
currentSlot := params.BeaconConfig().GenesisSlot
randaoReveal := createRandaoReveal(t, beaconState, privKeys)
pendingDeposits := []*pb.Deposit{
createPreChainStartDeposit(t, []byte{'F'}),
}
pendingDepositsData := make([][]byte, len(pendingDeposits))
for i, pd := range pendingDeposits {
pendingDepositsData[i] = pd.DepositData
}
depositTrie, err := trieutil.GenerateTrieFromItems(pendingDepositsData, int(params.BeaconConfig().DepositContractTreeDepth))
if err != nil {
t.Fatalf("Could not generate deposit trie: %v", err)
}
for i := range pendingDeposits {
pendingDeposits[i].MerkleTreeIndex = 0
proof, err := depositTrie.MerkleProof(int(pendingDeposits[i].MerkleTreeIndex))
if err != nil {
t.Fatalf("Could not generate proof: %v", err)
}
pendingDeposits[i].MerkleBranchHash32S = proof
}
depositRoot := depositTrie.Root()
beaconState.LatestEth1Data.DepositRootHash32 = depositRoot[:]
block := &pb.BeaconBlock{
Slot: currentSlot + 1,
StateRootHash32: stateRoot[:],
ParentRootHash32: parentHash[:],
RandaoReveal: randaoReveal,
Eth1Data: &pb.Eth1Data{
DepositRootHash32: []byte("a"),
BlockHash32: []byte("b"),
},
Body: &pb.BeaconBlockBody{
Deposits: pendingDeposits,
},
}
for _, dep := range pendingDeposits {
db.InsertPendingDeposit(chainService.ctx, dep, big.NewInt(0))
}
if len(db.PendingDeposits(chainService.ctx, nil)) != len(pendingDeposits) || len(pendingDeposits) == 0 {
t.Fatalf("Expected %d pending deposits", len(pendingDeposits))
}
beaconState.Slot--
if err := chainService.beaconDB.SaveState(beaconState); err != nil {
t.Fatal(err)
}
computedState, err := chainService.ReceiveBlock(context.Background(), block)
if err != nil {
t.Fatal(err)
}
if err := chainService.ApplyForkChoiceRule(context.Background(), block, computedState); err != nil {
t.Fatal(err)
}
if len(db.PendingDeposits(chainService.ctx, nil)) != 0 {
t.Fatalf("Expected 0 pending deposits, but there are %+v", db.PendingDeposits(chainService.ctx, nil))
}
testutil.AssertLogsContain(t, hook, "Executing state transition")
}
func TestIsBlockReadyForProcessing_ValidBlock(t *testing.T) {
db := internal.SetupDB(t)
defer internal.TeardownDB(t, db)
ctx := context.Background()
chainService := setupBeaconChain(t, false, db, true, nil)
unixTime := uint64(time.Now().Unix())
deposits, privKeys := setupInitialDeposits(t, 100)
if err := db.InitializeState(unixTime, deposits, &pb.Eth1Data{}); err != nil {
t.Fatalf("Could not initialize beacon state to disk: %v", err)
}
beaconState, err := db.State(ctx)
if err != nil {
t.Fatalf("Can't get genesis state: %v", err)
}
block := &pb.BeaconBlock{
ParentRootHash32: []byte{'a'},
}
if err := chainService.isBlockReadyForProcessing(block, beaconState); err == nil {
t.Fatal("block processing succeeded despite block having no parent saved")
}
beaconState.Slot = params.BeaconConfig().GenesisSlot + 10
stateRoot, err := hashutil.HashProto(beaconState)
if err != nil {
t.Fatalf("Could not tree hash state: %v", err)
}
genesis := b.NewGenesisBlock([]byte{})
if err := chainService.beaconDB.SaveBlock(genesis); err != nil {
t.Fatalf("cannot save block: %v", err)
}
parentRoot, err := hashutil.HashBeaconBlock(genesis)
if err != nil {
t.Fatalf("unable to get root of canonical head: %v", err)
}
beaconState.LatestEth1Data = &pb.Eth1Data{
DepositRootHash32: []byte{2},
BlockHash32: []byte{3},
}
beaconState.Slot = params.BeaconConfig().GenesisSlot
currentSlot := params.BeaconConfig().GenesisSlot + 1
attestationSlot := params.BeaconConfig().GenesisSlot
randaoReveal := createRandaoReveal(t, beaconState, privKeys)
block2 := &pb.BeaconBlock{
Slot: currentSlot,
StateRootHash32: stateRoot[:],
ParentRootHash32: parentRoot[:],
RandaoReveal: randaoReveal,
Eth1Data: &pb.Eth1Data{
DepositRootHash32: []byte("a"),
BlockHash32: []byte("b"),
},
Body: &pb.BeaconBlockBody{
Attestations: []*pb.Attestation{{
AggregationBitfield: []byte{128, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
Data: &pb.AttestationData{
Slot: attestationSlot,
JustifiedBlockRootHash32: parentRoot[:],
},
}},
},
}
chainService.enablePOWChain = true
if err := chainService.isBlockReadyForProcessing(block2, beaconState); err != nil {
t.Fatalf("block processing failed despite being a valid block: %v", err)
}
}
func TestDeleteValidatorIdx_DeleteWorks(t *testing.T) {
db := internal.SetupDB(t)
defer internal.TeardownDB(t, db)
epoch := uint64(2)
v.ActivatedValidators[epoch] = []uint64{0, 1, 2}
v.ExitedValidators[epoch] = []uint64{0, 2}
var validators []*pb.Validator
for i := 0; i < 3; i++ {
pubKeyBuf := make([]byte, params.BeaconConfig().BLSPubkeyLength)
binary.PutUvarint(pubKeyBuf, uint64(i))
validators = append(validators, &pb.Validator{
Pubkey: pubKeyBuf,
})
}
state := &pb.BeaconState{
ValidatorRegistry: validators,
Slot: epoch * params.BeaconConfig().SlotsPerEpoch,
}
chainService := setupBeaconChain(t, false, db, true, nil)
if err := chainService.saveValidatorIdx(state); err != nil {
t.Fatalf("Could not save validator idx: %v", err)
}
if err := chainService.deleteValidatorIdx(state); err != nil {
t.Fatalf("Could not delete validator idx: %v", err)
}
wantedIdx := uint64(1)
idx, err := chainService.beaconDB.ValidatorIndex(validators[wantedIdx].Pubkey)
if err != nil {
t.Fatalf("Could not get validator index: %v", err)
}
if wantedIdx != idx {
t.Errorf("Wanted: %d, got: %d", wantedIdx, idx)
}
wantedIdx = uint64(2)
if chainService.beaconDB.HasValidator(validators[wantedIdx].Pubkey) {
t.Errorf("Validator index %d should have been deleted", wantedIdx)
}
if _, ok := v.ExitedValidators[epoch]; ok {
t.Errorf("Activated validators mapping for epoch %d still there", epoch)
}
}
func TestSaveValidatorIdx_SaveRetrieveWorks(t *testing.T) {
db := internal.SetupDB(t)
defer internal.TeardownDB(t, db)
epoch := uint64(1)
v.ActivatedValidators[epoch] = []uint64{0, 1, 2}
var validators []*pb.Validator
for i := 0; i < 3; i++ {
pubKeyBuf := make([]byte, params.BeaconConfig().BLSPubkeyLength)
binary.PutUvarint(pubKeyBuf, uint64(i))
validators = append(validators, &pb.Validator{
Pubkey: pubKeyBuf,
})
}
state := &pb.BeaconState{
ValidatorRegistry: validators,
Slot: epoch * params.BeaconConfig().SlotsPerEpoch,
}
chainService := setupBeaconChain(t, false, db, true, nil)
if err := chainService.saveValidatorIdx(state); err != nil {
t.Fatalf("Could not save validator idx: %v", err)
}
wantedIdx := uint64(2)
idx, err := chainService.beaconDB.ValidatorIndex(validators[wantedIdx].Pubkey)
if err != nil {
t.Fatalf("Could not get validator index: %v", err)
}
if wantedIdx != idx {
t.Errorf("Wanted: %d, got: %d", wantedIdx, idx)
}
if _, ok := v.ActivatedValidators[epoch]; ok {
t.Errorf("Activated validators mapping for epoch %d still there", epoch)
}
}

View File

@@ -1,6 +1,7 @@
package blockchain
import (
"context"
"fmt"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
@@ -8,8 +9,158 @@ import (
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/hashutil"
"go.opencensus.io/trace"
)
// ApplyForkChoiceRule determines the current beacon chain head using LMD GHOST as a block-vote
// weighted function to select a canonical head in Ethereum Serenity.
func (c *ChainService) ApplyForkChoiceRule(ctx context.Context, block *pb.BeaconBlock, computedState *pb.BeaconState) error {
ctx, span := trace.StartSpan(ctx, "beacon-chain.blockchain.ApplyForkChoiceRule")
defer span.End()
h, err := hashutil.HashBeaconBlock(block)
if err != nil {
return fmt.Errorf("could not tree hash incoming block: %v", err)
}
// TODO(#1307): Use LMD GHOST as the fork-choice rule for Ethereum Serenity.
// TODO(#674): Handle chain reorgs.
if err := c.beaconDB.UpdateChainHead(block, computedState); err != nil {
return fmt.Errorf("failed to update chain: %v", err)
}
log.WithField("blockRoot", fmt.Sprintf("0x%x", h)).Info("Chain head block and state updated")
// We fire events that notify listeners of a new block in
// the case of a state transition. This is useful for the beacon node's gRPC
// server to stream these events to beacon clients.
// When the transition is a cycle transition, we stream the state containing the new validator
// assignments to clients.
if err := c.saveFinalizedState(computedState); err != nil {
log.Errorf("Could not save new finalized state: %v", err)
}
if c.canonicalBlockFeed.Send(&pb.BeaconBlockAnnounce{
Hash: h[:],
SlotNumber: block.Slot,
}) == 0 {
log.Error("Sent canonical block to no subscribers")
}
return nil
}
// lmdGhost applies the Latest Message Driven, Greediest Heaviest Observed Sub-Tree
// fork-choice rule defined in the Ethereum Serenity specification for the beacon chain.
//
// Spec pseudocode definition:
// def lmd_ghost(store: Store, start_state: BeaconState, start_block: BeaconBlock) -> BeaconBlock:
// """
// Execute the LMD-GHOST algorithm to find the head ``BeaconBlock``.
// """
// validators = start_state.validator_registry
// active_validator_indices = get_active_validator_indices(validators, slot_to_epoch(start_state.slot))
// attestation_targets = [
// (validator_index, get_latest_attestation_target(store, validator_index))
// for validator_index in active_validator_indices
// ]
//
// def get_vote_count(block: BeaconBlock) -> int:
// return sum(
// get_effective_balance(start_state.validator_balances[validator_index]) // FORK_CHOICE_BALANCE_INCREMENT
// for validator_index, target in attestation_targets
// if get_ancestor(store, target, block.slot) == block
// )
//
// head = start_block
// while 1:
// children = get_children(store, head)
// if len(children) == 0:
// return head
// head = max(children, key=get_vote_count)
func (c *ChainService) lmdGhost(
block *pb.BeaconBlock,
state *pb.BeaconState,
voteTargets map[uint64]*pb.BeaconBlock,
) (*pb.BeaconBlock, error) {
head := block
for {
children, err := c.blockChildren(head, state)
if err != nil {
return nil, fmt.Errorf("could not fetch block children: %v", err)
}
if len(children) == 0 {
return head, nil
}
maxChild := children[0]
maxChildVotes, err := VoteCount(maxChild, state, voteTargets, c.beaconDB)
if err != nil {
return nil, fmt.Errorf("unable to determine vote count for block: %v", err)
}
for i := 0; i < len(children); i++ {
candidateChildVotes, err := VoteCount(children[i], state, voteTargets, c.beaconDB)
if err != nil {
return nil, fmt.Errorf("unable to determine vote count for block: %v", err)
}
if candidateChildVotes > maxChildVotes {
maxChild = children[i]
}
}
head = maxChild
}
}
// blockChildren returns the child blocks of the given block.
// ex:
// /- C - E
// A - B - D - F
// \- G
// Input: B. Output: [C, D, G]
//
// Spec pseudocode definition:
// get_children(store: Store, block: BeaconBlock) -> List[BeaconBlock]
// returns the child blocks of the given block.
func (c *ChainService) blockChildren(block *pb.BeaconBlock, state *pb.BeaconState) ([]*pb.BeaconBlock, error) {
var children []*pb.BeaconBlock
currentRoot, err := hashutil.HashBeaconBlock(block)
if err != nil {
return nil, fmt.Errorf("could not tree hash incoming block: %v", err)
}
startSlot := block.Slot + 1
currentSlot := state.Slot
for i := startSlot; i <= currentSlot; i++ {
block, err := c.beaconDB.BlockBySlot(i)
if err != nil {
return nil, fmt.Errorf("could not get block by slot: %v", err)
}
// Continue if there's a skip block.
if block == nil {
continue
}
parentRoot := bytesutil.ToBytes32(block.ParentRootHash32)
if currentRoot == parentRoot {
children = append(children, block)
}
}
return children, nil
}
// attestationTargets retrieves the list of attestation targets since last finalized epoch,
// each attestation target consists of validator index and its attestation target (i.e. the block
// which the validator attested to)
func (c *ChainService) attestationTargets(state *pb.BeaconState) ([]*attestationTarget, error) {
indices := helpers.ActiveValidatorIndices(state.ValidatorRegistry, state.FinalizedEpoch)
attestationTargets := make([]*attestationTarget, len(indices))
for i, index := range indices {
block, err := c.attsService.LatestAttestationTarget(c.ctx, index)
if err != nil {
return nil, fmt.Errorf("could not retrieve attestation target: %v", err)
}
attestationTargets[i] = &attestationTarget{
validatorIndex: index,
block: block,
}
}
return attestationTargets, nil
}
// VoteCount determines the number of votes on a beacon block by counting the number
// of target blocks that have such beacon block as a common ancestor.
//

View File

@@ -18,6 +18,8 @@ import (
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/hashutil"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/testutil"
logTest "github.com/sirupsen/logrus/hooks/test"
)
// Generates an initial genesis block and state using a custom number of initial
@@ -65,30 +67,87 @@ func generateTestGenesisStateAndBlock(
return beaconState, genesisBlock, stateRoot, genesisRoot
}
func setupConflictingBlocks(
t *testing.T,
beaconDB *db.BeaconDB,
genesisHash [32]byte,
stateRoot [32]byte,
) (candidate1 *pb.BeaconBlock, candidate2 *pb.BeaconBlock) {
candidate1 = &pb.BeaconBlock{
Slot: 5,
ParentRootHash32: genesisHash[:],
StateRootHash32: stateRoot[:],
func TestApplyForkChoice_SetsCanonicalHead(t *testing.T) {
beaconState, err := state.GenesisBeaconState(nil, 0, nil)
if err != nil {
t.Fatalf("Cannot create genesis beacon state: %v", err)
}
candidate2 = &pb.BeaconBlock{
Slot: 5,
ParentRootHash32: genesisHash[:],
StateRootHash32: []byte("some-other-state"),
stateRoot, err := hashutil.HashProto(beaconState)
if err != nil {
t.Fatalf("Could not tree hash state: %v", err)
}
// We store these potential heads in the DB.
if err := beaconDB.SaveBlock(candidate1); err != nil {
t.Fatal(err)
genesis := b.NewGenesisBlock(stateRoot[:])
genesisRoot, err := hashutil.HashProto(genesis)
if err != nil {
t.Fatalf("Could not get genesis block root: %v", err)
}
if err := beaconDB.SaveBlock(candidate2); err != nil {
t.Fatal(err)
// Table driven tests for various fork choice scenarios.
tests := []struct {
blockSlot uint64
state *pb.BeaconState
logAssert string
}{
// Higher slot but same state should trigger chain update.
{
blockSlot: 64,
state: beaconState,
logAssert: "Chain head block and state updated",
},
// Higher slot, different state, but higher last finalized slot.
{
blockSlot: 64,
state: &pb.BeaconState{FinalizedEpoch: 2},
logAssert: "Chain head block and state updated",
},
// Higher slot, different state, same last finalized slot,
// but last justified slot.
{
blockSlot: 64,
state: &pb.BeaconState{
FinalizedEpoch: 0,
JustifiedEpoch: 2,
},
logAssert: "Chain head block and state updated",
},
}
for _, tt := range tests {
hook := logTest.NewGlobal()
db := internal.SetupDB(t)
defer internal.TeardownDB(t, db)
chainService := setupBeaconChain(t, false, db, true, nil)
unixTime := uint64(time.Now().Unix())
deposits, _ := setupInitialDeposits(t, 100)
if err := db.InitializeState(unixTime, deposits, &pb.Eth1Data{}); err != nil {
t.Fatalf("Could not initialize beacon state to disk: %v", err)
}
stateRoot, err := hashutil.HashProto(tt.state)
if err != nil {
t.Fatalf("Could not tree hash state: %v", err)
}
block := &pb.BeaconBlock{
Slot: tt.blockSlot,
StateRootHash32: stateRoot[:],
ParentRootHash32: genesisRoot[:],
Eth1Data: &pb.Eth1Data{
DepositRootHash32: []byte("a"),
BlockHash32: []byte("b"),
},
}
if err := chainService.beaconDB.SaveBlock(block); err != nil {
t.Fatal(err)
}
if err := chainService.ApplyForkChoiceRule(context.Background(), block, tt.state); err != nil {
t.Errorf("Expected head to update, received %v", err)
}
if err := chainService.beaconDB.SaveBlock(block); err != nil {
t.Fatal(err)
}
chainService.cancel()
testutil.AssertLogsContain(t, hook, tt.logAssert)
}
return candidate1, candidate2
}
func TestVoteCount_ParentDoesNotExistNoVoteCount(t *testing.T) {

View File

@@ -8,22 +8,13 @@ import (
"fmt"
"time"
"github.com/ethereum/go-ethereum/common"
gethTypes "github.com/ethereum/go-ethereum/core/types"
"github.com/gogo/protobuf/proto"
"github.com/prysmaticlabs/prysm/beacon-chain/attestation"
b "github.com/prysmaticlabs/prysm/beacon-chain/core/blocks"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/core/state"
"github.com/prysmaticlabs/prysm/beacon-chain/core/validators"
"github.com/prysmaticlabs/prysm/beacon-chain/db"
"github.com/prysmaticlabs/prysm/beacon-chain/powchain"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/event"
"github.com/prysmaticlabs/prysm/shared/hashutil"
handler "github.com/prysmaticlabs/prysm/shared/messagehandler"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/sirupsen/logrus"
)
@@ -42,11 +33,9 @@ type ChainService struct {
web3Service *powchain.Web3Service
attsService *attestation.Service
opsPoolService operationService
incomingBlockFeed *event.Feed
incomingBlockChan chan *pb.BeaconBlock
chainStartChan chan time.Time
canonicalBlockChan chan *pb.BeaconBlock
canonicalBlockFeed *event.Feed
canonicalStateFeed *event.Feed
genesisTime time.Time
enablePOWChain bool
finalizedEpoch uint64
@@ -55,14 +44,13 @@ type ChainService struct {
// Config options for the service.
type Config struct {
BeaconBlockBuf int
IncomingBlockBuf int
Web3Service *powchain.Web3Service
AttsService *attestation.Service
BeaconDB *db.BeaconDB
OpsPoolService operationService
DevMode bool
EnablePOWChain bool
BeaconBlockBuf int
Web3Service *powchain.Web3Service
AttsService *attestation.Service
BeaconDB *db.BeaconDB
OpsPoolService operationService
DevMode bool
EnablePOWChain bool
}
// attestationTarget consists of validator index and block, it's
@@ -83,11 +71,9 @@ func NewChainService(ctx context.Context, cfg *Config) (*ChainService, error) {
web3Service: cfg.Web3Service,
opsPoolService: cfg.OpsPoolService,
attsService: cfg.AttsService,
incomingBlockChan: make(chan *pb.BeaconBlock, cfg.IncomingBlockBuf),
chainStartChan: make(chan time.Time),
incomingBlockFeed: new(event.Feed),
canonicalBlockFeed: new(event.Feed),
canonicalStateFeed: new(event.Feed),
canonicalBlockChan: make(chan *pb.BeaconBlock, cfg.BeaconBlockBuf),
chainStartChan: make(chan time.Time),
stateInitializedFeed: new(event.Feed),
enablePOWChain: cfg.EnablePOWChain,
}, nil
@@ -104,7 +90,6 @@ func (c *ChainService) Start() {
log.Info("Beacon chain data already exists, starting service")
c.genesisTime = time.Unix(int64(beaconState.GenesisTime), 0)
c.finalizedEpoch = beaconState.FinalizedEpoch
go c.blockProcessing()
} else {
log.Info("Waiting for ChainStart log from the Validator Deposit Contract to start the beacon chain...")
if c.web3Service == nil {
@@ -114,31 +99,36 @@ func (c *ChainService) Start() {
subChainStart := c.web3Service.ChainStartFeed().Subscribe(c.chainStartChan)
go func() {
genesisTime := <-c.chainStartChan
initialDepositsData := c.web3Service.ChainStartDeposits()
initialDeposits := make([]*pb.Deposit, len(initialDepositsData))
for i := range initialDepositsData {
initialDeposits[i] = &pb.Deposit{DepositData: initialDepositsData[i]}
}
depositRoot := c.web3Service.DepositRoot()
latestBlockHash := c.web3Service.LatestBlockHash()
eth1Data := &pb.Eth1Data{
DepositRootHash32: depositRoot[:],
BlockHash32: latestBlockHash[:],
}
beaconState, err := c.initializeBeaconChain(genesisTime, initialDeposits, eth1Data)
if err != nil {
log.Fatalf("Could not initialize beacon chain: %v", err)
}
c.finalizedEpoch = beaconState.FinalizedEpoch
c.stateInitializedFeed.Send(genesisTime)
c.canonicalStateFeed.Send(beaconState)
go c.blockProcessing()
subChainStart.Unsubscribe()
c.processChainStartTime(genesisTime, subChainStart)
return
}()
}
}
// processChainStartTime initializes a series of deposits from the ChainStart deposits in the eth1
// deposit contract, initializes the beacon chain's state, and kicks off the beacon chain.
func (c *ChainService) processChainStartTime(genesisTime time.Time, chainStartSub event.Subscription) {
initialDepositsData := c.web3Service.ChainStartDeposits()
initialDeposits := make([]*pb.Deposit, len(initialDepositsData))
for i := range initialDepositsData {
initialDeposits[i] = &pb.Deposit{DepositData: initialDepositsData[i]}
}
depositRoot := c.web3Service.DepositRoot()
latestBlockHash := c.web3Service.LatestBlockHash()
eth1Data := &pb.Eth1Data{
DepositRootHash32: depositRoot[:],
BlockHash32: latestBlockHash[:],
}
beaconState, err := c.initializeBeaconChain(genesisTime, initialDeposits, eth1Data)
if err != nil {
log.Fatalf("Could not initialize beacon chain: %v", err)
}
c.finalizedEpoch = beaconState.FinalizedEpoch
c.stateInitializedFeed.Send(genesisTime)
chainStartSub.Unsubscribe()
}
// initializes the state and genesis block of the beacon chain to persistent storage
// based on a genesis timestamp value obtained from the ChainStart event emitted
// by the ETH1.0 Deposit Contract and the POWChain service of the node.
@@ -182,24 +172,12 @@ func (c *ChainService) Status() error {
return nil
}
// IncomingBlockFeed returns a feed that any service can send incoming p2p blocks into.
// The chain service will subscribe to this feed in order to process incoming blocks.
func (c *ChainService) IncomingBlockFeed() *event.Feed {
return c.incomingBlockFeed
}
// CanonicalBlockFeed returns a channel that is written to
// whenever a new block is determined to be canonical in the chain.
func (c *ChainService) CanonicalBlockFeed() *event.Feed {
return c.canonicalBlockFeed
}
// CanonicalStateFeed returns a feed that is written to
// whenever a new state is determined to be canonical in the chain.
func (c *ChainService) CanonicalStateFeed() *event.Feed {
return c.canonicalStateFeed
}
// StateInitializedFeed returns a feed that is written to
// when the beacon state is first initialized.
func (c *ChainService) StateInitializedFeed() *event.Feed {
@@ -231,369 +209,3 @@ func (c *ChainService) doesPoWBlockExist(hash [32]byte) bool {
return powBlock != nil
}
// blockProcessing subscribes to incoming blocks, processes them if possible, and then applies
// the fork-choice rule to update the beacon chain's head.
func (c *ChainService) blockProcessing() {
subBlock := c.incomingBlockFeed.Subscribe(c.incomingBlockChan)
defer subBlock.Unsubscribe()
for {
select {
case <-c.ctx.Done():
log.Debug("Chain service context closed, exiting goroutine")
return
// Listen for a newly received incoming block from the feed. Blocks
// can be received either from the sync service, the RPC service,
// or via p2p.
case block := <-c.incomingBlockChan:
handler.SafelyHandleMessage(c.ctx, c.processBlock, block)
}
}
}
func (c *ChainService) processBlock(message proto.Message) {
block := message.(*pb.BeaconBlock)
beaconState, err := c.beaconDB.State(c.ctx)
if err != nil {
log.Errorf("Unable to retrieve beacon state %v", err)
return
}
if block.Slot > beaconState.Slot {
computedState, err := c.ReceiveBlock(block, beaconState)
if err != nil {
log.Errorf("Could not process received block: %v", err)
return
}
if err := c.ApplyForkChoiceRule(block, computedState); err != nil {
log.Errorf("Could not update chain head: %v", err)
return
}
}
}
// ApplyForkChoiceRule determines the current beacon chain head using LMD GHOST as a block-vote
// weighted function to select a canonical head in Ethereum Serenity.
func (c *ChainService) ApplyForkChoiceRule(block *pb.BeaconBlock, computedState *pb.BeaconState) error {
h, err := hashutil.HashBeaconBlock(block)
if err != nil {
return fmt.Errorf("could not tree hash incoming block: %v", err)
}
// TODO(#1307): Use LMD GHOST as the fork-choice rule for Ethereum Serenity.
// TODO(#674): Handle chain reorgs.
if err := c.beaconDB.UpdateChainHead(block, computedState); err != nil {
return fmt.Errorf("failed to update chain: %v", err)
}
log.WithField("blockRoot", fmt.Sprintf("0x%x", h)).Info("Chain head block and state updated")
// We fire events that notify listeners of a new block in
// the case of a state transition. This is useful for the beacon node's gRPC
// server to stream these events to beacon clients.
// When the transition is a cycle transition, we stream the state containing the new validator
// assignments to clients.
if helpers.IsEpochStart(block.Slot) {
if c.canonicalStateFeed.Send(computedState) == 0 {
log.Error("Sent canonical state to no subscribers")
}
}
if err := c.saveFinalizedState(computedState); err != nil {
log.Errorf("Could not save new finalized state: %v", err)
}
if c.canonicalBlockFeed.Send(&pb.BeaconBlockAnnounce{
Hash: h[:],
SlotNumber: block.Slot,
}) == 0 {
log.Error("Sent canonical block to no subscribers")
}
return nil
}
// ReceiveBlock is a function that defines the operations that are preformed on
// any block that is received from p2p layer or rpc. It checks the block to see
// if it passes the pre-processing conditions, if it does then the per slot
// state transition function is carried out on the block.
// spec:
// def process_block(block):
// if not block_pre_processing_conditions(block):
// return nil, error
//
// # process skipped slots
//
// while (state.slot < block.slot - 1):
// state = slot_state_transition(state, block=None)
//
// # process slot with block
// state = slot_state_transition(state, block)
//
// # check state root
// if block.state_root == hash(state):
// return state, error
// else:
// return nil, error # or throw or whatever
//
func (c *ChainService) ReceiveBlock(block *pb.BeaconBlock, beaconState *pb.BeaconState) (*pb.BeaconState, error) {
blockRoot, err := hashutil.HashBeaconBlock(block)
if err != nil {
return nil, fmt.Errorf("could not tree hash incoming block: %v", err)
}
if block.Slot == params.BeaconConfig().GenesisSlot {
return nil, fmt.Errorf("cannot process a genesis block: received block with slot %d",
block.Slot-params.BeaconConfig().GenesisSlot)
}
// Save blocks with higher slot numbers in cache.
if err := c.isBlockReadyForProcessing(block, beaconState); err != nil {
return nil, fmt.Errorf("block with root %#x is not ready for processing: %v", blockRoot, err)
}
// Retrieve the last processed beacon block's hash root.
headRoot, err := c.ChainHeadRoot()
if err != nil {
return nil, fmt.Errorf("could not retrieve chain head root: %v", err)
}
log.WithField("slotNumber", block.Slot-params.BeaconConfig().GenesisSlot).Info(
"Executing state transition")
// Check for skipped slots.
numSkippedSlots := 0
for beaconState.Slot < block.Slot-1 {
beaconState, err = c.runStateTransition(headRoot, nil, beaconState)
if err != nil {
return nil, fmt.Errorf("could not execute state transition without block %v", err)
}
numSkippedSlots++
}
if numSkippedSlots > 0 {
log.Warnf("Processed %d skipped slots", numSkippedSlots)
}
beaconState, err = c.runStateTransition(headRoot, block, beaconState)
if err != nil {
return nil, fmt.Errorf("could not execute state transition with block %v", err)
}
// if there exists a block for the slot being processed.
if err := c.beaconDB.SaveBlock(block); err != nil {
return nil, fmt.Errorf("failed to save block: %v", err)
}
// Forward processed block to operation pool to remove individual operation from DB.
if c.opsPoolService.IncomingProcessedBlockFeed().Send(block) == 0 {
log.Error("Sent processed block to no subscribers")
}
// Remove pending deposits from the deposit queue.
for _, dep := range block.Body.Deposits {
c.beaconDB.RemovePendingDeposit(c.ctx, dep)
}
log.WithField("hash", fmt.Sprintf("%#x", blockRoot)).Debug("Processed beacon block")
return beaconState, nil
}
func (c *ChainService) runStateTransition(headRoot [32]byte, block *pb.BeaconBlock,
beaconState *pb.BeaconState) (*pb.BeaconState, error) {
beaconState, err := state.ExecuteStateTransition(
c.ctx,
beaconState,
block,
headRoot,
true, /* sig verify */
)
if err != nil {
return nil, fmt.Errorf("could not execute state transition %v", err)
}
log.WithField(
"slotsSinceGenesis", beaconState.Slot-params.BeaconConfig().GenesisSlot,
).Info("Slot transition successfully processed")
if block != nil {
log.WithField(
"slotsSinceGenesis", beaconState.Slot-params.BeaconConfig().GenesisSlot,
).Info("Block transition successfully processed")
}
if helpers.IsEpochEnd(beaconState.Slot) {
// Save activated validators of this epoch to public key -> index DB.
if err := c.saveValidatorIdx(beaconState); err != nil {
return nil, fmt.Errorf("could not save validator index: %v", err)
}
// Delete exited validators of this epoch to public key -> index DB.
if err := c.deleteValidatorIdx(beaconState); err != nil {
return nil, fmt.Errorf("could not delete validator index: %v", err)
}
log.WithField(
"SlotsSinceGenesis", beaconState.Slot-params.BeaconConfig().GenesisSlot,
).Info("Epoch transition successfully processed")
}
return beaconState, nil
}
func (c *ChainService) isBlockReadyForProcessing(block *pb.BeaconBlock, beaconState *pb.BeaconState) error {
var powBlockFetcher func(ctx context.Context, hash common.Hash) (*gethTypes.Block, error)
if c.enablePOWChain {
powBlockFetcher = c.web3Service.Client().BlockByHash
}
if err := b.IsValidBlock(c.ctx, beaconState, block, c.enablePOWChain,
c.beaconDB.HasBlock, powBlockFetcher, c.genesisTime); err != nil {
return fmt.Errorf("block does not fulfill pre-processing conditions %v", err)
}
return nil
}
func (c *ChainService) saveFinalizedState(beaconState *pb.BeaconState) error {
// check if the finalized epoch has changed, if it
// has we save the finalized state.
if c.finalizedEpoch != beaconState.FinalizedEpoch {
c.finalizedEpoch = beaconState.FinalizedEpoch
return c.beaconDB.SaveFinalizedState(beaconState)
}
return nil
}
// saveValidatorIdx saves the validators public key to index mapping in DB, these
// validators were activated from current epoch. After it saves, current epoch key
// is deleted from ActivatedValidators mapping.
func (c *ChainService) saveValidatorIdx(state *pb.BeaconState) error {
for _, idx := range validators.ActivatedValidators[helpers.CurrentEpoch(state)] {
pubKey := state.ValidatorRegistry[idx].Pubkey
if err := c.beaconDB.SaveValidatorIndex(pubKey, int(idx)); err != nil {
return fmt.Errorf("could not save validator index: %v", err)
}
}
delete(validators.ActivatedValidators, helpers.CurrentEpoch(state))
return nil
}
// deleteValidatorIdx deletes the validators public key to index mapping in DB, the
// validators were exited from current epoch. After it deletes, current epoch key
// is deleted from ExitedValidators mapping.
func (c *ChainService) deleteValidatorIdx(state *pb.BeaconState) error {
for _, idx := range validators.ExitedValidators[helpers.CurrentEpoch(state)] {
pubKey := state.ValidatorRegistry[idx].Pubkey
if err := c.beaconDB.DeleteValidatorIndex(pubKey); err != nil {
return fmt.Errorf("could not delete validator index: %v", err)
}
}
delete(validators.ExitedValidators, helpers.CurrentEpoch(state))
return nil
}
// attestationTargets retrieves the list of attestation targets since last finalized epoch,
// each attestation target consists of validator index and its attestation target (i.e. the block
// which the validator attested to)
func (c *ChainService) attestationTargets(state *pb.BeaconState) ([]*attestationTarget, error) {
indices := helpers.ActiveValidatorIndices(state.ValidatorRegistry, state.FinalizedEpoch)
attestationTargets := make([]*attestationTarget, len(indices))
for i, index := range indices {
block, err := c.attsService.LatestAttestationTarget(c.ctx, index)
if err != nil {
return nil, fmt.Errorf("could not retrieve attestation target: %v", err)
}
attestationTargets[i] = &attestationTarget{
validatorIndex: index,
block: block,
}
}
return attestationTargets, nil
}
// blockChildren returns the child blocks of the given block.
// ex:
// /- C - E
// A - B - D - F
// \- G
// Input: B. Output: [C, D, G]
//
// Spec pseudocode definition:
// get_children(store: Store, block: BeaconBlock) -> List[BeaconBlock]
// returns the child blocks of the given block.
func (c *ChainService) blockChildren(block *pb.BeaconBlock, state *pb.BeaconState) ([]*pb.BeaconBlock, error) {
var children []*pb.BeaconBlock
currentRoot, err := hashutil.HashBeaconBlock(block)
if err != nil {
return nil, fmt.Errorf("could not tree hash incoming block: %v", err)
}
startSlot := block.Slot + 1
currentSlot := state.Slot
for i := startSlot; i <= currentSlot; i++ {
block, err := c.beaconDB.BlockBySlot(i)
if err != nil {
return nil, fmt.Errorf("could not get block by slot: %v", err)
}
// Continue if there's a skip block.
if block == nil {
continue
}
parentRoot := bytesutil.ToBytes32(block.ParentRootHash32)
if currentRoot == parentRoot {
children = append(children, block)
}
}
return children, nil
}
// lmdGhost applies the Latest Message Driven, Greediest Heaviest Observed Sub-Tree
// fork-choice rule defined in the Ethereum Serenity specification for the beacon chain.
//
// Spec pseudocode definition:
// def lmd_ghost(store: Store, start_state: BeaconState, start_block: BeaconBlock) -> BeaconBlock:
// """
// Execute the LMD-GHOST algorithm to find the head ``BeaconBlock``.
// """
// validators = start_state.validator_registry
// active_validator_indices = get_active_validator_indices(validators, slot_to_epoch(start_state.slot))
// attestation_targets = [
// (validator_index, get_latest_attestation_target(store, validator_index))
// for validator_index in active_validator_indices
// ]
//
// def get_vote_count(block: BeaconBlock) -> int:
// return sum(
// get_effective_balance(start_state.validator_balances[validator_index]) // FORK_CHOICE_BALANCE_INCREMENT
// for validator_index, target in attestation_targets
// if get_ancestor(store, target, block.slot) == block
// )
//
// head = start_block
// while 1:
// children = get_children(store, head)
// if len(children) == 0:
// return head
// head = max(children, key=get_vote_count)
func (c *ChainService) lmdGhost(
block *pb.BeaconBlock,
state *pb.BeaconState,
voteTargets map[uint64]*pb.BeaconBlock,
) (*pb.BeaconBlock, error) {
head := block
for {
children, err := c.blockChildren(head, state)
if err != nil {
return nil, fmt.Errorf("could not fetch block children: %v", err)
}
if len(children) == 0 {
return head, nil
}
maxChild := children[0]
maxChildVotes, err := VoteCount(maxChild, state, voteTargets, c.beaconDB)
if err != nil {
return nil, fmt.Errorf("unable to determine vote count for block: %v", err)
}
for i := 0; i < len(children); i++ {
candidateChildVotes, err := VoteCount(children[i], state, voteTargets, c.beaconDB)
if err != nil {
return nil, fmt.Errorf("unable to determine vote count for block: %v", err)
}
if candidateChildVotes > maxChildVotes {
maxChild = children[i]
}
}
head = maxChild
}
}

View File

@@ -16,8 +16,6 @@ import (
"github.com/prysmaticlabs/prysm/beacon-chain/attestation"
b "github.com/prysmaticlabs/prysm/beacon-chain/core/blocks"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/core/state"
v "github.com/prysmaticlabs/prysm/beacon-chain/core/validators"
"github.com/prysmaticlabs/prysm/beacon-chain/db"
"github.com/prysmaticlabs/prysm/beacon-chain/internal"
"github.com/prysmaticlabs/prysm/beacon-chain/powchain"
@@ -29,7 +27,6 @@ import (
"github.com/prysmaticlabs/prysm/shared/hashutil"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/testutil"
"github.com/prysmaticlabs/prysm/shared/trieutil"
"github.com/sirupsen/logrus"
logTest "github.com/sirupsen/logrus/hooks/test"
)
@@ -255,15 +252,10 @@ func TestChainStartStop_Uninitialized(t *testing.T) {
defer internal.TeardownDB(t, db)
chainService := setupBeaconChain(t, false, db, true, nil)
chainService.IncomingBlockFeed()
// Test the start function.
genesisChan := make(chan time.Time, 0)
stateChan := make(chan *pb.BeaconState, 0)
sub := chainService.stateInitializedFeed.Subscribe(genesisChan)
defer sub.Unsubscribe()
sub2 := chainService.canonicalStateFeed.Subscribe(stateChan)
defer sub2.Unsubscribe()
chainService.Start()
chainService.chainStartChan <- time.Unix(0, 0)
genesisTime := <-genesisChan
@@ -275,16 +267,16 @@ func TestChainStartStop_Uninitialized(t *testing.T) {
)
}
beaconState := <-stateChan
beaconState, err := db.State(context.Background())
if err != nil {
t.Fatal(err)
}
if beaconState == nil || beaconState.Slot != params.BeaconConfig().GenesisSlot {
t.Error("Expected canonical state feed to send a state with genesis block")
}
if err := chainService.Stop(); err != nil {
t.Fatalf("Unable to stop chain service: %v", err)
}
// The context should have been canceled.
if chainService.ctx.Err() != context.Canceled {
t.Error("Context was not canceled")
@@ -344,205 +336,6 @@ func TestChainStartStop_Initialized(t *testing.T) {
testutil.AssertLogsContain(t, hook, "Beacon chain data already exists, starting service")
}
func TestChainService_FaultyPOWChain(t *testing.T) {
hook := logTest.NewGlobal()
db := internal.SetupDB(t)
defer internal.TeardownDB(t, db)
chainService := setupBeaconChain(t, true, db, true, nil)
unixTime := uint64(time.Now().Unix())
deposits, _ := setupInitialDeposits(t, 100)
if err := db.InitializeState(unixTime, deposits, &pb.Eth1Data{}); err != nil {
t.Fatalf("Could not initialize beacon state to disk: %v", err)
}
if err := SetSlotInState(chainService, 1); err != nil {
t.Fatal(err)
}
parentBlock := &pb.BeaconBlock{
Slot: 1,
}
parentRoot, err := hashutil.HashBeaconBlock(parentBlock)
if err != nil {
t.Fatalf("Unable to tree hash block %v", err)
}
if err := chainService.beaconDB.SaveBlock(parentBlock); err != nil {
t.Fatalf("Unable to save block %v", err)
}
block := &pb.BeaconBlock{
Slot: 2,
ParentRootHash32: parentRoot[:],
Eth1Data: &pb.Eth1Data{
DepositRootHash32: []byte("a"),
BlockHash32: []byte("b"),
},
}
exitRoutine := make(chan bool)
go func() {
chainService.blockProcessing()
<-exitRoutine
}()
if err := chainService.beaconDB.SaveBlock(block); err != nil {
t.Fatal(err)
}
chainService.incomingBlockChan <- block
chainService.cancel()
exitRoutine <- true
testutil.AssertLogsContain(t, hook, "unable to retrieve POW chain reference block")
}
func TestChainService_Starts(t *testing.T) {
hook := logTest.NewGlobal()
db := internal.SetupDB(t)
defer internal.TeardownDB(t, db)
chainService := setupBeaconChain(t, false, db, true, nil)
deposits, privKeys := setupInitialDeposits(t, 100)
eth1Data := &pb.Eth1Data{
DepositRootHash32: []byte{},
BlockHash32: []byte{},
}
beaconState, err := state.GenesisBeaconState(deposits, 0, eth1Data)
if err != nil {
t.Fatalf("Can't generate genesis state: %v", err)
}
stateRoot, err := hashutil.HashProto(beaconState)
if err != nil {
t.Fatalf("Could not tree hash state: %v", err)
}
parentHash, genesisBlock := setupGenesisBlock(t, chainService, beaconState)
if err := chainService.beaconDB.UpdateChainHead(genesisBlock, beaconState); err != nil {
t.Fatal(err)
}
currentSlot := params.BeaconConfig().GenesisSlot
beaconState.Slot++
randaoReveal := createRandaoReveal(t, beaconState, privKeys)
block := &pb.BeaconBlock{
Slot: currentSlot + 1,
StateRootHash32: stateRoot[:],
ParentRootHash32: parentHash[:],
RandaoReveal: randaoReveal,
Eth1Data: &pb.Eth1Data{
DepositRootHash32: []byte("a"),
BlockHash32: []byte("b"),
},
Body: &pb.BeaconBlockBody{
Attestations: nil,
},
}
exitRoutine := make(chan bool)
go func() {
chainService.blockProcessing()
<-exitRoutine
}()
if err := chainService.beaconDB.SaveBlock(block); err != nil {
t.Fatal(err)
}
chainService.incomingBlockChan <- block
chainService.cancel()
exitRoutine <- true
testutil.AssertLogsContain(t, hook, "Chain service context closed, exiting goroutine")
testutil.AssertLogsContain(t, hook, "Processed beacon block")
}
func TestReceiveBlock_RemovesPendingDeposits(t *testing.T) {
hook := logTest.NewGlobal()
db := internal.SetupDB(t)
defer internal.TeardownDB(t, db)
chainService := setupBeaconChain(t, false, db, true, nil)
deposits, privKeys := setupInitialDeposits(t, 100)
eth1Data := &pb.Eth1Data{
DepositRootHash32: []byte{},
BlockHash32: []byte{},
}
beaconState, err := state.GenesisBeaconState(deposits, 0, eth1Data)
if err != nil {
t.Fatalf("Can't generate genesis state: %v", err)
}
stateRoot, err := hashutil.HashProto(beaconState)
if err != nil {
t.Fatalf("Could not tree hash state: %v", err)
}
parentHash, genesisBlock := setupGenesisBlock(t, chainService, beaconState)
beaconState.Slot++
if err := chainService.beaconDB.UpdateChainHead(genesisBlock, beaconState); err != nil {
t.Fatal(err)
}
currentSlot := params.BeaconConfig().GenesisSlot
randaoReveal := createRandaoReveal(t, beaconState, privKeys)
pendingDeposits := []*pb.Deposit{
createPreChainStartDeposit(t, []byte{'F'}),
}
pendingDepositsData := make([][]byte, len(pendingDeposits))
for i, pd := range pendingDeposits {
pendingDepositsData[i] = pd.DepositData
}
depositTrie, err := trieutil.GenerateTrieFromItems(pendingDepositsData, int(params.BeaconConfig().DepositContractTreeDepth))
if err != nil {
t.Fatalf("Could not generate deposit trie: %v", err)
}
for i := range pendingDeposits {
pendingDeposits[i].MerkleTreeIndex = 0
proof, err := depositTrie.MerkleProof(int(pendingDeposits[i].MerkleTreeIndex))
if err != nil {
t.Fatalf("Could not generate proof: %v", err)
}
pendingDeposits[i].MerkleBranchHash32S = proof
}
depositRoot := depositTrie.Root()
beaconState.LatestEth1Data.DepositRootHash32 = depositRoot[:]
block := &pb.BeaconBlock{
Slot: currentSlot + 1,
StateRootHash32: stateRoot[:],
ParentRootHash32: parentHash[:],
RandaoReveal: randaoReveal,
Eth1Data: &pb.Eth1Data{
DepositRootHash32: []byte("a"),
BlockHash32: []byte("b"),
},
Body: &pb.BeaconBlockBody{
Deposits: pendingDeposits,
},
}
for _, dep := range pendingDeposits {
db.InsertPendingDeposit(chainService.ctx, dep, big.NewInt(0))
}
if len(db.PendingDeposits(chainService.ctx, nil)) != len(pendingDeposits) || len(pendingDeposits) == 0 {
t.Fatalf("Expected %d pending deposits", len(pendingDeposits))
}
beaconState.Slot--
computedState, err := chainService.ReceiveBlock(block, beaconState)
if err != nil {
t.Fatal(err)
}
if err := chainService.ApplyForkChoiceRule(block, computedState); err != nil {
t.Fatal(err)
}
if len(db.PendingDeposits(chainService.ctx, nil)) != 0 {
t.Fatalf("Expected 0 pending deposits, but there are %+v", db.PendingDeposits(chainService.ctx, nil))
}
testutil.AssertLogsContain(t, hook, "Executing state transition")
}
func TestPOWBlockExists_UsingDepositRootHash(t *testing.T) {
hook := logTest.NewGlobal()
db := internal.SetupDB(t)
@@ -573,242 +366,3 @@ func TestPOWBlockExists_UsingDepositRootHash(t *testing.T) {
}
testutil.AssertLogsContain(t, hook, "fetching PoW block corresponding to mainchain reference failed")
}
func TestUpdateHead_SavesBlock(t *testing.T) {
beaconState, err := state.GenesisBeaconState(nil, 0, nil)
if err != nil {
t.Fatalf("Cannot create genesis beacon state: %v", err)
}
stateRoot, err := hashutil.HashProto(beaconState)
if err != nil {
t.Fatalf("Could not tree hash state: %v", err)
}
genesis := b.NewGenesisBlock(stateRoot[:])
genesisRoot, err := hashutil.HashProto(genesis)
if err != nil {
t.Fatalf("Could not get genesis block root: %v", err)
}
// Table driven tests for various fork choice scenarios.
tests := []struct {
blockSlot uint64
state *pb.BeaconState
logAssert string
}{
// Higher slot but same state should trigger chain update.
{
blockSlot: 64,
state: beaconState,
logAssert: "Chain head block and state updated",
},
// Higher slot, different state, but higher last finalized slot.
{
blockSlot: 64,
state: &pb.BeaconState{FinalizedEpoch: 2},
logAssert: "Chain head block and state updated",
},
// Higher slot, different state, same last finalized slot,
// but last justified slot.
{
blockSlot: 64,
state: &pb.BeaconState{
FinalizedEpoch: 0,
JustifiedEpoch: 2,
},
logAssert: "Chain head block and state updated",
},
}
for _, tt := range tests {
hook := logTest.NewGlobal()
db := internal.SetupDB(t)
defer internal.TeardownDB(t, db)
chainService := setupBeaconChain(t, false, db, true, nil)
unixTime := uint64(time.Now().Unix())
deposits, _ := setupInitialDeposits(t, 100)
if err := db.InitializeState(unixTime, deposits, &pb.Eth1Data{}); err != nil {
t.Fatalf("Could not initialize beacon state to disk: %v", err)
}
stateRoot, err := hashutil.HashProto(tt.state)
if err != nil {
t.Fatalf("Could not tree hash state: %v", err)
}
block := &pb.BeaconBlock{
Slot: tt.blockSlot,
StateRootHash32: stateRoot[:],
ParentRootHash32: genesisRoot[:],
Eth1Data: &pb.Eth1Data{
DepositRootHash32: []byte("a"),
BlockHash32: []byte("b"),
},
}
if err := chainService.beaconDB.SaveBlock(block); err != nil {
t.Fatal(err)
}
if err := chainService.ApplyForkChoiceRule(block, tt.state); err != nil {
t.Errorf("Expected head to update, received %v", err)
}
if err := chainService.beaconDB.SaveBlock(block); err != nil {
t.Fatal(err)
}
chainService.cancel()
testutil.AssertLogsContain(t, hook, tt.logAssert)
}
}
func TestIsBlockReadyForProcessing_ValidBlock(t *testing.T) {
db := internal.SetupDB(t)
defer internal.TeardownDB(t, db)
ctx := context.Background()
chainService := setupBeaconChain(t, false, db, true, nil)
unixTime := uint64(time.Now().Unix())
deposits, privKeys := setupInitialDeposits(t, 100)
if err := db.InitializeState(unixTime, deposits, &pb.Eth1Data{}); err != nil {
t.Fatalf("Could not initialize beacon state to disk: %v", err)
}
beaconState, err := db.State(ctx)
if err != nil {
t.Fatalf("Can't get genesis state: %v", err)
}
block := &pb.BeaconBlock{
ParentRootHash32: []byte{'a'},
}
if err := chainService.isBlockReadyForProcessing(block, beaconState); err == nil {
t.Fatal("block processing succeeded despite block having no parent saved")
}
beaconState.Slot = params.BeaconConfig().GenesisSlot + 10
stateRoot, err := hashutil.HashProto(beaconState)
if err != nil {
t.Fatalf("Could not tree hash state: %v", err)
}
genesis := b.NewGenesisBlock([]byte{})
if err := chainService.beaconDB.SaveBlock(genesis); err != nil {
t.Fatalf("cannot save block: %v", err)
}
parentRoot, err := hashutil.HashBeaconBlock(genesis)
if err != nil {
t.Fatalf("unable to get root of canonical head: %v", err)
}
beaconState.LatestEth1Data = &pb.Eth1Data{
DepositRootHash32: []byte{2},
BlockHash32: []byte{3},
}
beaconState.Slot = params.BeaconConfig().GenesisSlot
currentSlot := params.BeaconConfig().GenesisSlot + 1
attestationSlot := params.BeaconConfig().GenesisSlot
randaoReveal := createRandaoReveal(t, beaconState, privKeys)
block2 := &pb.BeaconBlock{
Slot: currentSlot,
StateRootHash32: stateRoot[:],
ParentRootHash32: parentRoot[:],
RandaoReveal: randaoReveal,
Eth1Data: &pb.Eth1Data{
DepositRootHash32: []byte("a"),
BlockHash32: []byte("b"),
},
Body: &pb.BeaconBlockBody{
Attestations: []*pb.Attestation{{
AggregationBitfield: []byte{128, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
Data: &pb.AttestationData{
Slot: attestationSlot,
JustifiedBlockRootHash32: parentRoot[:],
},
}},
},
}
chainService.enablePOWChain = true
if err := chainService.isBlockReadyForProcessing(block2, beaconState); err != nil {
t.Fatalf("block processing failed despite being a valid block: %v", err)
}
}
func TestDeleteValidatorIdx_DeleteWorks(t *testing.T) {
db := internal.SetupDB(t)
defer internal.TeardownDB(t, db)
epoch := uint64(2)
v.ActivatedValidators[epoch] = []uint64{0, 1, 2}
v.ExitedValidators[epoch] = []uint64{0, 2}
var validators []*pb.Validator
for i := 0; i < 3; i++ {
pubKeyBuf := make([]byte, params.BeaconConfig().BLSPubkeyLength)
binary.PutUvarint(pubKeyBuf, uint64(i))
validators = append(validators, &pb.Validator{
Pubkey: pubKeyBuf,
})
}
state := &pb.BeaconState{
ValidatorRegistry: validators,
Slot: epoch * params.BeaconConfig().SlotsPerEpoch,
}
chainService := setupBeaconChain(t, false, db, true, nil)
if err := chainService.saveValidatorIdx(state); err != nil {
t.Fatalf("Could not save validator idx: %v", err)
}
if err := chainService.deleteValidatorIdx(state); err != nil {
t.Fatalf("Could not delete validator idx: %v", err)
}
wantedIdx := uint64(1)
idx, err := chainService.beaconDB.ValidatorIndex(validators[wantedIdx].Pubkey)
if err != nil {
t.Fatalf("Could not get validator index: %v", err)
}
if wantedIdx != idx {
t.Errorf("Wanted: %d, got: %d", wantedIdx, idx)
}
wantedIdx = uint64(2)
if chainService.beaconDB.HasValidator(validators[wantedIdx].Pubkey) {
t.Errorf("Validator index %d should have been deleted", wantedIdx)
}
if _, ok := v.ExitedValidators[epoch]; ok {
t.Errorf("Activated validators mapping for epoch %d still there", epoch)
}
}
func TestSaveValidatorIdx_SaveRetrieveWorks(t *testing.T) {
db := internal.SetupDB(t)
defer internal.TeardownDB(t, db)
epoch := uint64(1)
v.ActivatedValidators[epoch] = []uint64{0, 1, 2}
var validators []*pb.Validator
for i := 0; i < 3; i++ {
pubKeyBuf := make([]byte, params.BeaconConfig().BLSPubkeyLength)
binary.PutUvarint(pubKeyBuf, uint64(i))
validators = append(validators, &pb.Validator{
Pubkey: pubKeyBuf,
})
}
state := &pb.BeaconState{
ValidatorRegistry: validators,
Slot: epoch * params.BeaconConfig().SlotsPerEpoch,
}
chainService := setupBeaconChain(t, false, db, true, nil)
if err := chainService.saveValidatorIdx(state); err != nil {
t.Fatalf("Could not save validator idx: %v", err)
}
wantedIdx := uint64(2)
idx, err := chainService.beaconDB.ValidatorIndex(validators[wantedIdx].Pubkey)
if err != nil {
t.Fatalf("Could not get validator index: %v", err)
}
if wantedIdx != idx {
t.Errorf("Wanted: %d, got: %d", wantedIdx, idx)
}
if _, ok := v.ActivatedValidators[epoch]; ok {
t.Errorf("Activated validators mapping for epoch %d still there", epoch)
}
}

View File

@@ -54,9 +54,8 @@ func NewSimulatedBackend() (*SimulatedBackend, error) {
return nil, fmt.Errorf("could not setup simulated backend db: %v", err)
}
cs, err := blockchain.NewChainService(context.Background(), &blockchain.Config{
BeaconDB: db,
IncomingBlockBuf: 0,
EnablePOWChain: false,
BeaconDB: db,
EnablePOWChain: false,
})
if err != nil {
return nil, err

View File

@@ -200,12 +200,11 @@ func (b *BeaconNode) registerBlockchainService(_ *cli.Context) error {
}
blockchainService, err := blockchain.NewChainService(context.Background(), &blockchain.Config{
BeaconDB: b.db,
Web3Service: web3Service,
OpsPoolService: opsService,
AttsService: attsService,
BeaconBlockBuf: 10,
IncomingBlockBuf: 100, // Big buffer to accommodate other feed subscribers.
BeaconDB: b.db,
Web3Service: web3Service,
OpsPoolService: opsService,
AttsService: attsService,
BeaconBlockBuf: 10,
})
if err != nil {
return fmt.Errorf("could not register blockchain service: %v", err)

View File

@@ -47,16 +47,21 @@ func (ps *ProposerServer) ProposerIndex(ctx context.Context, req *pb.ProposerInd
}, nil
}
// ProposeBlock is called by a proposer in a sharding validator and a full beacon node
// sends the request into a beacon block that can then be included in a canonical chain.
// ProposeBlock is called by a proposer during its assigned slot to create a block in an attempt
// to get it processed by the beacon node as the canonical head.
func (ps *ProposerServer) ProposeBlock(ctx context.Context, blk *pbp2p.BeaconBlock) (*pb.ProposeResponse, error) {
h, err := hashutil.HashBeaconBlock(blk)
if err != nil {
return nil, fmt.Errorf("could not tree hash block: %v", err)
}
log.WithField("blockRoot", fmt.Sprintf("%#x", h)).Debugf("Block proposal received via RPC")
// We relay the received block from the proposer to the chain service for processing.
ps.chainService.IncomingBlockFeed().Send(blk)
beaconState, err := ps.chainService.ReceiveBlock(ctx, blk)
if err != nil {
return nil, fmt.Errorf("could not process beacon block: %v", err)
}
if err := ps.chainService.ApplyForkChoiceRule(ctx, blk, beaconState); err != nil {
return nil, fmt.Errorf("could not apply fork choice rule: %v", err)
}
return &pb.ProposeResponse{BlockHash: h[:]}, nil
}

View File

@@ -31,13 +31,10 @@ func init() {
}
type chainService interface {
IncomingBlockFeed() *event.Feed
// These methods are not called on-demand by a validator
// but instead streamed to connected validators every
// time the canonical head changes in the chain service.
CanonicalBlockFeed() *event.Feed
CanonicalStateFeed() *event.Feed
StateInitializedFeed() *event.Feed
ReceiveBlock(ctx context.Context, block *pbp2p.BeaconBlock) (*pbp2p.BeaconState, error)
ApplyForkChoiceRule(ctx context.Context, block *pbp2p.BeaconBlock, computedState *pbp2p.BeaconState) error
}
type operationService interface {

View File

@@ -74,20 +74,20 @@ type mockChainService struct {
stateInitializedFeed *event.Feed
}
func (m *mockChainService) IncomingBlockFeed() *event.Feed {
return new(event.Feed)
func (m *mockChainService) StateInitializedFeed() *event.Feed {
return m.stateInitializedFeed
}
func (m *mockChainService) ReceiveBlock(ctx context.Context, block *pb.BeaconBlock) (*pb.BeaconState, error) {
return &pb.BeaconState{}, nil
}
func (m *mockChainService) ApplyForkChoiceRule(ctx context.Context, block *pb.BeaconBlock, computedState *pb.BeaconState) error {
return nil
}
func (m *mockChainService) CanonicalBlockFeed() *event.Feed {
return m.blockFeed
}
func (m *mockChainService) CanonicalStateFeed() *event.Feed {
return m.stateFeed
}
func (m *mockChainService) StateInitializedFeed() *event.Feed {
return m.stateInitializedFeed
return new(event.Feed)
}
func newMockChainService() *mockChainService {

View File

@@ -42,15 +42,16 @@ func (vs *ValidatorServer) WaitForActivation(req *pb.ValidatorActivationRequest,
}
return stream.Send(res)
}
sub := vs.chainService.CanonicalStateFeed().Subscribe(vs.canonicalStateChan)
defer sub.Unsubscribe()
for {
select {
case <-time.After(3 * time.Second):
case beaconState := <-vs.canonicalStateChan:
if !vs.beaconDB.HasValidator(req.Pubkey) {
continue
}
beaconState, err := vs.beaconDB.State(vs.ctx)
if err != nil {
return fmt.Errorf("could not retrieve beacon state: %v", err)
}
activeVal, err := vs.retrieveActiveValidator(beaconState, req.Pubkey)
if err != nil {
return fmt.Errorf("could not retrieve active validator from state: %v", err)
@@ -59,8 +60,6 @@ func (vs *ValidatorServer) WaitForActivation(req *pb.ValidatorActivationRequest,
Validator: activeVal,
}
return stream.Send(res)
case <-sub.Err():
return errors.New("subscriber closed, exiting goroutine")
case <-vs.ctx.Done():
return errors.New("rpc context closed, exiting goroutine")
}

View File

@@ -517,55 +517,3 @@ func TestWaitForActivation_ValidatorOriginallyExists(t *testing.T) {
t.Fatalf("Could not setup wait for activation stream: %v", err)
}
}
func TestWaitForActivation_ListensAndFetchesValidatorFromStateFeed(t *testing.T) {
db := internal.SetupDB(t)
defer internal.TeardownDB(t, db)
pubKey := []byte{'A'}
beaconState := &pbp2p.BeaconState{
Slot: params.BeaconConfig().GenesisSlot,
ValidatorRegistry: []*pbp2p.Validator{{
ActivationEpoch: params.BeaconConfig().GenesisSlot,
ExitEpoch: params.BeaconConfig().FarFutureEpoch,
Pubkey: pubKey},
},
}
if err := db.SaveState(beaconState); err != nil {
t.Fatalf("could not save state: %v", err)
}
vs := &ValidatorServer{
beaconDB: db,
ctx: context.Background(),
chainService: newMockChainService(),
canonicalStateChan: make(chan *pbp2p.BeaconState, 1),
}
req := &pb.ValidatorActivationRequest{
Pubkey: pubKey,
}
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockStream := internal.NewMockValidatorService_WaitForActivationServer(ctrl)
mockStream.EXPECT().Send(
&pb.ValidatorActivationResponse{
Validator: beaconState.ValidatorRegistry[0],
},
).Return(nil)
exitRoutine := make(chan bool)
go func(tt *testing.T) {
if err := vs.WaitForActivation(req, mockStream); err != nil {
t.Fatalf("Could not setup wait for activation stream: %v", err)
}
<-exitRoutine
}(t)
vs.canonicalStateChan <- beaconState
if err := db.SaveValidatorIndex(pubKey, 0); err != nil {
t.Fatalf("Could not save validator index: %v", err)
}
vs.canonicalStateChan <- beaconState
exitRoutine <- true
}

View File

@@ -69,7 +69,8 @@ type p2pAPI interface {
}
type chainService interface {
IncomingBlockFeed() *event.Feed
ReceiveBlock(ctx context.Context, block *pb.BeaconBlock) (*pb.BeaconState, error)
ApplyForkChoiceRule(ctx context.Context, block *pb.BeaconBlock, computedState *pb.BeaconState) error
}
// SyncService is the interface for the Sync service.
@@ -513,7 +514,13 @@ func (s *InitialSync) validateAndSaveNextBlock(ctx context.Context, block *pb.Be
}
// Send block to main chain service to be processed.
s.chainService.IncomingBlockFeed().Send(block)
beaconState, err := s.chainService.ReceiveBlock(ctx, block)
if err != nil {
return fmt.Errorf("could not process beacon block: %v", err)
}
if err := s.chainService.ApplyForkChoiceRule(ctx, block, beaconState); err != nil {
return fmt.Errorf("could not apply fork choice rule: %v", err)
}
return nil
}

View File

@@ -53,8 +53,12 @@ func (ms *mockSyncService) ResumeSync() {
type mockChainService struct{}
func (ms *mockChainService) IncomingBlockFeed() *event.Feed {
return &event.Feed{}
func (m *mockChainService) ReceiveBlock(ctx context.Context, block *pb.BeaconBlock) (*pb.BeaconState, error) {
return &pb.BeaconState{}, nil
}
func (m *mockChainService) ApplyForkChoiceRule(ctx context.Context, block *pb.BeaconBlock, computedState *pb.BeaconState) error {
return nil
}
func setUpGenesisStateAndBlock(beaconDB *db.BeaconDB, t *testing.T) {

View File

@@ -28,9 +28,10 @@ var (
)
type chainService interface {
IncomingBlockFeed() *event.Feed
StateInitializedFeed() *event.Feed
CanonicalBlockFeed() *event.Feed
ReceiveBlock(ctx context.Context, block *pb.BeaconBlock) (*pb.BeaconState, error)
ApplyForkChoiceRule(ctx context.Context, block *pb.BeaconBlock, computedState *pb.BeaconState) error
}
type operationService interface {
@@ -340,16 +341,40 @@ func (rs *RegularSync) receiveBlock(msg p2p.Message) {
log.WithField("blockRoot", fmt.Sprintf("%#x", blockRoot)).Debug("Received missing block parent")
delete(rs.blocksAwaitingProcessing, blockRoot)
blocksAwaitingProcessingGauge.Dec()
rs.chainService.IncomingBlockFeed().Send(block)
rs.chainService.IncomingBlockFeed().Send(childBlock)
beaconState, err = rs.chainService.ReceiveBlock(ctx, block)
if err != nil {
log.Errorf("could not process beacon block: %v", err)
return
}
if err := rs.chainService.ApplyForkChoiceRule(ctx, block, beaconState); err != nil {
log.Errorf("could not apply fork choice rule: %v", err)
return
}
beaconState, err = rs.chainService.ReceiveBlock(ctx, childBlock)
if err != nil {
log.Errorf("could not process beacon block: %v", err)
return
}
if err := rs.chainService.ApplyForkChoiceRule(ctx, childBlock, beaconState); err != nil {
log.Errorf("could not apply fork choice rule: %v", err)
return
}
log.Debug("Sent missing block parent and child to chain service for processing")
return
}
}
_, sendBlockSpan := trace.StartSpan(ctx, "beacon-chain.sync.sendBlock")
log.WithField("blockRoot", fmt.Sprintf("%#x", blockRoot)).Debug("Sending newly received block to subscribers")
rs.chainService.IncomingBlockFeed().Send(block)
log.WithField("blockRoot", fmt.Sprintf("%#x", blockRoot)).Debug("Sending newly received block to chain service")
beaconState, err = rs.chainService.ReceiveBlock(ctx, block)
if err != nil {
log.Errorf("could not process beacon block: %v", err)
return
}
if err := rs.chainService.ApplyForkChoiceRule(ctx, block, beaconState); err != nil {
log.Errorf("could not apply fork choice rule: %v", err)
return
}
sentBlocks.Inc()
sendBlockSpan.End()
// We update the last observed slot to the received canonical block's slot.

View File

@@ -47,13 +47,6 @@ type mockChainService struct {
cFeed *event.Feed
}
func (ms *mockChainService) IncomingBlockFeed() *event.Feed {
if ms.bFeed == nil {
return new(event.Feed)
}
return ms.bFeed
}
func (ms *mockChainService) StateInitializedFeed() *event.Feed {
if ms.sFeed == nil {
return new(event.Feed)
@@ -68,6 +61,14 @@ func (ms *mockChainService) CanonicalBlockFeed() *event.Feed {
return ms.cFeed
}
func (ms *mockChainService) ReceiveBlock(ctx context.Context, block *pb.BeaconBlock) (*pb.BeaconState, error) {
return &pb.BeaconState{}, nil
}
func (ms *mockChainService) ApplyForkChoiceRule(ctx context.Context, block *pb.BeaconBlock, computedState *pb.BeaconState) error {
return nil
}
type mockOperationService struct{}
func (ms *mockOperationService) IncomingAttFeed() *event.Feed {
@@ -208,7 +209,7 @@ func TestProcessBlock_OK(t *testing.T) {
ss.cancel()
<-exitRoutine
testutil.AssertLogsContain(t, hook, "Sending newly received block to subscribers")
testutil.AssertLogsContain(t, hook, "Sending newly received block to chain service")
hook.Reset()
}
@@ -312,8 +313,8 @@ func TestProcessBlock_MultipleBlocksProcessedOK(t *testing.T) {
ss.blockBuf <- msg2
ss.cancel()
<-exitRoutine
testutil.AssertLogsContain(t, hook, "Sending newly received block to subscribers")
testutil.AssertLogsContain(t, hook, "Sending newly received block to subscribers")
testutil.AssertLogsContain(t, hook, "Sending newly received block to chain service")
testutil.AssertLogsContain(t, hook, "Sending newly received block to chain service")
hook.Reset()
}
@@ -415,7 +416,7 @@ func TestProcessBlock_MissingParentBlockRequestedOK(t *testing.T) {
}
// Finally, we respond with the parent block that was missing.
ss.receiveBlock(msg2)
testutil.AssertLogsContain(t, hook, "Sending newly received block to subscribers")
testutil.AssertLogsContain(t, hook, "Sending newly received block to chain service")
testutil.AssertLogsContain(t, hook, "Received missing block parent")
testutil.AssertLogsContain(t, hook, "Sent missing block parent and child to chain service for processing")
hook.Reset()