Refactor Initial Sync, Enable Catching Up to Chain (#2111)

* refactor initial sync to prevent reorg infinite loops

* lint

* fixed build

* passing tests

* tests passing

* terence suggestion

* new attempt

* clean up and refactor sync service

* complete the new initial sync logic

* revert head

* init sync working

* config for blockchain receive block

* all works

* builds

* fix a few more tests

* init sync tests pass

* revert scripts

* revert accounts changes

* lint

* lint2

* travis lint

* fix build

* fix single use argument

* any peer

* imports spacing

* imports

* ready for a rolling restart

* add todo

* fork choice in blocks when exiting sync

* readd finalized state root to requests

* successful build

* revert blockchain config

* old config reversion

* initial sync tests pass

* initial sync full test works

* lint

* use the new block processing api

* new proto defs

* init sync functions again

* remove sync polling

* tests fixed

* fixed catching up with chain

* tests pass

* spacing

* lint

* goimports

* add changes

* add lock and conditional to prevent multiple goroutines

* make reg sync synchronous

* add

* fixed the parent block issue

* fix errors in chain service

* tests pass

* check nil block

* typo

* fix nil state

* merge & conflicts

* revert synchronus reg sync

* add more spans to state db

* fix lint

* lint
This commit is contained in:
Raul Jordan
2019-04-03 10:13:19 -05:00
committed by GitHub
parent fa063c85ca
commit eb900a8193
21 changed files with 888 additions and 884 deletions

View File

@@ -28,6 +28,7 @@ type BlockReceiver interface {
type BlockProcessor interface {
VerifyBlockValidity(block *pb.BeaconBlock, beaconState *pb.BeaconState) error
ApplyBlockStateTransition(ctx context.Context, block *pb.BeaconBlock, beaconState *pb.BeaconState) (*pb.BeaconState, error)
CleanupBlockOperations(ctx context.Context, block *pb.BeaconBlock) error
}
// ReceiveBlock is a function that defines the operations that are preformed on
@@ -47,12 +48,12 @@ func (c *ChainService) ReceiveBlock(ctx context.Context, block *pb.BeaconBlock)
// We first verify the block's basic validity conditions.
if err := c.VerifyBlockValidity(block, beaconState); err != nil {
return nil, fmt.Errorf("block with slot %d is not ready for processing: %v", block.Slot, err)
return beaconState, fmt.Errorf("block with slot %d is not ready for processing: %v", block.Slot, err)
}
// We save the block to the DB and broadcast it to our peers.
if err := c.SaveAndBroadcastBlock(ctx, block); err != nil {
return nil, fmt.Errorf(
return beaconState, fmt.Errorf(
"could not save and broadcast beacon block with slot %d: %v",
block.Slot-params.BeaconConfig().GenesisSlot, err,
)
@@ -64,7 +65,7 @@ func (c *ChainService) ReceiveBlock(ctx context.Context, block *pb.BeaconBlock)
// We then apply the block state transition accordingly to obtain the resulting beacon state.
beaconState, err = c.ApplyBlockStateTransition(ctx, block, beaconState)
if err != nil {
return nil, fmt.Errorf("could not apply block state transition: %v", err)
return beaconState, fmt.Errorf("could not apply block state transition: %v", err)
}
log.WithFields(logrus.Fields{
@@ -76,7 +77,7 @@ func (c *ChainService) ReceiveBlock(ctx context.Context, block *pb.BeaconBlock)
// We process the block's contained deposits, attestations, and other operations
// and that may need to be stored or deleted from the beacon node's persistent storage.
if err := c.CleanupBlockOperations(ctx, block); err != nil {
return nil, fmt.Errorf("could not process block deposits, attestations, and other operations: %v", err)
return beaconState, fmt.Errorf("could not process block deposits, attestations, and other operations: %v", err)
}
log.WithField("slot", block.Slot-params.BeaconConfig().GenesisSlot).Info("Processed beacon block")
@@ -106,7 +107,7 @@ func (c *ChainService) ApplyBlockStateTransition(
// Retrieve the last processed beacon block's hash root.
headRoot, err := c.ChainHeadRoot()
if err != nil {
return nil, fmt.Errorf("could not retrieve chain head root: %v", err)
return beaconState, fmt.Errorf("could not retrieve chain head root: %v", err)
}
// Check for skipped slots.
@@ -114,7 +115,7 @@ func (c *ChainService) ApplyBlockStateTransition(
for beaconState.Slot < block.Slot-1 {
beaconState, err = c.runStateTransition(headRoot, nil, beaconState)
if err != nil {
return nil, fmt.Errorf("could not execute state transition without block %v", err)
return beaconState, fmt.Errorf("could not execute state transition without block %v", err)
}
numSkippedSlots++
}
@@ -124,7 +125,7 @@ func (c *ChainService) ApplyBlockStateTransition(
beaconState, err = c.runStateTransition(headRoot, block, beaconState)
if err != nil {
return nil, fmt.Errorf("could not execute state transition with block %v", err)
return beaconState, fmt.Errorf("could not execute state transition with block %v", err)
}
return beaconState, nil
}
@@ -197,7 +198,7 @@ func (c *ChainService) CleanupBlockOperations(ctx context.Context, block *pb.Bea
func (c *ChainService) runStateTransition(
headRoot [32]byte, block *pb.BeaconBlock, beaconState *pb.BeaconState,
) (*pb.BeaconState, error) {
beaconState, err := state.ExecuteStateTransition(
newState, err := state.ExecuteStateTransition(
c.ctx,
beaconState,
block,
@@ -208,40 +209,40 @@ func (c *ChainService) runStateTransition(
},
)
if err != nil {
return nil, fmt.Errorf("could not execute state transition %v", err)
return beaconState, fmt.Errorf("could not execute state transition %v", err)
}
log.WithField(
"slotsSinceGenesis", beaconState.Slot-params.BeaconConfig().GenesisSlot,
"slotsSinceGenesis", newState.Slot-params.BeaconConfig().GenesisSlot,
).Info("Slot transition successfully processed")
if block != nil {
log.WithField(
"slotsSinceGenesis", beaconState.Slot-params.BeaconConfig().GenesisSlot,
"slotsSinceGenesis", newState.Slot-params.BeaconConfig().GenesisSlot,
).Info("Block transition successfully processed")
}
if helpers.IsEpochEnd(beaconState.Slot) {
if helpers.IsEpochEnd(newState.Slot) {
// Save activated validators of this epoch to public key -> index DB.
if err := c.saveValidatorIdx(beaconState); err != nil {
return nil, fmt.Errorf("could not save validator index: %v", err)
if err := c.saveValidatorIdx(newState); err != nil {
return newState, fmt.Errorf("could not save validator index: %v", err)
}
// Delete exited validators of this epoch to public key -> index DB.
if err := c.deleteValidatorIdx(beaconState); err != nil {
return nil, fmt.Errorf("could not delete validator index: %v", err)
if err := c.deleteValidatorIdx(newState); err != nil {
return newState, fmt.Errorf("could not delete validator index: %v", err)
}
// Update FFG checkpoints in DB.
if err := c.updateFFGCheckPts(beaconState); err != nil {
return nil, fmt.Errorf("could not update FFG checkpts: %v", err)
if err := c.updateFFGCheckPts(newState); err != nil {
return newState, fmt.Errorf("could not update FFG checkpts: %v", err)
}
// Save Historical States.
if err := c.beaconDB.SaveHistoricalState(beaconState); err != nil {
return nil, fmt.Errorf("could not save historical state: %v", err)
if err := c.beaconDB.SaveHistoricalState(newState); err != nil {
return newState, fmt.Errorf("could not save historical state: %v", err)
}
log.WithField(
"SlotsSinceGenesis", beaconState.Slot-params.BeaconConfig().GenesisSlot,
"SlotsSinceGenesis", newState.Slot-params.BeaconConfig().GenesisSlot,
).Info("Epoch transition successfully processed")
}
return beaconState, nil
return newState, nil
}
// saveValidatorIdx saves the validators public key to index mapping in DB, these

View File

@@ -130,7 +130,7 @@ func (c *ChainService) ApplyForkChoiceRule(ctx context.Context, block *pb.Beacon
}
justifiedHead, err := c.beaconDB.JustifiedBlock()
if err != nil {
return err
return fmt.Errorf("could not retrieve justified head: %v", err)
}
head, err := c.lmdGhost(justifiedHead, justifiedState, postState, attestationTargets)
if err != nil {

View File

@@ -148,10 +148,10 @@ func (db *BeaconDB) SaveState(ctx context.Context, beaconState *pb.BeaconState)
if prevStatePb.Slot >= beaconState.Slot {
log.WithField(
"prevStateSlot",
prevStatePb.Slot,
prevStatePb.Slot-params.BeaconConfig().GenesisSlot,
).WithField(
"newStateSlot",
beaconState.Slot,
beaconState.Slot-params.BeaconConfig().GenesisSlot,
).Warn("Current saved state has a slot number greater or equal to the state attempted to be saved")
}
}

View File

@@ -3,8 +3,11 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = [
"helpers.go",
"metrics.go",
"service.go",
"sync_blocks.go",
"sync_state.go",
],
importpath = "github.com/prysmaticlabs/prysm/beacon-chain/sync/initial-sync",
visibility = ["//beacon-chain:__subpackages__"],
@@ -19,7 +22,6 @@ go_library(
"//shared/params:go_default_library",
"@com_github_ethereum_go_ethereum//common:go_default_library",
"@com_github_gogo_protobuf//proto:go_default_library",
"@com_github_libp2p_go_libp2p_peer//:go_default_library",
"@com_github_prometheus_client_golang//prometheus:go_default_library",
"@com_github_prometheus_client_golang//prometheus/promauto:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
@@ -33,7 +35,6 @@ go_test(
embed = [":go_default_library"],
deps = [
"//beacon-chain/core/blocks:go_default_library",
"//beacon-chain/core/state:go_default_library",
"//beacon-chain/db:go_default_library",
"//beacon-chain/internal:go_default_library",
"//proto/beacon/p2p/v1:go_default_library",
@@ -42,6 +43,7 @@ go_test(
"//shared/p2p:go_default_library",
"//shared/params:go_default_library",
"//shared/testutil:go_default_library",
"@com_github_ethereum_go_ethereum//common:go_default_library",
"@com_github_gogo_protobuf//proto:go_default_library",
"@com_github_libp2p_go_libp2p_peer//:go_default_library",
"@com_github_sirupsen_logrus//hooks/test:go_default_library",

View File

@@ -0,0 +1,69 @@
package initialsync
import (
"context"
"errors"
"fmt"
"runtime/debug"
"github.com/gogo/protobuf/proto"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/p2p"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/sirupsen/logrus"
"go.opencensus.io/trace"
)
func (s *InitialSync) checkBlockValidity(ctx context.Context, block *pb.BeaconBlock) error {
ctx, span := trace.StartSpan(ctx, "beacon-chain.sync.initial-sync.checkBlockValidity")
defer span.End()
beaconState, err := s.db.State(ctx)
if err != nil {
return fmt.Errorf("failed to get beacon state: %v", err)
}
if block.Slot < beaconState.FinalizedEpoch*params.BeaconConfig().SlotsPerEpoch {
return errors.New(debugError + "discarding received block with a slot number smaller than the last finalized slot")
}
// Attestation from proposer not verified as, other nodes only store blocks not proposer
// attestations.
return nil
}
func (s *InitialSync) doesParentExist(block *pb.BeaconBlock) bool {
parentHash := bytesutil.ToBytes32(block.ParentRootHash32)
return s.db.HasBlock(parentHash)
}
// safelyHandleMessage will recover and log any panic that occurs from the
// function argument.
func safelyHandleMessage(fn func(p2p.Message), msg p2p.Message) {
defer func() {
if r := recover(); r != nil {
printedMsg := "message contains no data"
if msg.Data != nil {
printedMsg = proto.MarshalTextString(msg.Data)
}
log.WithFields(logrus.Fields{
"r": r,
"msg": printedMsg,
}).Error("Panicked when handling p2p message! Recovering...")
debug.PrintStack()
if msg.Ctx == nil {
return
}
if span := trace.FromContext(msg.Ctx); span != nil {
span.SetStatus(trace.Status{
Code: trace.StatusCodeInternal,
Message: fmt.Sprintf("Panic: %v", r),
})
}
}
}()
// Fingers crossed that it doesn't panic...
fn(msg)
}

View File

@@ -15,10 +15,6 @@ var (
Name: "initsync_batched_block_req",
Help: "The number of received batch blocks responses",
})
blockReqSlot = promauto.NewCounter(prometheus.CounterOpts{
Name: "initsync_block_req_by_slot",
Help: "The number of sent block requests by slot",
})
recBlock = promauto.NewCounter(prometheus.CounterOpts{
Name: "initsync_received_blocks",
Help: "The number of received blocks",

View File

@@ -12,27 +12,20 @@ package initialsync
import (
"context"
"errors"
"fmt"
"math/big"
"runtime/debug"
"strings"
"sync"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/gogo/protobuf/proto"
peer "github.com/libp2p/go-libp2p-peer"
"github.com/prysmaticlabs/prysm/beacon-chain/blockchain"
"github.com/prysmaticlabs/prysm/beacon-chain/db"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/event"
"github.com/prysmaticlabs/prysm/shared/hashutil"
"github.com/prysmaticlabs/prysm/shared/p2p"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/sirupsen/logrus"
"go.opencensus.io/trace"
)
var log = logrus.WithField("prefix", "initial-sync")
@@ -73,15 +66,15 @@ type p2pAPI interface {
Subscribe(msg proto.Message, channel chan p2p.Message) event.Subscription
}
type chainService interface {
blockchain.ForkChoice
blockchain.BlockProcessor
}
type powChainService interface {
BlockExists(ctx context.Context, hash common.Hash) (bool, *big.Int, error)
}
type chainService interface {
blockchain.BlockProcessor
blockchain.ForkChoice
}
// SyncService is the interface for the Sync service.
// InitialSync calls `Start` when initial sync completes.
type syncService interface {
@@ -92,26 +85,30 @@ type syncService interface {
// InitialSync defines the main class in this package.
// See the package comments for a general description of the service's functions.
type InitialSync struct {
ctx context.Context
cancel context.CancelFunc
p2p p2pAPI
syncService syncService
chainService chainService
db *db.BeaconDB
powchain powChainService
blockAnnounceBuf chan p2p.Message
batchedBlockBuf chan p2p.Message
blockBuf chan p2p.Message
stateBuf chan p2p.Message
currentSlot uint64
highestObservedSlot uint64
beaconStateSlot uint64
syncPollingInterval time.Duration
inMemoryBlocks map[uint64]*pb.BeaconBlock
syncedFeed *event.Feed
reqState bool
stateRootOfHighestObservedSlot [32]byte
mutex *sync.Mutex
ctx context.Context
cancel context.CancelFunc
p2p p2pAPI
syncService syncService
chainService chainService
db *db.BeaconDB
powchain powChainService
blockAnnounceBuf chan p2p.Message
batchedBlockBuf chan p2p.Message
blockBuf chan p2p.Message
stateBuf chan p2p.Message
currentSlot uint64
highestObservedSlot uint64
beaconStateSlot uint64
syncPollingInterval time.Duration
inMemoryBlocks map[uint64]*pb.BeaconBlock
syncedFeed *event.Feed
stateReceived bool
latestSyncedBlock *pb.BeaconBlock
lastRequestedSlot uint64
finalizedStateRoot [32]byte
mutex *sync.Mutex
nodeIsSynced bool
highestObservedCanonicalState *pb.BeaconState
}
// NewInitialSyncService constructs a new InitialSyncService.
@@ -127,26 +124,25 @@ func NewInitialSyncService(ctx context.Context,
batchedBlockBuf := make(chan p2p.Message, cfg.BatchedBlockBufferSize)
return &InitialSync{
ctx: ctx,
cancel: cancel,
p2p: cfg.P2P,
syncService: cfg.SyncService,
chainService: cfg.ChainService,
db: cfg.BeaconDB,
powchain: cfg.PowChain,
currentSlot: params.BeaconConfig().GenesisSlot,
highestObservedSlot: params.BeaconConfig().GenesisSlot,
beaconStateSlot: params.BeaconConfig().GenesisSlot,
blockBuf: blockBuf,
stateBuf: stateBuf,
batchedBlockBuf: batchedBlockBuf,
blockAnnounceBuf: blockAnnounceBuf,
syncPollingInterval: cfg.SyncPollingInterval,
inMemoryBlocks: map[uint64]*pb.BeaconBlock{},
syncedFeed: new(event.Feed),
reqState: false,
stateRootOfHighestObservedSlot: [32]byte{},
mutex: new(sync.Mutex),
ctx: ctx,
cancel: cancel,
p2p: cfg.P2P,
syncService: cfg.SyncService,
db: cfg.BeaconDB,
powchain: cfg.PowChain,
chainService: cfg.ChainService,
currentSlot: params.BeaconConfig().GenesisSlot,
highestObservedSlot: params.BeaconConfig().GenesisSlot,
beaconStateSlot: params.BeaconConfig().GenesisSlot,
blockBuf: blockBuf,
stateBuf: stateBuf,
batchedBlockBuf: batchedBlockBuf,
blockAnnounceBuf: blockAnnounceBuf,
syncPollingInterval: cfg.SyncPollingInterval,
inMemoryBlocks: map[uint64]*pb.BeaconBlock{},
syncedFeed: new(event.Feed),
stateReceived: false,
mutex: new(sync.Mutex),
}
}
@@ -157,19 +153,8 @@ func (s *InitialSync) Start() {
log.Errorf("Unable to get chain head %v", err)
}
s.currentSlot = cHead.Slot
var reqState bool
// setting genesis bool
if cHead.Slot == params.BeaconConfig().GenesisSlot || s.isSlotDiffLarge() {
reqState = true
}
s.reqState = reqState
go func() {
ticker := time.NewTicker(s.syncPollingInterval)
s.run(ticker.C)
ticker.Stop()
}()
go s.run()
go s.listenForNewBlocks()
go s.checkInMemoryBlocks()
}
@@ -185,98 +170,42 @@ func (s *InitialSync) InitializeObservedSlot(slot uint64) {
s.highestObservedSlot = slot
}
// InitializeStateRoot sets the state root of the highest observed slot.
func (s *InitialSync) InitializeStateRoot(root [32]byte) {
s.stateRootOfHighestObservedSlot = root
// InitializeFinalizedStateRoot sets the state root of the last finalized state.
func (s *InitialSync) InitializeFinalizedStateRoot(root [32]byte) {
s.finalizedStateRoot = root
}
// SyncedFeed returns a feed which fires a message once the node is synced
func (s *InitialSync) SyncedFeed() *event.Feed {
return s.syncedFeed
// NodeIsSynced checks that the node has been caught up with the network.
func (s *InitialSync) NodeIsSynced() (bool, uint64) {
return s.nodeIsSynced, s.currentSlot
}
// run is the main goroutine for the initial sync service.
// delayChan is explicitly passed into this function to facilitate tests that don't require a timeout.
// It is assumed that the goroutine `run` is only called once per instance.
func (s *InitialSync) run(delayChan <-chan time.Time) {
blockSub := s.p2p.Subscribe(&pb.BeaconBlockResponse{}, s.blockBuf)
batchedBlocksub := s.p2p.Subscribe(&pb.BatchedBeaconBlockResponse{}, s.batchedBlockBuf)
blockAnnounceSub := s.p2p.Subscribe(&pb.BeaconBlockAnnounce{}, s.blockAnnounceBuf)
beaconStateSub := s.p2p.Subscribe(&pb.BeaconStateResponse{}, s.stateBuf)
defer func() {
blockSub.Unsubscribe()
blockAnnounceSub.Unsubscribe()
beaconStateSub.Unsubscribe()
batchedBlocksub.Unsubscribe()
close(s.batchedBlockBuf)
close(s.blockBuf)
close(s.stateBuf)
}()
if s.reqState {
if err := s.requestStateFromPeer(s.ctx, s.stateRootOfHighestObservedSlot[:], p2p.AnyPeer); err != nil {
log.Errorf("Could not request state from peer %v", err)
}
} else {
// Send out a batch request
s.requestBatchedBlocks(s.currentSlot+1, s.highestObservedSlot)
func (s *InitialSync) exitInitialSync(ctx context.Context) error {
if s.nodeIsSynced {
return nil
}
state := s.highestObservedCanonicalState
var err error
if err := s.db.SaveBlock(s.latestSyncedBlock); err != nil {
return fmt.Errorf("could not save block: %v", err)
}
if err := s.db.UpdateChainHead(ctx, s.latestSyncedBlock, state); err != nil {
return fmt.Errorf("could not update chain head: %v", err)
}
if err := s.db.SaveHistoricalState(state); err != nil {
return fmt.Errorf("could not save state: %v", err)
}
for {
select {
case <-s.ctx.Done():
log.Debug("Exiting goroutine")
return
case <-delayChan:
if s.checkSyncStatus() {
return
}
case msg := <-s.blockAnnounceBuf:
safelyHandleMessage(s.processBlockAnnounce, msg)
case msg := <-s.blockBuf:
safelyHandleMessage(func(message p2p.Message) {
data := message.Data.(*pb.BeaconBlockResponse)
s.processBlock(message.Ctx, data.Block, message.Peer)
}, msg)
case msg := <-s.stateBuf:
safelyHandleMessage(s.processState, msg)
case msg := <-s.batchedBlockBuf:
safelyHandleMessage(s.processBatchedBlocks, msg)
}
canonicalState, err := s.db.State(ctx)
if err != nil {
return fmt.Errorf("could not get state: %v", err)
}
}
// safelyHandleMessage will recover and log any panic that occurs from the
// function argument.
func safelyHandleMessage(fn func(p2p.Message), msg p2p.Message) {
defer func() {
if r := recover(); r != nil {
printedMsg := "message contains no data"
if msg.Data != nil {
printedMsg = proto.MarshalTextString(msg.Data)
}
log.WithFields(logrus.Fields{
"r": r,
"msg": printedMsg,
}).Error("Panicked when handling p2p message! Recovering...")
debug.PrintStack()
if msg.Ctx == nil {
return
}
if span := trace.FromContext(msg.Ctx); span != nil {
span.SetStatus(trace.Status{
Code: trace.StatusCodeInternal,
Message: fmt.Sprintf("Panic: %v", r),
})
}
}
}()
// Fingers crossed that it doesn't panic...
fn(msg)
log.Infof("Canonical state slot: %d", canonicalState.Slot-params.BeaconConfig().GenesisSlot)
log.Info("Exiting initial sync and starting normal sync")
s.syncService.ResumeSync()
s.cancel()
s.nodeIsSynced = true
return nil
}
// checkInMemoryBlocks is another routine which will run concurrently with the
@@ -293,332 +222,65 @@ func (s *InitialSync) checkInMemoryBlocks() {
}
s.mutex.Lock()
if block, ok := s.inMemoryBlocks[s.currentSlot+1]; ok && s.currentSlot+1 <= s.highestObservedSlot {
s.processBlock(s.ctx, block, p2p.AnyPeer)
s.processBlock(s.ctx, block)
}
s.mutex.Unlock()
}
}
}
// checkSyncStatus verifies if the beacon node is correctly synced with its peers up to their
// latest canonical head. If not, then it requests batched blocks up to the highest observed slot.
func (s *InitialSync) checkSyncStatus() bool {
if s.reqState {
if err := s.requestStateFromPeer(s.ctx, s.stateRootOfHighestObservedSlot[:], p2p.AnyPeer); err != nil {
log.Errorf("Could not request state from peer %v", err)
}
return false
}
if s.highestObservedSlot == s.currentSlot {
log.Info("Exiting initial sync and starting normal sync")
s.syncedFeed.Send(s.currentSlot)
s.syncService.ResumeSync()
return true
}
// requests multiple blocks so as to save and sync quickly.
s.requestBatchedBlocks(s.currentSlot+1, s.highestObservedSlot)
return false
}
func (s *InitialSync) processBlockAnnounce(msg p2p.Message) {
ctx, span := trace.StartSpan(msg.Ctx, "beacon-chain.sync.initial-sync.processBlockAnnounce")
defer span.End()
data := msg.Data.(*pb.BeaconBlockAnnounce)
recBlockAnnounce.Inc()
if s.reqState {
if err := s.requestStateFromPeer(ctx, s.stateRootOfHighestObservedSlot[:], msg.Peer); err != nil {
log.Errorf("Could not request state from peer %v", err)
}
return
}
if data.SlotNumber > s.highestObservedSlot {
s.highestObservedSlot = data.SlotNumber
}
s.requestBatchedBlocks(s.currentSlot+1, s.highestObservedSlot)
log.Debugf("Successfully requested the next block with slot: %d", data.SlotNumber-params.BeaconConfig().GenesisSlot)
}
// processBlock is the main method that validates each block which is received
// for initial sync. It checks if the blocks are valid and then will continue to
// process and save it into the db.
func (s *InitialSync) processBlock(ctx context.Context, block *pb.BeaconBlock, peerID peer.ID) {
ctx, span := trace.StartSpan(ctx, "beacon-chain.sync.initial-sync.processBlock")
defer span.End()
recBlock.Inc()
if block.Slot > s.highestObservedSlot {
s.highestObservedSlot = block.Slot
}
if block.Slot < s.currentSlot {
return
}
// requesting beacon state if there is no saved state.
if s.reqState {
if err := s.requestStateFromPeer(s.ctx, s.stateRootOfHighestObservedSlot[:], peerID); err != nil {
log.Errorf("Could not request beacon state from peer: %v", err)
}
return
}
// if it isn't the block in the next slot we check if it is a skipped slot.
// if it isn't skipped we save it in memory.
if block.Slot != (s.currentSlot + 1) {
// if parent exists we validate the block.
if s.doesParentExist(block) {
if err := s.validateAndSaveNextBlock(ctx, block); err != nil {
// Debug error so as not to have noisy error logs
if strings.HasPrefix(err.Error(), debugError) {
log.Debug(strings.TrimPrefix(err.Error(), debugError))
return
}
log.Errorf("Unable to save block: %v", err)
}
// listenForNewBlocks listens for block announcements beyond the canonical head slot that may
// be received during initial sync - we must process these blocks to catch up with peers.
func (s *InitialSync) listenForNewBlocks() {
blockAnnounceSub := s.p2p.Subscribe(&pb.BeaconBlockAnnounce{}, s.blockAnnounceBuf)
defer func() {
blockAnnounceSub.Unsubscribe()
close(s.blockAnnounceBuf)
}()
for {
select {
case <-s.ctx.Done():
return
case msg := <-s.blockAnnounceBuf:
safelyHandleMessage(s.processBlockAnnounce, msg)
}
s.mutex.Lock()
defer s.mutex.Unlock()
if _, ok := s.inMemoryBlocks[block.Slot]; !ok {
s.inMemoryBlocks[block.Slot] = block
}
return
}
}
// run is the main goroutine for the initial sync service.
// delayChan is explicitly passed into this function to facilitate tests that don't require a timeout.
// It is assumed that the goroutine `run` is only called once per instance.
func (s *InitialSync) run() {
blockSub := s.p2p.Subscribe(&pb.BeaconBlockResponse{}, s.blockBuf)
batchedBlocksub := s.p2p.Subscribe(&pb.BatchedBeaconBlockResponse{}, s.batchedBlockBuf)
beaconStateSub := s.p2p.Subscribe(&pb.BeaconStateResponse{}, s.stateBuf)
defer func() {
blockSub.Unsubscribe()
beaconStateSub.Unsubscribe()
batchedBlocksub.Unsubscribe()
close(s.batchedBlockBuf)
close(s.blockBuf)
close(s.stateBuf)
}()
if err := s.requestStateFromPeer(s.ctx, s.finalizedStateRoot); err != nil {
log.Errorf("Could not request state from peer %v", err)
}
if err := s.validateAndSaveNextBlock(ctx, block); err != nil {
// Debug error so as not to have noisy error logs
if strings.HasPrefix(err.Error(), debugError) {
log.Debug(strings.TrimPrefix(err.Error(), debugError))
for {
select {
case <-s.ctx.Done():
log.Debug("Exiting goroutine")
return
case msg := <-s.blockBuf:
safelyHandleMessage(func(message p2p.Message) {
data := message.Data.(*pb.BeaconBlockResponse)
s.processBlock(message.Ctx, data.Block)
}, msg)
case msg := <-s.stateBuf:
safelyHandleMessage(s.processState, msg)
case msg := <-s.batchedBlockBuf:
safelyHandleMessage(s.processBatchedBlocks, msg)
}
log.Errorf("Unable to save block: %v", err)
}
}
// processBatchedBlocks processes all the received blocks from
// the p2p message.
func (s *InitialSync) processBatchedBlocks(msg p2p.Message) {
ctx, span := trace.StartSpan(msg.Ctx, "beacon-chain.sync.initial-sync.processBatchedBlocks")
defer span.End()
batchedBlockReq.Inc()
response := msg.Data.(*pb.BatchedBeaconBlockResponse)
batchedBlocks := response.BatchedBlocks
if len(batchedBlocks) == 0 {
// Do not process empty response
return
}
log.Debug("Processing batched block response")
for _, block := range batchedBlocks {
s.processBlock(ctx, block, msg.Peer)
}
log.Debug("Finished processing batched blocks")
}
func (s *InitialSync) processState(msg p2p.Message) {
ctx, span := trace.StartSpan(msg.Ctx, "beacon-chain.sync.initial-sync.processState")
defer span.End()
data := msg.Data.(*pb.BeaconStateResponse)
beaconState := data.BeaconState
recState.Inc()
if s.currentSlot > beaconState.FinalizedEpoch*params.BeaconConfig().SlotsPerEpoch {
return
}
if err := s.db.SaveCurrentAndFinalizedState(ctx, beaconState); err != nil {
log.Errorf("Unable to set beacon state for initial sync %v", err)
return
}
if err := s.db.SaveFinalizedBlock(beaconState.LatestBlock); err != nil {
log.Errorf("Could not save finalized block %v", err)
return
}
if err := s.db.SaveBlock(beaconState.LatestBlock); err != nil {
log.Errorf("Could not save block %v", err)
return
}
if err := s.db.UpdateChainHead(ctx, beaconState.LatestBlock, beaconState); err != nil {
log.Errorf("Could not update chainhead %v", err)
return
}
if err := s.db.SaveJustifiedState(beaconState); err != nil {
log.Errorf("Could not set beacon state for initial sync %v", err)
return
}
if err := s.db.SaveJustifiedBlock(beaconState.LatestBlock); err != nil {
log.Errorf("Could not save finalized block %v", err)
return
}
h, err := hashutil.HashProto(beaconState)
if err != nil {
log.Error(err)
return
}
exists, blkNum, err := s.powchain.BlockExists(ctx, bytesutil.ToBytes32(beaconState.LatestEth1Data.BlockHash32))
if err != nil {
log.Errorf("Unable to get powchain block %v", err)
}
if !exists {
log.Error("Latest ETH1 block doesn't exist in the pow chain")
return
}
s.db.PrunePendingDeposits(ctx, blkNum)
if h == s.stateRootOfHighestObservedSlot {
s.reqState = false
}
// sets the current slot to the last finalized slot of the
// beacon state to begin our sync from.
s.currentSlot = beaconState.FinalizedEpoch * params.BeaconConfig().SlotsPerEpoch
s.beaconStateSlot = beaconState.Slot
log.Debugf("Successfully saved beacon state with the last finalized slot: %d", beaconState.FinalizedEpoch*params.BeaconConfig().SlotsPerEpoch-params.BeaconConfig().GenesisSlot)
s.requestBatchedBlocks(s.currentSlot+1, s.highestObservedSlot)
}
// requestStateFromPeer always requests for the last finalized slot from a peer.
func (s *InitialSync) requestStateFromPeer(ctx context.Context, stateRoot []byte, peerID peer.ID) error {
ctx, span := trace.StartSpan(ctx, "beacon-chain.sync.initial-sync.requestStateFromPeer")
defer span.End()
stateReq.Inc()
log.Debugf("Successfully processed incoming block with state hash: %#x", stateRoot)
return s.p2p.Send(ctx, &pb.BeaconStateRequest{FinalizedStateRootHash32S: stateRoot}, peerID)
}
// requestNextBlock broadcasts a request for a block with the entered slotnumber.
func (s *InitialSync) requestNextBlockBySlot(ctx context.Context, slotNumber uint64) {
ctx, span := trace.StartSpan(ctx, "beacon-chain.sync.initial-sync.requestBlockBySlot")
defer span.End()
log.Debugf("Requesting block %d ", slotNumber)
blockReqSlot.Inc()
s.mutex.Lock()
defer s.mutex.Unlock()
if block, ok := s.inMemoryBlocks[slotNumber]; ok {
s.processBlock(ctx, block, p2p.AnyPeer)
return
}
s.p2p.Broadcast(ctx, &pb.BeaconBlockRequestBySlotNumber{SlotNumber: slotNumber})
}
// requestBatchedBlocks sends out a request for multiple blocks till a
// specified bound slot number.
func (s *InitialSync) requestBatchedBlocks(startSlot uint64, endSlot uint64) {
ctx, span := trace.StartSpan(context.Background(), "beacon-chain.sync.initial-sync.requestBatchedBlocks")
defer span.End()
sentBatchedBlockReq.Inc()
if startSlot > endSlot {
log.Debugf("Invalid batched request from slot %d to %d", startSlot-params.BeaconConfig().GenesisSlot, endSlot-params.BeaconConfig().GenesisSlot)
return
}
blockLimit := params.BeaconConfig().BatchBlockLimit
if startSlot+blockLimit < endSlot {
endSlot = startSlot + blockLimit
}
log.Debugf("Requesting batched blocks from slot %d to %d", startSlot-params.BeaconConfig().GenesisSlot, endSlot-params.BeaconConfig().GenesisSlot)
s.p2p.Broadcast(ctx, &pb.BatchedBeaconBlockRequest{
StartSlot: startSlot,
EndSlot: endSlot,
})
}
// validateAndSaveNextBlock will validate whether blocks received from the blockfetcher
// routine can be added to the chain.
func (s *InitialSync) validateAndSaveNextBlock(ctx context.Context, block *pb.BeaconBlock) error {
ctx, span := trace.StartSpan(ctx, "beacon-chain.sync.initial-sync.validateAndSaveNextBlock")
defer span.End()
root, err := hashutil.HashBeaconBlock(block)
if err != nil {
return err
}
if err := s.checkBlockValidity(ctx, block); err != nil {
return err
}
log.Infof("Saved block with root %#x and slot %d for initial sync", root, block.Slot)
s.currentSlot = block.Slot
s.mutex.Lock()
defer s.mutex.Unlock()
// delete block from memory.
if _, ok := s.inMemoryBlocks[block.Slot]; ok {
delete(s.inMemoryBlocks, block.Slot)
}
// since the block will not be processed by chainservice we save
// the block and do not send it to chainservice.
if s.beaconStateSlot >= block.Slot {
if err := s.db.SaveBlock(block); err != nil {
return err
}
return nil
}
// Send block to main chain service to be processed.
beaconState, err := s.db.State(ctx)
if err != nil {
return fmt.Errorf("could not fetch state: %v", err)
}
if err := s.chainService.VerifyBlockValidity(block, beaconState); err != nil {
return fmt.Errorf("block not valid: %v", err)
}
if err := s.db.SaveBlock(block); err != nil {
return fmt.Errorf("could not save block: %v", err)
}
beaconState, err = s.chainService.ApplyBlockStateTransition(ctx, block, beaconState)
if err != nil {
return fmt.Errorf("could not process beacon block: %v", err)
}
if err := s.chainService.ApplyForkChoiceRule(ctx, block, beaconState); err != nil {
return fmt.Errorf("could not apply fork choice rule: %v", err)
}
return nil
}
func (s *InitialSync) checkBlockValidity(ctx context.Context, block *pb.BeaconBlock) error {
ctx, span := trace.StartSpan(ctx, "beacon-chain.sync.initial-sync.checkBlockValidity")
defer span.End()
blockRoot, err := hashutil.HashBeaconBlock(block)
if err != nil {
return fmt.Errorf("could not tree hash received block: %v", err)
}
if s.db.HasBlock(blockRoot) {
return errors.New(debugError + "received a block that already exists. Exiting")
}
beaconState, err := s.db.State(ctx)
if err != nil {
return fmt.Errorf("failed to get beacon state: %v", err)
}
if block.Slot < beaconState.FinalizedEpoch*params.BeaconConfig().SlotsPerEpoch {
return errors.New(debugError + "discarding received block with a slot number smaller than the last finalized slot")
}
// Attestation from proposer not verified as, other nodes only store blocks not proposer
// attestations.
return nil
}
// isSlotDiff large checks if the difference between the current slot and highest observed
// slot isnt too large.
func (s *InitialSync) isSlotDiffLarge() bool {
slotsPerEpoch := params.BeaconConfig().SlotsPerEpoch
epochLimit := params.BeaconConfig().SyncEpochLimit
return s.currentSlot+slotsPerEpoch*epochLimit < s.highestObservedSlot
}
func (s *InitialSync) doesParentExist(block *pb.BeaconBlock) bool {
parentHash := bytesutil.ToBytes32(block.ParentRootHash32)
return s.db.HasBlock(parentHash)
}

View File

@@ -2,14 +2,14 @@ package initialsync
import (
"context"
"fmt"
"math/big"
"testing"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/gogo/protobuf/proto"
peer "github.com/libp2p/go-libp2p-peer"
b "github.com/prysmaticlabs/prysm/beacon-chain/core/blocks"
"github.com/prysmaticlabs/prysm/beacon-chain/core/state"
"github.com/prysmaticlabs/prysm/beacon-chain/db"
"github.com/prysmaticlabs/prysm/beacon-chain/internal"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
@@ -51,21 +51,39 @@ func (ms *mockSyncService) ResumeSync() {
}
type mockPowchain struct{}
func (mp *mockPowchain) BlockExists(ctx context.Context, hash common.Hash) (bool, *big.Int, error) {
return true, nil, nil
}
type mockChainService struct{}
func (m *mockChainService) ApplyBlockStateTransition(
func (ms *mockChainService) CanonicalBlockFeed() *event.Feed {
return new(event.Feed)
}
func (ms *mockChainService) ReceiveBlock(ctx context.Context, block *pb.BeaconBlock) (*pb.BeaconState, error) {
return &pb.BeaconState{}, nil
}
func (ms *mockChainService) ApplyBlockStateTransition(
ctx context.Context, block *pb.BeaconBlock, beaconState *pb.BeaconState,
) (*pb.BeaconState, error) {
return &pb.BeaconState{}, nil
}
func (m *mockChainService) VerifyBlockValidity(
func (ms *mockChainService) VerifyBlockValidity(
block *pb.BeaconBlock, beaconState *pb.BeaconState,
) error {
return nil
}
func (m *mockChainService) ApplyForkChoiceRule(ctx context.Context, block *pb.BeaconBlock, computedState *pb.BeaconState) error {
func (ms *mockChainService) ApplyForkChoiceRule(ctx context.Context, block *pb.BeaconBlock, computedState *pb.BeaconState) error {
return nil
}
func (ms *mockChainService) CleanupBlockOperations(ctx context.Context, block *pb.BeaconBlock) error {
return nil
}
@@ -97,52 +115,72 @@ func setUpGenesisStateAndBlock(beaconDB *db.BeaconDB, t *testing.T) {
func TestSavingBlock_InSync(t *testing.T) {
hook := logTest.NewGlobal()
db := internal.SetupDB(t)
defer internal.TeardownDB(t, db)
setUpGenesisStateAndBlock(db, t)
cfg := &Config{
P2P: &mockP2P{},
SyncService: &mockSyncService{},
BeaconDB: db,
ChainService: &mockChainService{},
BeaconDB: db,
PowChain: &mockPowchain{},
}
ss := NewInitialSyncService(context.Background(), cfg)
ss.reqState = false
exitRoutine := make(chan bool)
delayChan := make(chan time.Time)
defer func() {
close(exitRoutine)
close(delayChan)
}()
go func() {
ss.run(delayChan)
ss.run()
ss.listenForNewBlocks()
exitRoutine <- true
}()
genericHash := make([]byte, 32)
genericHash[0] = 'a'
beaconState := &pb.BeaconState{
FinalizedEpoch: params.BeaconConfig().GenesisSlot + 1,
fState := &pb.BeaconState{
FinalizedEpoch: params.BeaconConfig().GenesisEpoch + 1,
LatestBlock: &pb.BeaconBlock{
Slot: params.BeaconConfig().GenesisSlot + params.BeaconConfig().SlotsPerEpoch,
},
LatestEth1Data: &pb.Eth1Data{
BlockHash32: []byte{},
},
}
jState := &pb.BeaconState{
JustifiedEpoch: params.BeaconConfig().GenesisEpoch + 2,
LatestBlock: &pb.BeaconBlock{
Slot: params.BeaconConfig().GenesisSlot + 2*params.BeaconConfig().SlotsPerEpoch,
},
}
stateResponse := &pb.BeaconStateResponse{
BeaconState: beaconState,
FinalizedState: fState,
JustifiedState: jState,
CanonicalState: jState,
}
incorrectState := &pb.BeaconState{
FinalizedEpoch: params.BeaconConfig().GenesisSlot,
JustifiedEpoch: params.BeaconConfig().GenesisSlot + 1,
FinalizedEpoch: params.BeaconConfig().GenesisEpoch,
JustifiedEpoch: params.BeaconConfig().GenesisEpoch + 1,
LatestBlock: &pb.BeaconBlock{
Slot: params.BeaconConfig().GenesisSlot + 4*params.BeaconConfig().SlotsPerEpoch,
},
LatestEth1Data: &pb.Eth1Data{
BlockHash32: []byte{},
},
}
incorrectStateResponse := &pb.BeaconStateResponse{
BeaconState: incorrectState,
FinalizedState: incorrectState,
JustifiedState: incorrectState,
CanonicalState: incorrectState,
}
stateRoot, err := hashutil.HashProto(beaconState)
stateRoot, err := hashutil.HashProto(fState)
if err != nil {
t.Fatalf("unable to tree hash state: %v", err)
}
@@ -187,19 +225,12 @@ func TestSavingBlock_InSync(t *testing.T) {
ss.stateBuf <- msg2
if ss.currentSlot == incorrectStateResponse.BeaconState.FinalizedEpoch*params.BeaconConfig().SlotsPerEpoch {
t.Fatalf("Beacon state updated incorrectly: %d", ss.currentSlot)
}
msg2.Data = stateResponse
ss.stateBuf <- msg2
msg1 = getBlockResponseMsg(params.BeaconConfig().GenesisSlot + 1)
ss.blockBuf <- msg1
if params.BeaconConfig().GenesisSlot+1 != ss.currentSlot {
t.Fatalf("Slot saved when it was not supposed too: %v", stateResponse.BeaconState.FinalizedEpoch*params.BeaconConfig().SlotsPerEpoch)
}
msg1 = getBlockResponseMsg(params.BeaconConfig().GenesisSlot + 2)
ss.blockBuf <- msg1
@@ -207,13 +238,8 @@ func TestSavingBlock_InSync(t *testing.T) {
ss.cancel()
<-exitRoutine
br := msg1.Data.(*pb.BeaconBlockResponse)
if br.Block.Slot != ss.currentSlot {
t.Fatalf("Slot not updated despite receiving a valid block: %v", ss.currentSlot)
}
hook.Reset()
internal.TeardownDB(t, db)
}
func TestProcessingBatchedBlocks_OK(t *testing.T) {
@@ -224,11 +250,10 @@ func TestProcessingBatchedBlocks_OK(t *testing.T) {
cfg := &Config{
P2P: &mockP2P{},
SyncService: &mockSyncService{},
BeaconDB: db,
ChainService: &mockChainService{},
BeaconDB: db,
}
ss := NewInitialSyncService(context.Background(), cfg)
ss.reqState = false
batchSize := 20
batchedBlocks := make([]*pb.BeaconBlock, batchSize)
@@ -250,10 +275,10 @@ func TestProcessingBatchedBlocks_OK(t *testing.T) {
ss.processBatchedBlocks(msg)
if ss.currentSlot != expectedSlot {
t.Errorf("Expected slot %d not equal to current slot %d", expectedSlot, ss.currentSlot)
t.Errorf("Expected slot %d equal to current slot %d", expectedSlot, ss.currentSlot)
}
if ss.highestObservedSlot != expectedSlot {
if ss.highestObservedSlot == expectedSlot {
t.Errorf("Expected slot %d not equal to highest observed slot slot %d", expectedSlot, ss.highestObservedSlot)
}
}
@@ -266,14 +291,17 @@ func TestProcessingBlocks_SkippedSlots(t *testing.T) {
cfg := &Config{
P2P: &mockP2P{},
SyncService: &mockSyncService{},
BeaconDB: db,
ChainService: &mockChainService{},
BeaconDB: db,
}
ss := NewInitialSyncService(context.Background(), cfg)
ss.reqState = false
batchSize := 20
expectedSlot := params.BeaconConfig().GenesisSlot + uint64(batchSize)
ss.highestObservedSlot = expectedSlot
ss.highestObservedCanonicalState = &pb.BeaconState{
Slot: expectedSlot,
}
blk, err := ss.db.BlockBySlot(params.BeaconConfig().GenesisSlot)
if err != nil {
t.Fatalf("Unable to get genesis block %v", err)
@@ -294,7 +322,7 @@ func TestProcessingBlocks_SkippedSlots(t *testing.T) {
ParentRootHash32: parentHash,
}
ss.processBlock(context.Background(), block, p2p.AnyPeer)
ss.processBlock(context.Background(), block)
// Save the block and set the parent hash of the next block
// as the hash of the current block.
@@ -307,207 +335,18 @@ func TestProcessingBlocks_SkippedSlots(t *testing.T) {
t.Fatalf("Could not hash block %v", err)
}
parentHash = hash[:]
ss.latestSyncedBlock = block
}
if ss.currentSlot != expectedSlot {
t.Errorf("Expected slot %d not equal to current slot %d", expectedSlot, ss.currentSlot)
t.Errorf("Expected slot %d equal to current slot %d", expectedSlot, ss.currentSlot)
}
if ss.highestObservedSlot != expectedSlot {
t.Errorf("Expected slot %d not equal to highest observed slot %d", expectedSlot, ss.highestObservedSlot)
t.Errorf("Expected slot %d equal to highest observed slot %d", expectedSlot, ss.highestObservedSlot)
}
}
func TestDelayChan_OK(t *testing.T) {
hook := logTest.NewGlobal()
db := internal.SetupDB(t)
defer internal.TeardownDB(t, db)
setUpGenesisStateAndBlock(db, t)
cfg := &Config{
P2P: &mockP2P{},
SyncService: &mockSyncService{},
BeaconDB: db,
ChainService: &mockChainService{},
}
ss := NewInitialSyncService(context.Background(), cfg)
ss.reqState = false
exitRoutine := make(chan bool)
delayChan := make(chan time.Time)
defer func() {
close(exitRoutine)
close(delayChan)
}()
go func() {
ss.run(delayChan)
exitRoutine <- true
}()
genericHash := make([]byte, 32)
genericHash[0] = 'a'
beaconState := &pb.BeaconState{
FinalizedEpoch: params.BeaconConfig().GenesisSlot + 1,
}
stateResponse := &pb.BeaconStateResponse{
BeaconState: beaconState,
}
stateRoot, err := hashutil.HashProto(beaconState)
if err != nil {
t.Fatalf("unable to tree hash state: %v", err)
}
beaconStateRootHash32 := stateRoot
block := &pb.BeaconBlock{
Eth1Data: &pb.Eth1Data{
DepositRootHash32: []byte{1, 2, 3},
BlockHash32: []byte{4, 5, 6},
},
ParentRootHash32: genericHash,
Slot: params.BeaconConfig().GenesisSlot + 1,
StateRootHash32: beaconStateRootHash32[:],
}
blockResponse := &pb.BeaconBlockResponse{
Block: block,
}
msg1 := p2p.Message{
Peer: "",
Data: blockResponse,
Ctx: context.Background(),
}
msg2 := p2p.Message{
Peer: "",
Data: stateResponse,
Ctx: context.Background(),
}
ss.blockBuf <- msg1
ss.stateBuf <- msg2
blockResponse.Block.Slot = params.BeaconConfig().GenesisSlot + 1
msg1.Data = blockResponse
ss.blockBuf <- msg1
delayChan <- time.Time{}
ss.cancel()
<-exitRoutine
testutil.AssertLogsContain(t, hook, "Exiting initial sync and starting normal sync")
hook.Reset()
}
func TestRequestBlocksBySlot_OK(t *testing.T) {
hook := logTest.NewGlobal()
db := internal.SetupDB(t)
defer internal.TeardownDB(t, db)
setUpGenesisStateAndBlock(db, t)
ctx := context.Background()
cfg := &Config{
P2P: &mockP2P{},
SyncService: &mockSyncService{},
ChainService: &mockChainService{},
BeaconDB: db,
BlockBufferSize: 100,
}
ss := NewInitialSyncService(context.Background(), cfg)
newState, err := state.GenesisBeaconState(nil, 0, nil)
if err != nil {
t.Fatalf("could not create new state %v", err)
}
err = ss.db.SaveState(ctx, newState)
if err != nil {
t.Fatalf("could not save beacon state %v", err)
}
ss.reqState = false
exitRoutine := make(chan bool)
delayChan := make(chan time.Time)
defer func() {
close(exitRoutine)
close(delayChan)
}()
go func() {
ss.run(delayChan)
exitRoutine <- true
}()
genericHash := make([]byte, 32)
genericHash[0] = 'a'
getBlockResponseMsg := func(Slot uint64) (p2p.Message, [32]byte) {
block := &pb.BeaconBlock{
Eth1Data: &pb.Eth1Data{
DepositRootHash32: []byte{1, 2, 3},
BlockHash32: []byte{4, 5, 6},
},
ParentRootHash32: genericHash,
Slot: Slot,
StateRootHash32: nil,
}
blockResponse := &pb.BeaconBlockResponse{
Block: block,
}
root, err := hashutil.HashBeaconBlock(block)
if err != nil {
t.Fatalf("unable to tree hash block %v", err)
}
return p2p.Message{
Peer: "",
Data: blockResponse,
Ctx: context.Background(),
}, root
}
// sending all blocks except for the genesis block
startSlot := 1 + params.BeaconConfig().GenesisSlot
for i := startSlot; i < startSlot+10; i++ {
response, _ := getBlockResponseMsg(i)
ss.blockBuf <- response
}
initialResponse, _ := getBlockResponseMsg(1 + params.BeaconConfig().GenesisSlot)
//sending genesis block
ss.blockBuf <- initialResponse
_, hash := getBlockResponseMsg(9 + params.BeaconConfig().GenesisSlot)
expString := fmt.Sprintf("Saved block with root %#x and slot %d for initial sync",
hash, 9+params.BeaconConfig().GenesisSlot)
// waiting for the current slot to come up to the
// expected one.
testutil.WaitForLog(t, hook, expString)
delayChan <- time.Time{}
ss.cancel()
<-exitRoutine
testutil.AssertLogsContain(t, hook, "Exiting initial sync and starting normal sync")
hook.Reset()
}
func TestSafelyHandleMessage(t *testing.T) {
hook := logTest.NewGlobal()

View File

@@ -0,0 +1,172 @@
package initialsync
import (
"context"
"errors"
"strings"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/hashutil"
"github.com/prysmaticlabs/prysm/shared/p2p"
"github.com/prysmaticlabs/prysm/shared/params"
"go.opencensus.io/trace"
)
func (s *InitialSync) processBlockAnnounce(msg p2p.Message) {
_, span := trace.StartSpan(msg.Ctx, "beacon-chain.sync.initial-sync.processBlockAnnounce")
defer span.End()
data := msg.Data.(*pb.BeaconBlockAnnounce)
recBlockAnnounce.Inc()
if s.stateReceived && data.SlotNumber > s.highestObservedSlot {
s.requestBatchedBlocks(s.lastRequestedSlot, data.SlotNumber)
s.lastRequestedSlot = data.SlotNumber
}
}
// processBlock is the main method that validates each block which is received
// for initial sync. It checks if the blocks are valid and then will continue to
// process and save it into the db.
func (s *InitialSync) processBlock(ctx context.Context, block *pb.BeaconBlock) {
ctx, span := trace.StartSpan(ctx, "beacon-chain.sync.initial-sync.processBlock")
defer span.End()
recBlock.Inc()
if block.Slot == s.highestObservedSlot {
s.currentSlot = s.highestObservedSlot
if err := s.exitInitialSync(s.ctx); err != nil {
log.Errorf("Could not exit initial sync: %v", err)
return
}
return
}
if block.Slot < s.currentSlot {
return
}
// if it isn't the block in the next slot we check if it is a skipped slot.
// if it isn't skipped we save it in memory.
if block.Slot != (s.currentSlot + 1) {
// if parent exists we validate the block.
if s.doesParentExist(block) {
if err := s.validateAndSaveNextBlock(ctx, block); err != nil {
// Debug error so as not to have noisy error logs
if strings.HasPrefix(err.Error(), debugError) {
log.Debug(strings.TrimPrefix(err.Error(), debugError))
return
}
log.Errorf("Unable to save block: %v", err)
}
return
}
s.mutex.Lock()
defer s.mutex.Unlock()
if _, ok := s.inMemoryBlocks[block.Slot]; !ok {
s.inMemoryBlocks[block.Slot] = block
}
return
}
if err := s.validateAndSaveNextBlock(ctx, block); err != nil {
// Debug error so as not to have noisy error logs
if strings.HasPrefix(err.Error(), debugError) {
log.Debug(strings.TrimPrefix(err.Error(), debugError))
return
}
log.Errorf("Unable to save block: %v", err)
}
}
// processBatchedBlocks processes all the received blocks from
// the p2p message.
func (s *InitialSync) processBatchedBlocks(msg p2p.Message) {
ctx, span := trace.StartSpan(msg.Ctx, "beacon-chain.sync.initial-sync.processBatchedBlocks")
defer span.End()
batchedBlockReq.Inc()
response := msg.Data.(*pb.BatchedBeaconBlockResponse)
batchedBlocks := response.BatchedBlocks
if len(batchedBlocks) == 0 {
// Do not process empty responses.
return
}
log.Debug("Processing batched block response")
for _, block := range batchedBlocks {
s.processBlock(ctx, block)
}
log.Debug("Finished processing batched blocks")
}
// requestBatchedBlocks sends out a request for multiple blocks till a
// specified bound slot number.
func (s *InitialSync) requestBatchedBlocks(startSlot uint64, endSlot uint64) {
ctx, span := trace.StartSpan(context.Background(), "beacon-chain.sync.initial-sync.requestBatchedBlocks")
defer span.End()
sentBatchedBlockReq.Inc()
if startSlot > endSlot {
log.Debugf(
"Invalid batched request from slot %d to %d",
startSlot-params.BeaconConfig().GenesisSlot, endSlot-params.BeaconConfig().GenesisSlot,
)
return
}
blockLimit := params.BeaconConfig().BatchBlockLimit
if startSlot+blockLimit < endSlot {
endSlot = startSlot + blockLimit
}
log.Debugf(
"Requesting batched blocks from slot %d to %d",
startSlot-params.BeaconConfig().GenesisSlot, endSlot-params.BeaconConfig().GenesisSlot,
)
s.p2p.Broadcast(ctx, &pb.BatchedBeaconBlockRequest{
StartSlot: startSlot,
EndSlot: endSlot,
})
}
// validateAndSaveNextBlock will validate whether blocks received from the blockfetcher
// routine can be added to the chain.
func (s *InitialSync) validateAndSaveNextBlock(ctx context.Context, block *pb.BeaconBlock) error {
ctx, span := trace.StartSpan(ctx, "beacon-chain.sync.initial-sync.validateAndSaveNextBlock")
defer span.End()
if block == nil {
return errors.New("received nil block")
}
root, err := hashutil.HashBeaconBlock(block)
if err != nil {
return err
}
if err := s.checkBlockValidity(ctx, block); err != nil {
return err
}
log.Infof("Saving block with root %#x and slot %d for initial sync", root, block.Slot-params.BeaconConfig().GenesisSlot)
s.currentSlot = block.Slot
s.latestSyncedBlock = block
s.mutex.Lock()
defer s.mutex.Unlock()
// delete block from memory.
if _, ok := s.inMemoryBlocks[block.Slot]; ok {
delete(s.inMemoryBlocks, block.Slot)
}
state, err := s.db.State(ctx)
if err != nil {
return err
}
if err := s.chainService.VerifyBlockValidity(block, state); err != nil {
return err
}
if err := s.db.SaveBlock(block); err != nil {
return err
}
state, err = s.chainService.ApplyBlockStateTransition(ctx, block, state)
if err != nil {
return err
}
if err := s.chainService.CleanupBlockOperations(ctx, block); err != nil {
return err
}
return s.db.UpdateChainHead(ctx, block, state)
}

View File

@@ -0,0 +1,111 @@
package initialsync
import (
"context"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/p2p"
"github.com/prysmaticlabs/prysm/shared/params"
"go.opencensus.io/trace"
)
func (s *InitialSync) processState(msg p2p.Message) {
ctx, span := trace.StartSpan(msg.Ctx, "beacon-chain.sync.initial-sync.processState")
defer span.End()
data := msg.Data.(*pb.BeaconStateResponse)
finalizedState := data.FinalizedState
justifiedState := data.JustifiedState
canonicalState := data.CanonicalState
recState.Inc()
if err := s.db.SaveFinalizedState(finalizedState); err != nil {
log.Errorf("Unable to set received last finalized state in db: %v", err)
return
}
if err := s.db.SaveHistoricalState(finalizedState); err != nil {
log.Errorf("Could not save new historical state: %v", err)
return
}
if err := s.db.SaveFinalizedBlock(finalizedState.LatestBlock); err != nil {
log.Errorf("Could not save finalized block %v", err)
return
}
if err := s.db.SaveBlock(finalizedState.LatestBlock); err != nil {
log.Errorf("Could not save block %v", err)
return
}
if err := s.db.SaveJustifiedState(justifiedState); err != nil {
log.Errorf("Could not set beacon state for initial sync %v", err)
return
}
if err := s.db.SaveHistoricalState(justifiedState); err != nil {
log.Errorf("Could not save new historical state: %v", err)
return
}
if err := s.db.SaveJustifiedBlock(justifiedState.LatestBlock); err != nil {
log.Errorf("Could not save finalized block %v", err)
return
}
if err := s.db.SaveBlock(justifiedState.LatestBlock); err != nil {
log.Errorf("Could not save finalized block %v", err)
return
}
exists, blkNum, err := s.powchain.BlockExists(ctx, bytesutil.ToBytes32(finalizedState.LatestEth1Data.BlockHash32))
if err != nil {
log.Errorf("Unable to get powchain block %v", err)
}
if !exists {
log.Error("Latest ETH1 block doesn't exist in the pow chain")
return
}
s.db.PrunePendingDeposits(ctx, blkNum)
if err := s.db.SaveBlock(canonicalState.LatestBlock); err != nil {
log.Errorf("Could not save block %v", err)
return
}
if err := s.db.UpdateChainHead(ctx, finalizedState.LatestBlock, finalizedState); err != nil {
log.Errorf("Could not update chain head %v", err)
return
}
if err := s.db.SaveHistoricalState(canonicalState); err != nil {
log.Errorf("Could not save new historical state: %v", err)
return
}
// sets the current slot to the last finalized slot of the
// beacon state to begin our sync from.
s.currentSlot = finalizedState.Slot
s.stateReceived = true
s.highestObservedCanonicalState = canonicalState
s.highestObservedSlot = canonicalState.Slot
log.Debugf(
"Successfully saved beacon state with the last finalized slot: %d, canonical slot: %d",
finalizedState.Slot-params.BeaconConfig().GenesisSlot,
canonicalState.Slot-params.BeaconConfig().GenesisSlot,
)
s.requestBatchedBlocks(s.currentSlot+1, s.highestObservedSlot)
s.lastRequestedSlot = s.highestObservedSlot
}
// requestStateFromPeer requests for the canonical state, finalized state, and justified state from a peer.
func (s *InitialSync) requestStateFromPeer(ctx context.Context, lastFinalizedRoot [32]byte) error {
ctx, span := trace.StartSpan(ctx, "beacon-chain.sync.initial-sync.requestStateFromPeer")
defer span.End()
stateReq.Inc()
return s.p2p.Send(ctx, &pb.BeaconStateRequest{
FinalizedStateRootHash32S: lastFinalizedRoot[:],
}, p2p.AnyPeer)
}

View File

@@ -120,7 +120,6 @@ func (q *Querier) Stop() error {
}
func (q *Querier) listenForStateInitialization() {
sub := q.chainService.StateInitializedFeed().Subscribe(q.chainStartBuf)
defer sub.Unsubscribe()
for {
@@ -140,9 +139,7 @@ func (q *Querier) listenForStateInitialization() {
}
func (q *Querier) run() {
responseSub := q.p2p.Subscribe(&pb.ChainHeadResponse{}, q.responseBuf)
// Ticker so that service will keep on requesting for chain head
// until they get a response.
ticker := time.NewTicker(1 * time.Second)
@@ -164,7 +161,7 @@ func (q *Querier) run() {
q.RequestLatestHead()
case msg := <-q.responseBuf:
response := msg.Data.(*pb.ChainHeadResponse)
queryLog.Infof("Latest chain head is at slot: %d and hash %#x", response.Slot, response.Hash)
queryLog.Infof("Latest chain head is at slot: %d and hash %#x", response.Slot-params.BeaconConfig().GenesisSlot, response.Hash)
q.currentHeadSlot = response.Slot
q.currentHeadHash = response.Hash
q.currentFinalizedStateRoot = bytesutil.ToBytes32(response.FinalizedStateRootHash32S)
@@ -183,7 +180,7 @@ func (q *Querier) RequestLatestHead() {
q.p2p.Broadcast(context.Background(), request)
}
// IsSynced checks if the node is cuurently synced with the
// IsSynced checks if the node is currently synced with the
// rest of the network.
func (q *Querier) IsSynced() (bool, error) {
if q.chainStarted && q.atGenesis {

View File

@@ -12,6 +12,7 @@ import (
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/event"
"github.com/prysmaticlabs/prysm/shared/p2p"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/testutil"
logTest "github.com/sirupsen/logrus/hooks/test"
)
@@ -139,20 +140,14 @@ func TestQuerier_ChainReqResponse(t *testing.T) {
sq := NewQuerierService(context.Background(), cfg)
exitRoutine := make(chan bool)
defer func() {
close(exitRoutine)
}()
go func() {
sq.run()
exitRoutine <- true
}()
response := &pb.ChainHeadResponse{
Slot: 0,
Hash: []byte{'a', 'b'},
FinalizedStateRootHash32S: []byte{'c', 'd'},
Slot: 0,
Hash: []byte{'a', 'b'},
}
msg := p2p.Message{
@@ -161,12 +156,13 @@ func TestQuerier_ChainReqResponse(t *testing.T) {
sq.responseBuf <- msg
expMsg := fmt.Sprintf("Latest chain head is at slot: %d and hash %#x", response.Slot, response.Hash)
expMsg := fmt.Sprintf(
"Latest chain head is at slot: %d and hash %#x",
response.Slot-params.BeaconConfig().GenesisSlot, response.Hash,
)
testutil.WaitForLog(t, hook, expMsg)
sq.cancel()
<-exitRoutine
testutil.AssertLogsContain(t, hook, expMsg)
close(exitRoutine)
hook.Reset()
}

View File

@@ -22,6 +22,12 @@ func (rs *RegularSync) receiveBlockAnnounce(msg p2p.Message) error {
data := msg.Data.(*pb.BeaconBlockAnnounce)
h := bytesutil.ToBytes32(data.Hash[:32])
// This prevents us from processing a block announcement we have already received.
// TODO(#2072): If the peer failed to give the block, broadcast request to the whole network.
if _, ok := rs.blockAnnouncements[data.SlotNumber]; ok {
return nil
}
hasBlock := rs.db.HasBlock(h)
span.AddAttributes(trace.BoolAttribute("hasBlock", hasBlock))
@@ -36,6 +42,7 @@ func (rs *RegularSync) receiveBlockAnnounce(msg p2p.Message) error {
log.Error(err)
return err
}
rs.blockAnnouncements[data.SlotNumber] = data.Hash
sentBlockReq.Inc()
return nil
}
@@ -99,15 +106,12 @@ func (rs *RegularSync) receiveBlock(msg p2p.Message) error {
beaconState, err = rs.chainService.ReceiveBlock(ctx, block)
if err != nil {
log.Errorf("Could not process beacon block: %v", err)
return err
}
if err := rs.chainService.ApplyForkChoiceRule(ctx, block, beaconState); err != nil {
log.Errorf("could not apply fork choice rule: %v", err)
return err
}
// We clear the block root from the pending processing map.
rs.clearPendingBlock(blockRoot)
sentBlocks.Inc()
// We update the last observed slot to the received canonical block's slot.
@@ -120,6 +124,8 @@ func (rs *RegularSync) receiveBlock(msg p2p.Message) error {
// and call receiveBlock recursively. The recursive function call will stop once
// the block we process no longer has children.
if child, ok := rs.hasChild(blockRoot); ok {
// We clear the block root from the pending processing map.
rs.clearPendingBlock(blockRoot)
return rs.receiveBlock(child)
}
return nil

View File

@@ -86,6 +86,7 @@ type RegularSync struct {
highestObservedSlot uint64
blocksAwaitingProcessing map[[32]byte]p2p.Message
blocksAwaitingProcessingLock sync.RWMutex
blockAnnouncements map[uint64][]byte
}
// RegularSyncConfig allows the channel's buffer sizes to be changed.
@@ -152,6 +153,7 @@ func NewRegularSyncService(ctx context.Context, cfg *RegularSyncConfig) *Regular
chainHeadReqBuf: make(chan p2p.Message, cfg.ChainHeadReqBufferSize),
canonicalBuf: make(chan *pb.BeaconBlockAnnounce, cfg.CanonicalBufferSize),
blocksAwaitingProcessing: make(map[[32]byte]p2p.Message),
blockAnnouncements: make(map[uint64][]byte),
}
}
@@ -237,6 +239,7 @@ func (rs *RegularSync) run() {
go rs.broadcastCanonicalBlock(rs.ctx, blockAnnounce)
}
}
log.Info("Exiting regular sync run()")
}
@@ -341,9 +344,26 @@ func (rs *RegularSync) handleStateRequest(msg p2p.Message) error {
log.Debugf("Requested state root is different from locally stored state root %#x", req.FinalizedStateRootHash32S)
return err
}
log.WithField("beaconState", fmt.Sprintf("%#x", root)).Debug("Sending beacon state to peer")
log.WithField(
"beaconState", fmt.Sprintf("%#x", root),
).Debug("Sending finalized, justified, and canonical states to peer")
defer sentState.Inc()
if err := rs.p2p.Send(ctx, &pb.BeaconStateResponse{BeaconState: fState}, msg.Peer); err != nil {
jState, err := rs.db.JustifiedState()
if err != nil {
log.Errorf("Unable to retrieve justified state, %v", err)
return err
}
canonicalState, err := rs.db.State(ctx)
if err != nil {
log.Errorf("Unable to retrieve canonical beacon state, %v", err)
return err
}
resp := &pb.BeaconStateResponse{
FinalizedState: fState,
JustifiedState: jState,
CanonicalState: canonicalState,
}
if err := rs.p2p.Send(ctx, resp, msg.Peer); err != nil {
log.Error(err)
return err
}

View File

@@ -84,6 +84,10 @@ func (ms *mockChainService) ApplyForkChoiceRule(ctx context.Context, block *pb.B
return nil
}
func (ms *mockChainService) CleanupBlockOperations(ctx context.Context, block *pb.BeaconBlock) error {
return nil
}
type mockOperationService struct{}
func (ms *mockOperationService) IncomingProcessedBlockFeed() *event.Feed {
@@ -687,6 +691,12 @@ func TestHandleStateReq_OK(t *testing.T) {
if err != nil {
t.Fatalf("could not attempt fetch beacon state: %v", err)
}
if err := db.SaveJustifiedState(beaconState); err != nil {
t.Fatalf("could not save justified state: %v", err)
}
if err := db.SaveFinalizedState(beaconState); err != nil {
t.Fatalf("could not save justified state: %v", err)
}
stateRoot, err := hashutil.HashProto(beaconState)
if err != nil {
t.Fatalf("could not hash beacon state: %v", err)
@@ -707,5 +717,5 @@ func TestHandleStateReq_OK(t *testing.T) {
if err := ss.handleStateRequest(msg1); err != nil {
t.Error(err)
}
testutil.AssertLogsContain(t, hook, "Sending beacon state to peer")
testutil.AssertLogsContain(t, hook, "Sending finalized, justified, and canonical states to peer")
}

View File

@@ -31,7 +31,6 @@ type Config struct {
// NewSyncService creates a new instance of SyncService using the config
// given.
func NewSyncService(ctx context.Context, cfg *Config) *Service {
sqCfg := DefaultQuerierConfig()
sqCfg.BeaconDB = cfg.BeaconDB
sqCfg.P2P = cfg.P2P
@@ -41,8 +40,8 @@ func NewSyncService(ctx context.Context, cfg *Config) *Service {
isCfg := initialsync.DefaultConfig()
isCfg.BeaconDB = cfg.BeaconDB
isCfg.P2P = cfg.P2P
isCfg.ChainService = cfg.ChainService
isCfg.PowChain = cfg.PowChainService
isCfg.ChainService = cfg.ChainService
rsCfg := DefaultRegularSyncConfig()
rsCfg.ChainService = cfg.ChainService
@@ -110,9 +109,7 @@ func (ss *Service) run() {
// Sets the highest observed slot from querier.
ss.InitialSync.InitializeObservedSlot(ss.Querier.currentHeadSlot)
// Sets the state root of the highest observed slot.
ss.InitialSync.InitializeStateRoot(ss.Querier.currentFinalizedStateRoot)
ss.InitialSync.InitializeFinalizedStateRoot(ss.Querier.currentFinalizedStateRoot)
ss.InitialSync.Start()
}

View File

@@ -87,16 +87,34 @@ func setupSimBackendAndDB(t *testing.T) (*backend.SimulatedBackend, *db.BeaconDB
t.Fatalf("Could not setup beacon db %v", err)
}
if err := beacondb.SaveState(ctx, bd.State()); err != nil {
t.Fatalf("Could not save state %v", err)
}
memBlocks := bd.InMemoryBlocks()
if err := beacondb.SaveBlock(memBlocks[0]); err != nil {
t.Fatalf("Could not save block %v", err)
}
if err := beacondb.SaveJustifiedBlock(memBlocks[0]); err != nil {
t.Fatalf("Could not save block %v", err)
}
if err := beacondb.SaveFinalizedBlock(memBlocks[0]); err != nil {
t.Fatalf("Could not save block %v", err)
}
if err := beacondb.UpdateChainHead(ctx, memBlocks[0], bd.State()); err != nil {
state := bd.State()
state.LatestBlock = memBlocks[0]
state.LatestEth1Data = &pb.Eth1Data{
BlockHash32: []byte{},
}
if err := beacondb.SaveState(ctx, state); err != nil {
t.Fatalf("Could not save state %v", err)
}
if err := beacondb.SaveJustifiedState(state); err != nil {
t.Fatalf("Could not save state %v", err)
}
if err := beacondb.SaveFinalizedState(state); err != nil {
t.Fatalf("Could not save state %v", err)
}
if err := beacondb.UpdateChainHead(ctx, memBlocks[0], state); err != nil {
t.Fatalf("Could not update chain head %v", err)
}
@@ -152,10 +170,9 @@ func setUpSyncedService(numOfBlocks int, simP2P *simulatedP2P, t *testing.T) (*S
}
func setUpUnSyncedService(simP2P *simulatedP2P, stateRoot [32]byte, t *testing.T) (*Service, *db.BeaconDB) {
bd, beacondb, privKeys := setupSimBackendAndDB(t)
bd, beacondb, _ := setupSimBackendAndDB(t)
defer bd.Shutdown()
defer db.TeardownDB(bd.DB())
ctx := context.Background()
mockPow := &afterGenesisPowChain{
feed: new(event.Feed),
@@ -168,21 +185,6 @@ func setUpUnSyncedService(simP2P *simulatedP2P, stateRoot [32]byte, t *testing.T
db: bd.DB(),
}
// we add in 2 blocks to the unsynced node so that, we dont request the beacon state from the
// synced node to reduce test time.
for i := 1; i <= 2; i++ {
if err := bd.GenerateBlockAndAdvanceChain(&backend.SimulatedObjects{}, privKeys); err != nil {
t.Fatalf("Unable to generate block in simulated backend %v", err)
}
blocks := bd.InMemoryBlocks()
if err := beacondb.SaveBlock(blocks[i]); err != nil {
t.Fatalf("Unable to save block %v", err)
}
if err := beacondb.UpdateChainHead(ctx, blocks[i], bd.State()); err != nil {
t.Fatalf("Unable to update chain head %v", err)
}
}
cfg := &Config{
ChainService: mockChain,
BeaconDB: beacondb,
@@ -197,9 +199,8 @@ func setUpUnSyncedService(simP2P *simulatedP2P, stateRoot [32]byte, t *testing.T
for ss.Querier.currentHeadSlot == 0 {
simP2P.Send(simP2P.ctx, &pb.ChainHeadResponse{
Slot: params.BeaconConfig().GenesisSlot + 12,
Hash: []byte{'t', 'e', 's', 't'},
FinalizedStateRootHash32S: stateRoot[:],
Slot: params.BeaconConfig().GenesisSlot + 12,
Hash: stateRoot[:],
}, "")
}
@@ -242,29 +243,36 @@ func TestSyncing_AFullySyncedNode(t *testing.T) {
defer us2.Stop()
defer db.TeardownDB(unSyncedDB2)
syncedChan := make(chan uint64)
// Waits for the unsynced node to fire a message signifying it is
// synced with its current slot number.
sub := us.InitialSync.SyncedFeed().Subscribe(syncedChan)
defer sub.Unsubscribe()
syncedChan2 := make(chan uint64)
sub2 := us2.InitialSync.SyncedFeed().Subscribe(syncedChan2)
defer sub2.Unsubscribe()
highestSlot := <-syncedChan
highestSlot2 := <-syncedChan2
if highestSlot != uint64(numOfBlocks)+params.BeaconConfig().GenesisSlot {
t.Errorf("Sync services didn't sync to expectecd slot, expected %d but got %d",
uint64(numOfBlocks)+params.BeaconConfig().GenesisSlot, highestSlot)
finalized, err := syncedDB.FinalizedState()
if err != nil {
t.Fatal(err)
}
justified, err := syncedDB.JustifiedState()
if err != nil {
t.Fatal(err)
}
if highestSlot2 != uint64(numOfBlocks)+params.BeaconConfig().GenesisSlot {
t.Errorf("Sync services didn't sync to expectecd slot, expected %d but got %d",
uint64(numOfBlocks)+params.BeaconConfig().GenesisSlot, highestSlot2)
newP2P.Send(newP2P.ctx, &pb.BeaconStateResponse{
FinalizedState: finalized,
JustifiedState: justified,
CanonicalState: bState,
}, "")
timeout := time.After(10 * time.Second)
tick := time.Tick(200 * time.Millisecond)
loop:
for {
select {
case <-timeout:
t.Error("Could not sync in time")
break loop
case <-tick:
_, slot1 := us.InitialSync.NodeIsSynced()
_, slot2 := us.InitialSync.NodeIsSynced()
if slot1 == uint64(numOfBlocks)+params.BeaconConfig().GenesisSlot ||
slot2 == uint64(numOfBlocks)+params.BeaconConfig().GenesisSlot {
break loop
}
}
}
}

View File

@@ -4,4 +4,4 @@ metadata:
name: beacon-config
namespace: beacon-chain
data:
DEPOSIT_CONTRACT_ADDRESS: "0xBb0A35B97b35E334f142b20132860F51A91c011F"
DEPOSIT_CONTRACT_ADDRESS: "0xEE18Fa5bcfE1ae7E2Dbd38A6D0b19B481325C60E"

View File

@@ -644,7 +644,9 @@ func (m *BeaconStateRequest) GetFinalizedStateRootHash32S() []byte {
}
type BeaconStateResponse struct {
BeaconState *BeaconState `protobuf:"bytes,1,opt,name=beacon_state,json=beaconState,proto3" json:"beacon_state,omitempty"`
FinalizedState *BeaconState `protobuf:"bytes,1,opt,name=finalized_state,json=finalizedState,proto3" json:"finalized_state,omitempty"`
JustifiedState *BeaconState `protobuf:"bytes,2,opt,name=justified_state,json=justifiedState,proto3" json:"justified_state,omitempty"`
CanonicalState *BeaconState `protobuf:"bytes,3,opt,name=canonical_state,json=canonicalState,proto3" json:"canonical_state,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
@@ -683,9 +685,23 @@ func (m *BeaconStateResponse) XXX_DiscardUnknown() {
var xxx_messageInfo_BeaconStateResponse proto.InternalMessageInfo
func (m *BeaconStateResponse) GetBeaconState() *BeaconState {
func (m *BeaconStateResponse) GetFinalizedState() *BeaconState {
if m != nil {
return m.BeaconState
return m.FinalizedState
}
return nil
}
func (m *BeaconStateResponse) GetJustifiedState() *BeaconState {
if m != nil {
return m.JustifiedState
}
return nil
}
func (m *BeaconStateResponse) GetCanonicalState() *BeaconState {
if m != nil {
return m.CanonicalState
}
return nil
}
@@ -1469,61 +1485,63 @@ func init() {
func init() { proto.RegisterFile("proto/beacon/p2p/v1/messages.proto", fileDescriptor_a1d590cda035b632) }
var fileDescriptor_a1d590cda035b632 = []byte{
// 863 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x96, 0x51, 0x73, 0xdb, 0x44,
0x10, 0xc7, 0x51, 0x93, 0xd4, 0xe9, 0xda, 0x09, 0xee, 0x05, 0x1a, 0x27, 0xd3, 0x3a, 0x89, 0x4a,
0x87, 0xc0, 0x4c, 0x9d, 0x69, 0xfa, 0xd4, 0x27, 0x46, 0x72, 0x04, 0x6e, 0x1b, 0xe4, 0x22, 0xc9,
0x30, 0x3c, 0x30, 0x87, 0x6c, 0x1f, 0xb5, 0x07, 0xe7, 0xee, 0xd0, 0x9d, 0x3d, 0x09, 0xc3, 0x2b,
0x9f, 0x81, 0x4f, 0xc0, 0x77, 0xe1, 0x91, 0x8f, 0xc0, 0xe4, 0x93, 0x30, 0xba, 0x3b, 0x39, 0x8a,
0xad, 0x28, 0x79, 0xe8, 0x9b, 0x6f, 0xf7, 0xbf, 0xff, 0xdd, 0xdf, 0x66, 0x35, 0x13, 0xb0, 0x79,
0xc2, 0x24, 0x3b, 0xea, 0x93, 0x78, 0xc0, 0xe8, 0x11, 0x3f, 0xe6, 0x47, 0xb3, 0x17, 0x47, 0x67,
0x44, 0x88, 0xf8, 0x3d, 0x11, 0x2d, 0x95, 0x44, 0x8f, 0x88, 0x1c, 0x91, 0x84, 0x4c, 0xcf, 0x5a,
0x5a, 0xd6, 0xe2, 0xc7, 0xbc, 0x35, 0x7b, 0xb1, 0xbb, 0x57, 0x54, 0x2b, 0x2f, 0x78, 0x56, 0x68,
0x7f, 0x03, 0xeb, 0x1e, 0x9d, 0x91, 0x09, 0xe3, 0x04, 0x1d, 0x40, 0x4d, 0xf0, 0x98, 0xe2, 0x01,
0xa3, 0x92, 0x9c, 0xcb, 0x86, 0xb5, 0x6f, 0x1d, 0xd6, 0x82, 0x6a, 0x1a, 0x6b, 0xeb, 0x10, 0x6a,
0x40, 0x85, 0xc7, 0x17, 0x13, 0x16, 0x0f, 0x1b, 0xf7, 0x54, 0x36, 0x7b, 0xda, 0x6f, 0x60, 0xcb,
0x55, 0x5d, 0xdc, 0x09, 0x1b, 0xfc, 0xea, 0x50, 0xca, 0xa6, 0x74, 0x40, 0x10, 0x82, 0xd5, 0x51,
0x2c, 0x46, 0xc6, 0x4b, 0xfd, 0x46, 0x7b, 0x50, 0x15, 0x13, 0x26, 0x31, 0x9d, 0x9e, 0xf5, 0x49,
0xa2, 0x8c, 0x56, 0x03, 0x48, 0x43, 0xbe, 0x8a, 0xd8, 0x87, 0x80, 0x72, 0x5e, 0x01, 0xf9, 0x6d,
0x4a, 0x84, 0x2c, 0xb2, 0xb2, 0x1d, 0x68, 0x2e, 0x2b, 0xdd, 0x8b, 0x70, 0xee, 0xb5, 0xd8, 0xcc,
0x5a, 0x6a, 0xf6, 0x97, 0x75, 0x6d, 0xf2, 0x80, 0x08, 0xce, 0xa8, 0x20, 0xe8, 0x15, 0xac, 0xf5,
0xd3, 0x80, 0x2a, 0xa9, 0x1e, 0x3f, 0x6d, 0x15, 0xaf, 0xb8, 0x95, 0xaf, 0xd5, 0x15, 0xc8, 0x83,
0x6a, 0x2c, 0x25, 0x11, 0x32, 0x96, 0x63, 0x46, 0x15, 0x60, 0x89, 0x81, 0x73, 0x25, 0x0d, 0xf2,
0x75, 0x76, 0x0f, 0x76, 0xdc, 0x58, 0x0e, 0x46, 0x64, 0x58, 0xb0, 0x8d, 0x27, 0x00, 0x42, 0xc6,
0x89, 0xc4, 0x29, 0x8a, 0xc1, 0x7a, 0xa0, 0x22, 0x29, 0x3c, 0xda, 0x81, 0x75, 0x42, 0x87, 0x3a,
0xa9, 0x17, 0x5c, 0x21, 0x74, 0x98, 0xa6, 0xec, 0x11, 0xec, 0x16, 0xd9, 0x1a, 0xec, 0x37, 0xb0,
0xd9, 0xd7, 0x59, 0xac, 0x60, 0x44, 0xc3, 0xda, 0x5f, 0xb9, 0x2b, 0xff, 0x86, 0x29, 0x55, 0x2f,
0x61, 0x23, 0xa8, 0xb7, 0x47, 0xf1, 0x98, 0x76, 0x48, 0x3c, 0x34, 0x73, 0xdb, 0x7f, 0xc0, 0xc3,
0x5c, 0xcc, 0x34, 0x2d, 0xba, 0x12, 0x04, 0xab, 0xb9, 0xe9, 0xd5, 0x6f, 0xf4, 0x15, 0x3c, 0xfe,
0x65, 0x4c, 0xe3, 0xc9, 0xf8, 0x77, 0x32, 0xc4, 0xe9, 0x9a, 0x08, 0x4e, 0x18, 0x93, 0x38, 0x2d,
0x78, 0x79, 0x2c, 0x1a, 0x2b, 0xaa, 0x7e, 0x67, 0xae, 0x09, 0x53, 0x49, 0xc0, 0x98, 0xec, 0x68,
0x81, 0xfd, 0x1c, 0xb6, 0xf5, 0xbc, 0x2a, 0x93, 0x46, 0xcb, 0x2e, 0xd5, 0xee, 0x65, 0x87, 0xa8,
0x8d, 0xcc, 0xea, 0x6f, 0x9b, 0xc2, 0xba, 0x6d, 0x8a, 0x9f, 0xb2, 0x8b, 0x33, 0xb6, 0x66, 0x0b,
0x5f, 0x43, 0x4d, 0xaf, 0x56, 0x9b, 0xde, 0xed, 0xf0, 0xb4, 0x45, 0xb5, 0x7f, 0xf5, 0xb0, 0xbf,
0x80, 0xad, 0xdc, 0x4d, 0x95, 0x02, 0x1e, 0x02, 0xca, 0x9f, 0x5f, 0xc9, 0x97, 0xc6, 0xaf, 0x99,
0x96, 0xfe, 0xe5, 0x3e, 0xd0, 0xf9, 0xb7, 0xa0, 0xf1, 0x2e, 0x61, 0x9c, 0x09, 0x92, 0x84, 0x93,
0x58, 0x8c, 0xc6, 0xf4, 0x7d, 0x29, 0xcb, 0x73, 0xd8, 0x5e, 0xd4, 0x97, 0x01, 0xfd, 0x69, 0x2d,
0xfb, 0x97, 0x62, 0xf5, 0xe0, 0x21, 0x37, 0x7a, 0x2c, 0x4c, 0x81, 0x81, 0x3b, 0xbc, 0x09, 0x6e,
0xa9, 0x41, 0x9d, 0x2f, 0x44, 0x52, 0x4c, 0xbd, 0x82, 0xbb, 0x63, 0x2e, 0xea, 0x6f, 0xc3, 0x5c,
0xd6, 0x97, 0x63, 0x66, 0xfa, 0x3b, 0x63, 0x2e, 0x35, 0xa8, 0x2f, 0x46, 0xec, 0x67, 0xf0, 0xf1,
0x09, 0xe1, 0x4c, 0x8c, 0x65, 0x29, 0xdd, 0x67, 0xb0, 0x69, 0x64, 0x65, 0x50, 0x3f, 0xcf, 0xcd,
0x4a, 0x51, 0x5e, 0x41, 0x65, 0xa8, 0x65, 0x06, 0x60, 0xef, 0x26, 0x80, 0xcc, 0x2d, 0xd3, 0xdb,
0x36, 0xd4, 0xbc, 0xf3, 0x5b, 0x66, 0x3d, 0x80, 0x6a, 0xaa, 0x29, 0xff, 0x6a, 0x6a, 0x5a, 0x52,
0x32, 0xe5, 0x29, 0x6c, 0xce, 0xd8, 0x64, 0x4a, 0x65, 0x9c, 0x5c, 0x60, 0x72, 0x3e, 0x1f, 0xf6,
0xd9, 0x4d, 0xc3, 0x7e, 0x9f, 0xa9, 0x95, 0xf5, 0xc6, 0x2c, 0xff, 0xfc, 0xf2, 0xef, 0x15, 0x58,
0x8b, 0x18, 0x1f, 0x0f, 0x50, 0x15, 0x2a, 0x3d, 0xff, 0xad, 0xdf, 0xfd, 0xc1, 0xaf, 0x7f, 0x84,
0x76, 0xe0, 0x53, 0xd7, 0x73, 0xda, 0x5d, 0x1f, 0xbb, 0xa7, 0xdd, 0xf6, 0x5b, 0xec, 0xf8, 0x7e,
0xb7, 0xe7, 0xb7, 0xbd, 0xba, 0x85, 0x1a, 0xf0, 0xc9, 0xb5, 0x54, 0xe0, 0x7d, 0xd7, 0xf3, 0xc2,
0xa8, 0x7e, 0x0f, 0x7d, 0x0e, 0x4f, 0x8b, 0x32, 0xd8, 0xfd, 0x11, 0x87, 0xa7, 0xdd, 0x08, 0xfb,
0xbd, 0x6f, 0x5d, 0x2f, 0xa8, 0xaf, 0x2c, 0xb9, 0x07, 0x5e, 0xf8, 0xae, 0xeb, 0x87, 0x5e, 0x7d,
0x15, 0xed, 0xc3, 0x63, 0xd7, 0x89, 0xda, 0x1d, 0xef, 0x04, 0x17, 0x76, 0x59, 0x43, 0x07, 0xf0,
0xe4, 0x06, 0x85, 0x31, 0xb9, 0x8f, 0x1e, 0x01, 0x6a, 0x77, 0x9c, 0xd7, 0x3e, 0xee, 0x78, 0xce,
0xc9, 0xbc, 0xb4, 0x82, 0xb6, 0x61, 0xeb, 0x5a, 0xdc, 0x14, 0xac, 0xa3, 0x26, 0xec, 0x1a, 0xaf,
0x30, 0x72, 0x22, 0x0f, 0x77, 0x9c, 0xb0, 0x73, 0xc5, 0xfc, 0x20, 0xc7, 0xac, 0xf3, 0x99, 0x25,
0xe4, 0x50, 0xb2, 0x8c, 0x31, 0xad, 0xa6, 0x45, 0x4e, 0x14, 0x79, 0x69, 0xfc, 0x75, 0xd7, 0xbf,
0xb2, 0xab, 0xa5, 0x73, 0xe4, 0x33, 0x99, 0xdb, 0xc6, 0x62, 0xc9, 0xdc, 0x6c, 0xd3, 0xad, 0xfd,
0x73, 0xd9, 0xb4, 0xfe, 0xbd, 0x6c, 0x5a, 0xff, 0x5d, 0x36, 0xad, 0xfe, 0x7d, 0xf5, 0xdf, 0xd8,
0xcb, 0xff, 0x03, 0x00, 0x00, 0xff, 0xff, 0x7c, 0xb5, 0x2a, 0x92, 0xec, 0x09, 0x00, 0x00,
// 893 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x96, 0xdf, 0x72, 0xdb, 0x44,
0x14, 0xc6, 0x51, 0x9c, 0xd4, 0xe9, 0xb1, 0xe3, 0xba, 0x1b, 0x68, 0x9c, 0x4c, 0xeb, 0x24, 0x2a,
0x1d, 0x02, 0x33, 0x75, 0xa6, 0xe9, 0x55, 0xaf, 0x18, 0xc9, 0xd1, 0xe0, 0xb6, 0x41, 0x2e, 0x92,
0x0d, 0xc3, 0xd5, 0xb2, 0xb6, 0xb7, 0xb5, 0xc0, 0xd9, 0x15, 0xde, 0xb5, 0x27, 0x61, 0xb8, 0xe5,
0x19, 0x78, 0x02, 0xde, 0x85, 0x4b, 0x1e, 0x81, 0xc9, 0x8b, 0xc0, 0x68, 0xb5, 0x52, 0xe4, 0x3f,
0x51, 0x9c, 0x99, 0xde, 0x49, 0xe7, 0x7c, 0xdf, 0xef, 0x9c, 0x6f, 0xb3, 0xca, 0x18, 0xcc, 0x70,
0xcc, 0x25, 0x3f, 0xee, 0x51, 0xd2, 0xe7, 0xec, 0x38, 0x3c, 0x09, 0x8f, 0xa7, 0x2f, 0x8e, 0xcf,
0xa9, 0x10, 0xe4, 0x03, 0x15, 0x0d, 0xd5, 0x44, 0x8f, 0xa8, 0x1c, 0xd2, 0x31, 0x9d, 0x9c, 0x37,
0x62, 0x59, 0x23, 0x3c, 0x09, 0x1b, 0xd3, 0x17, 0x7b, 0xfb, 0xcb, 0xbc, 0xf2, 0x32, 0x4c, 0x8c,
0xe6, 0x37, 0xb0, 0xe9, 0xb0, 0x29, 0x1d, 0xf1, 0x90, 0xa2, 0x43, 0x28, 0x8b, 0x90, 0x30, 0xdc,
0xe7, 0x4c, 0xd2, 0x0b, 0x59, 0x33, 0x0e, 0x8c, 0xa3, 0xb2, 0x57, 0x8a, 0x6a, 0xcd, 0xb8, 0x84,
0x6a, 0x50, 0x0c, 0xc9, 0xe5, 0x88, 0x93, 0x41, 0x6d, 0x4d, 0x75, 0x93, 0x57, 0xf3, 0x0d, 0x6c,
0xdb, 0x6a, 0x8a, 0x3d, 0xe2, 0xfd, 0x5f, 0x2c, 0xc6, 0xf8, 0x84, 0xf5, 0x29, 0x42, 0xb0, 0x3e,
0x24, 0x62, 0xa8, 0x59, 0xea, 0x19, 0xed, 0x43, 0x49, 0x8c, 0xb8, 0xc4, 0x6c, 0x72, 0xde, 0xa3,
0x63, 0x05, 0x5a, 0xf7, 0x20, 0x2a, 0xb9, 0xaa, 0x62, 0x1e, 0x01, 0xca, 0xb0, 0x3c, 0xfa, 0xeb,
0x84, 0x0a, 0xb9, 0x0c, 0x65, 0x5a, 0x50, 0x5f, 0x54, 0xda, 0x97, 0x7e, 0xca, 0x9a, 0x1f, 0x66,
0x2c, 0x0c, 0xfb, 0xd3, 0x98, 0xd9, 0xdc, 0xa3, 0x22, 0xe4, 0x4c, 0x50, 0xf4, 0x0a, 0x36, 0x7a,
0x51, 0x41, 0x59, 0x4a, 0x27, 0x4f, 0x1b, 0xcb, 0x8f, 0xb8, 0x91, 0xf5, 0xc6, 0x0e, 0xe4, 0x40,
0x89, 0x48, 0x49, 0x85, 0x24, 0x32, 0xe0, 0x4c, 0x05, 0xcc, 0x01, 0x58, 0xd7, 0x52, 0x2f, 0xeb,
0x33, 0xbb, 0xb0, 0x6b, 0x13, 0xd9, 0x1f, 0xd2, 0xc1, 0x92, 0xd3, 0x78, 0x02, 0x20, 0x24, 0x19,
0x4b, 0x1c, 0x45, 0xd1, 0xb1, 0xee, 0xab, 0x4a, 0x14, 0x1e, 0xed, 0xc2, 0x26, 0x65, 0x83, 0xb8,
0x19, 0x1f, 0x70, 0x91, 0xb2, 0x41, 0xd4, 0x32, 0x87, 0xb0, 0xb7, 0x0c, 0xab, 0x63, 0xbf, 0x81,
0x4a, 0x2f, 0xee, 0x62, 0x15, 0x46, 0xd4, 0x8c, 0x83, 0xc2, 0xaa, 0xf9, 0xb7, 0xb4, 0x55, 0xbd,
0x09, 0x13, 0x41, 0xb5, 0x39, 0x24, 0x01, 0x6b, 0x51, 0x32, 0xd0, 0x7b, 0x9b, 0xbf, 0xc3, 0xc3,
0x4c, 0x4d, 0x0f, 0x5d, 0x76, 0x4b, 0x10, 0xac, 0x67, 0xb6, 0x57, 0xcf, 0xe8, 0x6b, 0x78, 0xfc,
0x3e, 0x60, 0x64, 0x14, 0xfc, 0x46, 0x07, 0x38, 0x3a, 0x26, 0x8a, 0xc7, 0x9c, 0x4b, 0x1c, 0x19,
0x5e, 0x9e, 0x88, 0x5a, 0x41, 0xf9, 0x77, 0x53, 0x8d, 0x1f, 0x49, 0x3c, 0xce, 0x65, 0x2b, 0x16,
0x98, 0xcf, 0x61, 0x27, 0xde, 0x57, 0x75, 0xa2, 0x6a, 0xde, 0x4d, 0x35, 0xbb, 0xc9, 0x45, 0x8c,
0x41, 0xfa, 0xe8, 0x6f, 0xdb, 0xc2, 0xb8, 0x6d, 0x8b, 0xff, 0xd2, 0x2b, 0xa7, 0xb9, 0xfa, 0x18,
0xce, 0xe0, 0xc1, 0x1c, 0x78, 0xb5, 0xcb, 0x17, 0x53, 0x2a, 0xb3, 0x03, 0x23, 0xda, 0xcf, 0x13,
0x21, 0x83, 0xf7, 0x41, 0x4a, 0x5b, 0xbb, 0x03, 0x2d, 0xf5, 0xa6, 0xb4, 0x3e, 0x61, 0x9c, 0x05,
0x7d, 0x32, 0xd2, 0xb4, 0xc2, 0x1d, 0x68, 0xa9, 0x57, 0xbd, 0x9b, 0x5f, 0xc2, 0x76, 0xe6, 0xda,
0xe7, 0xfe, 0x0d, 0x8e, 0x00, 0x65, 0xbf, 0x90, 0x9c, 0x7f, 0x06, 0xe1, 0x0c, 0x34, 0xf7, 0x72,
0x7d, 0xa4, 0x2f, 0xb4, 0x01, 0xb5, 0x77, 0x63, 0x1e, 0x72, 0x41, 0xc7, 0xfe, 0x88, 0x88, 0x61,
0xc0, 0x3e, 0xe4, 0x66, 0x79, 0x0e, 0x3b, 0xf3, 0xfa, 0xbc, 0x40, 0x7f, 0x18, 0x8b, 0xfc, 0xdc,
0x58, 0x5d, 0x78, 0x18, 0x6a, 0x3d, 0x16, 0xda, 0xa0, 0xc3, 0x1d, 0xdd, 0x14, 0x6e, 0x61, 0x40,
0x35, 0x9c, 0xab, 0x44, 0x31, 0xe3, 0x23, 0x58, 0x3d, 0xe6, 0xbc, 0xfe, 0xb6, 0x98, 0x8b, 0xfa,
0xfc, 0x98, 0x89, 0x7e, 0xe5, 0x98, 0x0b, 0x03, 0xaa, 0xf3, 0x15, 0xf3, 0x19, 0x3c, 0x38, 0xa5,
0x21, 0x17, 0x81, 0xcc, 0x4d, 0xf7, 0x39, 0x54, 0xb4, 0x2c, 0x2f, 0xd4, 0x4f, 0x29, 0x2c, 0x37,
0xca, 0x2b, 0x28, 0x0e, 0x62, 0x99, 0x0e, 0xb0, 0x7f, 0x53, 0x80, 0x84, 0x96, 0xe8, 0x4d, 0x13,
0xca, 0xce, 0xc5, 0x2d, 0xbb, 0x1e, 0x42, 0x29, 0xd2, 0xe4, 0x7f, 0x35, 0xe5, 0x58, 0x92, 0xb3,
0xe5, 0x19, 0x54, 0xa6, 0x7c, 0x34, 0x61, 0x92, 0x8c, 0x2f, 0x31, 0xbd, 0x48, 0x97, 0x7d, 0x76,
0xd3, 0xb2, 0xdf, 0x27, 0x6a, 0x85, 0xde, 0x9a, 0x66, 0x5f, 0xbf, 0xfa, 0xab, 0x00, 0x1b, 0x1d,
0x1e, 0x06, 0x7d, 0x54, 0x82, 0x62, 0xd7, 0x7d, 0xeb, 0xb6, 0x7f, 0x70, 0xab, 0x9f, 0xa0, 0x5d,
0xf8, 0xcc, 0x76, 0xac, 0x66, 0xdb, 0xc5, 0xf6, 0x59, 0xbb, 0xf9, 0x16, 0x5b, 0xae, 0xdb, 0xee,
0xba, 0x4d, 0xa7, 0x6a, 0xa0, 0x1a, 0x7c, 0x3a, 0xd3, 0xf2, 0x9c, 0xef, 0xba, 0x8e, 0xdf, 0xa9,
0xae, 0xa1, 0x2f, 0xe0, 0xe9, 0xb2, 0x0e, 0xb6, 0x7f, 0xc4, 0xfe, 0x59, 0xbb, 0x83, 0xdd, 0xee,
0xb7, 0xb6, 0xe3, 0x55, 0x0b, 0x0b, 0x74, 0xcf, 0xf1, 0xdf, 0xb5, 0x5d, 0xdf, 0xa9, 0xae, 0xa3,
0x03, 0x78, 0x6c, 0x5b, 0x9d, 0x66, 0xcb, 0x39, 0xc5, 0x4b, 0xa7, 0x6c, 0xa0, 0x43, 0x78, 0x72,
0x83, 0x42, 0x43, 0xee, 0xa1, 0x47, 0x80, 0x9a, 0x2d, 0xeb, 0xb5, 0x8b, 0x5b, 0x8e, 0x75, 0x9a,
0x5a, 0x8b, 0x68, 0x07, 0xb6, 0x67, 0xea, 0xda, 0xb0, 0x89, 0xea, 0xb0, 0xa7, 0x59, 0x7e, 0xc7,
0xea, 0x38, 0xb8, 0x65, 0xf9, 0xad, 0xeb, 0xcc, 0xf7, 0x33, 0x99, 0xe3, 0x7e, 0x82, 0x84, 0x4c,
0x94, 0xa4, 0xa3, 0xa1, 0xa5, 0xc8, 0x64, 0x75, 0x3a, 0x4e, 0x54, 0x7f, 0xdd, 0x76, 0xaf, 0x71,
0xe5, 0x68, 0x8f, 0x6c, 0x27, 0xa1, 0x6d, 0xcd, 0x5b, 0x52, 0x58, 0xc5, 0x2e, 0xff, 0x7d, 0x55,
0x37, 0xfe, 0xb9, 0xaa, 0x1b, 0xff, 0x5e, 0xd5, 0x8d, 0xde, 0x3d, 0xf5, 0x83, 0xf1, 0xe5, 0xff,
0x01, 0x00, 0x00, 0xff, 0xff, 0x1d, 0x87, 0xb8, 0xa8, 0x8f, 0x0a, 0x00, 0x00,
}
func (m *Envelope) Marshal() (dAtA []byte, err error) {
@@ -1877,16 +1895,36 @@ func (m *BeaconStateResponse) MarshalTo(dAtA []byte) (int, error) {
_ = i
var l int
_ = l
if m.BeaconState != nil {
if m.FinalizedState != nil {
dAtA[i] = 0xa
i++
i = encodeVarintMessages(dAtA, i, uint64(m.BeaconState.Size()))
n3, err := m.BeaconState.MarshalTo(dAtA[i:])
i = encodeVarintMessages(dAtA, i, uint64(m.FinalizedState.Size()))
n3, err := m.FinalizedState.MarshalTo(dAtA[i:])
if err != nil {
return 0, err
}
i += n3
}
if m.JustifiedState != nil {
dAtA[i] = 0x12
i++
i = encodeVarintMessages(dAtA, i, uint64(m.JustifiedState.Size()))
n4, err := m.JustifiedState.MarshalTo(dAtA[i:])
if err != nil {
return 0, err
}
i += n4
}
if m.CanonicalState != nil {
dAtA[i] = 0x1a
i++
i = encodeVarintMessages(dAtA, i, uint64(m.CanonicalState.Size()))
n5, err := m.CanonicalState.MarshalTo(dAtA[i:])
if err != nil {
return 0, err
}
i += n5
}
if m.XXX_unrecognized != nil {
i += copy(dAtA[i:], m.XXX_unrecognized)
}
@@ -1972,11 +2010,11 @@ func (m *AttestationResponse) MarshalTo(dAtA []byte) (int, error) {
dAtA[i] = 0x12
i++
i = encodeVarintMessages(dAtA, i, uint64(m.Attestation.Size()))
n4, err := m.Attestation.MarshalTo(dAtA[i:])
n6, err := m.Attestation.MarshalTo(dAtA[i:])
if err != nil {
return 0, err
}
i += n4
i += n6
}
if m.XXX_unrecognized != nil {
i += copy(dAtA[i:], m.XXX_unrecognized)
@@ -2063,11 +2101,11 @@ func (m *ProposerSlashingResponse) MarshalTo(dAtA []byte) (int, error) {
dAtA[i] = 0x12
i++
i = encodeVarintMessages(dAtA, i, uint64(m.ProposerSlashing.Size()))
n5, err := m.ProposerSlashing.MarshalTo(dAtA[i:])
n7, err := m.ProposerSlashing.MarshalTo(dAtA[i:])
if err != nil {
return 0, err
}
i += n5
i += n7
}
if m.XXX_unrecognized != nil {
i += copy(dAtA[i:], m.XXX_unrecognized)
@@ -2154,11 +2192,11 @@ func (m *AttesterSlashingResponse) MarshalTo(dAtA []byte) (int, error) {
dAtA[i] = 0x12
i++
i = encodeVarintMessages(dAtA, i, uint64(m.AttesterSlashing.Size()))
n6, err := m.AttesterSlashing.MarshalTo(dAtA[i:])
n8, err := m.AttesterSlashing.MarshalTo(dAtA[i:])
if err != nil {
return 0, err
}
i += n6
i += n8
}
if m.XXX_unrecognized != nil {
i += copy(dAtA[i:], m.XXX_unrecognized)
@@ -2245,11 +2283,11 @@ func (m *DepositResponse) MarshalTo(dAtA []byte) (int, error) {
dAtA[i] = 0x12
i++
i = encodeVarintMessages(dAtA, i, uint64(m.Deposit.Size()))
n7, err := m.Deposit.MarshalTo(dAtA[i:])
n9, err := m.Deposit.MarshalTo(dAtA[i:])
if err != nil {
return 0, err
}
i += n7
i += n9
}
if m.XXX_unrecognized != nil {
i += copy(dAtA[i:], m.XXX_unrecognized)
@@ -2336,11 +2374,11 @@ func (m *ExitResponse) MarshalTo(dAtA []byte) (int, error) {
dAtA[i] = 0x12
i++
i = encodeVarintMessages(dAtA, i, uint64(m.VoluntaryExit.Size()))
n8, err := m.VoluntaryExit.MarshalTo(dAtA[i:])
n10, err := m.VoluntaryExit.MarshalTo(dAtA[i:])
if err != nil {
return 0, err
}
i += n8
i += n10
}
if m.XXX_unrecognized != nil {
i += copy(dAtA[i:], m.XXX_unrecognized)
@@ -2556,8 +2594,16 @@ func (m *BeaconStateResponse) Size() (n int) {
}
var l int
_ = l
if m.BeaconState != nil {
l = m.BeaconState.Size()
if m.FinalizedState != nil {
l = m.FinalizedState.Size()
n += 1 + l + sovMessages(uint64(l))
}
if m.JustifiedState != nil {
l = m.JustifiedState.Size()
n += 1 + l + sovMessages(uint64(l))
}
if m.CanonicalState != nil {
l = m.CanonicalState.Size()
n += 1 + l + sovMessages(uint64(l))
}
if m.XXX_unrecognized != nil {
@@ -3937,7 +3983,7 @@ func (m *BeaconStateResponse) Unmarshal(dAtA []byte) error {
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field BeaconState", wireType)
return fmt.Errorf("proto: wrong wireType = %d for field FinalizedState", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
@@ -3964,10 +4010,82 @@ func (m *BeaconStateResponse) Unmarshal(dAtA []byte) error {
if postIndex > l {
return io.ErrUnexpectedEOF
}
if m.BeaconState == nil {
m.BeaconState = &BeaconState{}
if m.FinalizedState == nil {
m.FinalizedState = &BeaconState{}
}
if err := m.BeaconState.Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
if err := m.FinalizedState.Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
case 2:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field JustifiedState", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowMessages
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthMessages
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return ErrInvalidLengthMessages
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
if m.JustifiedState == nil {
m.JustifiedState = &BeaconState{}
}
if err := m.JustifiedState.Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
case 3:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field CanonicalState", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowMessages
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthMessages
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return ErrInvalidLengthMessages
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
if m.CanonicalState == nil {
m.CanonicalState = &BeaconState{}
}
if err := m.CanonicalState.Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex

View File

@@ -59,7 +59,7 @@ message ChainHeadRequest {}
message ChainHeadResponse {
bytes hash = 1;
uint64 slot = 2;
bytes finalized_state_root_hash32s =3;
bytes finalized_state_root_hash32s = 3;
}
message BeaconStateHashAnnounce {
@@ -71,7 +71,9 @@ message BeaconStateRequest {
}
message BeaconStateResponse {
BeaconState beacon_state = 1;
BeaconState finalized_state = 1;
BeaconState justified_state = 2;
BeaconState canonical_state = 3;
}
message AttestationAnnounce {

View File

@@ -169,9 +169,7 @@ var defaultBeaconConfig = &BeaconChainConfig{
// Prysm constants.
DepositsForChainStart: 16384,
RandBytes: 3,
SyncPollingInterval: 6 * 1, // Query nodes over the network every slot for sync status.
BatchBlockLimit: 64 * 4, // Process blocks in batches of 4 epochs of blocks (threshold before casper penalties).
SyncEpochLimit: 4,
MaxNumLog2Validators: 24,
LogBlockDelay: 2, //
}