diff --git a/beacon-chain/blockchain/block_processing.go b/beacon-chain/blockchain/block_processing.go index ad978995ca..c713189735 100644 --- a/beacon-chain/blockchain/block_processing.go +++ b/beacon-chain/blockchain/block_processing.go @@ -28,6 +28,7 @@ type BlockReceiver interface { type BlockProcessor interface { VerifyBlockValidity(block *pb.BeaconBlock, beaconState *pb.BeaconState) error ApplyBlockStateTransition(ctx context.Context, block *pb.BeaconBlock, beaconState *pb.BeaconState) (*pb.BeaconState, error) + CleanupBlockOperations(ctx context.Context, block *pb.BeaconBlock) error } // ReceiveBlock is a function that defines the operations that are preformed on @@ -47,12 +48,12 @@ func (c *ChainService) ReceiveBlock(ctx context.Context, block *pb.BeaconBlock) // We first verify the block's basic validity conditions. if err := c.VerifyBlockValidity(block, beaconState); err != nil { - return nil, fmt.Errorf("block with slot %d is not ready for processing: %v", block.Slot, err) + return beaconState, fmt.Errorf("block with slot %d is not ready for processing: %v", block.Slot, err) } // We save the block to the DB and broadcast it to our peers. if err := c.SaveAndBroadcastBlock(ctx, block); err != nil { - return nil, fmt.Errorf( + return beaconState, fmt.Errorf( "could not save and broadcast beacon block with slot %d: %v", block.Slot-params.BeaconConfig().GenesisSlot, err, ) @@ -64,7 +65,7 @@ func (c *ChainService) ReceiveBlock(ctx context.Context, block *pb.BeaconBlock) // We then apply the block state transition accordingly to obtain the resulting beacon state. beaconState, err = c.ApplyBlockStateTransition(ctx, block, beaconState) if err != nil { - return nil, fmt.Errorf("could not apply block state transition: %v", err) + return beaconState, fmt.Errorf("could not apply block state transition: %v", err) } log.WithFields(logrus.Fields{ @@ -76,7 +77,7 @@ func (c *ChainService) ReceiveBlock(ctx context.Context, block *pb.BeaconBlock) // We process the block's contained deposits, attestations, and other operations // and that may need to be stored or deleted from the beacon node's persistent storage. if err := c.CleanupBlockOperations(ctx, block); err != nil { - return nil, fmt.Errorf("could not process block deposits, attestations, and other operations: %v", err) + return beaconState, fmt.Errorf("could not process block deposits, attestations, and other operations: %v", err) } log.WithField("slot", block.Slot-params.BeaconConfig().GenesisSlot).Info("Processed beacon block") @@ -106,7 +107,7 @@ func (c *ChainService) ApplyBlockStateTransition( // Retrieve the last processed beacon block's hash root. headRoot, err := c.ChainHeadRoot() if err != nil { - return nil, fmt.Errorf("could not retrieve chain head root: %v", err) + return beaconState, fmt.Errorf("could not retrieve chain head root: %v", err) } // Check for skipped slots. @@ -114,7 +115,7 @@ func (c *ChainService) ApplyBlockStateTransition( for beaconState.Slot < block.Slot-1 { beaconState, err = c.runStateTransition(headRoot, nil, beaconState) if err != nil { - return nil, fmt.Errorf("could not execute state transition without block %v", err) + return beaconState, fmt.Errorf("could not execute state transition without block %v", err) } numSkippedSlots++ } @@ -124,7 +125,7 @@ func (c *ChainService) ApplyBlockStateTransition( beaconState, err = c.runStateTransition(headRoot, block, beaconState) if err != nil { - return nil, fmt.Errorf("could not execute state transition with block %v", err) + return beaconState, fmt.Errorf("could not execute state transition with block %v", err) } return beaconState, nil } @@ -197,7 +198,7 @@ func (c *ChainService) CleanupBlockOperations(ctx context.Context, block *pb.Bea func (c *ChainService) runStateTransition( headRoot [32]byte, block *pb.BeaconBlock, beaconState *pb.BeaconState, ) (*pb.BeaconState, error) { - beaconState, err := state.ExecuteStateTransition( + newState, err := state.ExecuteStateTransition( c.ctx, beaconState, block, @@ -208,40 +209,40 @@ func (c *ChainService) runStateTransition( }, ) if err != nil { - return nil, fmt.Errorf("could not execute state transition %v", err) + return beaconState, fmt.Errorf("could not execute state transition %v", err) } log.WithField( - "slotsSinceGenesis", beaconState.Slot-params.BeaconConfig().GenesisSlot, + "slotsSinceGenesis", newState.Slot-params.BeaconConfig().GenesisSlot, ).Info("Slot transition successfully processed") if block != nil { log.WithField( - "slotsSinceGenesis", beaconState.Slot-params.BeaconConfig().GenesisSlot, + "slotsSinceGenesis", newState.Slot-params.BeaconConfig().GenesisSlot, ).Info("Block transition successfully processed") } - if helpers.IsEpochEnd(beaconState.Slot) { + if helpers.IsEpochEnd(newState.Slot) { // Save activated validators of this epoch to public key -> index DB. - if err := c.saveValidatorIdx(beaconState); err != nil { - return nil, fmt.Errorf("could not save validator index: %v", err) + if err := c.saveValidatorIdx(newState); err != nil { + return newState, fmt.Errorf("could not save validator index: %v", err) } // Delete exited validators of this epoch to public key -> index DB. - if err := c.deleteValidatorIdx(beaconState); err != nil { - return nil, fmt.Errorf("could not delete validator index: %v", err) + if err := c.deleteValidatorIdx(newState); err != nil { + return newState, fmt.Errorf("could not delete validator index: %v", err) } // Update FFG checkpoints in DB. - if err := c.updateFFGCheckPts(beaconState); err != nil { - return nil, fmt.Errorf("could not update FFG checkpts: %v", err) + if err := c.updateFFGCheckPts(newState); err != nil { + return newState, fmt.Errorf("could not update FFG checkpts: %v", err) } // Save Historical States. - if err := c.beaconDB.SaveHistoricalState(beaconState); err != nil { - return nil, fmt.Errorf("could not save historical state: %v", err) + if err := c.beaconDB.SaveHistoricalState(newState); err != nil { + return newState, fmt.Errorf("could not save historical state: %v", err) } log.WithField( - "SlotsSinceGenesis", beaconState.Slot-params.BeaconConfig().GenesisSlot, + "SlotsSinceGenesis", newState.Slot-params.BeaconConfig().GenesisSlot, ).Info("Epoch transition successfully processed") } - return beaconState, nil + return newState, nil } // saveValidatorIdx saves the validators public key to index mapping in DB, these diff --git a/beacon-chain/blockchain/fork_choice.go b/beacon-chain/blockchain/fork_choice.go index c23c65d631..cbb171ce29 100644 --- a/beacon-chain/blockchain/fork_choice.go +++ b/beacon-chain/blockchain/fork_choice.go @@ -130,7 +130,7 @@ func (c *ChainService) ApplyForkChoiceRule(ctx context.Context, block *pb.Beacon } justifiedHead, err := c.beaconDB.JustifiedBlock() if err != nil { - return err + return fmt.Errorf("could not retrieve justified head: %v", err) } head, err := c.lmdGhost(justifiedHead, justifiedState, postState, attestationTargets) if err != nil { diff --git a/beacon-chain/db/state.go b/beacon-chain/db/state.go index 89506f4909..b615e54778 100644 --- a/beacon-chain/db/state.go +++ b/beacon-chain/db/state.go @@ -148,10 +148,10 @@ func (db *BeaconDB) SaveState(ctx context.Context, beaconState *pb.BeaconState) if prevStatePb.Slot >= beaconState.Slot { log.WithField( "prevStateSlot", - prevStatePb.Slot, + prevStatePb.Slot-params.BeaconConfig().GenesisSlot, ).WithField( "newStateSlot", - beaconState.Slot, + beaconState.Slot-params.BeaconConfig().GenesisSlot, ).Warn("Current saved state has a slot number greater or equal to the state attempted to be saved") } } diff --git a/beacon-chain/sync/initial-sync/BUILD.bazel b/beacon-chain/sync/initial-sync/BUILD.bazel index b31db872c0..f6a2a2ae1f 100644 --- a/beacon-chain/sync/initial-sync/BUILD.bazel +++ b/beacon-chain/sync/initial-sync/BUILD.bazel @@ -3,8 +3,11 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "go_default_library", srcs = [ + "helpers.go", "metrics.go", "service.go", + "sync_blocks.go", + "sync_state.go", ], importpath = "github.com/prysmaticlabs/prysm/beacon-chain/sync/initial-sync", visibility = ["//beacon-chain:__subpackages__"], @@ -19,7 +22,6 @@ go_library( "//shared/params:go_default_library", "@com_github_ethereum_go_ethereum//common:go_default_library", "@com_github_gogo_protobuf//proto:go_default_library", - "@com_github_libp2p_go_libp2p_peer//:go_default_library", "@com_github_prometheus_client_golang//prometheus:go_default_library", "@com_github_prometheus_client_golang//prometheus/promauto:go_default_library", "@com_github_sirupsen_logrus//:go_default_library", @@ -33,7 +35,6 @@ go_test( embed = [":go_default_library"], deps = [ "//beacon-chain/core/blocks:go_default_library", - "//beacon-chain/core/state:go_default_library", "//beacon-chain/db:go_default_library", "//beacon-chain/internal:go_default_library", "//proto/beacon/p2p/v1:go_default_library", @@ -42,6 +43,7 @@ go_test( "//shared/p2p:go_default_library", "//shared/params:go_default_library", "//shared/testutil:go_default_library", + "@com_github_ethereum_go_ethereum//common:go_default_library", "@com_github_gogo_protobuf//proto:go_default_library", "@com_github_libp2p_go_libp2p_peer//:go_default_library", "@com_github_sirupsen_logrus//hooks/test:go_default_library", diff --git a/beacon-chain/sync/initial-sync/helpers.go b/beacon-chain/sync/initial-sync/helpers.go new file mode 100644 index 0000000000..77598a7fc9 --- /dev/null +++ b/beacon-chain/sync/initial-sync/helpers.go @@ -0,0 +1,69 @@ +package initialsync + +import ( + "context" + "errors" + "fmt" + "runtime/debug" + + "github.com/gogo/protobuf/proto" + pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" + "github.com/prysmaticlabs/prysm/shared/bytesutil" + "github.com/prysmaticlabs/prysm/shared/p2p" + "github.com/prysmaticlabs/prysm/shared/params" + "github.com/sirupsen/logrus" + "go.opencensus.io/trace" +) + +func (s *InitialSync) checkBlockValidity(ctx context.Context, block *pb.BeaconBlock) error { + ctx, span := trace.StartSpan(ctx, "beacon-chain.sync.initial-sync.checkBlockValidity") + defer span.End() + beaconState, err := s.db.State(ctx) + if err != nil { + return fmt.Errorf("failed to get beacon state: %v", err) + } + + if block.Slot < beaconState.FinalizedEpoch*params.BeaconConfig().SlotsPerEpoch { + return errors.New(debugError + "discarding received block with a slot number smaller than the last finalized slot") + } + // Attestation from proposer not verified as, other nodes only store blocks not proposer + // attestations. + return nil +} + +func (s *InitialSync) doesParentExist(block *pb.BeaconBlock) bool { + parentHash := bytesutil.ToBytes32(block.ParentRootHash32) + return s.db.HasBlock(parentHash) +} + +// safelyHandleMessage will recover and log any panic that occurs from the +// function argument. +func safelyHandleMessage(fn func(p2p.Message), msg p2p.Message) { + defer func() { + if r := recover(); r != nil { + printedMsg := "message contains no data" + if msg.Data != nil { + printedMsg = proto.MarshalTextString(msg.Data) + } + log.WithFields(logrus.Fields{ + "r": r, + "msg": printedMsg, + }).Error("Panicked when handling p2p message! Recovering...") + + debug.PrintStack() + + if msg.Ctx == nil { + return + } + if span := trace.FromContext(msg.Ctx); span != nil { + span.SetStatus(trace.Status{ + Code: trace.StatusCodeInternal, + Message: fmt.Sprintf("Panic: %v", r), + }) + } + } + }() + + // Fingers crossed that it doesn't panic... + fn(msg) +} diff --git a/beacon-chain/sync/initial-sync/metrics.go b/beacon-chain/sync/initial-sync/metrics.go index de143f8b01..ebea942d8f 100644 --- a/beacon-chain/sync/initial-sync/metrics.go +++ b/beacon-chain/sync/initial-sync/metrics.go @@ -15,10 +15,6 @@ var ( Name: "initsync_batched_block_req", Help: "The number of received batch blocks responses", }) - blockReqSlot = promauto.NewCounter(prometheus.CounterOpts{ - Name: "initsync_block_req_by_slot", - Help: "The number of sent block requests by slot", - }) recBlock = promauto.NewCounter(prometheus.CounterOpts{ Name: "initsync_received_blocks", Help: "The number of received blocks", diff --git a/beacon-chain/sync/initial-sync/service.go b/beacon-chain/sync/initial-sync/service.go index 1d9f507ef1..165b3b96a6 100644 --- a/beacon-chain/sync/initial-sync/service.go +++ b/beacon-chain/sync/initial-sync/service.go @@ -12,27 +12,20 @@ package initialsync import ( "context" - "errors" "fmt" "math/big" - "runtime/debug" - "strings" "sync" "time" "github.com/ethereum/go-ethereum/common" "github.com/gogo/protobuf/proto" - peer "github.com/libp2p/go-libp2p-peer" "github.com/prysmaticlabs/prysm/beacon-chain/blockchain" "github.com/prysmaticlabs/prysm/beacon-chain/db" pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" - "github.com/prysmaticlabs/prysm/shared/bytesutil" "github.com/prysmaticlabs/prysm/shared/event" - "github.com/prysmaticlabs/prysm/shared/hashutil" "github.com/prysmaticlabs/prysm/shared/p2p" "github.com/prysmaticlabs/prysm/shared/params" "github.com/sirupsen/logrus" - "go.opencensus.io/trace" ) var log = logrus.WithField("prefix", "initial-sync") @@ -73,15 +66,15 @@ type p2pAPI interface { Subscribe(msg proto.Message, channel chan p2p.Message) event.Subscription } -type chainService interface { - blockchain.ForkChoice - blockchain.BlockProcessor -} - type powChainService interface { BlockExists(ctx context.Context, hash common.Hash) (bool, *big.Int, error) } +type chainService interface { + blockchain.BlockProcessor + blockchain.ForkChoice +} + // SyncService is the interface for the Sync service. // InitialSync calls `Start` when initial sync completes. type syncService interface { @@ -92,26 +85,30 @@ type syncService interface { // InitialSync defines the main class in this package. // See the package comments for a general description of the service's functions. type InitialSync struct { - ctx context.Context - cancel context.CancelFunc - p2p p2pAPI - syncService syncService - chainService chainService - db *db.BeaconDB - powchain powChainService - blockAnnounceBuf chan p2p.Message - batchedBlockBuf chan p2p.Message - blockBuf chan p2p.Message - stateBuf chan p2p.Message - currentSlot uint64 - highestObservedSlot uint64 - beaconStateSlot uint64 - syncPollingInterval time.Duration - inMemoryBlocks map[uint64]*pb.BeaconBlock - syncedFeed *event.Feed - reqState bool - stateRootOfHighestObservedSlot [32]byte - mutex *sync.Mutex + ctx context.Context + cancel context.CancelFunc + p2p p2pAPI + syncService syncService + chainService chainService + db *db.BeaconDB + powchain powChainService + blockAnnounceBuf chan p2p.Message + batchedBlockBuf chan p2p.Message + blockBuf chan p2p.Message + stateBuf chan p2p.Message + currentSlot uint64 + highestObservedSlot uint64 + beaconStateSlot uint64 + syncPollingInterval time.Duration + inMemoryBlocks map[uint64]*pb.BeaconBlock + syncedFeed *event.Feed + stateReceived bool + latestSyncedBlock *pb.BeaconBlock + lastRequestedSlot uint64 + finalizedStateRoot [32]byte + mutex *sync.Mutex + nodeIsSynced bool + highestObservedCanonicalState *pb.BeaconState } // NewInitialSyncService constructs a new InitialSyncService. @@ -127,26 +124,25 @@ func NewInitialSyncService(ctx context.Context, batchedBlockBuf := make(chan p2p.Message, cfg.BatchedBlockBufferSize) return &InitialSync{ - ctx: ctx, - cancel: cancel, - p2p: cfg.P2P, - syncService: cfg.SyncService, - chainService: cfg.ChainService, - db: cfg.BeaconDB, - powchain: cfg.PowChain, - currentSlot: params.BeaconConfig().GenesisSlot, - highestObservedSlot: params.BeaconConfig().GenesisSlot, - beaconStateSlot: params.BeaconConfig().GenesisSlot, - blockBuf: blockBuf, - stateBuf: stateBuf, - batchedBlockBuf: batchedBlockBuf, - blockAnnounceBuf: blockAnnounceBuf, - syncPollingInterval: cfg.SyncPollingInterval, - inMemoryBlocks: map[uint64]*pb.BeaconBlock{}, - syncedFeed: new(event.Feed), - reqState: false, - stateRootOfHighestObservedSlot: [32]byte{}, - mutex: new(sync.Mutex), + ctx: ctx, + cancel: cancel, + p2p: cfg.P2P, + syncService: cfg.SyncService, + db: cfg.BeaconDB, + powchain: cfg.PowChain, + chainService: cfg.ChainService, + currentSlot: params.BeaconConfig().GenesisSlot, + highestObservedSlot: params.BeaconConfig().GenesisSlot, + beaconStateSlot: params.BeaconConfig().GenesisSlot, + blockBuf: blockBuf, + stateBuf: stateBuf, + batchedBlockBuf: batchedBlockBuf, + blockAnnounceBuf: blockAnnounceBuf, + syncPollingInterval: cfg.SyncPollingInterval, + inMemoryBlocks: map[uint64]*pb.BeaconBlock{}, + syncedFeed: new(event.Feed), + stateReceived: false, + mutex: new(sync.Mutex), } } @@ -157,19 +153,8 @@ func (s *InitialSync) Start() { log.Errorf("Unable to get chain head %v", err) } s.currentSlot = cHead.Slot - - var reqState bool - // setting genesis bool - if cHead.Slot == params.BeaconConfig().GenesisSlot || s.isSlotDiffLarge() { - reqState = true - } - s.reqState = reqState - - go func() { - ticker := time.NewTicker(s.syncPollingInterval) - s.run(ticker.C) - ticker.Stop() - }() + go s.run() + go s.listenForNewBlocks() go s.checkInMemoryBlocks() } @@ -185,98 +170,42 @@ func (s *InitialSync) InitializeObservedSlot(slot uint64) { s.highestObservedSlot = slot } -// InitializeStateRoot sets the state root of the highest observed slot. -func (s *InitialSync) InitializeStateRoot(root [32]byte) { - s.stateRootOfHighestObservedSlot = root +// InitializeFinalizedStateRoot sets the state root of the last finalized state. +func (s *InitialSync) InitializeFinalizedStateRoot(root [32]byte) { + s.finalizedStateRoot = root } -// SyncedFeed returns a feed which fires a message once the node is synced -func (s *InitialSync) SyncedFeed() *event.Feed { - return s.syncedFeed +// NodeIsSynced checks that the node has been caught up with the network. +func (s *InitialSync) NodeIsSynced() (bool, uint64) { + return s.nodeIsSynced, s.currentSlot } -// run is the main goroutine for the initial sync service. -// delayChan is explicitly passed into this function to facilitate tests that don't require a timeout. -// It is assumed that the goroutine `run` is only called once per instance. -func (s *InitialSync) run(delayChan <-chan time.Time) { - - blockSub := s.p2p.Subscribe(&pb.BeaconBlockResponse{}, s.blockBuf) - batchedBlocksub := s.p2p.Subscribe(&pb.BatchedBeaconBlockResponse{}, s.batchedBlockBuf) - blockAnnounceSub := s.p2p.Subscribe(&pb.BeaconBlockAnnounce{}, s.blockAnnounceBuf) - beaconStateSub := s.p2p.Subscribe(&pb.BeaconStateResponse{}, s.stateBuf) - defer func() { - blockSub.Unsubscribe() - blockAnnounceSub.Unsubscribe() - beaconStateSub.Unsubscribe() - batchedBlocksub.Unsubscribe() - close(s.batchedBlockBuf) - close(s.blockBuf) - close(s.stateBuf) - }() - - if s.reqState { - if err := s.requestStateFromPeer(s.ctx, s.stateRootOfHighestObservedSlot[:], p2p.AnyPeer); err != nil { - log.Errorf("Could not request state from peer %v", err) - } - } else { - // Send out a batch request - s.requestBatchedBlocks(s.currentSlot+1, s.highestObservedSlot) +func (s *InitialSync) exitInitialSync(ctx context.Context) error { + if s.nodeIsSynced { + return nil + } + state := s.highestObservedCanonicalState + var err error + if err := s.db.SaveBlock(s.latestSyncedBlock); err != nil { + return fmt.Errorf("could not save block: %v", err) + } + if err := s.db.UpdateChainHead(ctx, s.latestSyncedBlock, state); err != nil { + return fmt.Errorf("could not update chain head: %v", err) + } + if err := s.db.SaveHistoricalState(state); err != nil { + return fmt.Errorf("could not save state: %v", err) } - for { - select { - case <-s.ctx.Done(): - log.Debug("Exiting goroutine") - return - case <-delayChan: - if s.checkSyncStatus() { - return - } - case msg := <-s.blockAnnounceBuf: - safelyHandleMessage(s.processBlockAnnounce, msg) - case msg := <-s.blockBuf: - safelyHandleMessage(func(message p2p.Message) { - data := message.Data.(*pb.BeaconBlockResponse) - s.processBlock(message.Ctx, data.Block, message.Peer) - }, msg) - case msg := <-s.stateBuf: - safelyHandleMessage(s.processState, msg) - case msg := <-s.batchedBlockBuf: - safelyHandleMessage(s.processBatchedBlocks, msg) - } + canonicalState, err := s.db.State(ctx) + if err != nil { + return fmt.Errorf("could not get state: %v", err) } -} - -// safelyHandleMessage will recover and log any panic that occurs from the -// function argument. -func safelyHandleMessage(fn func(p2p.Message), msg p2p.Message) { - defer func() { - if r := recover(); r != nil { - printedMsg := "message contains no data" - if msg.Data != nil { - printedMsg = proto.MarshalTextString(msg.Data) - } - log.WithFields(logrus.Fields{ - "r": r, - "msg": printedMsg, - }).Error("Panicked when handling p2p message! Recovering...") - - debug.PrintStack() - - if msg.Ctx == nil { - return - } - if span := trace.FromContext(msg.Ctx); span != nil { - span.SetStatus(trace.Status{ - Code: trace.StatusCodeInternal, - Message: fmt.Sprintf("Panic: %v", r), - }) - } - } - }() - - // Fingers crossed that it doesn't panic... - fn(msg) + log.Infof("Canonical state slot: %d", canonicalState.Slot-params.BeaconConfig().GenesisSlot) + log.Info("Exiting initial sync and starting normal sync") + s.syncService.ResumeSync() + s.cancel() + s.nodeIsSynced = true + return nil } // checkInMemoryBlocks is another routine which will run concurrently with the @@ -293,332 +222,65 @@ func (s *InitialSync) checkInMemoryBlocks() { } s.mutex.Lock() if block, ok := s.inMemoryBlocks[s.currentSlot+1]; ok && s.currentSlot+1 <= s.highestObservedSlot { - s.processBlock(s.ctx, block, p2p.AnyPeer) + s.processBlock(s.ctx, block) } s.mutex.Unlock() } } } -// checkSyncStatus verifies if the beacon node is correctly synced with its peers up to their -// latest canonical head. If not, then it requests batched blocks up to the highest observed slot. -func (s *InitialSync) checkSyncStatus() bool { - if s.reqState { - if err := s.requestStateFromPeer(s.ctx, s.stateRootOfHighestObservedSlot[:], p2p.AnyPeer); err != nil { - log.Errorf("Could not request state from peer %v", err) - } - return false - } - if s.highestObservedSlot == s.currentSlot { - log.Info("Exiting initial sync and starting normal sync") - s.syncedFeed.Send(s.currentSlot) - s.syncService.ResumeSync() - return true - } - // requests multiple blocks so as to save and sync quickly. - s.requestBatchedBlocks(s.currentSlot+1, s.highestObservedSlot) - return false -} - -func (s *InitialSync) processBlockAnnounce(msg p2p.Message) { - ctx, span := trace.StartSpan(msg.Ctx, "beacon-chain.sync.initial-sync.processBlockAnnounce") - defer span.End() - data := msg.Data.(*pb.BeaconBlockAnnounce) - recBlockAnnounce.Inc() - - if s.reqState { - if err := s.requestStateFromPeer(ctx, s.stateRootOfHighestObservedSlot[:], msg.Peer); err != nil { - log.Errorf("Could not request state from peer %v", err) - } - return - } - - if data.SlotNumber > s.highestObservedSlot { - s.highestObservedSlot = data.SlotNumber - } - - s.requestBatchedBlocks(s.currentSlot+1, s.highestObservedSlot) - log.Debugf("Successfully requested the next block with slot: %d", data.SlotNumber-params.BeaconConfig().GenesisSlot) -} - -// processBlock is the main method that validates each block which is received -// for initial sync. It checks if the blocks are valid and then will continue to -// process and save it into the db. -func (s *InitialSync) processBlock(ctx context.Context, block *pb.BeaconBlock, peerID peer.ID) { - ctx, span := trace.StartSpan(ctx, "beacon-chain.sync.initial-sync.processBlock") - defer span.End() - recBlock.Inc() - if block.Slot > s.highestObservedSlot { - s.highestObservedSlot = block.Slot - } - - if block.Slot < s.currentSlot { - return - } - - // requesting beacon state if there is no saved state. - if s.reqState { - if err := s.requestStateFromPeer(s.ctx, s.stateRootOfHighestObservedSlot[:], peerID); err != nil { - log.Errorf("Could not request beacon state from peer: %v", err) - } - return - } - // if it isn't the block in the next slot we check if it is a skipped slot. - // if it isn't skipped we save it in memory. - if block.Slot != (s.currentSlot + 1) { - // if parent exists we validate the block. - if s.doesParentExist(block) { - if err := s.validateAndSaveNextBlock(ctx, block); err != nil { - // Debug error so as not to have noisy error logs - if strings.HasPrefix(err.Error(), debugError) { - log.Debug(strings.TrimPrefix(err.Error(), debugError)) - return - } - log.Errorf("Unable to save block: %v", err) - } +// listenForNewBlocks listens for block announcements beyond the canonical head slot that may +// be received during initial sync - we must process these blocks to catch up with peers. +func (s *InitialSync) listenForNewBlocks() { + blockAnnounceSub := s.p2p.Subscribe(&pb.BeaconBlockAnnounce{}, s.blockAnnounceBuf) + defer func() { + blockAnnounceSub.Unsubscribe() + close(s.blockAnnounceBuf) + }() + for { + select { + case <-s.ctx.Done(): return + case msg := <-s.blockAnnounceBuf: + safelyHandleMessage(s.processBlockAnnounce, msg) } - s.mutex.Lock() - defer s.mutex.Unlock() - if _, ok := s.inMemoryBlocks[block.Slot]; !ok { - s.inMemoryBlocks[block.Slot] = block - } - return + } +} + +// run is the main goroutine for the initial sync service. +// delayChan is explicitly passed into this function to facilitate tests that don't require a timeout. +// It is assumed that the goroutine `run` is only called once per instance. +func (s *InitialSync) run() { + blockSub := s.p2p.Subscribe(&pb.BeaconBlockResponse{}, s.blockBuf) + batchedBlocksub := s.p2p.Subscribe(&pb.BatchedBeaconBlockResponse{}, s.batchedBlockBuf) + beaconStateSub := s.p2p.Subscribe(&pb.BeaconStateResponse{}, s.stateBuf) + defer func() { + blockSub.Unsubscribe() + beaconStateSub.Unsubscribe() + batchedBlocksub.Unsubscribe() + close(s.batchedBlockBuf) + close(s.blockBuf) + close(s.stateBuf) + }() + + if err := s.requestStateFromPeer(s.ctx, s.finalizedStateRoot); err != nil { + log.Errorf("Could not request state from peer %v", err) } - if err := s.validateAndSaveNextBlock(ctx, block); err != nil { - // Debug error so as not to have noisy error logs - if strings.HasPrefix(err.Error(), debugError) { - log.Debug(strings.TrimPrefix(err.Error(), debugError)) + for { + select { + case <-s.ctx.Done(): + log.Debug("Exiting goroutine") return + case msg := <-s.blockBuf: + safelyHandleMessage(func(message p2p.Message) { + data := message.Data.(*pb.BeaconBlockResponse) + s.processBlock(message.Ctx, data.Block) + }, msg) + case msg := <-s.stateBuf: + safelyHandleMessage(s.processState, msg) + case msg := <-s.batchedBlockBuf: + safelyHandleMessage(s.processBatchedBlocks, msg) } - log.Errorf("Unable to save block: %v", err) } } - -// processBatchedBlocks processes all the received blocks from -// the p2p message. -func (s *InitialSync) processBatchedBlocks(msg p2p.Message) { - ctx, span := trace.StartSpan(msg.Ctx, "beacon-chain.sync.initial-sync.processBatchedBlocks") - defer span.End() - batchedBlockReq.Inc() - - response := msg.Data.(*pb.BatchedBeaconBlockResponse) - batchedBlocks := response.BatchedBlocks - if len(batchedBlocks) == 0 { - // Do not process empty response - return - } - - log.Debug("Processing batched block response") - for _, block := range batchedBlocks { - s.processBlock(ctx, block, msg.Peer) - } - log.Debug("Finished processing batched blocks") -} - -func (s *InitialSync) processState(msg p2p.Message) { - ctx, span := trace.StartSpan(msg.Ctx, "beacon-chain.sync.initial-sync.processState") - defer span.End() - data := msg.Data.(*pb.BeaconStateResponse) - beaconState := data.BeaconState - recState.Inc() - - if s.currentSlot > beaconState.FinalizedEpoch*params.BeaconConfig().SlotsPerEpoch { - return - } - - if err := s.db.SaveCurrentAndFinalizedState(ctx, beaconState); err != nil { - log.Errorf("Unable to set beacon state for initial sync %v", err) - return - } - - if err := s.db.SaveFinalizedBlock(beaconState.LatestBlock); err != nil { - log.Errorf("Could not save finalized block %v", err) - return - } - - if err := s.db.SaveBlock(beaconState.LatestBlock); err != nil { - log.Errorf("Could not save block %v", err) - return - } - - if err := s.db.UpdateChainHead(ctx, beaconState.LatestBlock, beaconState); err != nil { - log.Errorf("Could not update chainhead %v", err) - return - } - - if err := s.db.SaveJustifiedState(beaconState); err != nil { - log.Errorf("Could not set beacon state for initial sync %v", err) - return - } - - if err := s.db.SaveJustifiedBlock(beaconState.LatestBlock); err != nil { - log.Errorf("Could not save finalized block %v", err) - return - } - - h, err := hashutil.HashProto(beaconState) - if err != nil { - log.Error(err) - return - } - - exists, blkNum, err := s.powchain.BlockExists(ctx, bytesutil.ToBytes32(beaconState.LatestEth1Data.BlockHash32)) - if err != nil { - log.Errorf("Unable to get powchain block %v", err) - } - - if !exists { - log.Error("Latest ETH1 block doesn't exist in the pow chain") - return - } - - s.db.PrunePendingDeposits(ctx, blkNum) - - if h == s.stateRootOfHighestObservedSlot { - s.reqState = false - } - - // sets the current slot to the last finalized slot of the - // beacon state to begin our sync from. - s.currentSlot = beaconState.FinalizedEpoch * params.BeaconConfig().SlotsPerEpoch - s.beaconStateSlot = beaconState.Slot - log.Debugf("Successfully saved beacon state with the last finalized slot: %d", beaconState.FinalizedEpoch*params.BeaconConfig().SlotsPerEpoch-params.BeaconConfig().GenesisSlot) - - s.requestBatchedBlocks(s.currentSlot+1, s.highestObservedSlot) -} - -// requestStateFromPeer always requests for the last finalized slot from a peer. -func (s *InitialSync) requestStateFromPeer(ctx context.Context, stateRoot []byte, peerID peer.ID) error { - ctx, span := trace.StartSpan(ctx, "beacon-chain.sync.initial-sync.requestStateFromPeer") - defer span.End() - stateReq.Inc() - log.Debugf("Successfully processed incoming block with state hash: %#x", stateRoot) - return s.p2p.Send(ctx, &pb.BeaconStateRequest{FinalizedStateRootHash32S: stateRoot}, peerID) -} - -// requestNextBlock broadcasts a request for a block with the entered slotnumber. -func (s *InitialSync) requestNextBlockBySlot(ctx context.Context, slotNumber uint64) { - ctx, span := trace.StartSpan(ctx, "beacon-chain.sync.initial-sync.requestBlockBySlot") - defer span.End() - log.Debugf("Requesting block %d ", slotNumber) - blockReqSlot.Inc() - s.mutex.Lock() - defer s.mutex.Unlock() - if block, ok := s.inMemoryBlocks[slotNumber]; ok { - s.processBlock(ctx, block, p2p.AnyPeer) - return - } - s.p2p.Broadcast(ctx, &pb.BeaconBlockRequestBySlotNumber{SlotNumber: slotNumber}) -} - -// requestBatchedBlocks sends out a request for multiple blocks till a -// specified bound slot number. -func (s *InitialSync) requestBatchedBlocks(startSlot uint64, endSlot uint64) { - ctx, span := trace.StartSpan(context.Background(), "beacon-chain.sync.initial-sync.requestBatchedBlocks") - defer span.End() - sentBatchedBlockReq.Inc() - if startSlot > endSlot { - log.Debugf("Invalid batched request from slot %d to %d", startSlot-params.BeaconConfig().GenesisSlot, endSlot-params.BeaconConfig().GenesisSlot) - return - } - blockLimit := params.BeaconConfig().BatchBlockLimit - if startSlot+blockLimit < endSlot { - endSlot = startSlot + blockLimit - } - log.Debugf("Requesting batched blocks from slot %d to %d", startSlot-params.BeaconConfig().GenesisSlot, endSlot-params.BeaconConfig().GenesisSlot) - s.p2p.Broadcast(ctx, &pb.BatchedBeaconBlockRequest{ - StartSlot: startSlot, - EndSlot: endSlot, - }) -} - -// validateAndSaveNextBlock will validate whether blocks received from the blockfetcher -// routine can be added to the chain. -func (s *InitialSync) validateAndSaveNextBlock(ctx context.Context, block *pb.BeaconBlock) error { - ctx, span := trace.StartSpan(ctx, "beacon-chain.sync.initial-sync.validateAndSaveNextBlock") - defer span.End() - root, err := hashutil.HashBeaconBlock(block) - if err != nil { - return err - } - if err := s.checkBlockValidity(ctx, block); err != nil { - return err - } - log.Infof("Saved block with root %#x and slot %d for initial sync", root, block.Slot) - s.currentSlot = block.Slot - - s.mutex.Lock() - defer s.mutex.Unlock() - // delete block from memory. - if _, ok := s.inMemoryBlocks[block.Slot]; ok { - delete(s.inMemoryBlocks, block.Slot) - } - // since the block will not be processed by chainservice we save - // the block and do not send it to chainservice. - if s.beaconStateSlot >= block.Slot { - if err := s.db.SaveBlock(block); err != nil { - return err - } - return nil - } - - // Send block to main chain service to be processed. - beaconState, err := s.db.State(ctx) - if err != nil { - return fmt.Errorf("could not fetch state: %v", err) - } - if err := s.chainService.VerifyBlockValidity(block, beaconState); err != nil { - return fmt.Errorf("block not valid: %v", err) - } - if err := s.db.SaveBlock(block); err != nil { - return fmt.Errorf("could not save block: %v", err) - } - beaconState, err = s.chainService.ApplyBlockStateTransition(ctx, block, beaconState) - if err != nil { - return fmt.Errorf("could not process beacon block: %v", err) - } - if err := s.chainService.ApplyForkChoiceRule(ctx, block, beaconState); err != nil { - return fmt.Errorf("could not apply fork choice rule: %v", err) - } - return nil -} - -func (s *InitialSync) checkBlockValidity(ctx context.Context, block *pb.BeaconBlock) error { - ctx, span := trace.StartSpan(ctx, "beacon-chain.sync.initial-sync.checkBlockValidity") - defer span.End() - blockRoot, err := hashutil.HashBeaconBlock(block) - if err != nil { - return fmt.Errorf("could not tree hash received block: %v", err) - } - - if s.db.HasBlock(blockRoot) { - return errors.New(debugError + "received a block that already exists. Exiting") - } - - beaconState, err := s.db.State(ctx) - if err != nil { - return fmt.Errorf("failed to get beacon state: %v", err) - } - - if block.Slot < beaconState.FinalizedEpoch*params.BeaconConfig().SlotsPerEpoch { - return errors.New(debugError + "discarding received block with a slot number smaller than the last finalized slot") - } - // Attestation from proposer not verified as, other nodes only store blocks not proposer - // attestations. - return nil -} - -// isSlotDiff large checks if the difference between the current slot and highest observed -// slot isnt too large. -func (s *InitialSync) isSlotDiffLarge() bool { - slotsPerEpoch := params.BeaconConfig().SlotsPerEpoch - epochLimit := params.BeaconConfig().SyncEpochLimit - return s.currentSlot+slotsPerEpoch*epochLimit < s.highestObservedSlot -} - -func (s *InitialSync) doesParentExist(block *pb.BeaconBlock) bool { - parentHash := bytesutil.ToBytes32(block.ParentRootHash32) - return s.db.HasBlock(parentHash) -} diff --git a/beacon-chain/sync/initial-sync/service_test.go b/beacon-chain/sync/initial-sync/service_test.go index aa7459b8f7..f2272fe8ea 100644 --- a/beacon-chain/sync/initial-sync/service_test.go +++ b/beacon-chain/sync/initial-sync/service_test.go @@ -2,14 +2,14 @@ package initialsync import ( "context" - "fmt" + "math/big" "testing" "time" + "github.com/ethereum/go-ethereum/common" "github.com/gogo/protobuf/proto" peer "github.com/libp2p/go-libp2p-peer" b "github.com/prysmaticlabs/prysm/beacon-chain/core/blocks" - "github.com/prysmaticlabs/prysm/beacon-chain/core/state" "github.com/prysmaticlabs/prysm/beacon-chain/db" "github.com/prysmaticlabs/prysm/beacon-chain/internal" pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" @@ -51,21 +51,39 @@ func (ms *mockSyncService) ResumeSync() { } +type mockPowchain struct{} + +func (mp *mockPowchain) BlockExists(ctx context.Context, hash common.Hash) (bool, *big.Int, error) { + return true, nil, nil +} + type mockChainService struct{} -func (m *mockChainService) ApplyBlockStateTransition( +func (ms *mockChainService) CanonicalBlockFeed() *event.Feed { + return new(event.Feed) +} + +func (ms *mockChainService) ReceiveBlock(ctx context.Context, block *pb.BeaconBlock) (*pb.BeaconState, error) { + return &pb.BeaconState{}, nil +} + +func (ms *mockChainService) ApplyBlockStateTransition( ctx context.Context, block *pb.BeaconBlock, beaconState *pb.BeaconState, ) (*pb.BeaconState, error) { return &pb.BeaconState{}, nil } -func (m *mockChainService) VerifyBlockValidity( +func (ms *mockChainService) VerifyBlockValidity( block *pb.BeaconBlock, beaconState *pb.BeaconState, ) error { return nil } -func (m *mockChainService) ApplyForkChoiceRule(ctx context.Context, block *pb.BeaconBlock, computedState *pb.BeaconState) error { +func (ms *mockChainService) ApplyForkChoiceRule(ctx context.Context, block *pb.BeaconBlock, computedState *pb.BeaconState) error { + return nil +} + +func (ms *mockChainService) CleanupBlockOperations(ctx context.Context, block *pb.BeaconBlock) error { return nil } @@ -97,52 +115,72 @@ func setUpGenesisStateAndBlock(beaconDB *db.BeaconDB, t *testing.T) { func TestSavingBlock_InSync(t *testing.T) { hook := logTest.NewGlobal() db := internal.SetupDB(t) - defer internal.TeardownDB(t, db) setUpGenesisStateAndBlock(db, t) cfg := &Config{ P2P: &mockP2P{}, SyncService: &mockSyncService{}, - BeaconDB: db, ChainService: &mockChainService{}, + BeaconDB: db, + PowChain: &mockPowchain{}, } ss := NewInitialSyncService(context.Background(), cfg) - ss.reqState = false exitRoutine := make(chan bool) - delayChan := make(chan time.Time) defer func() { close(exitRoutine) - close(delayChan) }() go func() { - ss.run(delayChan) + ss.run() + ss.listenForNewBlocks() exitRoutine <- true }() genericHash := make([]byte, 32) genericHash[0] = 'a' - beaconState := &pb.BeaconState{ - FinalizedEpoch: params.BeaconConfig().GenesisSlot + 1, + fState := &pb.BeaconState{ + FinalizedEpoch: params.BeaconConfig().GenesisEpoch + 1, + LatestBlock: &pb.BeaconBlock{ + Slot: params.BeaconConfig().GenesisSlot + params.BeaconConfig().SlotsPerEpoch, + }, + LatestEth1Data: &pb.Eth1Data{ + BlockHash32: []byte{}, + }, + } + jState := &pb.BeaconState{ + JustifiedEpoch: params.BeaconConfig().GenesisEpoch + 2, + LatestBlock: &pb.BeaconBlock{ + Slot: params.BeaconConfig().GenesisSlot + 2*params.BeaconConfig().SlotsPerEpoch, + }, } stateResponse := &pb.BeaconStateResponse{ - BeaconState: beaconState, + FinalizedState: fState, + JustifiedState: jState, + CanonicalState: jState, } incorrectState := &pb.BeaconState{ - FinalizedEpoch: params.BeaconConfig().GenesisSlot, - JustifiedEpoch: params.BeaconConfig().GenesisSlot + 1, + FinalizedEpoch: params.BeaconConfig().GenesisEpoch, + JustifiedEpoch: params.BeaconConfig().GenesisEpoch + 1, + LatestBlock: &pb.BeaconBlock{ + Slot: params.BeaconConfig().GenesisSlot + 4*params.BeaconConfig().SlotsPerEpoch, + }, + LatestEth1Data: &pb.Eth1Data{ + BlockHash32: []byte{}, + }, } incorrectStateResponse := &pb.BeaconStateResponse{ - BeaconState: incorrectState, + FinalizedState: incorrectState, + JustifiedState: incorrectState, + CanonicalState: incorrectState, } - stateRoot, err := hashutil.HashProto(beaconState) + stateRoot, err := hashutil.HashProto(fState) if err != nil { t.Fatalf("unable to tree hash state: %v", err) } @@ -187,19 +225,12 @@ func TestSavingBlock_InSync(t *testing.T) { ss.stateBuf <- msg2 - if ss.currentSlot == incorrectStateResponse.BeaconState.FinalizedEpoch*params.BeaconConfig().SlotsPerEpoch { - t.Fatalf("Beacon state updated incorrectly: %d", ss.currentSlot) - } - msg2.Data = stateResponse ss.stateBuf <- msg2 msg1 = getBlockResponseMsg(params.BeaconConfig().GenesisSlot + 1) ss.blockBuf <- msg1 - if params.BeaconConfig().GenesisSlot+1 != ss.currentSlot { - t.Fatalf("Slot saved when it was not supposed too: %v", stateResponse.BeaconState.FinalizedEpoch*params.BeaconConfig().SlotsPerEpoch) - } msg1 = getBlockResponseMsg(params.BeaconConfig().GenesisSlot + 2) ss.blockBuf <- msg1 @@ -207,13 +238,8 @@ func TestSavingBlock_InSync(t *testing.T) { ss.cancel() <-exitRoutine - br := msg1.Data.(*pb.BeaconBlockResponse) - - if br.Block.Slot != ss.currentSlot { - t.Fatalf("Slot not updated despite receiving a valid block: %v", ss.currentSlot) - } - hook.Reset() + internal.TeardownDB(t, db) } func TestProcessingBatchedBlocks_OK(t *testing.T) { @@ -224,11 +250,10 @@ func TestProcessingBatchedBlocks_OK(t *testing.T) { cfg := &Config{ P2P: &mockP2P{}, SyncService: &mockSyncService{}, - BeaconDB: db, ChainService: &mockChainService{}, + BeaconDB: db, } ss := NewInitialSyncService(context.Background(), cfg) - ss.reqState = false batchSize := 20 batchedBlocks := make([]*pb.BeaconBlock, batchSize) @@ -250,10 +275,10 @@ func TestProcessingBatchedBlocks_OK(t *testing.T) { ss.processBatchedBlocks(msg) if ss.currentSlot != expectedSlot { - t.Errorf("Expected slot %d not equal to current slot %d", expectedSlot, ss.currentSlot) + t.Errorf("Expected slot %d equal to current slot %d", expectedSlot, ss.currentSlot) } - if ss.highestObservedSlot != expectedSlot { + if ss.highestObservedSlot == expectedSlot { t.Errorf("Expected slot %d not equal to highest observed slot slot %d", expectedSlot, ss.highestObservedSlot) } } @@ -266,14 +291,17 @@ func TestProcessingBlocks_SkippedSlots(t *testing.T) { cfg := &Config{ P2P: &mockP2P{}, SyncService: &mockSyncService{}, - BeaconDB: db, ChainService: &mockChainService{}, + BeaconDB: db, } ss := NewInitialSyncService(context.Background(), cfg) - ss.reqState = false batchSize := 20 expectedSlot := params.BeaconConfig().GenesisSlot + uint64(batchSize) + ss.highestObservedSlot = expectedSlot + ss.highestObservedCanonicalState = &pb.BeaconState{ + Slot: expectedSlot, + } blk, err := ss.db.BlockBySlot(params.BeaconConfig().GenesisSlot) if err != nil { t.Fatalf("Unable to get genesis block %v", err) @@ -294,7 +322,7 @@ func TestProcessingBlocks_SkippedSlots(t *testing.T) { ParentRootHash32: parentHash, } - ss.processBlock(context.Background(), block, p2p.AnyPeer) + ss.processBlock(context.Background(), block) // Save the block and set the parent hash of the next block // as the hash of the current block. @@ -307,207 +335,18 @@ func TestProcessingBlocks_SkippedSlots(t *testing.T) { t.Fatalf("Could not hash block %v", err) } parentHash = hash[:] + ss.latestSyncedBlock = block } if ss.currentSlot != expectedSlot { - t.Errorf("Expected slot %d not equal to current slot %d", expectedSlot, ss.currentSlot) + t.Errorf("Expected slot %d equal to current slot %d", expectedSlot, ss.currentSlot) } if ss.highestObservedSlot != expectedSlot { - t.Errorf("Expected slot %d not equal to highest observed slot %d", expectedSlot, ss.highestObservedSlot) + t.Errorf("Expected slot %d equal to highest observed slot %d", expectedSlot, ss.highestObservedSlot) } } -func TestDelayChan_OK(t *testing.T) { - hook := logTest.NewGlobal() - db := internal.SetupDB(t) - defer internal.TeardownDB(t, db) - setUpGenesisStateAndBlock(db, t) - - cfg := &Config{ - P2P: &mockP2P{}, - SyncService: &mockSyncService{}, - BeaconDB: db, - ChainService: &mockChainService{}, - } - ss := NewInitialSyncService(context.Background(), cfg) - ss.reqState = false - - exitRoutine := make(chan bool) - delayChan := make(chan time.Time) - - defer func() { - close(exitRoutine) - close(delayChan) - }() - - go func() { - ss.run(delayChan) - exitRoutine <- true - }() - - genericHash := make([]byte, 32) - genericHash[0] = 'a' - - beaconState := &pb.BeaconState{ - FinalizedEpoch: params.BeaconConfig().GenesisSlot + 1, - } - - stateResponse := &pb.BeaconStateResponse{ - BeaconState: beaconState, - } - - stateRoot, err := hashutil.HashProto(beaconState) - if err != nil { - t.Fatalf("unable to tree hash state: %v", err) - } - beaconStateRootHash32 := stateRoot - - block := &pb.BeaconBlock{ - Eth1Data: &pb.Eth1Data{ - DepositRootHash32: []byte{1, 2, 3}, - BlockHash32: []byte{4, 5, 6}, - }, - ParentRootHash32: genericHash, - Slot: params.BeaconConfig().GenesisSlot + 1, - StateRootHash32: beaconStateRootHash32[:], - } - - blockResponse := &pb.BeaconBlockResponse{ - Block: block, - } - - msg1 := p2p.Message{ - Peer: "", - Data: blockResponse, - Ctx: context.Background(), - } - - msg2 := p2p.Message{ - Peer: "", - Data: stateResponse, - Ctx: context.Background(), - } - - ss.blockBuf <- msg1 - - ss.stateBuf <- msg2 - - blockResponse.Block.Slot = params.BeaconConfig().GenesisSlot + 1 - msg1.Data = blockResponse - - ss.blockBuf <- msg1 - - delayChan <- time.Time{} - - ss.cancel() - <-exitRoutine - - testutil.AssertLogsContain(t, hook, "Exiting initial sync and starting normal sync") - - hook.Reset() -} - -func TestRequestBlocksBySlot_OK(t *testing.T) { - hook := logTest.NewGlobal() - db := internal.SetupDB(t) - defer internal.TeardownDB(t, db) - setUpGenesisStateAndBlock(db, t) - ctx := context.Background() - - cfg := &Config{ - P2P: &mockP2P{}, - SyncService: &mockSyncService{}, - ChainService: &mockChainService{}, - BeaconDB: db, - BlockBufferSize: 100, - } - ss := NewInitialSyncService(context.Background(), cfg) - newState, err := state.GenesisBeaconState(nil, 0, nil) - if err != nil { - t.Fatalf("could not create new state %v", err) - } - - err = ss.db.SaveState(ctx, newState) - if err != nil { - t.Fatalf("could not save beacon state %v", err) - } - - ss.reqState = false - - exitRoutine := make(chan bool) - delayChan := make(chan time.Time) - - defer func() { - close(exitRoutine) - close(delayChan) - }() - - go func() { - ss.run(delayChan) - exitRoutine <- true - }() - genericHash := make([]byte, 32) - genericHash[0] = 'a' - - getBlockResponseMsg := func(Slot uint64) (p2p.Message, [32]byte) { - - block := &pb.BeaconBlock{ - Eth1Data: &pb.Eth1Data{ - DepositRootHash32: []byte{1, 2, 3}, - BlockHash32: []byte{4, 5, 6}, - }, - ParentRootHash32: genericHash, - Slot: Slot, - StateRootHash32: nil, - } - - blockResponse := &pb.BeaconBlockResponse{ - Block: block, - } - - root, err := hashutil.HashBeaconBlock(block) - if err != nil { - t.Fatalf("unable to tree hash block %v", err) - } - - return p2p.Message{ - Peer: "", - Data: blockResponse, - Ctx: context.Background(), - }, root - } - - // sending all blocks except for the genesis block - startSlot := 1 + params.BeaconConfig().GenesisSlot - for i := startSlot; i < startSlot+10; i++ { - response, _ := getBlockResponseMsg(i) - ss.blockBuf <- response - } - - initialResponse, _ := getBlockResponseMsg(1 + params.BeaconConfig().GenesisSlot) - - //sending genesis block - ss.blockBuf <- initialResponse - - _, hash := getBlockResponseMsg(9 + params.BeaconConfig().GenesisSlot) - - expString := fmt.Sprintf("Saved block with root %#x and slot %d for initial sync", - hash, 9+params.BeaconConfig().GenesisSlot) - - // waiting for the current slot to come up to the - // expected one. - testutil.WaitForLog(t, hook, expString) - - delayChan <- time.Time{} - - ss.cancel() - <-exitRoutine - - testutil.AssertLogsContain(t, hook, "Exiting initial sync and starting normal sync") - - hook.Reset() -} func TestSafelyHandleMessage(t *testing.T) { hook := logTest.NewGlobal() diff --git a/beacon-chain/sync/initial-sync/sync_blocks.go b/beacon-chain/sync/initial-sync/sync_blocks.go new file mode 100644 index 0000000000..ec32c502f7 --- /dev/null +++ b/beacon-chain/sync/initial-sync/sync_blocks.go @@ -0,0 +1,172 @@ +package initialsync + +import ( + "context" + "errors" + "strings" + + pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" + "github.com/prysmaticlabs/prysm/shared/hashutil" + "github.com/prysmaticlabs/prysm/shared/p2p" + "github.com/prysmaticlabs/prysm/shared/params" + "go.opencensus.io/trace" +) + +func (s *InitialSync) processBlockAnnounce(msg p2p.Message) { + _, span := trace.StartSpan(msg.Ctx, "beacon-chain.sync.initial-sync.processBlockAnnounce") + defer span.End() + data := msg.Data.(*pb.BeaconBlockAnnounce) + recBlockAnnounce.Inc() + + if s.stateReceived && data.SlotNumber > s.highestObservedSlot { + s.requestBatchedBlocks(s.lastRequestedSlot, data.SlotNumber) + s.lastRequestedSlot = data.SlotNumber + } +} + +// processBlock is the main method that validates each block which is received +// for initial sync. It checks if the blocks are valid and then will continue to +// process and save it into the db. +func (s *InitialSync) processBlock(ctx context.Context, block *pb.BeaconBlock) { + ctx, span := trace.StartSpan(ctx, "beacon-chain.sync.initial-sync.processBlock") + defer span.End() + recBlock.Inc() + + if block.Slot == s.highestObservedSlot { + s.currentSlot = s.highestObservedSlot + if err := s.exitInitialSync(s.ctx); err != nil { + log.Errorf("Could not exit initial sync: %v", err) + return + } + return + } + + if block.Slot < s.currentSlot { + return + } + + // if it isn't the block in the next slot we check if it is a skipped slot. + // if it isn't skipped we save it in memory. + if block.Slot != (s.currentSlot + 1) { + // if parent exists we validate the block. + if s.doesParentExist(block) { + if err := s.validateAndSaveNextBlock(ctx, block); err != nil { + // Debug error so as not to have noisy error logs + if strings.HasPrefix(err.Error(), debugError) { + log.Debug(strings.TrimPrefix(err.Error(), debugError)) + return + } + log.Errorf("Unable to save block: %v", err) + } + return + } + s.mutex.Lock() + defer s.mutex.Unlock() + if _, ok := s.inMemoryBlocks[block.Slot]; !ok { + s.inMemoryBlocks[block.Slot] = block + } + return + } + + if err := s.validateAndSaveNextBlock(ctx, block); err != nil { + // Debug error so as not to have noisy error logs + if strings.HasPrefix(err.Error(), debugError) { + log.Debug(strings.TrimPrefix(err.Error(), debugError)) + return + } + log.Errorf("Unable to save block: %v", err) + } +} + +// processBatchedBlocks processes all the received blocks from +// the p2p message. +func (s *InitialSync) processBatchedBlocks(msg p2p.Message) { + ctx, span := trace.StartSpan(msg.Ctx, "beacon-chain.sync.initial-sync.processBatchedBlocks") + defer span.End() + batchedBlockReq.Inc() + + response := msg.Data.(*pb.BatchedBeaconBlockResponse) + batchedBlocks := response.BatchedBlocks + if len(batchedBlocks) == 0 { + // Do not process empty responses. + return + } + + log.Debug("Processing batched block response") + for _, block := range batchedBlocks { + s.processBlock(ctx, block) + } + log.Debug("Finished processing batched blocks") +} + +// requestBatchedBlocks sends out a request for multiple blocks till a +// specified bound slot number. +func (s *InitialSync) requestBatchedBlocks(startSlot uint64, endSlot uint64) { + ctx, span := trace.StartSpan(context.Background(), "beacon-chain.sync.initial-sync.requestBatchedBlocks") + defer span.End() + sentBatchedBlockReq.Inc() + if startSlot > endSlot { + log.Debugf( + "Invalid batched request from slot %d to %d", + startSlot-params.BeaconConfig().GenesisSlot, endSlot-params.BeaconConfig().GenesisSlot, + ) + return + } + blockLimit := params.BeaconConfig().BatchBlockLimit + if startSlot+blockLimit < endSlot { + endSlot = startSlot + blockLimit + } + log.Debugf( + "Requesting batched blocks from slot %d to %d", + startSlot-params.BeaconConfig().GenesisSlot, endSlot-params.BeaconConfig().GenesisSlot, + ) + s.p2p.Broadcast(ctx, &pb.BatchedBeaconBlockRequest{ + StartSlot: startSlot, + EndSlot: endSlot, + }) +} + +// validateAndSaveNextBlock will validate whether blocks received from the blockfetcher +// routine can be added to the chain. +func (s *InitialSync) validateAndSaveNextBlock(ctx context.Context, block *pb.BeaconBlock) error { + ctx, span := trace.StartSpan(ctx, "beacon-chain.sync.initial-sync.validateAndSaveNextBlock") + defer span.End() + if block == nil { + return errors.New("received nil block") + } + root, err := hashutil.HashBeaconBlock(block) + if err != nil { + return err + } + if err := s.checkBlockValidity(ctx, block); err != nil { + return err + } + log.Infof("Saving block with root %#x and slot %d for initial sync", root, block.Slot-params.BeaconConfig().GenesisSlot) + s.currentSlot = block.Slot + s.latestSyncedBlock = block + + s.mutex.Lock() + defer s.mutex.Unlock() + // delete block from memory. + if _, ok := s.inMemoryBlocks[block.Slot]; ok { + delete(s.inMemoryBlocks, block.Slot) + } + state, err := s.db.State(ctx) + if err != nil { + return err + } + if err := s.chainService.VerifyBlockValidity(block, state); err != nil { + return err + } + if err := s.db.SaveBlock(block); err != nil { + return err + } + state, err = s.chainService.ApplyBlockStateTransition(ctx, block, state) + if err != nil { + return err + } + if err := s.chainService.CleanupBlockOperations(ctx, block); err != nil { + return err + } + return s.db.UpdateChainHead(ctx, block, state) +} diff --git a/beacon-chain/sync/initial-sync/sync_state.go b/beacon-chain/sync/initial-sync/sync_state.go new file mode 100644 index 0000000000..429b920b67 --- /dev/null +++ b/beacon-chain/sync/initial-sync/sync_state.go @@ -0,0 +1,111 @@ +package initialsync + +import ( + "context" + + pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" + "github.com/prysmaticlabs/prysm/shared/bytesutil" + "github.com/prysmaticlabs/prysm/shared/p2p" + "github.com/prysmaticlabs/prysm/shared/params" + "go.opencensus.io/trace" +) + +func (s *InitialSync) processState(msg p2p.Message) { + ctx, span := trace.StartSpan(msg.Ctx, "beacon-chain.sync.initial-sync.processState") + defer span.End() + data := msg.Data.(*pb.BeaconStateResponse) + finalizedState := data.FinalizedState + justifiedState := data.JustifiedState + canonicalState := data.CanonicalState + recState.Inc() + + if err := s.db.SaveFinalizedState(finalizedState); err != nil { + log.Errorf("Unable to set received last finalized state in db: %v", err) + return + } + + if err := s.db.SaveHistoricalState(finalizedState); err != nil { + log.Errorf("Could not save new historical state: %v", err) + return + } + + if err := s.db.SaveFinalizedBlock(finalizedState.LatestBlock); err != nil { + log.Errorf("Could not save finalized block %v", err) + return + } + + if err := s.db.SaveBlock(finalizedState.LatestBlock); err != nil { + log.Errorf("Could not save block %v", err) + return + } + + if err := s.db.SaveJustifiedState(justifiedState); err != nil { + log.Errorf("Could not set beacon state for initial sync %v", err) + return + } + + if err := s.db.SaveHistoricalState(justifiedState); err != nil { + log.Errorf("Could not save new historical state: %v", err) + return + } + + if err := s.db.SaveJustifiedBlock(justifiedState.LatestBlock); err != nil { + log.Errorf("Could not save finalized block %v", err) + return + } + + if err := s.db.SaveBlock(justifiedState.LatestBlock); err != nil { + log.Errorf("Could not save finalized block %v", err) + return + } + + exists, blkNum, err := s.powchain.BlockExists(ctx, bytesutil.ToBytes32(finalizedState.LatestEth1Data.BlockHash32)) + if err != nil { + log.Errorf("Unable to get powchain block %v", err) + } + + if !exists { + log.Error("Latest ETH1 block doesn't exist in the pow chain") + return + } + + s.db.PrunePendingDeposits(ctx, blkNum) + + if err := s.db.SaveBlock(canonicalState.LatestBlock); err != nil { + log.Errorf("Could not save block %v", err) + return + } + + if err := s.db.UpdateChainHead(ctx, finalizedState.LatestBlock, finalizedState); err != nil { + log.Errorf("Could not update chain head %v", err) + return + } + if err := s.db.SaveHistoricalState(canonicalState); err != nil { + log.Errorf("Could not save new historical state: %v", err) + return + } + + // sets the current slot to the last finalized slot of the + // beacon state to begin our sync from. + s.currentSlot = finalizedState.Slot + s.stateReceived = true + s.highestObservedCanonicalState = canonicalState + s.highestObservedSlot = canonicalState.Slot + log.Debugf( + "Successfully saved beacon state with the last finalized slot: %d, canonical slot: %d", + finalizedState.Slot-params.BeaconConfig().GenesisSlot, + canonicalState.Slot-params.BeaconConfig().GenesisSlot, + ) + s.requestBatchedBlocks(s.currentSlot+1, s.highestObservedSlot) + s.lastRequestedSlot = s.highestObservedSlot +} + +// requestStateFromPeer requests for the canonical state, finalized state, and justified state from a peer. +func (s *InitialSync) requestStateFromPeer(ctx context.Context, lastFinalizedRoot [32]byte) error { + ctx, span := trace.StartSpan(ctx, "beacon-chain.sync.initial-sync.requestStateFromPeer") + defer span.End() + stateReq.Inc() + return s.p2p.Send(ctx, &pb.BeaconStateRequest{ + FinalizedStateRootHash32S: lastFinalizedRoot[:], + }, p2p.AnyPeer) +} diff --git a/beacon-chain/sync/querier.go b/beacon-chain/sync/querier.go index 5aa896ed18..e5f1b16be0 100644 --- a/beacon-chain/sync/querier.go +++ b/beacon-chain/sync/querier.go @@ -120,7 +120,6 @@ func (q *Querier) Stop() error { } func (q *Querier) listenForStateInitialization() { - sub := q.chainService.StateInitializedFeed().Subscribe(q.chainStartBuf) defer sub.Unsubscribe() for { @@ -140,9 +139,7 @@ func (q *Querier) listenForStateInitialization() { } func (q *Querier) run() { - responseSub := q.p2p.Subscribe(&pb.ChainHeadResponse{}, q.responseBuf) - // Ticker so that service will keep on requesting for chain head // until they get a response. ticker := time.NewTicker(1 * time.Second) @@ -164,7 +161,7 @@ func (q *Querier) run() { q.RequestLatestHead() case msg := <-q.responseBuf: response := msg.Data.(*pb.ChainHeadResponse) - queryLog.Infof("Latest chain head is at slot: %d and hash %#x", response.Slot, response.Hash) + queryLog.Infof("Latest chain head is at slot: %d and hash %#x", response.Slot-params.BeaconConfig().GenesisSlot, response.Hash) q.currentHeadSlot = response.Slot q.currentHeadHash = response.Hash q.currentFinalizedStateRoot = bytesutil.ToBytes32(response.FinalizedStateRootHash32S) @@ -183,7 +180,7 @@ func (q *Querier) RequestLatestHead() { q.p2p.Broadcast(context.Background(), request) } -// IsSynced checks if the node is cuurently synced with the +// IsSynced checks if the node is currently synced with the // rest of the network. func (q *Querier) IsSynced() (bool, error) { if q.chainStarted && q.atGenesis { diff --git a/beacon-chain/sync/querier_test.go b/beacon-chain/sync/querier_test.go index 20acfddb1d..9eb87905d8 100644 --- a/beacon-chain/sync/querier_test.go +++ b/beacon-chain/sync/querier_test.go @@ -12,6 +12,7 @@ import ( pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" "github.com/prysmaticlabs/prysm/shared/event" "github.com/prysmaticlabs/prysm/shared/p2p" + "github.com/prysmaticlabs/prysm/shared/params" "github.com/prysmaticlabs/prysm/shared/testutil" logTest "github.com/sirupsen/logrus/hooks/test" ) @@ -139,20 +140,14 @@ func TestQuerier_ChainReqResponse(t *testing.T) { sq := NewQuerierService(context.Background(), cfg) exitRoutine := make(chan bool) - - defer func() { - close(exitRoutine) - }() - go func() { sq.run() exitRoutine <- true }() response := &pb.ChainHeadResponse{ - Slot: 0, - Hash: []byte{'a', 'b'}, - FinalizedStateRootHash32S: []byte{'c', 'd'}, + Slot: 0, + Hash: []byte{'a', 'b'}, } msg := p2p.Message{ @@ -161,12 +156,13 @@ func TestQuerier_ChainReqResponse(t *testing.T) { sq.responseBuf <- msg - expMsg := fmt.Sprintf("Latest chain head is at slot: %d and hash %#x", response.Slot, response.Hash) + expMsg := fmt.Sprintf( + "Latest chain head is at slot: %d and hash %#x", + response.Slot-params.BeaconConfig().GenesisSlot, response.Hash, + ) - testutil.WaitForLog(t, hook, expMsg) - - sq.cancel() <-exitRoutine - + testutil.AssertLogsContain(t, hook, expMsg) + close(exitRoutine) hook.Reset() } diff --git a/beacon-chain/sync/receive_block.go b/beacon-chain/sync/receive_block.go index 9fe89250d1..3b825a32c1 100644 --- a/beacon-chain/sync/receive_block.go +++ b/beacon-chain/sync/receive_block.go @@ -22,6 +22,12 @@ func (rs *RegularSync) receiveBlockAnnounce(msg p2p.Message) error { data := msg.Data.(*pb.BeaconBlockAnnounce) h := bytesutil.ToBytes32(data.Hash[:32]) + // This prevents us from processing a block announcement we have already received. + // TODO(#2072): If the peer failed to give the block, broadcast request to the whole network. + if _, ok := rs.blockAnnouncements[data.SlotNumber]; ok { + return nil + } + hasBlock := rs.db.HasBlock(h) span.AddAttributes(trace.BoolAttribute("hasBlock", hasBlock)) @@ -36,6 +42,7 @@ func (rs *RegularSync) receiveBlockAnnounce(msg p2p.Message) error { log.Error(err) return err } + rs.blockAnnouncements[data.SlotNumber] = data.Hash sentBlockReq.Inc() return nil } @@ -99,15 +106,12 @@ func (rs *RegularSync) receiveBlock(msg p2p.Message) error { beaconState, err = rs.chainService.ReceiveBlock(ctx, block) if err != nil { log.Errorf("Could not process beacon block: %v", err) - return err } if err := rs.chainService.ApplyForkChoiceRule(ctx, block, beaconState); err != nil { log.Errorf("could not apply fork choice rule: %v", err) return err } - // We clear the block root from the pending processing map. - rs.clearPendingBlock(blockRoot) sentBlocks.Inc() // We update the last observed slot to the received canonical block's slot. @@ -120,6 +124,8 @@ func (rs *RegularSync) receiveBlock(msg p2p.Message) error { // and call receiveBlock recursively. The recursive function call will stop once // the block we process no longer has children. if child, ok := rs.hasChild(blockRoot); ok { + // We clear the block root from the pending processing map. + rs.clearPendingBlock(blockRoot) return rs.receiveBlock(child) } return nil diff --git a/beacon-chain/sync/regular_sync.go b/beacon-chain/sync/regular_sync.go index 18e0824223..1786fba8ee 100644 --- a/beacon-chain/sync/regular_sync.go +++ b/beacon-chain/sync/regular_sync.go @@ -86,6 +86,7 @@ type RegularSync struct { highestObservedSlot uint64 blocksAwaitingProcessing map[[32]byte]p2p.Message blocksAwaitingProcessingLock sync.RWMutex + blockAnnouncements map[uint64][]byte } // RegularSyncConfig allows the channel's buffer sizes to be changed. @@ -152,6 +153,7 @@ func NewRegularSyncService(ctx context.Context, cfg *RegularSyncConfig) *Regular chainHeadReqBuf: make(chan p2p.Message, cfg.ChainHeadReqBufferSize), canonicalBuf: make(chan *pb.BeaconBlockAnnounce, cfg.CanonicalBufferSize), blocksAwaitingProcessing: make(map[[32]byte]p2p.Message), + blockAnnouncements: make(map[uint64][]byte), } } @@ -237,6 +239,7 @@ func (rs *RegularSync) run() { go rs.broadcastCanonicalBlock(rs.ctx, blockAnnounce) } } + log.Info("Exiting regular sync run()") } @@ -341,9 +344,26 @@ func (rs *RegularSync) handleStateRequest(msg p2p.Message) error { log.Debugf("Requested state root is different from locally stored state root %#x", req.FinalizedStateRootHash32S) return err } - log.WithField("beaconState", fmt.Sprintf("%#x", root)).Debug("Sending beacon state to peer") + log.WithField( + "beaconState", fmt.Sprintf("%#x", root), + ).Debug("Sending finalized, justified, and canonical states to peer") defer sentState.Inc() - if err := rs.p2p.Send(ctx, &pb.BeaconStateResponse{BeaconState: fState}, msg.Peer); err != nil { + jState, err := rs.db.JustifiedState() + if err != nil { + log.Errorf("Unable to retrieve justified state, %v", err) + return err + } + canonicalState, err := rs.db.State(ctx) + if err != nil { + log.Errorf("Unable to retrieve canonical beacon state, %v", err) + return err + } + resp := &pb.BeaconStateResponse{ + FinalizedState: fState, + JustifiedState: jState, + CanonicalState: canonicalState, + } + if err := rs.p2p.Send(ctx, resp, msg.Peer); err != nil { log.Error(err) return err } diff --git a/beacon-chain/sync/regular_sync_test.go b/beacon-chain/sync/regular_sync_test.go index 112be4a889..8b6336f686 100644 --- a/beacon-chain/sync/regular_sync_test.go +++ b/beacon-chain/sync/regular_sync_test.go @@ -84,6 +84,10 @@ func (ms *mockChainService) ApplyForkChoiceRule(ctx context.Context, block *pb.B return nil } +func (ms *mockChainService) CleanupBlockOperations(ctx context.Context, block *pb.BeaconBlock) error { + return nil +} + type mockOperationService struct{} func (ms *mockOperationService) IncomingProcessedBlockFeed() *event.Feed { @@ -687,6 +691,12 @@ func TestHandleStateReq_OK(t *testing.T) { if err != nil { t.Fatalf("could not attempt fetch beacon state: %v", err) } + if err := db.SaveJustifiedState(beaconState); err != nil { + t.Fatalf("could not save justified state: %v", err) + } + if err := db.SaveFinalizedState(beaconState); err != nil { + t.Fatalf("could not save justified state: %v", err) + } stateRoot, err := hashutil.HashProto(beaconState) if err != nil { t.Fatalf("could not hash beacon state: %v", err) @@ -707,5 +717,5 @@ func TestHandleStateReq_OK(t *testing.T) { if err := ss.handleStateRequest(msg1); err != nil { t.Error(err) } - testutil.AssertLogsContain(t, hook, "Sending beacon state to peer") + testutil.AssertLogsContain(t, hook, "Sending finalized, justified, and canonical states to peer") } diff --git a/beacon-chain/sync/service.go b/beacon-chain/sync/service.go index a7727b9c60..fab93a1ad7 100644 --- a/beacon-chain/sync/service.go +++ b/beacon-chain/sync/service.go @@ -31,7 +31,6 @@ type Config struct { // NewSyncService creates a new instance of SyncService using the config // given. func NewSyncService(ctx context.Context, cfg *Config) *Service { - sqCfg := DefaultQuerierConfig() sqCfg.BeaconDB = cfg.BeaconDB sqCfg.P2P = cfg.P2P @@ -41,8 +40,8 @@ func NewSyncService(ctx context.Context, cfg *Config) *Service { isCfg := initialsync.DefaultConfig() isCfg.BeaconDB = cfg.BeaconDB isCfg.P2P = cfg.P2P - isCfg.ChainService = cfg.ChainService isCfg.PowChain = cfg.PowChainService + isCfg.ChainService = cfg.ChainService rsCfg := DefaultRegularSyncConfig() rsCfg.ChainService = cfg.ChainService @@ -110,9 +109,7 @@ func (ss *Service) run() { // Sets the highest observed slot from querier. ss.InitialSync.InitializeObservedSlot(ss.Querier.currentHeadSlot) - // Sets the state root of the highest observed slot. - ss.InitialSync.InitializeStateRoot(ss.Querier.currentFinalizedStateRoot) - + ss.InitialSync.InitializeFinalizedStateRoot(ss.Querier.currentFinalizedStateRoot) ss.InitialSync.Start() } diff --git a/beacon-chain/sync/simulated_sync_test.go b/beacon-chain/sync/simulated_sync_test.go index 6723075b53..ff4700df5f 100644 --- a/beacon-chain/sync/simulated_sync_test.go +++ b/beacon-chain/sync/simulated_sync_test.go @@ -87,16 +87,34 @@ func setupSimBackendAndDB(t *testing.T) (*backend.SimulatedBackend, *db.BeaconDB t.Fatalf("Could not setup beacon db %v", err) } - if err := beacondb.SaveState(ctx, bd.State()); err != nil { - t.Fatalf("Could not save state %v", err) - } - memBlocks := bd.InMemoryBlocks() if err := beacondb.SaveBlock(memBlocks[0]); err != nil { t.Fatalf("Could not save block %v", err) } + if err := beacondb.SaveJustifiedBlock(memBlocks[0]); err != nil { + t.Fatalf("Could not save block %v", err) + } + if err := beacondb.SaveFinalizedBlock(memBlocks[0]); err != nil { + t.Fatalf("Could not save block %v", err) + } - if err := beacondb.UpdateChainHead(ctx, memBlocks[0], bd.State()); err != nil { + state := bd.State() + state.LatestBlock = memBlocks[0] + state.LatestEth1Data = &pb.Eth1Data{ + BlockHash32: []byte{}, + } + + if err := beacondb.SaveState(ctx, state); err != nil { + t.Fatalf("Could not save state %v", err) + } + if err := beacondb.SaveJustifiedState(state); err != nil { + t.Fatalf("Could not save state %v", err) + } + if err := beacondb.SaveFinalizedState(state); err != nil { + t.Fatalf("Could not save state %v", err) + } + + if err := beacondb.UpdateChainHead(ctx, memBlocks[0], state); err != nil { t.Fatalf("Could not update chain head %v", err) } @@ -152,10 +170,9 @@ func setUpSyncedService(numOfBlocks int, simP2P *simulatedP2P, t *testing.T) (*S } func setUpUnSyncedService(simP2P *simulatedP2P, stateRoot [32]byte, t *testing.T) (*Service, *db.BeaconDB) { - bd, beacondb, privKeys := setupSimBackendAndDB(t) + bd, beacondb, _ := setupSimBackendAndDB(t) defer bd.Shutdown() defer db.TeardownDB(bd.DB()) - ctx := context.Background() mockPow := &afterGenesisPowChain{ feed: new(event.Feed), @@ -168,21 +185,6 @@ func setUpUnSyncedService(simP2P *simulatedP2P, stateRoot [32]byte, t *testing.T db: bd.DB(), } - // we add in 2 blocks to the unsynced node so that, we dont request the beacon state from the - // synced node to reduce test time. - for i := 1; i <= 2; i++ { - if err := bd.GenerateBlockAndAdvanceChain(&backend.SimulatedObjects{}, privKeys); err != nil { - t.Fatalf("Unable to generate block in simulated backend %v", err) - } - blocks := bd.InMemoryBlocks() - if err := beacondb.SaveBlock(blocks[i]); err != nil { - t.Fatalf("Unable to save block %v", err) - } - if err := beacondb.UpdateChainHead(ctx, blocks[i], bd.State()); err != nil { - t.Fatalf("Unable to update chain head %v", err) - } - } - cfg := &Config{ ChainService: mockChain, BeaconDB: beacondb, @@ -197,9 +199,8 @@ func setUpUnSyncedService(simP2P *simulatedP2P, stateRoot [32]byte, t *testing.T for ss.Querier.currentHeadSlot == 0 { simP2P.Send(simP2P.ctx, &pb.ChainHeadResponse{ - Slot: params.BeaconConfig().GenesisSlot + 12, - Hash: []byte{'t', 'e', 's', 't'}, - FinalizedStateRootHash32S: stateRoot[:], + Slot: params.BeaconConfig().GenesisSlot + 12, + Hash: stateRoot[:], }, "") } @@ -242,29 +243,36 @@ func TestSyncing_AFullySyncedNode(t *testing.T) { defer us2.Stop() defer db.TeardownDB(unSyncedDB2) - syncedChan := make(chan uint64) - - // Waits for the unsynced node to fire a message signifying it is - // synced with its current slot number. - sub := us.InitialSync.SyncedFeed().Subscribe(syncedChan) - defer sub.Unsubscribe() - - syncedChan2 := make(chan uint64) - - sub2 := us2.InitialSync.SyncedFeed().Subscribe(syncedChan2) - defer sub2.Unsubscribe() - - highestSlot := <-syncedChan - - highestSlot2 := <-syncedChan2 - - if highestSlot != uint64(numOfBlocks)+params.BeaconConfig().GenesisSlot { - t.Errorf("Sync services didn't sync to expectecd slot, expected %d but got %d", - uint64(numOfBlocks)+params.BeaconConfig().GenesisSlot, highestSlot) + finalized, err := syncedDB.FinalizedState() + if err != nil { + t.Fatal(err) + } + justified, err := syncedDB.JustifiedState() + if err != nil { + t.Fatal(err) } - if highestSlot2 != uint64(numOfBlocks)+params.BeaconConfig().GenesisSlot { - t.Errorf("Sync services didn't sync to expectecd slot, expected %d but got %d", - uint64(numOfBlocks)+params.BeaconConfig().GenesisSlot, highestSlot2) + newP2P.Send(newP2P.ctx, &pb.BeaconStateResponse{ + FinalizedState: finalized, + JustifiedState: justified, + CanonicalState: bState, + }, "") + + timeout := time.After(10 * time.Second) + tick := time.Tick(200 * time.Millisecond) +loop: + for { + select { + case <-timeout: + t.Error("Could not sync in time") + break loop + case <-tick: + _, slot1 := us.InitialSync.NodeIsSynced() + _, slot2 := us.InitialSync.NodeIsSynced() + if slot1 == uint64(numOfBlocks)+params.BeaconConfig().GenesisSlot || + slot2 == uint64(numOfBlocks)+params.BeaconConfig().GenesisSlot { + break loop + } + } } } diff --git a/k8s/beacon-chain/beacon-config.config.yaml b/k8s/beacon-chain/beacon-config.config.yaml index c0738434f9..a2cafff880 100644 --- a/k8s/beacon-chain/beacon-config.config.yaml +++ b/k8s/beacon-chain/beacon-config.config.yaml @@ -4,4 +4,4 @@ metadata: name: beacon-config namespace: beacon-chain data: - DEPOSIT_CONTRACT_ADDRESS: "0xBb0A35B97b35E334f142b20132860F51A91c011F" + DEPOSIT_CONTRACT_ADDRESS: "0xEE18Fa5bcfE1ae7E2Dbd38A6D0b19B481325C60E" diff --git a/proto/beacon/p2p/v1/messages.pb.go b/proto/beacon/p2p/v1/messages.pb.go index eb9ea92fd2..40fb640a31 100755 --- a/proto/beacon/p2p/v1/messages.pb.go +++ b/proto/beacon/p2p/v1/messages.pb.go @@ -644,7 +644,9 @@ func (m *BeaconStateRequest) GetFinalizedStateRootHash32S() []byte { } type BeaconStateResponse struct { - BeaconState *BeaconState `protobuf:"bytes,1,opt,name=beacon_state,json=beaconState,proto3" json:"beacon_state,omitempty"` + FinalizedState *BeaconState `protobuf:"bytes,1,opt,name=finalized_state,json=finalizedState,proto3" json:"finalized_state,omitempty"` + JustifiedState *BeaconState `protobuf:"bytes,2,opt,name=justified_state,json=justifiedState,proto3" json:"justified_state,omitempty"` + CanonicalState *BeaconState `protobuf:"bytes,3,opt,name=canonical_state,json=canonicalState,proto3" json:"canonical_state,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -683,9 +685,23 @@ func (m *BeaconStateResponse) XXX_DiscardUnknown() { var xxx_messageInfo_BeaconStateResponse proto.InternalMessageInfo -func (m *BeaconStateResponse) GetBeaconState() *BeaconState { +func (m *BeaconStateResponse) GetFinalizedState() *BeaconState { if m != nil { - return m.BeaconState + return m.FinalizedState + } + return nil +} + +func (m *BeaconStateResponse) GetJustifiedState() *BeaconState { + if m != nil { + return m.JustifiedState + } + return nil +} + +func (m *BeaconStateResponse) GetCanonicalState() *BeaconState { + if m != nil { + return m.CanonicalState } return nil } @@ -1469,61 +1485,63 @@ func init() { func init() { proto.RegisterFile("proto/beacon/p2p/v1/messages.proto", fileDescriptor_a1d590cda035b632) } var fileDescriptor_a1d590cda035b632 = []byte{ - // 863 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x96, 0x51, 0x73, 0xdb, 0x44, - 0x10, 0xc7, 0x51, 0x93, 0xd4, 0xe9, 0xda, 0x09, 0xee, 0x05, 0x1a, 0x27, 0xd3, 0x3a, 0x89, 0x4a, - 0x87, 0xc0, 0x4c, 0x9d, 0x69, 0xfa, 0xd4, 0x27, 0x46, 0x72, 0x04, 0x6e, 0x1b, 0xe4, 0x22, 0xc9, - 0x30, 0x3c, 0x30, 0x87, 0x6c, 0x1f, 0xb5, 0x07, 0xe7, 0xee, 0xd0, 0x9d, 0x3d, 0x09, 0xc3, 0x2b, - 0x9f, 0x81, 0x4f, 0xc0, 0x77, 0xe1, 0x91, 0x8f, 0xc0, 0xe4, 0x93, 0x30, 0xba, 0x3b, 0x39, 0x8a, - 0xad, 0x28, 0x79, 0xe8, 0x9b, 0x6f, 0xf7, 0xbf, 0xff, 0xdd, 0xdf, 0x66, 0x35, 0x13, 0xb0, 0x79, - 0xc2, 0x24, 0x3b, 0xea, 0x93, 0x78, 0xc0, 0xe8, 0x11, 0x3f, 0xe6, 0x47, 0xb3, 0x17, 0x47, 0x67, - 0x44, 0x88, 0xf8, 0x3d, 0x11, 0x2d, 0x95, 0x44, 0x8f, 0x88, 0x1c, 0x91, 0x84, 0x4c, 0xcf, 0x5a, - 0x5a, 0xd6, 0xe2, 0xc7, 0xbc, 0x35, 0x7b, 0xb1, 0xbb, 0x57, 0x54, 0x2b, 0x2f, 0x78, 0x56, 0x68, - 0x7f, 0x03, 0xeb, 0x1e, 0x9d, 0x91, 0x09, 0xe3, 0x04, 0x1d, 0x40, 0x4d, 0xf0, 0x98, 0xe2, 0x01, - 0xa3, 0x92, 0x9c, 0xcb, 0x86, 0xb5, 0x6f, 0x1d, 0xd6, 0x82, 0x6a, 0x1a, 0x6b, 0xeb, 0x10, 0x6a, - 0x40, 0x85, 0xc7, 0x17, 0x13, 0x16, 0x0f, 0x1b, 0xf7, 0x54, 0x36, 0x7b, 0xda, 0x6f, 0x60, 0xcb, - 0x55, 0x5d, 0xdc, 0x09, 0x1b, 0xfc, 0xea, 0x50, 0xca, 0xa6, 0x74, 0x40, 0x10, 0x82, 0xd5, 0x51, - 0x2c, 0x46, 0xc6, 0x4b, 0xfd, 0x46, 0x7b, 0x50, 0x15, 0x13, 0x26, 0x31, 0x9d, 0x9e, 0xf5, 0x49, - 0xa2, 0x8c, 0x56, 0x03, 0x48, 0x43, 0xbe, 0x8a, 0xd8, 0x87, 0x80, 0x72, 0x5e, 0x01, 0xf9, 0x6d, - 0x4a, 0x84, 0x2c, 0xb2, 0xb2, 0x1d, 0x68, 0x2e, 0x2b, 0xdd, 0x8b, 0x70, 0xee, 0xb5, 0xd8, 0xcc, - 0x5a, 0x6a, 0xf6, 0x97, 0x75, 0x6d, 0xf2, 0x80, 0x08, 0xce, 0xa8, 0x20, 0xe8, 0x15, 0xac, 0xf5, - 0xd3, 0x80, 0x2a, 0xa9, 0x1e, 0x3f, 0x6d, 0x15, 0xaf, 0xb8, 0x95, 0xaf, 0xd5, 0x15, 0xc8, 0x83, - 0x6a, 0x2c, 0x25, 0x11, 0x32, 0x96, 0x63, 0x46, 0x15, 0x60, 0x89, 0x81, 0x73, 0x25, 0x0d, 0xf2, - 0x75, 0x76, 0x0f, 0x76, 0xdc, 0x58, 0x0e, 0x46, 0x64, 0x58, 0xb0, 0x8d, 0x27, 0x00, 0x42, 0xc6, - 0x89, 0xc4, 0x29, 0x8a, 0xc1, 0x7a, 0xa0, 0x22, 0x29, 0x3c, 0xda, 0x81, 0x75, 0x42, 0x87, 0x3a, - 0xa9, 0x17, 0x5c, 0x21, 0x74, 0x98, 0xa6, 0xec, 0x11, 0xec, 0x16, 0xd9, 0x1a, 0xec, 0x37, 0xb0, - 0xd9, 0xd7, 0x59, 0xac, 0x60, 0x44, 0xc3, 0xda, 0x5f, 0xb9, 0x2b, 0xff, 0x86, 0x29, 0x55, 0x2f, - 0x61, 0x23, 0xa8, 0xb7, 0x47, 0xf1, 0x98, 0x76, 0x48, 0x3c, 0x34, 0x73, 0xdb, 0x7f, 0xc0, 0xc3, - 0x5c, 0xcc, 0x34, 0x2d, 0xba, 0x12, 0x04, 0xab, 0xb9, 0xe9, 0xd5, 0x6f, 0xf4, 0x15, 0x3c, 0xfe, - 0x65, 0x4c, 0xe3, 0xc9, 0xf8, 0x77, 0x32, 0xc4, 0xe9, 0x9a, 0x08, 0x4e, 0x18, 0x93, 0x38, 0x2d, - 0x78, 0x79, 0x2c, 0x1a, 0x2b, 0xaa, 0x7e, 0x67, 0xae, 0x09, 0x53, 0x49, 0xc0, 0x98, 0xec, 0x68, - 0x81, 0xfd, 0x1c, 0xb6, 0xf5, 0xbc, 0x2a, 0x93, 0x46, 0xcb, 0x2e, 0xd5, 0xee, 0x65, 0x87, 0xa8, - 0x8d, 0xcc, 0xea, 0x6f, 0x9b, 0xc2, 0xba, 0x6d, 0x8a, 0x9f, 0xb2, 0x8b, 0x33, 0xb6, 0x66, 0x0b, - 0x5f, 0x43, 0x4d, 0xaf, 0x56, 0x9b, 0xde, 0xed, 0xf0, 0xb4, 0x45, 0xb5, 0x7f, 0xf5, 0xb0, 0xbf, - 0x80, 0xad, 0xdc, 0x4d, 0x95, 0x02, 0x1e, 0x02, 0xca, 0x9f, 0x5f, 0xc9, 0x97, 0xc6, 0xaf, 0x99, - 0x96, 0xfe, 0xe5, 0x3e, 0xd0, 0xf9, 0xb7, 0xa0, 0xf1, 0x2e, 0x61, 0x9c, 0x09, 0x92, 0x84, 0x93, - 0x58, 0x8c, 0xc6, 0xf4, 0x7d, 0x29, 0xcb, 0x73, 0xd8, 0x5e, 0xd4, 0x97, 0x01, 0xfd, 0x69, 0x2d, - 0xfb, 0x97, 0x62, 0xf5, 0xe0, 0x21, 0x37, 0x7a, 0x2c, 0x4c, 0x81, 0x81, 0x3b, 0xbc, 0x09, 0x6e, - 0xa9, 0x41, 0x9d, 0x2f, 0x44, 0x52, 0x4c, 0xbd, 0x82, 0xbb, 0x63, 0x2e, 0xea, 0x6f, 0xc3, 0x5c, - 0xd6, 0x97, 0x63, 0x66, 0xfa, 0x3b, 0x63, 0x2e, 0x35, 0xa8, 0x2f, 0x46, 0xec, 0x67, 0xf0, 0xf1, - 0x09, 0xe1, 0x4c, 0x8c, 0x65, 0x29, 0xdd, 0x67, 0xb0, 0x69, 0x64, 0x65, 0x50, 0x3f, 0xcf, 0xcd, - 0x4a, 0x51, 0x5e, 0x41, 0x65, 0xa8, 0x65, 0x06, 0x60, 0xef, 0x26, 0x80, 0xcc, 0x2d, 0xd3, 0xdb, - 0x36, 0xd4, 0xbc, 0xf3, 0x5b, 0x66, 0x3d, 0x80, 0x6a, 0xaa, 0x29, 0xff, 0x6a, 0x6a, 0x5a, 0x52, - 0x32, 0xe5, 0x29, 0x6c, 0xce, 0xd8, 0x64, 0x4a, 0x65, 0x9c, 0x5c, 0x60, 0x72, 0x3e, 0x1f, 0xf6, - 0xd9, 0x4d, 0xc3, 0x7e, 0x9f, 0xa9, 0x95, 0xf5, 0xc6, 0x2c, 0xff, 0xfc, 0xf2, 0xef, 0x15, 0x58, - 0x8b, 0x18, 0x1f, 0x0f, 0x50, 0x15, 0x2a, 0x3d, 0xff, 0xad, 0xdf, 0xfd, 0xc1, 0xaf, 0x7f, 0x84, - 0x76, 0xe0, 0x53, 0xd7, 0x73, 0xda, 0x5d, 0x1f, 0xbb, 0xa7, 0xdd, 0xf6, 0x5b, 0xec, 0xf8, 0x7e, - 0xb7, 0xe7, 0xb7, 0xbd, 0xba, 0x85, 0x1a, 0xf0, 0xc9, 0xb5, 0x54, 0xe0, 0x7d, 0xd7, 0xf3, 0xc2, - 0xa8, 0x7e, 0x0f, 0x7d, 0x0e, 0x4f, 0x8b, 0x32, 0xd8, 0xfd, 0x11, 0x87, 0xa7, 0xdd, 0x08, 0xfb, - 0xbd, 0x6f, 0x5d, 0x2f, 0xa8, 0xaf, 0x2c, 0xb9, 0x07, 0x5e, 0xf8, 0xae, 0xeb, 0x87, 0x5e, 0x7d, - 0x15, 0xed, 0xc3, 0x63, 0xd7, 0x89, 0xda, 0x1d, 0xef, 0x04, 0x17, 0x76, 0x59, 0x43, 0x07, 0xf0, - 0xe4, 0x06, 0x85, 0x31, 0xb9, 0x8f, 0x1e, 0x01, 0x6a, 0x77, 0x9c, 0xd7, 0x3e, 0xee, 0x78, 0xce, - 0xc9, 0xbc, 0xb4, 0x82, 0xb6, 0x61, 0xeb, 0x5a, 0xdc, 0x14, 0xac, 0xa3, 0x26, 0xec, 0x1a, 0xaf, - 0x30, 0x72, 0x22, 0x0f, 0x77, 0x9c, 0xb0, 0x73, 0xc5, 0xfc, 0x20, 0xc7, 0xac, 0xf3, 0x99, 0x25, - 0xe4, 0x50, 0xb2, 0x8c, 0x31, 0xad, 0xa6, 0x45, 0x4e, 0x14, 0x79, 0x69, 0xfc, 0x75, 0xd7, 0xbf, - 0xb2, 0xab, 0xa5, 0x73, 0xe4, 0x33, 0x99, 0xdb, 0xc6, 0x62, 0xc9, 0xdc, 0x6c, 0xd3, 0xad, 0xfd, - 0x73, 0xd9, 0xb4, 0xfe, 0xbd, 0x6c, 0x5a, 0xff, 0x5d, 0x36, 0xad, 0xfe, 0x7d, 0xf5, 0xdf, 0xd8, - 0xcb, 0xff, 0x03, 0x00, 0x00, 0xff, 0xff, 0x7c, 0xb5, 0x2a, 0x92, 0xec, 0x09, 0x00, 0x00, + // 893 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x96, 0xdf, 0x72, 0xdb, 0x44, + 0x14, 0xc6, 0x51, 0x9c, 0xd4, 0xe9, 0xb1, 0xe3, 0xba, 0x1b, 0x68, 0x9c, 0x4c, 0xeb, 0x24, 0x2a, + 0x1d, 0x02, 0x33, 0x75, 0xa6, 0xe9, 0x55, 0xaf, 0x18, 0xc9, 0xd1, 0xe0, 0xb6, 0x41, 0x2e, 0x92, + 0x0d, 0xc3, 0xd5, 0xb2, 0xb6, 0xb7, 0xb5, 0xc0, 0xd9, 0x15, 0xde, 0xb5, 0x27, 0x61, 0xb8, 0xe5, + 0x19, 0x78, 0x02, 0xde, 0x85, 0x4b, 0x1e, 0x81, 0xc9, 0x8b, 0xc0, 0x68, 0xb5, 0x52, 0xe4, 0x3f, + 0x51, 0x9c, 0x99, 0xde, 0x49, 0xe7, 0x7c, 0xdf, 0xef, 0x9c, 0x6f, 0xb3, 0xca, 0x18, 0xcc, 0x70, + 0xcc, 0x25, 0x3f, 0xee, 0x51, 0xd2, 0xe7, 0xec, 0x38, 0x3c, 0x09, 0x8f, 0xa7, 0x2f, 0x8e, 0xcf, + 0xa9, 0x10, 0xe4, 0x03, 0x15, 0x0d, 0xd5, 0x44, 0x8f, 0xa8, 0x1c, 0xd2, 0x31, 0x9d, 0x9c, 0x37, + 0x62, 0x59, 0x23, 0x3c, 0x09, 0x1b, 0xd3, 0x17, 0x7b, 0xfb, 0xcb, 0xbc, 0xf2, 0x32, 0x4c, 0x8c, + 0xe6, 0x37, 0xb0, 0xe9, 0xb0, 0x29, 0x1d, 0xf1, 0x90, 0xa2, 0x43, 0x28, 0x8b, 0x90, 0x30, 0xdc, + 0xe7, 0x4c, 0xd2, 0x0b, 0x59, 0x33, 0x0e, 0x8c, 0xa3, 0xb2, 0x57, 0x8a, 0x6a, 0xcd, 0xb8, 0x84, + 0x6a, 0x50, 0x0c, 0xc9, 0xe5, 0x88, 0x93, 0x41, 0x6d, 0x4d, 0x75, 0x93, 0x57, 0xf3, 0x0d, 0x6c, + 0xdb, 0x6a, 0x8a, 0x3d, 0xe2, 0xfd, 0x5f, 0x2c, 0xc6, 0xf8, 0x84, 0xf5, 0x29, 0x42, 0xb0, 0x3e, + 0x24, 0x62, 0xa8, 0x59, 0xea, 0x19, 0xed, 0x43, 0x49, 0x8c, 0xb8, 0xc4, 0x6c, 0x72, 0xde, 0xa3, + 0x63, 0x05, 0x5a, 0xf7, 0x20, 0x2a, 0xb9, 0xaa, 0x62, 0x1e, 0x01, 0xca, 0xb0, 0x3c, 0xfa, 0xeb, + 0x84, 0x0a, 0xb9, 0x0c, 0x65, 0x5a, 0x50, 0x5f, 0x54, 0xda, 0x97, 0x7e, 0xca, 0x9a, 0x1f, 0x66, + 0x2c, 0x0c, 0xfb, 0xd3, 0x98, 0xd9, 0xdc, 0xa3, 0x22, 0xe4, 0x4c, 0x50, 0xf4, 0x0a, 0x36, 0x7a, + 0x51, 0x41, 0x59, 0x4a, 0x27, 0x4f, 0x1b, 0xcb, 0x8f, 0xb8, 0x91, 0xf5, 0xc6, 0x0e, 0xe4, 0x40, + 0x89, 0x48, 0x49, 0x85, 0x24, 0x32, 0xe0, 0x4c, 0x05, 0xcc, 0x01, 0x58, 0xd7, 0x52, 0x2f, 0xeb, + 0x33, 0xbb, 0xb0, 0x6b, 0x13, 0xd9, 0x1f, 0xd2, 0xc1, 0x92, 0xd3, 0x78, 0x02, 0x20, 0x24, 0x19, + 0x4b, 0x1c, 0x45, 0xd1, 0xb1, 0xee, 0xab, 0x4a, 0x14, 0x1e, 0xed, 0xc2, 0x26, 0x65, 0x83, 0xb8, + 0x19, 0x1f, 0x70, 0x91, 0xb2, 0x41, 0xd4, 0x32, 0x87, 0xb0, 0xb7, 0x0c, 0xab, 0x63, 0xbf, 0x81, + 0x4a, 0x2f, 0xee, 0x62, 0x15, 0x46, 0xd4, 0x8c, 0x83, 0xc2, 0xaa, 0xf9, 0xb7, 0xb4, 0x55, 0xbd, + 0x09, 0x13, 0x41, 0xb5, 0x39, 0x24, 0x01, 0x6b, 0x51, 0x32, 0xd0, 0x7b, 0x9b, 0xbf, 0xc3, 0xc3, + 0x4c, 0x4d, 0x0f, 0x5d, 0x76, 0x4b, 0x10, 0xac, 0x67, 0xb6, 0x57, 0xcf, 0xe8, 0x6b, 0x78, 0xfc, + 0x3e, 0x60, 0x64, 0x14, 0xfc, 0x46, 0x07, 0x38, 0x3a, 0x26, 0x8a, 0xc7, 0x9c, 0x4b, 0x1c, 0x19, + 0x5e, 0x9e, 0x88, 0x5a, 0x41, 0xf9, 0x77, 0x53, 0x8d, 0x1f, 0x49, 0x3c, 0xce, 0x65, 0x2b, 0x16, + 0x98, 0xcf, 0x61, 0x27, 0xde, 0x57, 0x75, 0xa2, 0x6a, 0xde, 0x4d, 0x35, 0xbb, 0xc9, 0x45, 0x8c, + 0x41, 0xfa, 0xe8, 0x6f, 0xdb, 0xc2, 0xb8, 0x6d, 0x8b, 0xff, 0xd2, 0x2b, 0xa7, 0xb9, 0xfa, 0x18, + 0xce, 0xe0, 0xc1, 0x1c, 0x78, 0xb5, 0xcb, 0x17, 0x53, 0x2a, 0xb3, 0x03, 0x23, 0xda, 0xcf, 0x13, + 0x21, 0x83, 0xf7, 0x41, 0x4a, 0x5b, 0xbb, 0x03, 0x2d, 0xf5, 0xa6, 0xb4, 0x3e, 0x61, 0x9c, 0x05, + 0x7d, 0x32, 0xd2, 0xb4, 0xc2, 0x1d, 0x68, 0xa9, 0x57, 0xbd, 0x9b, 0x5f, 0xc2, 0x76, 0xe6, 0xda, + 0xe7, 0xfe, 0x0d, 0x8e, 0x00, 0x65, 0xbf, 0x90, 0x9c, 0x7f, 0x06, 0xe1, 0x0c, 0x34, 0xf7, 0x72, + 0x7d, 0xa4, 0x2f, 0xb4, 0x01, 0xb5, 0x77, 0x63, 0x1e, 0x72, 0x41, 0xc7, 0xfe, 0x88, 0x88, 0x61, + 0xc0, 0x3e, 0xe4, 0x66, 0x79, 0x0e, 0x3b, 0xf3, 0xfa, 0xbc, 0x40, 0x7f, 0x18, 0x8b, 0xfc, 0xdc, + 0x58, 0x5d, 0x78, 0x18, 0x6a, 0x3d, 0x16, 0xda, 0xa0, 0xc3, 0x1d, 0xdd, 0x14, 0x6e, 0x61, 0x40, + 0x35, 0x9c, 0xab, 0x44, 0x31, 0xe3, 0x23, 0x58, 0x3d, 0xe6, 0xbc, 0xfe, 0xb6, 0x98, 0x8b, 0xfa, + 0xfc, 0x98, 0x89, 0x7e, 0xe5, 0x98, 0x0b, 0x03, 0xaa, 0xf3, 0x15, 0xf3, 0x19, 0x3c, 0x38, 0xa5, + 0x21, 0x17, 0x81, 0xcc, 0x4d, 0xf7, 0x39, 0x54, 0xb4, 0x2c, 0x2f, 0xd4, 0x4f, 0x29, 0x2c, 0x37, + 0xca, 0x2b, 0x28, 0x0e, 0x62, 0x99, 0x0e, 0xb0, 0x7f, 0x53, 0x80, 0x84, 0x96, 0xe8, 0x4d, 0x13, + 0xca, 0xce, 0xc5, 0x2d, 0xbb, 0x1e, 0x42, 0x29, 0xd2, 0xe4, 0x7f, 0x35, 0xe5, 0x58, 0x92, 0xb3, + 0xe5, 0x19, 0x54, 0xa6, 0x7c, 0x34, 0x61, 0x92, 0x8c, 0x2f, 0x31, 0xbd, 0x48, 0x97, 0x7d, 0x76, + 0xd3, 0xb2, 0xdf, 0x27, 0x6a, 0x85, 0xde, 0x9a, 0x66, 0x5f, 0xbf, 0xfa, 0xab, 0x00, 0x1b, 0x1d, + 0x1e, 0x06, 0x7d, 0x54, 0x82, 0x62, 0xd7, 0x7d, 0xeb, 0xb6, 0x7f, 0x70, 0xab, 0x9f, 0xa0, 0x5d, + 0xf8, 0xcc, 0x76, 0xac, 0x66, 0xdb, 0xc5, 0xf6, 0x59, 0xbb, 0xf9, 0x16, 0x5b, 0xae, 0xdb, 0xee, + 0xba, 0x4d, 0xa7, 0x6a, 0xa0, 0x1a, 0x7c, 0x3a, 0xd3, 0xf2, 0x9c, 0xef, 0xba, 0x8e, 0xdf, 0xa9, + 0xae, 0xa1, 0x2f, 0xe0, 0xe9, 0xb2, 0x0e, 0xb6, 0x7f, 0xc4, 0xfe, 0x59, 0xbb, 0x83, 0xdd, 0xee, + 0xb7, 0xb6, 0xe3, 0x55, 0x0b, 0x0b, 0x74, 0xcf, 0xf1, 0xdf, 0xb5, 0x5d, 0xdf, 0xa9, 0xae, 0xa3, + 0x03, 0x78, 0x6c, 0x5b, 0x9d, 0x66, 0xcb, 0x39, 0xc5, 0x4b, 0xa7, 0x6c, 0xa0, 0x43, 0x78, 0x72, + 0x83, 0x42, 0x43, 0xee, 0xa1, 0x47, 0x80, 0x9a, 0x2d, 0xeb, 0xb5, 0x8b, 0x5b, 0x8e, 0x75, 0x9a, + 0x5a, 0x8b, 0x68, 0x07, 0xb6, 0x67, 0xea, 0xda, 0xb0, 0x89, 0xea, 0xb0, 0xa7, 0x59, 0x7e, 0xc7, + 0xea, 0x38, 0xb8, 0x65, 0xf9, 0xad, 0xeb, 0xcc, 0xf7, 0x33, 0x99, 0xe3, 0x7e, 0x82, 0x84, 0x4c, + 0x94, 0xa4, 0xa3, 0xa1, 0xa5, 0xc8, 0x64, 0x75, 0x3a, 0x4e, 0x54, 0x7f, 0xdd, 0x76, 0xaf, 0x71, + 0xe5, 0x68, 0x8f, 0x6c, 0x27, 0xa1, 0x6d, 0xcd, 0x5b, 0x52, 0x58, 0xc5, 0x2e, 0xff, 0x7d, 0x55, + 0x37, 0xfe, 0xb9, 0xaa, 0x1b, 0xff, 0x5e, 0xd5, 0x8d, 0xde, 0x3d, 0xf5, 0x83, 0xf1, 0xe5, 0xff, + 0x01, 0x00, 0x00, 0xff, 0xff, 0x1d, 0x87, 0xb8, 0xa8, 0x8f, 0x0a, 0x00, 0x00, } func (m *Envelope) Marshal() (dAtA []byte, err error) { @@ -1877,16 +1895,36 @@ func (m *BeaconStateResponse) MarshalTo(dAtA []byte) (int, error) { _ = i var l int _ = l - if m.BeaconState != nil { + if m.FinalizedState != nil { dAtA[i] = 0xa i++ - i = encodeVarintMessages(dAtA, i, uint64(m.BeaconState.Size())) - n3, err := m.BeaconState.MarshalTo(dAtA[i:]) + i = encodeVarintMessages(dAtA, i, uint64(m.FinalizedState.Size())) + n3, err := m.FinalizedState.MarshalTo(dAtA[i:]) if err != nil { return 0, err } i += n3 } + if m.JustifiedState != nil { + dAtA[i] = 0x12 + i++ + i = encodeVarintMessages(dAtA, i, uint64(m.JustifiedState.Size())) + n4, err := m.JustifiedState.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n4 + } + if m.CanonicalState != nil { + dAtA[i] = 0x1a + i++ + i = encodeVarintMessages(dAtA, i, uint64(m.CanonicalState.Size())) + n5, err := m.CanonicalState.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n5 + } if m.XXX_unrecognized != nil { i += copy(dAtA[i:], m.XXX_unrecognized) } @@ -1972,11 +2010,11 @@ func (m *AttestationResponse) MarshalTo(dAtA []byte) (int, error) { dAtA[i] = 0x12 i++ i = encodeVarintMessages(dAtA, i, uint64(m.Attestation.Size())) - n4, err := m.Attestation.MarshalTo(dAtA[i:]) + n6, err := m.Attestation.MarshalTo(dAtA[i:]) if err != nil { return 0, err } - i += n4 + i += n6 } if m.XXX_unrecognized != nil { i += copy(dAtA[i:], m.XXX_unrecognized) @@ -2063,11 +2101,11 @@ func (m *ProposerSlashingResponse) MarshalTo(dAtA []byte) (int, error) { dAtA[i] = 0x12 i++ i = encodeVarintMessages(dAtA, i, uint64(m.ProposerSlashing.Size())) - n5, err := m.ProposerSlashing.MarshalTo(dAtA[i:]) + n7, err := m.ProposerSlashing.MarshalTo(dAtA[i:]) if err != nil { return 0, err } - i += n5 + i += n7 } if m.XXX_unrecognized != nil { i += copy(dAtA[i:], m.XXX_unrecognized) @@ -2154,11 +2192,11 @@ func (m *AttesterSlashingResponse) MarshalTo(dAtA []byte) (int, error) { dAtA[i] = 0x12 i++ i = encodeVarintMessages(dAtA, i, uint64(m.AttesterSlashing.Size())) - n6, err := m.AttesterSlashing.MarshalTo(dAtA[i:]) + n8, err := m.AttesterSlashing.MarshalTo(dAtA[i:]) if err != nil { return 0, err } - i += n6 + i += n8 } if m.XXX_unrecognized != nil { i += copy(dAtA[i:], m.XXX_unrecognized) @@ -2245,11 +2283,11 @@ func (m *DepositResponse) MarshalTo(dAtA []byte) (int, error) { dAtA[i] = 0x12 i++ i = encodeVarintMessages(dAtA, i, uint64(m.Deposit.Size())) - n7, err := m.Deposit.MarshalTo(dAtA[i:]) + n9, err := m.Deposit.MarshalTo(dAtA[i:]) if err != nil { return 0, err } - i += n7 + i += n9 } if m.XXX_unrecognized != nil { i += copy(dAtA[i:], m.XXX_unrecognized) @@ -2336,11 +2374,11 @@ func (m *ExitResponse) MarshalTo(dAtA []byte) (int, error) { dAtA[i] = 0x12 i++ i = encodeVarintMessages(dAtA, i, uint64(m.VoluntaryExit.Size())) - n8, err := m.VoluntaryExit.MarshalTo(dAtA[i:]) + n10, err := m.VoluntaryExit.MarshalTo(dAtA[i:]) if err != nil { return 0, err } - i += n8 + i += n10 } if m.XXX_unrecognized != nil { i += copy(dAtA[i:], m.XXX_unrecognized) @@ -2556,8 +2594,16 @@ func (m *BeaconStateResponse) Size() (n int) { } var l int _ = l - if m.BeaconState != nil { - l = m.BeaconState.Size() + if m.FinalizedState != nil { + l = m.FinalizedState.Size() + n += 1 + l + sovMessages(uint64(l)) + } + if m.JustifiedState != nil { + l = m.JustifiedState.Size() + n += 1 + l + sovMessages(uint64(l)) + } + if m.CanonicalState != nil { + l = m.CanonicalState.Size() n += 1 + l + sovMessages(uint64(l)) } if m.XXX_unrecognized != nil { @@ -3937,7 +3983,7 @@ func (m *BeaconStateResponse) Unmarshal(dAtA []byte) error { switch fieldNum { case 1: if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field BeaconState", wireType) + return fmt.Errorf("proto: wrong wireType = %d for field FinalizedState", wireType) } var msglen int for shift := uint(0); ; shift += 7 { @@ -3964,10 +4010,82 @@ func (m *BeaconStateResponse) Unmarshal(dAtA []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - if m.BeaconState == nil { - m.BeaconState = &BeaconState{} + if m.FinalizedState == nil { + m.FinalizedState = &BeaconState{} } - if err := m.BeaconState.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + if err := m.FinalizedState.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field JustifiedState", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMessages + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthMessages + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthMessages + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.JustifiedState == nil { + m.JustifiedState = &BeaconState{} + } + if err := m.JustifiedState.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field CanonicalState", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMessages + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthMessages + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthMessages + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.CanonicalState == nil { + m.CanonicalState = &BeaconState{} + } + if err := m.CanonicalState.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { return err } iNdEx = postIndex diff --git a/proto/beacon/p2p/v1/messages.proto b/proto/beacon/p2p/v1/messages.proto index 828d42728d..67f509c191 100644 --- a/proto/beacon/p2p/v1/messages.proto +++ b/proto/beacon/p2p/v1/messages.proto @@ -59,7 +59,7 @@ message ChainHeadRequest {} message ChainHeadResponse { bytes hash = 1; uint64 slot = 2; - bytes finalized_state_root_hash32s =3; + bytes finalized_state_root_hash32s = 3; } message BeaconStateHashAnnounce { @@ -71,7 +71,9 @@ message BeaconStateRequest { } message BeaconStateResponse { - BeaconState beacon_state = 1; + BeaconState finalized_state = 1; + BeaconState justified_state = 2; + BeaconState canonical_state = 3; } message AttestationAnnounce { diff --git a/shared/params/config.go b/shared/params/config.go index 1cd892ed04..1d735f0d0a 100644 --- a/shared/params/config.go +++ b/shared/params/config.go @@ -169,9 +169,7 @@ var defaultBeaconConfig = &BeaconChainConfig{ // Prysm constants. DepositsForChainStart: 16384, RandBytes: 3, - SyncPollingInterval: 6 * 1, // Query nodes over the network every slot for sync status. BatchBlockLimit: 64 * 4, // Process blocks in batches of 4 epochs of blocks (threshold before casper penalties). - SyncEpochLimit: 4, MaxNumLog2Validators: 24, LogBlockDelay: 2, // }