Refactor Chain Service Block Processing API (#2124)

* new block processing api

* fix tests

* lint

* spacing

* Update beacon-chain/blockchain/block_processing.go

Co-Authored-By: rauljordan <raul@prysmaticlabs.com>

* fix tests, remove enable powchain
This commit is contained in:
Raul Jordan
2019-03-31 20:44:16 -05:00
committed by GitHub
parent 6b3df2f776
commit 7489e52b99
17 changed files with 200 additions and 251 deletions

View File

@@ -25,8 +25,6 @@ go_library(
"//shared/hashutil:go_default_library",
"//shared/p2p:go_default_library",
"//shared/params:go_default_library",
"@com_github_ethereum_go_ethereum//common:go_default_library",
"@com_github_ethereum_go_ethereum//core/types:go_default_library",
"@com_github_prometheus_client_golang//prometheus:go_default_library",
"@com_github_prometheus_client_golang//prometheus/promauto:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",

View File

@@ -4,8 +4,6 @@ import (
"context"
"fmt"
"github.com/ethereum/go-ethereum/common"
gethTypes "github.com/ethereum/go-ethereum/core/types"
b "github.com/prysmaticlabs/prysm/beacon-chain/core/blocks"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/core/state"
@@ -18,22 +16,77 @@ import (
"go.opencensus.io/trace"
)
// BlockProcessor interface defines the methods in the blockchain service which
// handle new block operations.
type BlockProcessor interface {
// BlockReceiver interface defines the methods in the blockchain service which
// directly receives a new block from other services and applies the full processing pipeline.
type BlockReceiver interface {
CanonicalBlockFeed() *event.Feed
ReceiveBlock(ctx context.Context, block *pb.BeaconBlock) (*pb.BeaconState, error)
}
// BlockProcessor defines a common interface for methods useful for directly applying state transitions
// to beacon blocks and generating a new beacon state from the Ethereum 2.0 core primitives.
type BlockProcessor interface {
VerifyBlockValidity(block *pb.BeaconBlock, beaconState *pb.BeaconState) error
ApplyBlockStateTransition(ctx context.Context, block *pb.BeaconBlock, beaconState *pb.BeaconState) (*pb.BeaconState, error)
}
// ReceiveBlock is a function that defines the operations that are preformed on
// any block that is received from p2p layer or rpc. It checks the block to see
// if it passes the pre-processing conditions, if it does then the per slot
// state transition function is carried out on the block.
// spec:
// def process_block(block):
// if not block_pre_processing_conditions(block):
// return nil, error
// any block that is received from p2p layer or rpc. It performs the following actions: It checks the block to see
// 1. Verify a block passes pre-processing conditions
// 2. Save and broadcast the block via p2p to other peers
// 3. Apply the block state transition function and account for skip slots.
// 4. Process and cleanup any block operations, such as attestations and deposits, which would need to be
// either included or flushed from the beacon node's runtime.
func (c *ChainService) ReceiveBlock(ctx context.Context, block *pb.BeaconBlock) (*pb.BeaconState, error) {
ctx, span := trace.StartSpan(ctx, "beacon-chain.blockchain.ReceiveBlock")
defer span.End()
beaconState, err := c.beaconDB.State(ctx)
if err != nil {
return nil, fmt.Errorf("could not retrieve beacon state: %v", err)
}
// We first verify the block's basic validity conditions.
if err := c.VerifyBlockValidity(block, beaconState); err != nil {
return nil, fmt.Errorf("block with slot %d is not ready for processing: %v", block.Slot, err)
}
// We save the block to the DB and broadcast it to our peers.
if err := c.SaveAndBroadcastBlock(ctx, block); err != nil {
return nil, fmt.Errorf(
"could not save and broadcast beacon block with slot %d: %v",
block.Slot-params.BeaconConfig().GenesisSlot, err,
)
}
log.WithField("slotNumber", block.Slot-params.BeaconConfig().GenesisSlot).Info(
"Executing state transition")
// We then apply the block state transition accordingly to obtain the resulting beacon state.
beaconState, err = c.ApplyBlockStateTransition(ctx, block, beaconState)
if err != nil {
return nil, fmt.Errorf("could not apply block state transition: %v", err)
}
log.WithFields(logrus.Fields{
"slotNumber": block.Slot - params.BeaconConfig().GenesisSlot,
"justifiedEpoch": beaconState.JustifiedEpoch - params.BeaconConfig().GenesisEpoch,
"finalizedEpoch": beaconState.FinalizedEpoch - params.BeaconConfig().GenesisEpoch,
}).Info("State transition complete")
// We process the block's contained deposits, attestations, and other operations
// and that may need to be stored or deleted from the beacon node's persistent storage.
if err := c.CleanupBlockOperations(ctx, block); err != nil {
return nil, fmt.Errorf("could not process block deposits, attestations, and other operations: %v", err)
}
log.WithField("slot", block.Slot-params.BeaconConfig().GenesisSlot).Info("Processed beacon block")
return beaconState, nil
}
// ApplyBlockStateTransition runs the Ethereum 2.0 state transition function
// to produce a new beacon state and also accounts for skip slots occurring.
//
// def apply_block_state_transition(block):
// # process skipped slots
// while (state.slot < block.slot - 1):
// state = slot_state_transition(state, block=None)
@@ -47,48 +100,15 @@ type BlockProcessor interface {
// else:
// return nil, error # or throw or whatever
//
func (c *ChainService) ReceiveBlock(ctx context.Context, block *pb.BeaconBlock) (*pb.BeaconState, error) {
ctx, span := trace.StartSpan(ctx, "beacon-chain.blockchain.ReceiveBlock")
defer span.End()
beaconState, err := c.beaconDB.State(ctx)
if err != nil {
return nil, fmt.Errorf("could not retrieve beacon state: %v", err)
}
blockRoot, err := hashutil.HashBeaconBlock(block)
if err != nil {
return nil, fmt.Errorf("could not tree hash incoming block: %v", err)
}
if block.Slot == params.BeaconConfig().GenesisSlot {
return nil, fmt.Errorf("cannot process a genesis block: received block with slot %d",
block.Slot-params.BeaconConfig().GenesisSlot)
}
// Save blocks with higher slot numbers in cache.
if err := c.isBlockReadyForProcessing(block, beaconState); err != nil {
return nil, fmt.Errorf("block with root %#x is not ready for processing: %v", blockRoot, err)
}
// if there exists a block for the slot being processed.
if err := c.beaconDB.SaveBlock(block); err != nil {
return nil, fmt.Errorf("failed to save block: %v", err)
}
// Announce the new block to the network.
c.p2p.Broadcast(ctx, &pb.BeaconBlockAnnounce{
Hash: blockRoot[:],
SlotNumber: block.Slot,
})
func (c *ChainService) ApplyBlockStateTransition(
ctx context.Context, block *pb.BeaconBlock, beaconState *pb.BeaconState,
) (*pb.BeaconState, error) {
// Retrieve the last processed beacon block's hash root.
headRoot, err := c.ChainHeadRoot()
if err != nil {
return nil, fmt.Errorf("could not retrieve chain head root: %v", err)
}
log.WithField("slotNumber", block.Slot-params.BeaconConfig().GenesisSlot).Info(
"Executing state transition")
// Check for skipped slots.
numSkippedSlots := 0
for beaconState.Slot < block.Slot-1 {
@@ -106,13 +126,52 @@ func (c *ChainService) ReceiveBlock(ctx context.Context, block *pb.BeaconBlock)
if err != nil {
return nil, fmt.Errorf("could not execute state transition with block %v", err)
}
return beaconState, nil
}
log.WithFields(logrus.Fields{
"slotNumber": block.Slot - params.BeaconConfig().GenesisSlot,
"justifiedEpoch": beaconState.JustifiedEpoch - params.BeaconConfig().GenesisEpoch,
"finalizedEpoch": beaconState.FinalizedEpoch - params.BeaconConfig().GenesisEpoch,
}).Info("State transition complete")
// VerifyBlockValidity cross-checks the block against the pre-processing conditions from
// Ethereum 2.0, namely:
// The parent block with root block.parent_root has been processed and accepted.
// The node has processed its state up to slot, block.slot - 1.
// The Ethereum 1.0 block pointed to by the state.processed_pow_receipt_root has been processed and accepted.
// The node's local clock time is greater than or equal to state.genesis_time + block.slot * SECONDS_PER_SLOT.
func (c *ChainService) VerifyBlockValidity(block *pb.BeaconBlock, beaconState *pb.BeaconState) error {
if block.Slot == params.BeaconConfig().GenesisSlot {
return fmt.Errorf("cannot process a genesis block: received block with slot %d",
block.Slot-params.BeaconConfig().GenesisSlot)
}
powBlockFetcher := c.web3Service.Client().BlockByHash
if err := b.IsValidBlock(c.ctx, beaconState, block,
c.beaconDB.HasBlock, powBlockFetcher, c.genesisTime); err != nil {
return fmt.Errorf("block does not fulfill pre-processing conditions %v", err)
}
return nil
}
// SaveAndBroadcastBlock stores the block in persistent storage and then broadcasts it to
// peers via p2p. Blocks which have already been saved are not processed again via p2p, which is why
// the order of operations is important in this function to prevent infinite p2p loops.
func (c *ChainService) SaveAndBroadcastBlock(ctx context.Context, block *pb.BeaconBlock) error {
blockRoot, err := hashutil.HashBeaconBlock(block)
if err != nil {
return fmt.Errorf("could not tree hash incoming block: %v", err)
}
if err := c.beaconDB.SaveBlock(block); err != nil {
return fmt.Errorf("failed to save block: %v", err)
}
// Announce the new block to the network.
c.p2p.Broadcast(ctx, &pb.BeaconBlockAnnounce{
Hash: blockRoot[:],
SlotNumber: block.Slot,
})
return nil
}
// CleanupBlockOperations processes and cleans up any block operations relevant to the beacon node
// such as attestations, exits, and deposits. We update the latest seen attestation by validator
// in the local node's runtime, cleanup and remove pending deposits which have been included in the block
// from our node's local cache, and process validator exits and more.
func (c *ChainService) CleanupBlockOperations(ctx context.Context, block *pb.BeaconBlock) error {
// Forward processed block to operation pool to remove individual operation from DB.
if c.opsPoolService.IncomingProcessedBlockFeed().Send(block) == 0 {
log.Error("Sent processed block to no subscribers")
@@ -122,7 +181,7 @@ func (c *ChainService) ReceiveBlock(ctx context.Context, block *pb.BeaconBlock)
log.Info("Updating latest attestation target")
for _, att := range block.Body.Attestations {
if err := c.attsService.UpdateLatestAttestation(c.ctx, att); err != nil {
return nil, fmt.Errorf("failed to update latest attestation for store: %v", err)
return fmt.Errorf("failed to update latest attestation for store: %v", err)
}
log.WithFields(
logrus.Fields{
@@ -136,23 +195,12 @@ func (c *ChainService) ReceiveBlock(ctx context.Context, block *pb.BeaconBlock)
for _, dep := range block.Body.Deposits {
c.beaconDB.RemovePendingDeposit(ctx, dep)
}
log.WithField("hash", fmt.Sprintf("%#x", blockRoot)).Info("Processed beacon block")
return beaconState, nil
}
func (c *ChainService) isBlockReadyForProcessing(block *pb.BeaconBlock, beaconState *pb.BeaconState) error {
var powBlockFetcher func(ctx context.Context, hash common.Hash) (*gethTypes.Block, error)
if c.enablePOWChain {
powBlockFetcher = c.web3Service.Client().BlockByHash
}
if err := b.IsValidBlock(c.ctx, beaconState, block, c.enablePOWChain,
c.beaconDB.HasBlock, powBlockFetcher, c.genesisTime); err != nil {
return fmt.Errorf("block does not fulfill pre-processing conditions %v", err)
}
return nil
}
// runStateTransition executes the Ethereum 2.0 core state transition for the beacon chain and
// updates important checkpoints and local persistent data during epoch transitions. It serves as a wrapper
// around the more low-level, core state transition function primitive.
func (c *ChainService) runStateTransition(
headRoot [32]byte, block *pb.BeaconBlock, beaconState *pb.BeaconState,
) (*pb.BeaconState, error) {
@@ -193,7 +241,7 @@ func (c *ChainService) runStateTransition(
return nil, fmt.Errorf("could not update FFG checkpts: %v", err)
}
// Save Historical States.
if err := c.SaveHistoricalState(beaconState); err != nil {
if err := c.beaconDB.SaveHistoricalState(beaconState); err != nil {
return nil, fmt.Errorf("could not save historical state: %v", err)
}
log.WithField(
@@ -203,12 +251,6 @@ func (c *ChainService) runStateTransition(
return beaconState, nil
}
// SaveHistoricalState saves the state at each epoch transition so that it can be used
// by the state generator to regenerate state.
func (c *ChainService) SaveHistoricalState(beaconState *pb.BeaconState) error {
return c.beaconDB.SaveHistoricalState(beaconState)
}
// saveValidatorIdx saves the validators public key to index mapping in DB, these
// validators were activated from current epoch. After it saves, current epoch key
// is deleted from ActivatedValidators mapping.

View File

@@ -27,7 +27,7 @@ var _ = BlockProcessor(&ChainService{})
func TestReceiveBlock_FaultyPOWChain(t *testing.T) {
db := internal.SetupDB(t)
defer internal.TeardownDB(t, db)
chainService := setupBeaconChain(t, true, db, true, nil)
chainService := setupBeaconChain(t, db, nil)
unixTime := uint64(time.Now().Unix())
deposits, _ := setupInitialDeposits(t, 100)
if err := db.InitializeState(unixTime, deposits, &pb.Eth1Data{}); err != nil {
@@ -72,7 +72,7 @@ func TestReceiveBlock_ProcessCorrectly(t *testing.T) {
hook := logTest.NewGlobal()
db := internal.SetupDB(t)
defer internal.TeardownDB(t, db)
chainService := setupBeaconChain(t, false, db, true, nil)
chainService := setupBeaconChain(t, db, nil)
deposits, privKeys := setupInitialDeposits(t, 100)
eth1Data := &pb.Eth1Data{
DepositRootHash32: []byte{},
@@ -131,7 +131,7 @@ func TestReceiveBlock_RemovesPendingDeposits(t *testing.T) {
attsService := attestation.NewAttestationService(
context.Background(),
&attestation.Config{BeaconDB: db})
chainService := setupBeaconChain(t, false, db, true, attsService)
chainService := setupBeaconChain(t, db, attsService)
deposits, privKeys := setupInitialDeposits(t, 100)
eth1Data := &pb.Eth1Data{
DepositRootHash32: []byte{},
@@ -244,7 +244,7 @@ func TestIsBlockReadyForProcessing_ValidBlock(t *testing.T) {
defer internal.TeardownDB(t, db)
ctx := context.Background()
chainService := setupBeaconChain(t, false, db, true, nil)
chainService := setupBeaconChain(t, db, nil)
unixTime := uint64(time.Now().Unix())
deposits, privKeys := setupInitialDeposits(t, 100)
if err := db.InitializeState(unixTime, deposits, &pb.Eth1Data{}); err != nil {
@@ -258,7 +258,7 @@ func TestIsBlockReadyForProcessing_ValidBlock(t *testing.T) {
ParentRootHash32: []byte{'a'},
}
if err := chainService.isBlockReadyForProcessing(block, beaconState); err == nil {
if err := chainService.VerifyBlockValidity(block, beaconState); err == nil {
t.Fatal("block processing succeeded despite block having no parent saved")
}
@@ -308,9 +308,7 @@ func TestIsBlockReadyForProcessing_ValidBlock(t *testing.T) {
},
}
chainService.enablePOWChain = true
if err := chainService.isBlockReadyForProcessing(block2, beaconState); err != nil {
if err := chainService.VerifyBlockValidity(block2, beaconState); err != nil {
t.Fatalf("block processing failed despite being a valid block: %v", err)
}
}
@@ -333,7 +331,7 @@ func TestDeleteValidatorIdx_DeleteWorks(t *testing.T) {
ValidatorRegistry: validators,
Slot: epoch * params.BeaconConfig().SlotsPerEpoch,
}
chainService := setupBeaconChain(t, false, db, true, nil)
chainService := setupBeaconChain(t, db, nil)
if err := chainService.saveValidatorIdx(state); err != nil {
t.Fatalf("Could not save validator idx: %v", err)
}
@@ -375,7 +373,7 @@ func TestSaveValidatorIdx_SaveRetrieveWorks(t *testing.T) {
ValidatorRegistry: validators,
Slot: epoch * params.BeaconConfig().SlotsPerEpoch,
}
chainService := setupBeaconChain(t, false, db, true, nil)
chainService := setupBeaconChain(t, db, nil)
if err := chainService.saveValidatorIdx(state); err != nil {
t.Fatalf("Could not save validator idx: %v", err)
}

View File

@@ -29,51 +29,6 @@ import (
// Ensure ChainService implements interfaces.
var _ = ForkChoice(&ChainService{})
// Generates an initial genesis block and state using a custom number of initial
// deposits as a helper function for LMD Ghost fork-choice testing.
func generateTestGenesisStateAndBlock(
t testing.TB,
numDeposits uint64,
beaconDB *db.BeaconDB,
) (*pb.BeaconState, *pb.BeaconBlock, [32]byte, [32]byte) {
deposits := make([]*pb.Deposit, numDeposits)
for i := 0; i < len(deposits); i++ {
pubkey := []byte{byte(i)}
depositInput := &pb.DepositInput{
Pubkey: pubkey,
}
balance := params.BeaconConfig().MaxDepositAmount
depositData, err := helpers.EncodeDepositData(depositInput, balance, time.Now().Unix())
if err != nil {
t.Fatalf("Could not encode deposit: %v", err)
}
deposits[i] = &pb.Deposit{DepositData: depositData}
}
genesisTime := uint64(time.Unix(0, 0).Unix())
beaconState, err := state.GenesisBeaconState(deposits, genesisTime, nil)
if err != nil {
t.Fatal(err)
}
if err := beaconDB.SaveState(beaconState); err != nil {
t.Fatal(err)
}
stateRoot, err := hashutil.HashProto(beaconState)
if err != nil {
t.Fatal(err)
}
genesisBlock := b.NewGenesisBlock(stateRoot[:])
if err := beaconDB.SaveBlock(genesisBlock); err != nil {
t.Fatal(err)
}
genesisRoot, err := hashutil.HashBeaconBlock(genesisBlock)
if err != nil {
t.Fatal(err)
}
return beaconState, genesisBlock, stateRoot, genesisRoot
}
func TestApplyForkChoice_SetsCanonicalHead(t *testing.T) {
deposits, _ := setupInitialDeposits(t, 5)
beaconState, err := state.GenesisBeaconState(deposits, 0, nil)
@@ -127,7 +82,7 @@ func TestApplyForkChoice_SetsCanonicalHead(t *testing.T) {
context.Background(),
&attestation.Config{BeaconDB: db})
chainService := setupBeaconChain(t, false, db, true, attsService)
chainService := setupBeaconChain(t, db, attsService)
if err := chainService.beaconDB.SaveBlock(
genesis); err != nil {
t.Fatal(err)
@@ -270,7 +225,7 @@ func TestAttestationTargets_RetrieveWorks(t *testing.T) {
pubKey48 := bytesutil.ToBytes48(pubKey)
attsService.InsertAttestationIntoStore(pubKey48, att)
chainService := setupBeaconChain(t, false, beaconDB, true, attsService)
chainService := setupBeaconChain(t, beaconDB, attsService)
attestationTargets, err := chainService.attestationTargets(state)
if err != nil {
t.Fatalf("Could not get attestation targets: %v", err)
@@ -284,7 +239,7 @@ func TestBlockChildren_2InARow(t *testing.T) {
beaconDB := internal.SetupDB(t)
defer internal.TeardownDB(t, beaconDB)
chainService := setupBeaconChain(t, false, beaconDB, true, nil)
chainService := setupBeaconChain(t, beaconDB, nil)
state := &pb.BeaconState{
Slot: 3,
@@ -349,7 +304,7 @@ func TestBlockChildren_ChainSplits(t *testing.T) {
beaconDB := internal.SetupDB(t)
defer internal.TeardownDB(t, beaconDB)
chainService := setupBeaconChain(t, false, beaconDB, true, nil)
chainService := setupBeaconChain(t, beaconDB, nil)
state := &pb.BeaconState{
Slot: 10,
@@ -423,7 +378,7 @@ func TestBlockChildren_SkipSlots(t *testing.T) {
beaconDB := internal.SetupDB(t)
defer internal.TeardownDB(t, beaconDB)
chainService := setupBeaconChain(t, false, beaconDB, true, nil)
chainService := setupBeaconChain(t, beaconDB, nil)
state := &pb.BeaconState{
Slot: 10,
@@ -494,7 +449,7 @@ func TestLMDGhost_TrivialHeadUpdate(t *testing.T) {
ValidatorRegistry: []*pb.Validator{{}},
}
chainService := setupBeaconChain(t, false, beaconDB, true, nil)
chainService := setupBeaconChain(t, beaconDB, nil)
// Construct the following chain:
// B1 - B2 (State is slot 2)
@@ -552,7 +507,7 @@ func TestLMDGhost_3WayChainSplitsSameHeight(t *testing.T) {
ValidatorRegistry: []*pb.Validator{{}, {}, {}, {}},
}
chainService := setupBeaconChain(t, false, beaconDB, true, nil)
chainService := setupBeaconChain(t, beaconDB, nil)
// Construct the following chain:
// /- B2
@@ -636,7 +591,7 @@ func TestLMDGhost_2WayChainSplitsDiffHeight(t *testing.T) {
ValidatorRegistry: []*pb.Validator{{}, {}, {}, {}},
}
chainService := setupBeaconChain(t, false, beaconDB, true, nil)
chainService := setupBeaconChain(t, beaconDB, nil)
// Construct the following chain:
// /- B2 - B4 - B6
@@ -1047,7 +1002,6 @@ func setupBeaconChainBenchmark(b *testing.B, faultyPoWClient bool, beaconDB *db.
BeaconDB: beaconDB,
Web3Service: web3Service,
OpsPoolService: &mockOperationService{},
EnablePOWChain: enablePOWChain,
AttsService: attsService,
}
if err != nil {
@@ -1065,7 +1019,7 @@ func TestUpdateFFGCheckPts_NewJustifiedSlot(t *testing.T) {
genesisSlot := params.BeaconConfig().GenesisSlot
db := internal.SetupDB(t)
defer internal.TeardownDB(t, db)
chainSvc := setupBeaconChain(t, false, db, true, nil)
chainSvc := setupBeaconChain(t, db, nil)
gBlockRoot, gBlock, gState, privKeys := setupFFGTest(t)
if err := chainSvc.beaconDB.SaveBlock(gBlock); err != nil {
t.Fatal(err)
@@ -1139,7 +1093,7 @@ func TestUpdateFFGCheckPts_NewFinalizedSlot(t *testing.T) {
genesisSlot := params.BeaconConfig().GenesisSlot
db := internal.SetupDB(t)
defer internal.TeardownDB(t, db)
chainSvc := setupBeaconChain(t, false, db, true, nil)
chainSvc := setupBeaconChain(t, db, nil)
gBlockRoot, gBlock, gState, privKeys := setupFFGTest(t)
if err := chainSvc.beaconDB.SaveBlock(gBlock); err != nil {
@@ -1221,7 +1175,7 @@ func TestUpdateFFGCheckPts_NewJustifiedSkipSlot(t *testing.T) {
genesisSlot := params.BeaconConfig().GenesisSlot
db := internal.SetupDB(t)
defer internal.TeardownDB(t, db)
chainSvc := setupBeaconChain(t, false, db, true, nil)
chainSvc := setupBeaconChain(t, db, nil)
gBlockRoot, gBlock, gState, privKeys := setupFFGTest(t)
if err := chainSvc.beaconDB.SaveBlock(gBlock); err != nil {
t.Fatal(err)

View File

@@ -42,7 +42,6 @@ type ChainService struct {
canonicalBlockChan chan *pb.BeaconBlock
canonicalBlockFeed *event.Feed
genesisTime time.Time
enablePOWChain bool
finalizedEpoch uint64
stateInitializedFeed *event.Feed
p2p p2p.Broadcaster
@@ -56,7 +55,6 @@ type Config struct {
BeaconDB *db.BeaconDB
OpsPoolService operations.OperationFeeds
DevMode bool
EnablePOWChain bool
P2p p2p.Broadcaster
}
@@ -75,7 +73,6 @@ func NewChainService(ctx context.Context, cfg *Config) (*ChainService, error) {
canonicalBlockChan: make(chan *pb.BeaconBlock, params.BeaconConfig().DefaultBufferSize),
chainStartChan: make(chan time.Time),
stateInitializedFeed: new(event.Feed),
enablePOWChain: cfg.EnablePOWChain,
p2p: cfg.P2p,
}, nil
}

View File

@@ -22,7 +22,6 @@ import (
"github.com/prysmaticlabs/prysm/beacon-chain/powchain"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/bls"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/event"
"github.com/prysmaticlabs/prysm/shared/featureconfig"
"github.com/prysmaticlabs/prysm/shared/forkutil"
@@ -215,32 +214,19 @@ func setupGenesisBlock(t *testing.T, cs *ChainService, beaconState *pb.BeaconSta
return parentHash, genesis
}
func setupBeaconChain(t *testing.T, faultyPoWClient bool, beaconDB *db.BeaconDB, enablePOWChain bool, attsService *attestation.Service) *ChainService {
func setupBeaconChain(t *testing.T, beaconDB *db.BeaconDB, attsService *attestation.Service) *ChainService {
endpoint := "ws://127.0.0.1"
ctx := context.Background()
var web3Service *powchain.Web3Service
var err error
if enablePOWChain {
if faultyPoWClient {
client := &faultyClient{}
web3Service, err = powchain.NewWeb3Service(ctx, &powchain.Web3ServiceConfig{
Endpoint: endpoint,
DepositContract: common.Address{},
Reader: client,
Client: client,
Logger: client,
})
} else {
client := &mockClient{}
web3Service, err = powchain.NewWeb3Service(ctx, &powchain.Web3ServiceConfig{
Endpoint: endpoint,
DepositContract: common.Address{},
Reader: client,
Client: client,
Logger: client,
})
}
}
client := &mockClient{}
web3Service, err = powchain.NewWeb3Service(ctx, &powchain.Web3ServiceConfig{
Endpoint: endpoint,
DepositContract: common.Address{},
Reader: client,
Client: client,
Logger: client,
})
if err != nil {
t.Fatalf("unable to set up web3 service: %v", err)
}
@@ -250,7 +236,6 @@ func setupBeaconChain(t *testing.T, faultyPoWClient bool, beaconDB *db.BeaconDB,
BeaconDB: beaconDB,
Web3Service: web3Service,
OpsPoolService: &mockOperationService{},
EnablePOWChain: enablePOWChain,
AttsService: attsService,
P2p: &mockBroadcaster{},
}
@@ -279,7 +264,7 @@ func TestChainStartStop_Uninitialized(t *testing.T) {
hook := logTest.NewGlobal()
db := internal.SetupDB(t)
defer internal.TeardownDB(t, db)
chainService := setupBeaconChain(t, false, db, true, nil)
chainService := setupBeaconChain(t, db, nil)
// Test the start function.
genesisChan := make(chan time.Time, 0)
@@ -314,32 +299,13 @@ func TestChainStartStop_Uninitialized(t *testing.T) {
testutil.AssertLogsContain(t, hook, "ChainStart time reached, starting the beacon chain!")
}
func TestChainStartStop_UninitializedAndNoPOWChain(t *testing.T) {
hook := logTest.NewGlobal()
db := internal.SetupDB(t)
defer internal.TeardownDB(t, db)
chainService := setupBeaconChain(t, false, db, false, nil)
origExitFunc := logrus.StandardLogger().ExitFunc
defer func() { logrus.StandardLogger().ExitFunc = origExitFunc }()
fatal := false
logrus.StandardLogger().ExitFunc = func(int) { fatal = true }
// Test the start function.
chainService.Start()
if !fatal {
t.Fatalf("Not exists fatal for init BeaconChain without POW chain")
}
testutil.AssertLogsContain(t, hook, "Not configured web3Service for POW chain")
}
func TestChainStartStop_Initialized(t *testing.T) {
hook := logTest.NewGlobal()
db := internal.SetupDB(t)
defer internal.TeardownDB(t, db)
ctx := context.Background()
chainService := setupBeaconChain(t, false, db, true, nil)
chainService := setupBeaconChain(t, db, nil)
unixTime := uint64(time.Now().Unix())
deposits, _ := setupInitialDeposits(t, 100)
@@ -364,34 +330,3 @@ func TestChainStartStop_Initialized(t *testing.T) {
}
testutil.AssertLogsContain(t, hook, "Beacon chain data already exists, starting service")
}
func TestPOWBlockExists_UsingDepositRootHash(t *testing.T) {
hook := logTest.NewGlobal()
db := internal.SetupDB(t)
defer internal.TeardownDB(t, db)
ctx := context.Background()
chainService := setupBeaconChain(t, true, db, true, nil)
unixTime := uint64(time.Now().Unix())
deposits, _ := setupInitialDeposits(t, 10)
eth1Data := &pb.Eth1Data{
DepositRootHash32: []byte{},
BlockHash32: []byte{},
}
if err := db.InitializeState(unixTime, deposits, eth1Data); err != nil {
t.Fatalf("Could not initialize beacon state to disk: %v", err)
}
beaconState, err := chainService.beaconDB.State(ctx)
if err != nil {
t.Fatalf("Unable to retrieve beacon state %v", err)
}
// Using a faulty client should throw error.
powHash := bytesutil.ToBytes32(beaconState.LatestEth1Data.DepositRootHash32)
exists := chainService.doesPoWBlockExist(powHash)
if exists {
t.Error("Block corresponding to nil powchain reference should not exist")
}
testutil.AssertLogsContain(t, hook, "fetching PoW block corresponding to mainchain reference failed")
}

View File

@@ -54,8 +54,7 @@ func NewSimulatedBackend() (*SimulatedBackend, error) {
return nil, fmt.Errorf("could not setup simulated backend db: %v", err)
}
cs, err := blockchain.NewChainService(context.Background(), &blockchain.Config{
BeaconDB: db,
EnablePOWChain: false,
BeaconDB: db,
})
if err != nil {
return nil, err

View File

@@ -29,7 +29,6 @@ func IsValidBlock(
ctx context.Context,
state *pb.BeaconState,
block *pb.BeaconBlock,
enablePOWChain bool,
HasBlock func(hash [32]byte) bool,
GetPOWBlock func(ctx context.Context, hash common.Hash) (*gethTypes.Block, error),
genesisTime time.Time) error {
@@ -42,19 +41,17 @@ func IsValidBlock(
return fmt.Errorf("unprocessed parent block as it is not saved in the db: %#x", parentRoot)
}
if enablePOWChain {
h := common.BytesToHash(state.LatestEth1Data.BlockHash32)
powBlock, err := GetPOWBlock(ctx, h)
if err != nil {
return fmt.Errorf("unable to retrieve POW chain reference block: %v", err)
}
h := common.BytesToHash(state.LatestEth1Data.BlockHash32)
powBlock, err := GetPOWBlock(ctx, h)
if err != nil {
return fmt.Errorf("unable to retrieve POW chain reference block: %v", err)
}
// Pre-Processing Condition 2:
// The block pointed to by the state in state.processed_pow_receipt_root has
// been processed in the ETH 1.0 chain.
if powBlock == nil {
return fmt.Errorf("proof-of-Work chain reference in state does not exist: %#x", state.LatestEth1Data.BlockHash32)
}
// Pre-Processing Condition 2:
// The block pointed to by the state in state.processed_pow_receipt_root has
// been processed in the ETH 1.0 chain.
if powBlock == nil {
return fmt.Errorf("proof-of-Work chain reference in state does not exist: %#x", state.LatestEth1Data.BlockHash32)
}
// Pre-Processing Condition 4:

View File

@@ -53,7 +53,7 @@ func TestIsValidBlock_NoParent(t *testing.T) {
db.hasBlock = false
if err := IsValidBlock(ctx, beaconState, block, true,
if err := IsValidBlock(ctx, beaconState, block,
db.HasBlock, powClient.BlockByHash, genesisTime); err == nil {
t.Fatal("block is valid despite not having a parent")
}
@@ -82,7 +82,7 @@ func TestIsValidBlock_InvalidSlot(t *testing.T) {
DepositRootHash32: []byte{2},
BlockHash32: []byte{3},
}
if err := IsValidBlock(ctx, beaconState, block, true,
if err := IsValidBlock(ctx, beaconState, block,
db.HasBlock, powClient.BlockByHash, genesisTime); err == nil {
t.Fatalf("block is valid despite having an invalid slot %d", block.Slot)
}
@@ -112,7 +112,7 @@ func TestIsValidBlock_InvalidPoWReference(t *testing.T) {
BlockHash32: []byte{3},
}
if err := IsValidBlock(ctx, beaconState, block, true,
if err := IsValidBlock(ctx, beaconState, block,
db.HasBlock, powClient.BlockByHash, genesisTime); err == nil {
t.Fatalf("block is valid despite having an invalid pow reference block")
}
@@ -142,7 +142,7 @@ func TestIsValidBlock_InvalidGenesis(t *testing.T) {
invalidTime := time.Now().AddDate(1, 2, 3)
if err := IsValidBlock(ctx, beaconState, block, true,
if err := IsValidBlock(ctx, beaconState, block,
db.HasBlock, powClient.BlockByHash, genesisTime); err == nil {
t.Fatalf("block is valid despite having an invalid genesis time %v", invalidTime)
}
@@ -171,7 +171,7 @@ func TestIsValidBlock_GoodBlock(t *testing.T) {
Slot: params.BeaconConfig().GenesisSlot + 4,
}
if err := IsValidBlock(ctx, beaconState, block, true,
if err := IsValidBlock(ctx, beaconState, block,
db.HasBlock, powClient.BlockByHash, genesisTime); err != nil {
t.Fatal(err)
}

View File

@@ -12,6 +12,7 @@ go_library(
importpath = "github.com/prysmaticlabs/prysm/beacon-chain/rpc",
visibility = ["//beacon-chain:__subpackages__"],
deps = [
"//beacon-chain/blockchain:go_default_library",
"//beacon-chain/core/blocks:go_default_library",
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/core/state:go_default_library",

View File

@@ -65,7 +65,7 @@ func (ps *ProposerServer) ProposeBlock(ctx context.Context, blk *pbp2p.BeaconBlo
}
log.WithField("headRoot", fmt.Sprintf("0x%x", h)).Info("Chain head block and state updated")
if err := ps.chainService.SaveHistoricalState(beaconState); err != nil {
if err := ps.beaconDB.SaveHistoricalState(beaconState); err != nil {
log.Errorf("Could not save new historical state: %v", err)
}
return &pb.ProposeResponse{BlockRootHash32: h[:]}, nil

View File

@@ -13,6 +13,7 @@ import (
middleware "github.com/grpc-ecosystem/go-grpc-middleware"
recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/prysmaticlabs/prysm/beacon-chain/blockchain"
"github.com/prysmaticlabs/prysm/beacon-chain/db"
pbp2p "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
pb "github.com/prysmaticlabs/prysm/proto/beacon/rpc/v1"
@@ -33,10 +34,8 @@ func init() {
}
type chainService interface {
CanonicalBlockFeed() *event.Feed
StateInitializedFeed() *event.Feed
ReceiveBlock(ctx context.Context, block *pbp2p.BeaconBlock) (*pbp2p.BeaconState, error)
SaveHistoricalState(beaconState *pbp2p.BeaconState) error
blockchain.BlockReceiver
}
type operationService interface {

View File

@@ -9,6 +9,7 @@ go_library(
importpath = "github.com/prysmaticlabs/prysm/beacon-chain/sync/initial-sync",
visibility = ["//beacon-chain:__subpackages__"],
deps = [
"//beacon-chain/blockchain:go_default_library",
"//beacon-chain/db:go_default_library",
"//proto/beacon/p2p/v1:go_default_library",
"//shared/bytesutil:go_default_library",

View File

@@ -23,6 +23,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/gogo/protobuf/proto"
peer "github.com/libp2p/go-libp2p-peer"
"github.com/prysmaticlabs/prysm/beacon-chain/blockchain"
"github.com/prysmaticlabs/prysm/beacon-chain/db"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
@@ -73,8 +74,8 @@ type p2pAPI interface {
}
type chainService interface {
ReceiveBlock(ctx context.Context, block *pb.BeaconBlock) (*pb.BeaconState, error)
ApplyForkChoiceRule(ctx context.Context, block *pb.BeaconBlock, computedState *pb.BeaconState) error
blockchain.ForkChoice
blockchain.BlockProcessor
}
type powChainService interface {
@@ -564,7 +565,17 @@ func (s *InitialSync) validateAndSaveNextBlock(ctx context.Context, block *pb.Be
}
// Send block to main chain service to be processed.
beaconState, err := s.chainService.ReceiveBlock(ctx, block)
beaconState, err := s.db.State(ctx)
if err != nil {
return fmt.Errorf("could not fetch state: %v", err)
}
if err := s.chainService.VerifyBlockValidity(block, beaconState); err != nil {
return fmt.Errorf("block not valid: %v", err)
}
if err := s.db.SaveBlock(block); err != nil {
return fmt.Errorf("could not save block: %v", err)
}
beaconState, err = s.chainService.ApplyBlockStateTransition(ctx, block, beaconState)
if err != nil {
return fmt.Errorf("could not process beacon block: %v", err)
}

View File

@@ -53,10 +53,18 @@ func (ms *mockSyncService) ResumeSync() {
type mockChainService struct{}
func (m *mockChainService) ReceiveBlock(ctx context.Context, block *pb.BeaconBlock) (*pb.BeaconState, error) {
func (m *mockChainService) ApplyBlockStateTransition(
ctx context.Context, block *pb.BeaconBlock, beaconState *pb.BeaconState,
) (*pb.BeaconState, error) {
return &pb.BeaconState{}, nil
}
func (m *mockChainService) VerifyBlockValidity(
block *pb.BeaconBlock, beaconState *pb.BeaconState,
) error {
return nil
}
func (m *mockChainService) ApplyForkChoiceRule(ctx context.Context, block *pb.BeaconBlock, computedState *pb.BeaconState) error {
return nil
}

View File

@@ -34,6 +34,7 @@ var (
)
type chainService interface {
blockchain.BlockReceiver
blockchain.BlockProcessor
blockchain.ForkChoice
blockchain.ChainFeeds

View File

@@ -72,6 +72,14 @@ func (ms *mockChainService) ReceiveBlock(ctx context.Context, block *pb.BeaconBl
return &pb.BeaconState{}, nil
}
func (ms *mockChainService) ApplyBlockStateTransition(ctx context.Context, block *pb.BeaconBlock, beaconState *pb.BeaconState) (*pb.BeaconState, error) {
return &pb.BeaconState{}, nil
}
func (ms *mockChainService) VerifyBlockValidity(block *pb.BeaconBlock, beaconState *pb.BeaconState) error {
return nil
}
func (ms *mockChainService) ApplyForkChoiceRule(ctx context.Context, block *pb.BeaconBlock, computedState *pb.BeaconState) error {
return nil
}