diff --git a/beacon-chain/powchain/service_test.go b/beacon-chain/powchain/service_test.go index 6a1e256cf1..9104804ad6 100644 --- a/beacon-chain/powchain/service_test.go +++ b/beacon-chain/powchain/service_test.go @@ -940,14 +940,14 @@ func TestStatus(t *testing.T) { testCases := map[*Web3Service]string{ // "status is ok" cases - &Web3Service{}: "", - &Web3Service{isRunning: true, blockTime: afterFiveMinutesAgo}: "", - &Web3Service{isRunning: false, blockTime: beforeFiveMinutesAgo}: "", - &Web3Service{isRunning: false, runError: errors.New("test runError")}: "", + {}: "", + {isRunning: true, blockTime: afterFiveMinutesAgo}: "", + {isRunning: false, blockTime: beforeFiveMinutesAgo}: "", + {isRunning: false, runError: errors.New("test runError")}: "", // "status is error" cases - &Web3Service{isRunning: true, blockTime: beforeFiveMinutesAgo}: "web3 client is not syncing", - &Web3Service{isRunning: true}: "web3 client is not syncing", - &Web3Service{isRunning: true, runError: errors.New("test runError")}: "test runError", + {isRunning: true, blockTime: beforeFiveMinutesAgo}: "web3 client is not syncing", + {isRunning: true}: "web3 client is not syncing", + {isRunning: true, runError: errors.New("test runError")}: "test runError", } for web3ServiceState, wantedErrorText := range testCases { diff --git a/beacon-chain/sync/BUILD.bazel b/beacon-chain/sync/BUILD.bazel index 73dfad33e1..c8c83091ee 100644 --- a/beacon-chain/sync/BUILD.bazel +++ b/beacon-chain/sync/BUILD.bazel @@ -3,6 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "go_default_library", srcs = [ + "metrics.go", "querier.go", "regular_sync.go", "service.go", @@ -19,6 +20,8 @@ go_library( "//shared/p2p:go_default_library", "//shared/params:go_default_library", "@com_github_gogo_protobuf//proto:go_default_library", + "@com_github_prometheus_client_golang//prometheus:go_default_library", + "@com_github_prometheus_client_golang//prometheus/promauto:go_default_library", "@com_github_sirupsen_logrus//:go_default_library", "@io_opencensus_go//trace:go_default_library", ], diff --git a/beacon-chain/sync/initial-sync/BUILD.bazel b/beacon-chain/sync/initial-sync/BUILD.bazel index cbefc38a77..7f5b1d4a8d 100644 --- a/beacon-chain/sync/initial-sync/BUILD.bazel +++ b/beacon-chain/sync/initial-sync/BUILD.bazel @@ -2,7 +2,10 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "go_default_library", - srcs = ["service.go"], + srcs = [ + "metrics.go", + "service.go", + ], importpath = "github.com/prysmaticlabs/prysm/beacon-chain/sync/initial-sync", visibility = ["//beacon-chain:__subpackages__"], deps = [ @@ -14,7 +17,10 @@ go_library( "//shared/p2p:go_default_library", "//shared/params:go_default_library", "@com_github_gogo_protobuf//proto:go_default_library", + "@com_github_prometheus_client_golang//prometheus:go_default_library", + "@com_github_prometheus_client_golang//prometheus/promauto:go_default_library", "@com_github_sirupsen_logrus//:go_default_library", + "@io_opencensus_go//trace:go_default_library", ], ) diff --git a/beacon-chain/sync/initial-sync/metrics.go b/beacon-chain/sync/initial-sync/metrics.go new file mode 100644 index 0000000000..de143f8b01 --- /dev/null +++ b/beacon-chain/sync/initial-sync/metrics.go @@ -0,0 +1,38 @@ +package initialsync + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +var ( + // Metrics + sentBatchedBlockReq = promauto.NewCounter(prometheus.CounterOpts{ + Name: "initsync_sent_batched_block_req", + Help: "The number of sent batched block req", + }) + batchedBlockReq = promauto.NewCounter(prometheus.CounterOpts{ + Name: "initsync_batched_block_req", + Help: "The number of received batch blocks responses", + }) + blockReqSlot = promauto.NewCounter(prometheus.CounterOpts{ + Name: "initsync_block_req_by_slot", + Help: "The number of sent block requests by slot", + }) + recBlock = promauto.NewCounter(prometheus.CounterOpts{ + Name: "initsync_received_blocks", + Help: "The number of received blocks", + }) + recBlockAnnounce = promauto.NewCounter(prometheus.CounterOpts{ + Name: "initsync_received_block_announce", + Help: "The number of received block announce", + }) + stateReq = promauto.NewCounter(prometheus.CounterOpts{ + Name: "initsync_state_req", + Help: "The number of sent state requests", + }) + recState = promauto.NewCounter(prometheus.CounterOpts{ + Name: "initsync_received_state", + Help: "The number of received state", + }) +) diff --git a/beacon-chain/sync/initial-sync/service.go b/beacon-chain/sync/initial-sync/service.go index 862c138fc7..aeafbd4e71 100644 --- a/beacon-chain/sync/initial-sync/service.go +++ b/beacon-chain/sync/initial-sync/service.go @@ -26,6 +26,7 @@ import ( "github.com/prysmaticlabs/prysm/shared/p2p" "github.com/prysmaticlabs/prysm/shared/params" "github.com/sirupsen/logrus" + "go.opencensus.io/trace" ) var log = logrus.WithField("prefix", "initial-sync") @@ -199,7 +200,7 @@ func (s *InitialSync) run(delayChan <-chan time.Time) { }() if s.reqState { - if err := s.requestStateFromPeer(s.stateRootOfHighestObservedSlot[:], p2p.Peer{}); err != nil { + if err := s.requestStateFromPeer(s.ctx, s.stateRootOfHighestObservedSlot[:], p2p.Peer{}); err != nil { log.Errorf("Could not request state from peer %v", err) } } else { @@ -213,72 +214,18 @@ func (s *InitialSync) run(delayChan <-chan time.Time) { log.Debug("Exiting goroutine") return case <-delayChan: - if s.reqState { - if err := s.requestStateFromPeer(s.stateRootOfHighestObservedSlot[:], p2p.Peer{}); err != nil { - log.Errorf("Could not request state from peer %v", err) - } - continue - } - if s.highestObservedSlot == s.currentSlot { - log.Info("Exiting initial sync and starting normal sync") - s.syncedFeed.Send(s.currentSlot) - s.syncService.ResumeSync() + if s.checkSyncStatus() { return } - - // requests multiple blocks so as to save and sync quickly. - s.requestBatchedBlocks(s.currentSlot+1, s.highestObservedSlot) case msg := <-s.blockAnnounceBuf: - data := msg.Data.(*pb.BeaconBlockAnnounce) - - if s.reqState { - if err := s.requestStateFromPeer(s.stateRootOfHighestObservedSlot[:], p2p.Peer{}); err != nil { - log.Errorf("Could not request state from peer %v", err) - } - continue - } - - if data.SlotNumber > s.highestObservedSlot { - s.highestObservedSlot = data.SlotNumber - } - - s.requestBatchedBlocks(s.currentSlot+1, s.highestObservedSlot) - log.Debugf("Successfully requested the next block with slot: %d", data.SlotNumber) + s.processBlockAnnounce(msg) case msg := <-s.blockBuf: data := msg.Data.(*pb.BeaconBlockResponse) - s.processBlock(s.ctx, data.Block, msg.Peer) + s.processBlock(msg.Ctx, data.Block, msg.Peer) case msg := <-s.stateBuf: - data := msg.Data.(*pb.BeaconStateResponse) - beaconState := data.BeaconState - - if s.currentSlot > beaconState.FinalizedEpoch*params.BeaconConfig().SlotsPerEpoch { - continue - } - - if err := s.db.SaveState(beaconState); err != nil { - log.Errorf("Unable to set beacon state for initial sync %v", err) - } - - h, err := hashutil.HashProto(beaconState) - if err != nil { - log.Error(err) - continue - } - - if h == s.stateRootOfHighestObservedSlot { - s.reqState = false - } - - // sets the current slot to the last finalized slot of the - // beacon state to begin our sync from. - s.currentSlot = beaconState.FinalizedEpoch * params.BeaconConfig().SlotsPerEpoch - s.beaconStateSlot = beaconState.Slot - log.Debugf("Successfully saved beacon state with the last finalized slot: %d", beaconState.FinalizedEpoch*params.BeaconConfig().SlotsPerEpoch) - - s.requestBatchedBlocks(s.currentSlot+1, s.highestObservedSlot) - + s.processState(msg) case msg := <-s.batchedBlockBuf: - s.processBatchedBlocks(s.ctx, msg) + s.processBatchedBlocks(msg) } } } @@ -304,10 +251,54 @@ func (s *InitialSync) checkInMemoryBlocks() { } } +// checkSyncStatus verifies if the beacon node is correctly synced with its peers up to their +// latest canonical head. If not, then it requests batched blocks up to the highest observed slot. +func (s *InitialSync) checkSyncStatus() bool { + if s.reqState { + if err := s.requestStateFromPeer(s.ctx, s.stateRootOfHighestObservedSlot[:], p2p.Peer{}); err != nil { + log.Errorf("Could not request state from peer %v", err) + } + return false + } + if s.highestObservedSlot == s.currentSlot { + log.Info("Exiting initial sync and starting normal sync") + s.syncedFeed.Send(s.currentSlot) + s.syncService.ResumeSync() + return true + } + // requests multiple blocks so as to save and sync quickly. + s.requestBatchedBlocks(s.currentSlot+1, s.highestObservedSlot) + return false +} + +func (s *InitialSync) processBlockAnnounce(msg p2p.Message) { + ctx, span := trace.StartSpan(msg.Ctx, "beacon-chain.sync.initial-sync.processBlockAnnounce") + defer span.End() + data := msg.Data.(*pb.BeaconBlockAnnounce) + recBlockAnnounce.Inc() + + if s.reqState { + if err := s.requestStateFromPeer(ctx, s.stateRootOfHighestObservedSlot[:], p2p.Peer{}); err != nil { + log.Errorf("Could not request state from peer %v", err) + } + return + } + + if data.SlotNumber > s.highestObservedSlot { + s.highestObservedSlot = data.SlotNumber + } + + s.requestBatchedBlocks(s.currentSlot+1, s.highestObservedSlot) + log.Debugf("Successfully requested the next block with slot: %d", data.SlotNumber) +} + // processBlock is the main method that validates each block which is received // for initial sync. It checks if the blocks are valid and then will continue to // process and save it into the db. func (s *InitialSync) processBlock(ctx context.Context, block *pb.BeaconBlock, peer p2p.Peer) { + ctx, span := trace.StartSpan(ctx, "beacon-chain.sync.initial-sync.processBlock") + defer span.End() + recBlock.Inc() if block.Slot > s.highestObservedSlot { s.highestObservedSlot = block.Slot s.stateRootOfHighestObservedSlot = bytesutil.ToBytes32(block.StateRootHash32) @@ -319,7 +310,7 @@ func (s *InitialSync) processBlock(ctx context.Context, block *pb.BeaconBlock, p // requesting beacon state if there is no saved state. if s.reqState { - if err := s.requestStateFromPeer(block.StateRootHash32, peer); err != nil { + if err := s.requestStateFromPeer(s.ctx, block.StateRootHash32, peer); err != nil { log.Errorf("Could not request beacon state from peer: %v", err) } return @@ -341,7 +332,10 @@ func (s *InitialSync) processBlock(ctx context.Context, block *pb.BeaconBlock, p // processBatchedBlocks processes all the received blocks from // the p2p message. -func (s *InitialSync) processBatchedBlocks(ctx context.Context, msg p2p.Message) { +func (s *InitialSync) processBatchedBlocks(msg p2p.Message) { + ctx, span := trace.StartSpan(msg.Ctx, "beacon-chain.sync.initial-sync.processBatchedBlocks") + defer span.End() + batchedBlockReq.Inc() log.Debug("Processing batched block response") response := msg.Data.(*pb.BatchedBeaconBlockResponse) @@ -353,9 +347,46 @@ func (s *InitialSync) processBatchedBlocks(ctx context.Context, msg p2p.Message) log.Debug("Finished processing batched blocks") } +func (s *InitialSync) processState(msg p2p.Message) { + _, span := trace.StartSpan(msg.Ctx, "beacon-chain.sync.initial-sync.processState") + defer span.End() + data := msg.Data.(*pb.BeaconStateResponse) + beaconState := data.BeaconState + recState.Inc() + + if s.currentSlot > beaconState.FinalizedEpoch*params.BeaconConfig().SlotsPerEpoch { + return + } + + if err := s.db.SaveState(beaconState); err != nil { + log.Errorf("Unable to set beacon state for initial sync %v", err) + } + + h, err := hashutil.HashProto(beaconState) + if err != nil { + log.Error(err) + return + } + + if h == s.stateRootOfHighestObservedSlot { + s.reqState = false + } + + // sets the current slot to the last finalized slot of the + // beacon state to begin our sync from. + s.currentSlot = beaconState.FinalizedEpoch * params.BeaconConfig().SlotsPerEpoch + s.beaconStateSlot = beaconState.Slot + log.Debugf("Successfully saved beacon state with the last finalized slot: %d", beaconState.FinalizedEpoch*params.BeaconConfig().SlotsPerEpoch) + + s.requestBatchedBlocks(s.currentSlot+1, s.highestObservedSlot) +} + // requestStateFromPeer sends a request to a peer for the corresponding state // for a beacon block. -func (s *InitialSync) requestStateFromPeer(stateRoot []byte, peer p2p.Peer) error { +func (s *InitialSync) requestStateFromPeer(ctx context.Context, stateRoot []byte, peer p2p.Peer) error { + _, span := trace.StartSpan(ctx, "beacon-chain.sync.initial-sync.requestStateFromPeer") + defer span.End() + stateReq.Inc() log.Debugf("Successfully processed incoming block with state hash: %#x", stateRoot) s.p2p.Send(&pb.BeaconStateRequest{Hash: stateRoot}, peer) return nil @@ -363,7 +394,10 @@ func (s *InitialSync) requestStateFromPeer(stateRoot []byte, peer p2p.Peer) erro // requestNextBlock broadcasts a request for a block with the entered slotnumber. func (s *InitialSync) requestNextBlockBySlot(ctx context.Context, slotNumber uint64) { + ctx, span := trace.StartSpan(ctx, "beacon-chain.sync.initial-sync.requestBlockBySlot") + defer span.End() log.Debugf("Requesting block %d ", slotNumber) + blockReqSlot.Inc() s.mutex.Lock() defer s.mutex.Unlock() if block, ok := s.inMemoryBlocks[slotNumber]; ok { @@ -376,6 +410,9 @@ func (s *InitialSync) requestNextBlockBySlot(ctx context.Context, slotNumber uin // requestBatchedBlocks sends out a request for multiple blocks till a // specified bound slot number. func (s *InitialSync) requestBatchedBlocks(startSlot uint64, endSlot uint64) { + _, span := trace.StartSpan(context.Background(), "beacon-chain.sync.initial-sync.requestBatchedBlocks") + defer span.End() + sentBatchedBlockReq.Inc() blockLimit := params.BeaconConfig().BatchBlockLimit if startSlot+blockLimit < endSlot { endSlot = startSlot + blockLimit @@ -390,6 +427,8 @@ func (s *InitialSync) requestBatchedBlocks(startSlot uint64, endSlot uint64) { // validateAndSaveNextBlock will validate whether blocks received from the blockfetcher // routine can be added to the chain. func (s *InitialSync) validateAndSaveNextBlock(ctx context.Context, block *pb.BeaconBlock) error { + ctx, span := trace.StartSpan(ctx, "beacon-chain.sync.initial-sync.validateAndSaveNextBlock") + defer span.End() root, err := hashutil.HashBeaconBlock(block) if err != nil { return err @@ -424,6 +463,8 @@ func (s *InitialSync) validateAndSaveNextBlock(ctx context.Context, block *pb.Be } func (s *InitialSync) checkBlockValidity(ctx context.Context, block *pb.BeaconBlock) error { + ctx, span := trace.StartSpan(ctx, "beacon-chain.sync.initial-sync.checkBlockValidity") + defer span.End() blockRoot, err := hashutil.HashBeaconBlock(block) if err != nil { return fmt.Errorf("could not tree hash received block: %v", err) diff --git a/beacon-chain/sync/initial-sync/service_test.go b/beacon-chain/sync/initial-sync/service_test.go index df66c49455..30387b24ab 100644 --- a/beacon-chain/sync/initial-sync/service_test.go +++ b/beacon-chain/sync/initial-sync/service_test.go @@ -152,6 +152,7 @@ func TestSavingBlock_InSync(t *testing.T) { return p2p.Message{ Peer: p2p.Peer{}, Data: blockResponse, + Ctx: context.Background(), } } @@ -167,6 +168,7 @@ func TestSavingBlock_InSync(t *testing.T) { msg2 := p2p.Message{ Peer: p2p.Peer{}, Data: incorrectStateResponse, + Ctx: context.Background(), } ss.stateBuf <- msg2 @@ -262,11 +264,13 @@ func TestDelayChan_OK(t *testing.T) { msg1 := p2p.Message{ Peer: p2p.Peer{}, Data: blockResponse, + Ctx: context.Background(), } msg2 := p2p.Message{ Peer: p2p.Peer{}, Data: stateResponse, + Ctx: context.Background(), } ss.blockBuf <- msg1 @@ -353,6 +357,7 @@ func TestRequestBlocksBySlot_OK(t *testing.T) { return p2p.Message{ Peer: p2p.Peer{}, Data: blockResponse, + Ctx: context.Background(), }, root } diff --git a/beacon-chain/sync/metrics.go b/beacon-chain/sync/metrics.go new file mode 100644 index 0000000000..340ce01a6f --- /dev/null +++ b/beacon-chain/sync/metrics.go @@ -0,0 +1,87 @@ +package sync + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +var ( + + // Metrics + batchedBlockReq = promauto.NewCounter(prometheus.CounterOpts{ + Name: "regsync_batched_block_req", + Help: "The number of received batch block requests", + }) + blockReqSlot = promauto.NewCounter(prometheus.CounterOpts{ + Name: "regsync_block_req_by_slot", + Help: "The number of received block requests by slot", + }) + blockReqHash = promauto.NewCounter(prometheus.CounterOpts{ + Name: "regsync_block_req_by_hash", + Help: "The number of received block requests by hash", + }) + recBlock = promauto.NewCounter(prometheus.CounterOpts{ + Name: "regsync_received_blocks", + Help: "The number of received blocks", + }) + recBlockAnnounce = promauto.NewCounter(prometheus.CounterOpts{ + Name: "regsync_received_block_announce", + Help: "The number of received block announcements", + }) + sentBlockAnnounce = promauto.NewCounter(prometheus.CounterOpts{ + Name: "regsync_sent_block_announce", + Help: "The number of sent block announcements", + }) + sentBlockReq = promauto.NewCounter(prometheus.CounterOpts{ + Name: "regsync_sent_block_request", + Help: "The number of sent block request", + }) + sentBlocks = promauto.NewCounter(prometheus.CounterOpts{ + Name: "regsync_sent_blocks", + Help: "The number of sent blocks", + }) + sentBatchedBlocks = promauto.NewCounter(prometheus.CounterOpts{ + Name: "regsync_sent_batched_blocks", + Help: "The number of sent batched blocks", + }) + stateReq = promauto.NewCounter(prometheus.CounterOpts{ + Name: "regsync_state_req", + Help: "The number of state requests", + }) + sentState = promauto.NewCounter(prometheus.CounterOpts{ + Name: "regsync_sent_state", + Help: "The number of sent state", + }) + attestationReq = promauto.NewCounter(prometheus.CounterOpts{ + Name: "regsync_attestation_req", + Help: "The number of received attestation requests", + }) + unseenAttestationReq = promauto.NewCounter(prometheus.CounterOpts{ + Name: "regsync_unseen_attestation_req", + Help: "The number of received unseen attestation requests", + }) + recAttestation = promauto.NewCounter(prometheus.CounterOpts{ + Name: "regsync_received_attestation", + Help: "The number of received attestations", + }) + sentAttestation = promauto.NewCounter(prometheus.CounterOpts{ + Name: "regsync_sent_attestation", + Help: "The number of sent attestations", + }) + recExit = promauto.NewCounter(prometheus.CounterOpts{ + Name: "regsync_received_exits", + Help: "The number of received exits", + }) + sentExit = promauto.NewCounter(prometheus.CounterOpts{ + Name: "regsync_sent_exits", + Help: "The number of sent exits", + }) + chainHeadReq = promauto.NewCounter(prometheus.CounterOpts{ + Name: "regsync_chain_head_req", + Help: "The number of sent attestation requests", + }) + sentChainHead = promauto.NewCounter(prometheus.CounterOpts{ + Name: "regsync_chain_head_sent", + Help: "The number of sent chain head responses", + }) +) diff --git a/beacon-chain/sync/regular_sync.go b/beacon-chain/sync/regular_sync.go index 41bde9c708..cc4fc4b078 100644 --- a/beacon-chain/sync/regular_sync.go +++ b/beacon-chain/sync/regular_sync.go @@ -213,7 +213,7 @@ func (rs *RegularSync) run() { case msg := <-rs.chainHeadReqBuf: rs.handleChainHeadRequest(msg) case block := <-rs.canonicalBuf: - rs.broadcastCanonicalBlock(block) + rs.broadcastCanonicalBlock(rs.ctx, block) } } } @@ -222,8 +222,9 @@ func (rs *RegularSync) run() { // TODO(#175): New hashes are forwarded to other peers in the network, and // the contents of the block are requested if the local chain doesn't have the block. func (rs *RegularSync) receiveBlockAnnounce(msg p2p.Message) { - ctx, receiveBlockSpan := trace.StartSpan(msg.Ctx, "RegularSync_receiveBlockRoot") - defer receiveBlockSpan.End() + ctx, span := trace.StartSpan(msg.Ctx, "beacon-chain.sync.receiveBlockAnnounce") + defer span.End() + recBlockAnnounce.Inc() data := msg.Data.(*pb.BeaconBlockAnnounce) h := bytesutil.ToBytes32(data.Hash[:32]) @@ -235,15 +236,17 @@ func (rs *RegularSync) receiveBlockAnnounce(msg p2p.Message) { log.WithField("blockRoot", fmt.Sprintf("%#x", h)).Debug("Received incoming block root, requesting full block data from sender") // Request the full block data from peer that sent the block hash. - _, sendBlockRequestSpan := trace.StartSpan(ctx, "sendBlockRequest") + _, sendBlockRequestSpan := trace.StartSpan(ctx, "beacon-chain.sync.sendBlockRequest") rs.p2p.Send(&pb.BeaconBlockRequest{Hash: h[:]}, msg.Peer) + sentBlockReq.Inc() sendBlockRequestSpan.End() } // receiveBlock processes a block from the p2p layer. func (rs *RegularSync) receiveBlock(msg p2p.Message) { - ctx, receiveBlockSpan := trace.StartSpan(msg.Ctx, "RegularSync_receiveBlock") - defer receiveBlockSpan.End() + ctx, span := trace.StartSpan(msg.Ctx, "beacon-chain.sync.receiveBlock") + defer span.End() + recBlock.Inc() response := msg.Data.(*pb.BeaconBlockResponse) block := response.Block @@ -270,17 +273,19 @@ func (rs *RegularSync) receiveBlock(msg p2p.Message) { return } - _, sendBlockSpan := trace.StartSpan(ctx, "sendBlock") + _, sendBlockSpan := trace.StartSpan(ctx, "beacon-chain.sync.sendBlock") log.WithField("blockRoot", fmt.Sprintf("%#x", blockRoot)).Debug("Sending newly received block to subscribers") rs.chainService.IncomingBlockFeed().Send(block) + sentBlocks.Inc() sendBlockSpan.End() } // handleBlockRequestBySlot processes a block request from the p2p layer. // if found, the block is sent to the requesting peer. func (rs *RegularSync) handleBlockRequestBySlot(msg p2p.Message) { - ctx, blockRequestSpan := trace.StartSpan(msg.Ctx, "RegularSync_blockRequestBySlot") - defer blockRequestSpan.End() + ctx, span := trace.StartSpan(msg.Ctx, "beacon-chain.sync.handleBlockRequestBySlot") + defer span.End() + blockReqSlot.Inc() request, ok := msg.Data.(*pb.BeaconBlockRequestBySlotNumber) if !ok { @@ -302,12 +307,14 @@ func (rs *RegularSync) handleBlockRequestBySlot(msg p2p.Message) { rs.p2p.Send(&pb.BeaconBlockResponse{ Block: block, }, msg.Peer) + sentBlocks.Inc() sendBlockSpan.End() } func (rs *RegularSync) handleStateRequest(msg p2p.Message) { - ctx, handleStateReqSpan := trace.StartSpan(msg.Ctx, "RegularSync_handleStateReq") - defer handleStateReqSpan.End() + ctx, span := trace.StartSpan(msg.Ctx, "beacon-chain.sync.handleStateRequest") + defer span.End() + stateReq.Inc() req, ok := msg.Data.(*pb.BeaconStateRequest) if !ok { log.Errorf("Message is of the incorrect type") @@ -327,13 +334,17 @@ func (rs *RegularSync) handleStateRequest(msg p2p.Message) { log.Debugf("Requested state root is different from locally stored state root %#x", req.Hash) return } - _, sendStateSpan := trace.StartSpan(ctx, "sendState") + _, sendStateSpan := trace.StartSpan(ctx, "beacon-chain.sync.sendState") log.WithField("beaconState", fmt.Sprintf("%#x", root)).Debug("Sending beacon state to peer") rs.p2p.Send(&pb.BeaconStateResponse{BeaconState: state}, msg.Peer) + sentState.Inc() sendStateSpan.End() } func (rs *RegularSync) handleChainHeadRequest(msg p2p.Message) { + ctx, span := trace.StartSpan(msg.Ctx, "beacon-chain.sync.handleChainHeadRequest") + defer span.End() + chainHeadReq.Inc() if _, ok := msg.Data.(*pb.ChainHeadRequest); !ok { log.Errorf("message is of the incorrect type") return @@ -356,16 +367,19 @@ func (rs *RegularSync) handleChainHeadRequest(msg p2p.Message) { Hash: blockRoot[:], Block: block, } - + _, ChainHead := trace.StartSpan(ctx, "sendChainHead") rs.p2p.Send(req, msg.Peer) + sentChainHead.Inc() + ChainHead.End() } // receiveAttestation accepts an broadcasted attestation from the p2p layer, // discard the attestation if we have gotten before, send it to attestation // pool if we have not. func (rs *RegularSync) receiveAttestation(msg p2p.Message) { - ctx, receiveAttestationSpan := trace.StartSpan(msg.Ctx, "RegularSync_receiveAttestation") - defer receiveAttestationSpan.End() + ctx, span := trace.StartSpan(msg.Ctx, "beacon-chain.sync.receiveAttestation") + defer span.End() + recAttestation.Inc() attestation := msg.Data.(*pb.Attestation) attestationRoot, err := hashutil.HashProto(attestation) @@ -393,9 +407,10 @@ func (rs *RegularSync) receiveAttestation(msg p2p.Message) { return } - _, sendAttestationSpan := trace.StartSpan(ctx, "sendAttestation") + _, sendAttestationSpan := trace.StartSpan(ctx, "beacon-chain.sync.sendAttestation") log.WithField("attestationHash", fmt.Sprintf("%#x", attestationRoot)).Debug("Sending newly received attestation to subscribers") rs.operationsService.IncomingAttFeed().Send(attestation) + sentAttestation.Inc() sendAttestationSpan.End() } @@ -403,6 +418,9 @@ func (rs *RegularSync) receiveAttestation(msg p2p.Message) { // discard the exit if we have gotten before, send it to operation // service if we have not. func (rs *RegularSync) receiveExitRequest(msg p2p.Message) { + ctx, span := trace.StartSpan(msg.Ctx, "beacon-chain.sync.receiveExitRequest") + defer span.End() + recExit.Inc() exit := msg.Data.(*pb.VoluntaryExit) h, err := hashutil.HashProto(exit) if err != nil { @@ -414,17 +432,21 @@ func (rs *RegularSync) receiveExitRequest(msg p2p.Message) { log.Debugf("Received, skipping exit request #%x", h) return } - + _, sendExitReqSpan := trace.StartSpan(ctx, "sendExitRequest") log.WithField("exitReqHash", fmt.Sprintf("%#x", h)). Debug("Forwarding validator exit request to subscribed services") rs.operationsService.IncomingExitFeed().Send(exit) + sentExit.Inc() + sendExitReqSpan.End() } func (rs *RegularSync) handleBlockRequestByHash(msg p2p.Message) { + ctx, span := trace.StartSpan(msg.Ctx, "beacon-chain.sync.handleBlockRequestByHash") + defer span.End() + blockReqHash.Inc() + data := msg.Data.(*pb.BeaconBlockRequest) - root := bytesutil.ToBytes32(data.Hash) - block, err := rs.db.Block(root) if err != nil { log.Error(err) @@ -435,14 +457,20 @@ func (rs *RegularSync) handleBlockRequestByHash(msg p2p.Message) { return } + _, sendBlockSpan := trace.StartSpan(ctx, "sendBlock") rs.p2p.Send(&pb.BeaconBlockResponse{ Block: block, }, msg.Peer) + sentBlocks.Inc() + sendBlockSpan.End() } // handleBatchedBlockRequest receives p2p messages which consist of requests for batched blocks // which are bounded by a start slot and end slot. func (rs *RegularSync) handleBatchedBlockRequest(msg p2p.Message) { + ctx, span := trace.StartSpan(msg.Ctx, "beacon-chain.sync.handleBatchedBlockRequest") + defer span.End() + batchedBlockReq.Inc() data := msg.Data.(*pb.BatchedBeaconBlockRequest) startSlot, endSlot := data.StartSlot, data.EndSlot @@ -459,7 +487,6 @@ func (rs *RegularSync) handleBatchedBlockRequest(msg p2p.Message) { } currentSlot := block.Slot - if currentSlot < startSlot || finalizedSlot > endSlot { log.Debugf( "invalid batch request: current slot < start slot || finalized slot > end slot."+ @@ -474,35 +501,35 @@ func (rs *RegularSync) handleBatchedBlockRequest(msg p2p.Message) { } response := make([]*pb.BeaconBlock, 0, blockRange) - for i := startSlot; i <= endSlot; i++ { retBlock, err := rs.db.BlockBySlot(i) if err != nil { log.Errorf("Unable to retrieve block from db %v", err) continue } - if retBlock == nil { log.Debug("Block does not exist in db") continue } - response = append(response, retBlock) } + _, sendBatchedBlockSpan := trace.StartSpan(ctx, "sendBatchedBlocks") log.Debugf("Sending response for batch blocks to peer %v", msg.Peer) rs.p2p.Send(&pb.BatchedBeaconBlockResponse{ BatchedBlocks: response, }, msg.Peer) + sentBatchedBlocks.Inc() + sendBatchedBlockSpan.End() } func (rs *RegularSync) handleAttestationRequestByHash(msg p2p.Message) { - ctx, respondAttestationSpan := trace.StartSpan(msg.Ctx, "RegularSync_respondAttestation") - defer respondAttestationSpan.End() + ctx, span := trace.StartSpan(msg.Ctx, "beacon-chain.sync.handleAttestationRequestByHash") + defer span.End() + attestationReq.Inc() + req := msg.Data.(*pb.AttestationRequest) - root := bytesutil.ToBytes32(req.Hash) - att, err := rs.db.Attestation(root) if err != nil { log.Error(err) @@ -518,12 +545,14 @@ func (rs *RegularSync) handleAttestationRequestByHash(msg p2p.Message) { rs.p2p.Send(&pb.AttestationResponse{ Attestation: att, }, msg.Peer) + sentAttestation.Inc() sendAttestationSpan.End() } func (rs *RegularSync) handleUnseenAttestationsRequest(msg p2p.Message) { - ctx, respondAttestationxSpan := trace.StartSpan(msg.Ctx, "RegularSync_respondUnseenAttestations") - defer respondAttestationxSpan.End() + ctx, span := trace.StartSpan(msg.Ctx, "beacon-chain.sync.handleUnseenAttestationsRequest") + defer span.End() + unseenAttestationReq.Inc() if _, ok := msg.Data.(*pb.UnseenAttestationsRequest); !ok { log.Errorf("message is of the incorrect type") return @@ -540,23 +569,31 @@ func (rs *RegularSync) handleUnseenAttestationsRequest(msg p2p.Message) { return } - _, sendAttestationsSpan := trace.StartSpan(ctx, "sendAttestation") + _, sendAttestationsSpan := trace.StartSpan(ctx, "beacon-chain.sync.sendAttestation") log.Debugf("Sending response for batched unseen attestations to peer %v", msg.Peer) rs.p2p.Send(&pb.UnseenAttestationResponse{ Attestations: atts, }, msg.Peer) + sentAttestation.Inc() sendAttestationsSpan.End() } -func (rs *RegularSync) broadcastCanonicalBlock(blk *pb.BeaconBlock) { +func (rs *RegularSync) broadcastCanonicalBlock(ctx context.Context, blk *pb.BeaconBlock) { + _, span := trace.StartSpan(ctx, "beacon-chain.sync.broadcastCanonicalBlock") + defer span.End() + h, err := hashutil.HashProto(blk) if err != nil { log.Errorf("Could not tree hash block %v", err) return } + + _, sendBlockAnnounceSpan := trace.StartSpan(ctx, "beacon-chain.sync.sendBlockAnnounce") log.Debugf("Announcing canonical block %#x", h) rs.p2p.Broadcast(&pb.BeaconBlockAnnounce{ Hash: h[:], SlotNumber: blk.Slot, }) + sentBlockAnnounce.Inc() + sendBlockAnnounceSpan.End() }