Adding Prometheus Metrics and Tracing to Sync (#1858)

* adding tracing and metrics

* clean up main routine

* added tracing to initial sync

* gazelle

* fix tests

* fix naming

* comments

* consistent style in promauto counters

* rename spans

* final comment
This commit is contained in:
Nishant Das
2019-03-06 01:31:19 +08:00
committed by Raul Jordan
parent b264cdc557
commit 07a25ca710
8 changed files with 319 additions and 102 deletions

View File

@@ -940,14 +940,14 @@ func TestStatus(t *testing.T) {
testCases := map[*Web3Service]string{
// "status is ok" cases
&Web3Service{}: "",
&Web3Service{isRunning: true, blockTime: afterFiveMinutesAgo}: "",
&Web3Service{isRunning: false, blockTime: beforeFiveMinutesAgo}: "",
&Web3Service{isRunning: false, runError: errors.New("test runError")}: "",
{}: "",
{isRunning: true, blockTime: afterFiveMinutesAgo}: "",
{isRunning: false, blockTime: beforeFiveMinutesAgo}: "",
{isRunning: false, runError: errors.New("test runError")}: "",
// "status is error" cases
&Web3Service{isRunning: true, blockTime: beforeFiveMinutesAgo}: "web3 client is not syncing",
&Web3Service{isRunning: true}: "web3 client is not syncing",
&Web3Service{isRunning: true, runError: errors.New("test runError")}: "test runError",
{isRunning: true, blockTime: beforeFiveMinutesAgo}: "web3 client is not syncing",
{isRunning: true}: "web3 client is not syncing",
{isRunning: true, runError: errors.New("test runError")}: "test runError",
}
for web3ServiceState, wantedErrorText := range testCases {

View File

@@ -3,6 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = [
"metrics.go",
"querier.go",
"regular_sync.go",
"service.go",
@@ -19,6 +20,8 @@ go_library(
"//shared/p2p:go_default_library",
"//shared/params:go_default_library",
"@com_github_gogo_protobuf//proto:go_default_library",
"@com_github_prometheus_client_golang//prometheus:go_default_library",
"@com_github_prometheus_client_golang//prometheus/promauto:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@io_opencensus_go//trace:go_default_library",
],

View File

@@ -2,7 +2,10 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = ["service.go"],
srcs = [
"metrics.go",
"service.go",
],
importpath = "github.com/prysmaticlabs/prysm/beacon-chain/sync/initial-sync",
visibility = ["//beacon-chain:__subpackages__"],
deps = [
@@ -14,7 +17,10 @@ go_library(
"//shared/p2p:go_default_library",
"//shared/params:go_default_library",
"@com_github_gogo_protobuf//proto:go_default_library",
"@com_github_prometheus_client_golang//prometheus:go_default_library",
"@com_github_prometheus_client_golang//prometheus/promauto:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@io_opencensus_go//trace:go_default_library",
],
)

View File

@@ -0,0 +1,38 @@
package initialsync
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
var (
// Metrics
sentBatchedBlockReq = promauto.NewCounter(prometheus.CounterOpts{
Name: "initsync_sent_batched_block_req",
Help: "The number of sent batched block req",
})
batchedBlockReq = promauto.NewCounter(prometheus.CounterOpts{
Name: "initsync_batched_block_req",
Help: "The number of received batch blocks responses",
})
blockReqSlot = promauto.NewCounter(prometheus.CounterOpts{
Name: "initsync_block_req_by_slot",
Help: "The number of sent block requests by slot",
})
recBlock = promauto.NewCounter(prometheus.CounterOpts{
Name: "initsync_received_blocks",
Help: "The number of received blocks",
})
recBlockAnnounce = promauto.NewCounter(prometheus.CounterOpts{
Name: "initsync_received_block_announce",
Help: "The number of received block announce",
})
stateReq = promauto.NewCounter(prometheus.CounterOpts{
Name: "initsync_state_req",
Help: "The number of sent state requests",
})
recState = promauto.NewCounter(prometheus.CounterOpts{
Name: "initsync_received_state",
Help: "The number of received state",
})
)

View File

@@ -26,6 +26,7 @@ import (
"github.com/prysmaticlabs/prysm/shared/p2p"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/sirupsen/logrus"
"go.opencensus.io/trace"
)
var log = logrus.WithField("prefix", "initial-sync")
@@ -199,7 +200,7 @@ func (s *InitialSync) run(delayChan <-chan time.Time) {
}()
if s.reqState {
if err := s.requestStateFromPeer(s.stateRootOfHighestObservedSlot[:], p2p.Peer{}); err != nil {
if err := s.requestStateFromPeer(s.ctx, s.stateRootOfHighestObservedSlot[:], p2p.Peer{}); err != nil {
log.Errorf("Could not request state from peer %v", err)
}
} else {
@@ -213,72 +214,18 @@ func (s *InitialSync) run(delayChan <-chan time.Time) {
log.Debug("Exiting goroutine")
return
case <-delayChan:
if s.reqState {
if err := s.requestStateFromPeer(s.stateRootOfHighestObservedSlot[:], p2p.Peer{}); err != nil {
log.Errorf("Could not request state from peer %v", err)
}
continue
}
if s.highestObservedSlot == s.currentSlot {
log.Info("Exiting initial sync and starting normal sync")
s.syncedFeed.Send(s.currentSlot)
s.syncService.ResumeSync()
if s.checkSyncStatus() {
return
}
// requests multiple blocks so as to save and sync quickly.
s.requestBatchedBlocks(s.currentSlot+1, s.highestObservedSlot)
case msg := <-s.blockAnnounceBuf:
data := msg.Data.(*pb.BeaconBlockAnnounce)
if s.reqState {
if err := s.requestStateFromPeer(s.stateRootOfHighestObservedSlot[:], p2p.Peer{}); err != nil {
log.Errorf("Could not request state from peer %v", err)
}
continue
}
if data.SlotNumber > s.highestObservedSlot {
s.highestObservedSlot = data.SlotNumber
}
s.requestBatchedBlocks(s.currentSlot+1, s.highestObservedSlot)
log.Debugf("Successfully requested the next block with slot: %d", data.SlotNumber)
s.processBlockAnnounce(msg)
case msg := <-s.blockBuf:
data := msg.Data.(*pb.BeaconBlockResponse)
s.processBlock(s.ctx, data.Block, msg.Peer)
s.processBlock(msg.Ctx, data.Block, msg.Peer)
case msg := <-s.stateBuf:
data := msg.Data.(*pb.BeaconStateResponse)
beaconState := data.BeaconState
if s.currentSlot > beaconState.FinalizedEpoch*params.BeaconConfig().SlotsPerEpoch {
continue
}
if err := s.db.SaveState(beaconState); err != nil {
log.Errorf("Unable to set beacon state for initial sync %v", err)
}
h, err := hashutil.HashProto(beaconState)
if err != nil {
log.Error(err)
continue
}
if h == s.stateRootOfHighestObservedSlot {
s.reqState = false
}
// sets the current slot to the last finalized slot of the
// beacon state to begin our sync from.
s.currentSlot = beaconState.FinalizedEpoch * params.BeaconConfig().SlotsPerEpoch
s.beaconStateSlot = beaconState.Slot
log.Debugf("Successfully saved beacon state with the last finalized slot: %d", beaconState.FinalizedEpoch*params.BeaconConfig().SlotsPerEpoch)
s.requestBatchedBlocks(s.currentSlot+1, s.highestObservedSlot)
s.processState(msg)
case msg := <-s.batchedBlockBuf:
s.processBatchedBlocks(s.ctx, msg)
s.processBatchedBlocks(msg)
}
}
}
@@ -304,10 +251,54 @@ func (s *InitialSync) checkInMemoryBlocks() {
}
}
// checkSyncStatus verifies if the beacon node is correctly synced with its peers up to their
// latest canonical head. If not, then it requests batched blocks up to the highest observed slot.
func (s *InitialSync) checkSyncStatus() bool {
if s.reqState {
if err := s.requestStateFromPeer(s.ctx, s.stateRootOfHighestObservedSlot[:], p2p.Peer{}); err != nil {
log.Errorf("Could not request state from peer %v", err)
}
return false
}
if s.highestObservedSlot == s.currentSlot {
log.Info("Exiting initial sync and starting normal sync")
s.syncedFeed.Send(s.currentSlot)
s.syncService.ResumeSync()
return true
}
// requests multiple blocks so as to save and sync quickly.
s.requestBatchedBlocks(s.currentSlot+1, s.highestObservedSlot)
return false
}
func (s *InitialSync) processBlockAnnounce(msg p2p.Message) {
ctx, span := trace.StartSpan(msg.Ctx, "beacon-chain.sync.initial-sync.processBlockAnnounce")
defer span.End()
data := msg.Data.(*pb.BeaconBlockAnnounce)
recBlockAnnounce.Inc()
if s.reqState {
if err := s.requestStateFromPeer(ctx, s.stateRootOfHighestObservedSlot[:], p2p.Peer{}); err != nil {
log.Errorf("Could not request state from peer %v", err)
}
return
}
if data.SlotNumber > s.highestObservedSlot {
s.highestObservedSlot = data.SlotNumber
}
s.requestBatchedBlocks(s.currentSlot+1, s.highestObservedSlot)
log.Debugf("Successfully requested the next block with slot: %d", data.SlotNumber)
}
// processBlock is the main method that validates each block which is received
// for initial sync. It checks if the blocks are valid and then will continue to
// process and save it into the db.
func (s *InitialSync) processBlock(ctx context.Context, block *pb.BeaconBlock, peer p2p.Peer) {
ctx, span := trace.StartSpan(ctx, "beacon-chain.sync.initial-sync.processBlock")
defer span.End()
recBlock.Inc()
if block.Slot > s.highestObservedSlot {
s.highestObservedSlot = block.Slot
s.stateRootOfHighestObservedSlot = bytesutil.ToBytes32(block.StateRootHash32)
@@ -319,7 +310,7 @@ func (s *InitialSync) processBlock(ctx context.Context, block *pb.BeaconBlock, p
// requesting beacon state if there is no saved state.
if s.reqState {
if err := s.requestStateFromPeer(block.StateRootHash32, peer); err != nil {
if err := s.requestStateFromPeer(s.ctx, block.StateRootHash32, peer); err != nil {
log.Errorf("Could not request beacon state from peer: %v", err)
}
return
@@ -341,7 +332,10 @@ func (s *InitialSync) processBlock(ctx context.Context, block *pb.BeaconBlock, p
// processBatchedBlocks processes all the received blocks from
// the p2p message.
func (s *InitialSync) processBatchedBlocks(ctx context.Context, msg p2p.Message) {
func (s *InitialSync) processBatchedBlocks(msg p2p.Message) {
ctx, span := trace.StartSpan(msg.Ctx, "beacon-chain.sync.initial-sync.processBatchedBlocks")
defer span.End()
batchedBlockReq.Inc()
log.Debug("Processing batched block response")
response := msg.Data.(*pb.BatchedBeaconBlockResponse)
@@ -353,9 +347,46 @@ func (s *InitialSync) processBatchedBlocks(ctx context.Context, msg p2p.Message)
log.Debug("Finished processing batched blocks")
}
func (s *InitialSync) processState(msg p2p.Message) {
_, span := trace.StartSpan(msg.Ctx, "beacon-chain.sync.initial-sync.processState")
defer span.End()
data := msg.Data.(*pb.BeaconStateResponse)
beaconState := data.BeaconState
recState.Inc()
if s.currentSlot > beaconState.FinalizedEpoch*params.BeaconConfig().SlotsPerEpoch {
return
}
if err := s.db.SaveState(beaconState); err != nil {
log.Errorf("Unable to set beacon state for initial sync %v", err)
}
h, err := hashutil.HashProto(beaconState)
if err != nil {
log.Error(err)
return
}
if h == s.stateRootOfHighestObservedSlot {
s.reqState = false
}
// sets the current slot to the last finalized slot of the
// beacon state to begin our sync from.
s.currentSlot = beaconState.FinalizedEpoch * params.BeaconConfig().SlotsPerEpoch
s.beaconStateSlot = beaconState.Slot
log.Debugf("Successfully saved beacon state with the last finalized slot: %d", beaconState.FinalizedEpoch*params.BeaconConfig().SlotsPerEpoch)
s.requestBatchedBlocks(s.currentSlot+1, s.highestObservedSlot)
}
// requestStateFromPeer sends a request to a peer for the corresponding state
// for a beacon block.
func (s *InitialSync) requestStateFromPeer(stateRoot []byte, peer p2p.Peer) error {
func (s *InitialSync) requestStateFromPeer(ctx context.Context, stateRoot []byte, peer p2p.Peer) error {
_, span := trace.StartSpan(ctx, "beacon-chain.sync.initial-sync.requestStateFromPeer")
defer span.End()
stateReq.Inc()
log.Debugf("Successfully processed incoming block with state hash: %#x", stateRoot)
s.p2p.Send(&pb.BeaconStateRequest{Hash: stateRoot}, peer)
return nil
@@ -363,7 +394,10 @@ func (s *InitialSync) requestStateFromPeer(stateRoot []byte, peer p2p.Peer) erro
// requestNextBlock broadcasts a request for a block with the entered slotnumber.
func (s *InitialSync) requestNextBlockBySlot(ctx context.Context, slotNumber uint64) {
ctx, span := trace.StartSpan(ctx, "beacon-chain.sync.initial-sync.requestBlockBySlot")
defer span.End()
log.Debugf("Requesting block %d ", slotNumber)
blockReqSlot.Inc()
s.mutex.Lock()
defer s.mutex.Unlock()
if block, ok := s.inMemoryBlocks[slotNumber]; ok {
@@ -376,6 +410,9 @@ func (s *InitialSync) requestNextBlockBySlot(ctx context.Context, slotNumber uin
// requestBatchedBlocks sends out a request for multiple blocks till a
// specified bound slot number.
func (s *InitialSync) requestBatchedBlocks(startSlot uint64, endSlot uint64) {
_, span := trace.StartSpan(context.Background(), "beacon-chain.sync.initial-sync.requestBatchedBlocks")
defer span.End()
sentBatchedBlockReq.Inc()
blockLimit := params.BeaconConfig().BatchBlockLimit
if startSlot+blockLimit < endSlot {
endSlot = startSlot + blockLimit
@@ -390,6 +427,8 @@ func (s *InitialSync) requestBatchedBlocks(startSlot uint64, endSlot uint64) {
// validateAndSaveNextBlock will validate whether blocks received from the blockfetcher
// routine can be added to the chain.
func (s *InitialSync) validateAndSaveNextBlock(ctx context.Context, block *pb.BeaconBlock) error {
ctx, span := trace.StartSpan(ctx, "beacon-chain.sync.initial-sync.validateAndSaveNextBlock")
defer span.End()
root, err := hashutil.HashBeaconBlock(block)
if err != nil {
return err
@@ -424,6 +463,8 @@ func (s *InitialSync) validateAndSaveNextBlock(ctx context.Context, block *pb.Be
}
func (s *InitialSync) checkBlockValidity(ctx context.Context, block *pb.BeaconBlock) error {
ctx, span := trace.StartSpan(ctx, "beacon-chain.sync.initial-sync.checkBlockValidity")
defer span.End()
blockRoot, err := hashutil.HashBeaconBlock(block)
if err != nil {
return fmt.Errorf("could not tree hash received block: %v", err)

View File

@@ -152,6 +152,7 @@ func TestSavingBlock_InSync(t *testing.T) {
return p2p.Message{
Peer: p2p.Peer{},
Data: blockResponse,
Ctx: context.Background(),
}
}
@@ -167,6 +168,7 @@ func TestSavingBlock_InSync(t *testing.T) {
msg2 := p2p.Message{
Peer: p2p.Peer{},
Data: incorrectStateResponse,
Ctx: context.Background(),
}
ss.stateBuf <- msg2
@@ -262,11 +264,13 @@ func TestDelayChan_OK(t *testing.T) {
msg1 := p2p.Message{
Peer: p2p.Peer{},
Data: blockResponse,
Ctx: context.Background(),
}
msg2 := p2p.Message{
Peer: p2p.Peer{},
Data: stateResponse,
Ctx: context.Background(),
}
ss.blockBuf <- msg1
@@ -353,6 +357,7 @@ func TestRequestBlocksBySlot_OK(t *testing.T) {
return p2p.Message{
Peer: p2p.Peer{},
Data: blockResponse,
Ctx: context.Background(),
}, root
}

View File

@@ -0,0 +1,87 @@
package sync
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
var (
// Metrics
batchedBlockReq = promauto.NewCounter(prometheus.CounterOpts{
Name: "regsync_batched_block_req",
Help: "The number of received batch block requests",
})
blockReqSlot = promauto.NewCounter(prometheus.CounterOpts{
Name: "regsync_block_req_by_slot",
Help: "The number of received block requests by slot",
})
blockReqHash = promauto.NewCounter(prometheus.CounterOpts{
Name: "regsync_block_req_by_hash",
Help: "The number of received block requests by hash",
})
recBlock = promauto.NewCounter(prometheus.CounterOpts{
Name: "regsync_received_blocks",
Help: "The number of received blocks",
})
recBlockAnnounce = promauto.NewCounter(prometheus.CounterOpts{
Name: "regsync_received_block_announce",
Help: "The number of received block announcements",
})
sentBlockAnnounce = promauto.NewCounter(prometheus.CounterOpts{
Name: "regsync_sent_block_announce",
Help: "The number of sent block announcements",
})
sentBlockReq = promauto.NewCounter(prometheus.CounterOpts{
Name: "regsync_sent_block_request",
Help: "The number of sent block request",
})
sentBlocks = promauto.NewCounter(prometheus.CounterOpts{
Name: "regsync_sent_blocks",
Help: "The number of sent blocks",
})
sentBatchedBlocks = promauto.NewCounter(prometheus.CounterOpts{
Name: "regsync_sent_batched_blocks",
Help: "The number of sent batched blocks",
})
stateReq = promauto.NewCounter(prometheus.CounterOpts{
Name: "regsync_state_req",
Help: "The number of state requests",
})
sentState = promauto.NewCounter(prometheus.CounterOpts{
Name: "regsync_sent_state",
Help: "The number of sent state",
})
attestationReq = promauto.NewCounter(prometheus.CounterOpts{
Name: "regsync_attestation_req",
Help: "The number of received attestation requests",
})
unseenAttestationReq = promauto.NewCounter(prometheus.CounterOpts{
Name: "regsync_unseen_attestation_req",
Help: "The number of received unseen attestation requests",
})
recAttestation = promauto.NewCounter(prometheus.CounterOpts{
Name: "regsync_received_attestation",
Help: "The number of received attestations",
})
sentAttestation = promauto.NewCounter(prometheus.CounterOpts{
Name: "regsync_sent_attestation",
Help: "The number of sent attestations",
})
recExit = promauto.NewCounter(prometheus.CounterOpts{
Name: "regsync_received_exits",
Help: "The number of received exits",
})
sentExit = promauto.NewCounter(prometheus.CounterOpts{
Name: "regsync_sent_exits",
Help: "The number of sent exits",
})
chainHeadReq = promauto.NewCounter(prometheus.CounterOpts{
Name: "regsync_chain_head_req",
Help: "The number of sent attestation requests",
})
sentChainHead = promauto.NewCounter(prometheus.CounterOpts{
Name: "regsync_chain_head_sent",
Help: "The number of sent chain head responses",
})
)

View File

@@ -213,7 +213,7 @@ func (rs *RegularSync) run() {
case msg := <-rs.chainHeadReqBuf:
rs.handleChainHeadRequest(msg)
case block := <-rs.canonicalBuf:
rs.broadcastCanonicalBlock(block)
rs.broadcastCanonicalBlock(rs.ctx, block)
}
}
}
@@ -222,8 +222,9 @@ func (rs *RegularSync) run() {
// TODO(#175): New hashes are forwarded to other peers in the network, and
// the contents of the block are requested if the local chain doesn't have the block.
func (rs *RegularSync) receiveBlockAnnounce(msg p2p.Message) {
ctx, receiveBlockSpan := trace.StartSpan(msg.Ctx, "RegularSync_receiveBlockRoot")
defer receiveBlockSpan.End()
ctx, span := trace.StartSpan(msg.Ctx, "beacon-chain.sync.receiveBlockAnnounce")
defer span.End()
recBlockAnnounce.Inc()
data := msg.Data.(*pb.BeaconBlockAnnounce)
h := bytesutil.ToBytes32(data.Hash[:32])
@@ -235,15 +236,17 @@ func (rs *RegularSync) receiveBlockAnnounce(msg p2p.Message) {
log.WithField("blockRoot", fmt.Sprintf("%#x", h)).Debug("Received incoming block root, requesting full block data from sender")
// Request the full block data from peer that sent the block hash.
_, sendBlockRequestSpan := trace.StartSpan(ctx, "sendBlockRequest")
_, sendBlockRequestSpan := trace.StartSpan(ctx, "beacon-chain.sync.sendBlockRequest")
rs.p2p.Send(&pb.BeaconBlockRequest{Hash: h[:]}, msg.Peer)
sentBlockReq.Inc()
sendBlockRequestSpan.End()
}
// receiveBlock processes a block from the p2p layer.
func (rs *RegularSync) receiveBlock(msg p2p.Message) {
ctx, receiveBlockSpan := trace.StartSpan(msg.Ctx, "RegularSync_receiveBlock")
defer receiveBlockSpan.End()
ctx, span := trace.StartSpan(msg.Ctx, "beacon-chain.sync.receiveBlock")
defer span.End()
recBlock.Inc()
response := msg.Data.(*pb.BeaconBlockResponse)
block := response.Block
@@ -270,17 +273,19 @@ func (rs *RegularSync) receiveBlock(msg p2p.Message) {
return
}
_, sendBlockSpan := trace.StartSpan(ctx, "sendBlock")
_, sendBlockSpan := trace.StartSpan(ctx, "beacon-chain.sync.sendBlock")
log.WithField("blockRoot", fmt.Sprintf("%#x", blockRoot)).Debug("Sending newly received block to subscribers")
rs.chainService.IncomingBlockFeed().Send(block)
sentBlocks.Inc()
sendBlockSpan.End()
}
// handleBlockRequestBySlot processes a block request from the p2p layer.
// if found, the block is sent to the requesting peer.
func (rs *RegularSync) handleBlockRequestBySlot(msg p2p.Message) {
ctx, blockRequestSpan := trace.StartSpan(msg.Ctx, "RegularSync_blockRequestBySlot")
defer blockRequestSpan.End()
ctx, span := trace.StartSpan(msg.Ctx, "beacon-chain.sync.handleBlockRequestBySlot")
defer span.End()
blockReqSlot.Inc()
request, ok := msg.Data.(*pb.BeaconBlockRequestBySlotNumber)
if !ok {
@@ -302,12 +307,14 @@ func (rs *RegularSync) handleBlockRequestBySlot(msg p2p.Message) {
rs.p2p.Send(&pb.BeaconBlockResponse{
Block: block,
}, msg.Peer)
sentBlocks.Inc()
sendBlockSpan.End()
}
func (rs *RegularSync) handleStateRequest(msg p2p.Message) {
ctx, handleStateReqSpan := trace.StartSpan(msg.Ctx, "RegularSync_handleStateReq")
defer handleStateReqSpan.End()
ctx, span := trace.StartSpan(msg.Ctx, "beacon-chain.sync.handleStateRequest")
defer span.End()
stateReq.Inc()
req, ok := msg.Data.(*pb.BeaconStateRequest)
if !ok {
log.Errorf("Message is of the incorrect type")
@@ -327,13 +334,17 @@ func (rs *RegularSync) handleStateRequest(msg p2p.Message) {
log.Debugf("Requested state root is different from locally stored state root %#x", req.Hash)
return
}
_, sendStateSpan := trace.StartSpan(ctx, "sendState")
_, sendStateSpan := trace.StartSpan(ctx, "beacon-chain.sync.sendState")
log.WithField("beaconState", fmt.Sprintf("%#x", root)).Debug("Sending beacon state to peer")
rs.p2p.Send(&pb.BeaconStateResponse{BeaconState: state}, msg.Peer)
sentState.Inc()
sendStateSpan.End()
}
func (rs *RegularSync) handleChainHeadRequest(msg p2p.Message) {
ctx, span := trace.StartSpan(msg.Ctx, "beacon-chain.sync.handleChainHeadRequest")
defer span.End()
chainHeadReq.Inc()
if _, ok := msg.Data.(*pb.ChainHeadRequest); !ok {
log.Errorf("message is of the incorrect type")
return
@@ -356,16 +367,19 @@ func (rs *RegularSync) handleChainHeadRequest(msg p2p.Message) {
Hash: blockRoot[:],
Block: block,
}
_, ChainHead := trace.StartSpan(ctx, "sendChainHead")
rs.p2p.Send(req, msg.Peer)
sentChainHead.Inc()
ChainHead.End()
}
// receiveAttestation accepts an broadcasted attestation from the p2p layer,
// discard the attestation if we have gotten before, send it to attestation
// pool if we have not.
func (rs *RegularSync) receiveAttestation(msg p2p.Message) {
ctx, receiveAttestationSpan := trace.StartSpan(msg.Ctx, "RegularSync_receiveAttestation")
defer receiveAttestationSpan.End()
ctx, span := trace.StartSpan(msg.Ctx, "beacon-chain.sync.receiveAttestation")
defer span.End()
recAttestation.Inc()
attestation := msg.Data.(*pb.Attestation)
attestationRoot, err := hashutil.HashProto(attestation)
@@ -393,9 +407,10 @@ func (rs *RegularSync) receiveAttestation(msg p2p.Message) {
return
}
_, sendAttestationSpan := trace.StartSpan(ctx, "sendAttestation")
_, sendAttestationSpan := trace.StartSpan(ctx, "beacon-chain.sync.sendAttestation")
log.WithField("attestationHash", fmt.Sprintf("%#x", attestationRoot)).Debug("Sending newly received attestation to subscribers")
rs.operationsService.IncomingAttFeed().Send(attestation)
sentAttestation.Inc()
sendAttestationSpan.End()
}
@@ -403,6 +418,9 @@ func (rs *RegularSync) receiveAttestation(msg p2p.Message) {
// discard the exit if we have gotten before, send it to operation
// service if we have not.
func (rs *RegularSync) receiveExitRequest(msg p2p.Message) {
ctx, span := trace.StartSpan(msg.Ctx, "beacon-chain.sync.receiveExitRequest")
defer span.End()
recExit.Inc()
exit := msg.Data.(*pb.VoluntaryExit)
h, err := hashutil.HashProto(exit)
if err != nil {
@@ -414,17 +432,21 @@ func (rs *RegularSync) receiveExitRequest(msg p2p.Message) {
log.Debugf("Received, skipping exit request #%x", h)
return
}
_, sendExitReqSpan := trace.StartSpan(ctx, "sendExitRequest")
log.WithField("exitReqHash", fmt.Sprintf("%#x", h)).
Debug("Forwarding validator exit request to subscribed services")
rs.operationsService.IncomingExitFeed().Send(exit)
sentExit.Inc()
sendExitReqSpan.End()
}
func (rs *RegularSync) handleBlockRequestByHash(msg p2p.Message) {
ctx, span := trace.StartSpan(msg.Ctx, "beacon-chain.sync.handleBlockRequestByHash")
defer span.End()
blockReqHash.Inc()
data := msg.Data.(*pb.BeaconBlockRequest)
root := bytesutil.ToBytes32(data.Hash)
block, err := rs.db.Block(root)
if err != nil {
log.Error(err)
@@ -435,14 +457,20 @@ func (rs *RegularSync) handleBlockRequestByHash(msg p2p.Message) {
return
}
_, sendBlockSpan := trace.StartSpan(ctx, "sendBlock")
rs.p2p.Send(&pb.BeaconBlockResponse{
Block: block,
}, msg.Peer)
sentBlocks.Inc()
sendBlockSpan.End()
}
// handleBatchedBlockRequest receives p2p messages which consist of requests for batched blocks
// which are bounded by a start slot and end slot.
func (rs *RegularSync) handleBatchedBlockRequest(msg p2p.Message) {
ctx, span := trace.StartSpan(msg.Ctx, "beacon-chain.sync.handleBatchedBlockRequest")
defer span.End()
batchedBlockReq.Inc()
data := msg.Data.(*pb.BatchedBeaconBlockRequest)
startSlot, endSlot := data.StartSlot, data.EndSlot
@@ -459,7 +487,6 @@ func (rs *RegularSync) handleBatchedBlockRequest(msg p2p.Message) {
}
currentSlot := block.Slot
if currentSlot < startSlot || finalizedSlot > endSlot {
log.Debugf(
"invalid batch request: current slot < start slot || finalized slot > end slot."+
@@ -474,35 +501,35 @@ func (rs *RegularSync) handleBatchedBlockRequest(msg p2p.Message) {
}
response := make([]*pb.BeaconBlock, 0, blockRange)
for i := startSlot; i <= endSlot; i++ {
retBlock, err := rs.db.BlockBySlot(i)
if err != nil {
log.Errorf("Unable to retrieve block from db %v", err)
continue
}
if retBlock == nil {
log.Debug("Block does not exist in db")
continue
}
response = append(response, retBlock)
}
_, sendBatchedBlockSpan := trace.StartSpan(ctx, "sendBatchedBlocks")
log.Debugf("Sending response for batch blocks to peer %v", msg.Peer)
rs.p2p.Send(&pb.BatchedBeaconBlockResponse{
BatchedBlocks: response,
}, msg.Peer)
sentBatchedBlocks.Inc()
sendBatchedBlockSpan.End()
}
func (rs *RegularSync) handleAttestationRequestByHash(msg p2p.Message) {
ctx, respondAttestationSpan := trace.StartSpan(msg.Ctx, "RegularSync_respondAttestation")
defer respondAttestationSpan.End()
ctx, span := trace.StartSpan(msg.Ctx, "beacon-chain.sync.handleAttestationRequestByHash")
defer span.End()
attestationReq.Inc()
req := msg.Data.(*pb.AttestationRequest)
root := bytesutil.ToBytes32(req.Hash)
att, err := rs.db.Attestation(root)
if err != nil {
log.Error(err)
@@ -518,12 +545,14 @@ func (rs *RegularSync) handleAttestationRequestByHash(msg p2p.Message) {
rs.p2p.Send(&pb.AttestationResponse{
Attestation: att,
}, msg.Peer)
sentAttestation.Inc()
sendAttestationSpan.End()
}
func (rs *RegularSync) handleUnseenAttestationsRequest(msg p2p.Message) {
ctx, respondAttestationxSpan := trace.StartSpan(msg.Ctx, "RegularSync_respondUnseenAttestations")
defer respondAttestationxSpan.End()
ctx, span := trace.StartSpan(msg.Ctx, "beacon-chain.sync.handleUnseenAttestationsRequest")
defer span.End()
unseenAttestationReq.Inc()
if _, ok := msg.Data.(*pb.UnseenAttestationsRequest); !ok {
log.Errorf("message is of the incorrect type")
return
@@ -540,23 +569,31 @@ func (rs *RegularSync) handleUnseenAttestationsRequest(msg p2p.Message) {
return
}
_, sendAttestationsSpan := trace.StartSpan(ctx, "sendAttestation")
_, sendAttestationsSpan := trace.StartSpan(ctx, "beacon-chain.sync.sendAttestation")
log.Debugf("Sending response for batched unseen attestations to peer %v", msg.Peer)
rs.p2p.Send(&pb.UnseenAttestationResponse{
Attestations: atts,
}, msg.Peer)
sentAttestation.Inc()
sendAttestationsSpan.End()
}
func (rs *RegularSync) broadcastCanonicalBlock(blk *pb.BeaconBlock) {
func (rs *RegularSync) broadcastCanonicalBlock(ctx context.Context, blk *pb.BeaconBlock) {
_, span := trace.StartSpan(ctx, "beacon-chain.sync.broadcastCanonicalBlock")
defer span.End()
h, err := hashutil.HashProto(blk)
if err != nil {
log.Errorf("Could not tree hash block %v", err)
return
}
_, sendBlockAnnounceSpan := trace.StartSpan(ctx, "beacon-chain.sync.sendBlockAnnounce")
log.Debugf("Announcing canonical block %#x", h)
rs.p2p.Broadcast(&pb.BeaconBlockAnnounce{
Hash: h[:],
SlotNumber: blk.Slot,
})
sentBlockAnnounce.Inc()
sendBlockAnnounceSpan.End()
}