Add a span to block-by-slot (#2184)

* Add a span to block-by-slot

* /goroutinez
This commit is contained in:
Preston Van Loon
2019-04-05 19:19:27 -05:00
committed by GitHub
parent 9a6a275d3d
commit c925632cc9
11 changed files with 49 additions and 34 deletions

View File

@@ -347,7 +347,7 @@ func TestReceiveBlock_OnChainSplit(t *testing.T) {
}
// Common ancestor is block at slot 3
commonAncestor, err := db.BlockBySlot(genesisSlot + 3)
commonAncestor, err := db.BlockBySlot(ctx, genesisSlot+3)
if err != nil {
t.Fatal(err)
}

View File

@@ -41,7 +41,7 @@ func (c *ChainService) updateFFGCheckPts(ctx context.Context, state *pb.BeaconSt
// the slot of justified block saved in DB.
if lastJustifiedSlot > savedJustifiedBlock.Slot {
// Retrieve the new justified block from DB using the new justified slot and save it.
newJustifiedBlock, err := c.beaconDB.BlockBySlot(lastJustifiedSlot)
newJustifiedBlock, err := c.beaconDB.BlockBySlot(ctx, lastJustifiedSlot)
if err != nil {
return err
}
@@ -52,7 +52,7 @@ func (c *ChainService) updateFFGCheckPts(ctx context.Context, state *pb.BeaconSt
log.Debugf("Saving new justified block, no block with slot %d in db, trying slot %d",
lastAvailBlkSlot, lastAvailBlkSlot-1)
lastAvailBlkSlot--
newJustifiedBlock, err = c.beaconDB.BlockBySlot(lastAvailBlkSlot)
newJustifiedBlock, err = c.beaconDB.BlockBySlot(ctx, lastAvailBlkSlot)
if err != nil {
return err
}
@@ -80,7 +80,7 @@ func (c *ChainService) updateFFGCheckPts(ctx context.Context, state *pb.BeaconSt
}
if lastFinalizedSlot > savedFinalizedBlock.Slot {
// Retrieve the new finalized block from DB using the new finalized slot and save it.
newFinalizedBlock, err := c.beaconDB.BlockBySlot(lastFinalizedSlot)
newFinalizedBlock, err := c.beaconDB.BlockBySlot(ctx, lastFinalizedSlot)
if err != nil {
return err
}
@@ -91,7 +91,7 @@ func (c *ChainService) updateFFGCheckPts(ctx context.Context, state *pb.BeaconSt
log.Debugf("Saving new finalized block, no block with slot %d in db, trying slot %d",
lastAvailBlkSlot, lastAvailBlkSlot-1)
lastAvailBlkSlot--
newFinalizedBlock, err = c.beaconDB.BlockBySlot(lastAvailBlkSlot)
newFinalizedBlock, err = c.beaconDB.BlockBySlot(ctx, lastAvailBlkSlot)
if err != nil {
return err
}
@@ -138,7 +138,7 @@ func (c *ChainService) ApplyForkChoiceRule(
if err != nil {
return fmt.Errorf("could not retrieve justified head: %v", err)
}
head, err := c.lmdGhost(justifiedHead, justifiedState, attestationTargets)
head, err := c.lmdGhost(ctx, justifiedHead, justifiedState, attestationTargets)
if err != nil {
return fmt.Errorf("could not run fork choice: %v", err)
}
@@ -201,6 +201,7 @@ func (c *ChainService) ApplyForkChoiceRule(
// return head
// head = max(children, key=get_vote_count)
func (c *ChainService) lmdGhost(
ctx context.Context,
startBlock *pb.BeaconBlock,
startState *pb.BeaconState,
voteTargets map[uint64]*pb.BeaconBlock,
@@ -209,7 +210,7 @@ func (c *ChainService) lmdGhost(
head := startBlock
for {
children, err := c.blockChildren(head, highestSlot)
children, err := c.blockChildren(ctx, head, highestSlot)
if err != nil {
return nil, fmt.Errorf("could not fetch block children: %v", err)
}
@@ -247,7 +248,7 @@ func (c *ChainService) lmdGhost(
// Spec pseudocode definition:
// get_children(store: Store, block: BeaconBlock) -> List[BeaconBlock]
// returns the child blocks of the given block.
func (c *ChainService) blockChildren(block *pb.BeaconBlock, highestSlot uint64) ([]*pb.BeaconBlock, error) {
func (c *ChainService) blockChildren(ctx context.Context, block *pb.BeaconBlock, highestSlot uint64) ([]*pb.BeaconBlock, error) {
var children []*pb.BeaconBlock
currentRoot, err := hashutil.HashBeaconBlock(block)
@@ -256,7 +257,7 @@ func (c *ChainService) blockChildren(block *pb.BeaconBlock, highestSlot uint64)
}
startSlot := block.Slot + 1
for i := startSlot; i <= highestSlot; i++ {
block, err := c.beaconDB.BlockBySlot(i)
block, err := c.beaconDB.BlockBySlot(ctx, i)
if err != nil {
return nil, fmt.Errorf("could not get block by slot: %v", err)
}

View File

@@ -289,7 +289,7 @@ func TestBlockChildren_2InARow(t *testing.T) {
t.Fatalf("Could update chain head: %v", err)
}
childrenBlock, err := chainService.blockChildren(block1, state.Slot)
childrenBlock, err := chainService.blockChildren(ctx, block1, state.Slot)
if err != nil {
t.Fatalf("Could not get block children: %v", err)
}
@@ -364,7 +364,7 @@ func TestBlockChildren_ChainSplits(t *testing.T) {
t.Fatalf("Could update chain head: %v", err)
}
childrenBlock, err := chainService.blockChildren(block1, state.Slot)
childrenBlock, err := chainService.blockChildren(ctx, block1, state.Slot)
if err != nil {
t.Fatalf("Could not get block children: %v", err)
}
@@ -430,7 +430,7 @@ func TestBlockChildren_SkipSlots(t *testing.T) {
t.Fatalf("Could update chain head: %v", err)
}
childrenBlock, err := chainService.blockChildren(block1, state.Slot)
childrenBlock, err := chainService.blockChildren(ctx, block1, state.Slot)
if err != nil {
t.Fatalf("Could not get block children: %v", err)
}
@@ -488,7 +488,7 @@ func TestLMDGhost_TrivialHeadUpdate(t *testing.T) {
voteTargets[0] = block2
// LMDGhost should pick block 2.
head, err := chainService.lmdGhost(block1, state, voteTargets)
head, err := chainService.lmdGhost(ctx, block1, state, voteTargets)
if err != nil {
t.Fatalf("Could not run LMD GHOST: %v", err)
}
@@ -573,7 +573,7 @@ func TestLMDGhost_3WayChainSplitsSameHeight(t *testing.T) {
voteTargets[2] = block4
voteTargets[3] = block4
// LMDGhost should pick block 4.
head, err := chainService.lmdGhost(block1, state, voteTargets)
head, err := chainService.lmdGhost(ctx, block1, state, voteTargets)
if err != nil {
t.Fatalf("Could not run LMD GHOST: %v", err)
}
@@ -690,7 +690,7 @@ func TestLMDGhost_2WayChainSplitsDiffHeight(t *testing.T) {
voteTargets[1] = block5
voteTargets[2] = block5
// LMDGhost should pick block 5.
head, err := chainService.lmdGhost(block1, state, voteTargets)
head, err := chainService.lmdGhost(ctx, block1, state, voteTargets)
if err != nil {
t.Fatalf("Could not run LMD GHOST: %v", err)
}
@@ -761,7 +761,7 @@ func BenchmarkLMDGhost_8Slots_8Validators(b *testing.B) {
}
for i := 0; i < b.N; i++ {
_, err := chainService.lmdGhost(genesis, state, voteTargets)
_, err := chainService.lmdGhost(ctx, genesis, state, voteTargets)
if err != nil {
b.Fatalf("Could not run LMD GHOST: %v", err)
}
@@ -832,7 +832,7 @@ func BenchmarkLMDGhost_32Slots_8Validators(b *testing.B) {
}
for i := 0; i < b.N; i++ {
_, err := chainService.lmdGhost(genesis, state, voteTargets)
_, err := chainService.lmdGhost(ctx, genesis, state, voteTargets)
if err != nil {
b.Fatalf("Could not run LMD GHOST: %v", err)
}
@@ -901,7 +901,7 @@ func BenchmarkLMDGhost_32Slots_64Validators(b *testing.B) {
}
for i := 0; i < b.N; i++ {
_, err := chainService.lmdGhost(genesis, state, voteTargets)
_, err := chainService.lmdGhost(ctx, genesis, state, voteTargets)
if err != nil {
b.Fatalf("Could not run LMD GHOST: %v", err)
}
@@ -970,7 +970,7 @@ func BenchmarkLMDGhost_64Slots_16384Validators(b *testing.B) {
}
for i := 0; i < b.N; i++ {
_, err := chainService.lmdGhost(genesis, state, voteTargets)
_, err := chainService.lmdGhost(ctx, genesis, state, voteTargets)
if err != nil {
b.Fatalf("Could not run LMD GHOST: %v", err)
}

View File

@@ -54,7 +54,7 @@ func GenerateStateFromBlock(ctx context.Context, db *db.BeaconDB, slot uint64) (
}
// from input slot, retrieve its corresponding block and call that the most recent block.
mostRecentBlock, err := db.BlockBySlot(slot)
mostRecentBlock, err := db.BlockBySlot(ctx, slot)
if err != nil {
return nil, err
}
@@ -66,7 +66,7 @@ func GenerateStateFromBlock(ctx context.Context, db *db.BeaconDB, slot uint64) (
lastSlot := slot
for mostRecentBlock == nil {
lastSlot--
mostRecentBlock, err = db.BlockBySlot(lastSlot)
mostRecentBlock, err = db.BlockBySlot(ctx, lastSlot)
if err != nil {
return nil, err
}

View File

@@ -433,7 +433,7 @@ func ProcessBlockAttestations(
}
for idx, attestation := range atts {
if err := verifyAttestation(beaconState, attestation, verifySignatures, beaconDB); err != nil {
if err := verifyAttestation(ctx, beaconState, attestation, verifySignatures, beaconDB); err != nil {
return nil, fmt.Errorf("could not verify attestation at index %d in block: %v", idx, err)
}
beaconState.LatestAttestations = append(beaconState.LatestAttestations, &pb.PendingAttestation{
@@ -447,7 +447,7 @@ func ProcessBlockAttestations(
return beaconState, nil
}
func verifyAttestation(beaconState *pb.BeaconState, att *pb.Attestation, verifySignatures bool, beaconDB *db.BeaconDB) error {
func verifyAttestation(ctx context.Context, beaconState *pb.BeaconState, att *pb.Attestation, verifySignatures bool, beaconDB *db.BeaconDB) error {
if att.Data.Slot < params.BeaconConfig().GenesisSlot {
return fmt.Errorf(
"attestation slot (slot %d) less than genesis slot (%d)",
@@ -499,7 +499,7 @@ func verifyAttestation(beaconState *pb.BeaconState, att *pb.Attestation, verifyS
var justifiedBlock *pb.BeaconBlock
var err error
for i := uint64(0); justifiedBlock == nil && i < params.BeaconConfig().SlotsPerEpoch; i++ {
justifiedBlock, err = beaconDB.BlockBySlot(justifiedSlot - i)
justifiedBlock, err = beaconDB.BlockBySlot(ctx, justifiedSlot-i)
if err != nil {
return fmt.Errorf("could not get justified block: %v", err)
}

View File

@@ -6,6 +6,7 @@ import (
"fmt"
"github.com/prysmaticlabs/prysm/shared/hashutil"
"github.com/prysmaticlabs/prysm/shared/params"
"go.opencensus.io/trace"
"github.com/boltdb/bolt"
@@ -215,7 +216,11 @@ func (db *BeaconDB) UpdateChainHead(ctx context.Context, block *pb.BeaconBlock,
// BlockBySlot accepts a slot number and returns the corresponding block in the main chain.
// Returns nil if a block was not recorded for the given slot.
func (db *BeaconDB) BlockBySlot(slot uint64) (*pb.BeaconBlock, error) {
func (db *BeaconDB) BlockBySlot(ctx context.Context, slot uint64) (*pb.BeaconBlock, error) {
_, span := trace.StartSpan(ctx, "BeaconDB.BlockBySlot")
defer span.End()
span.AddAttributes(trace.Int64Attribute("slot", int64(slot-params.BeaconConfig().GenesisSlot)))
var block *pb.BeaconBlock
slotEnc := encodeSlotNumber(slot)

View File

@@ -77,8 +77,9 @@ func TestSaveBlock_OK(t *testing.T) {
func TestBlockBySlotEmptyChain_OK(t *testing.T) {
db := setupDB(t)
defer teardownDB(t, db)
ctx := context.Background()
block, _ := db.BlockBySlot(0)
block, _ := db.BlockBySlot(ctx, 0)
if block != nil {
t.Error("BlockBySlot should return nil for an empty chain")
}
@@ -118,7 +119,7 @@ func TestUpdateChainHead_OK(t *testing.T) {
t.Fatalf("failed to initialize state: %v", err)
}
block, err := db.BlockBySlot(0)
block, err := db.BlockBySlot(ctx, 0)
if err != nil {
t.Fatalf("failed to get genesis block: %v", err)
}
@@ -147,7 +148,7 @@ func TestUpdateChainHead_OK(t *testing.T) {
t.Fatalf("failed to record the new head of the main chain: %v", err)
}
b2Prime, err := db.BlockBySlot(1)
b2Prime, err := db.BlockBySlot(ctx, 1)
if err != nil {
t.Fatalf("failed to retrieve slot 1: %v", err)
}

View File

@@ -93,7 +93,7 @@ func (as *AttesterServer) AttestationDataAtSlot(ctx context.Context, req *pb.Att
if lastJustifiedSlot != headState.Slot {
var justifiedBlock *pbp2p.BeaconBlock
for i := uint64(0); justifiedBlock == nil && i < params.BeaconConfig().SlotsPerEpoch; i++ {
justifiedBlock, err = as.beaconDB.BlockBySlot(lastJustifiedSlot - i)
justifiedBlock, err = as.beaconDB.BlockBySlot(ctx, lastJustifiedSlot-i)
if err != nil {
return nil, fmt.Errorf("could not get justified block: %v", err)
}

View File

@@ -289,6 +289,7 @@ func TestProcessingBlocks_SkippedSlots(t *testing.T) {
db := internal.SetupDB(t)
defer internal.TeardownDB(t, db)
setUpGenesisStateAndBlock(db, t)
ctx := context.Background()
cfg := &Config{
P2P: &mockP2P{},
@@ -301,7 +302,7 @@ func TestProcessingBlocks_SkippedSlots(t *testing.T) {
batchSize := 20
expectedSlot := params.BeaconConfig().GenesisSlot + uint64(batchSize)
ss.highestObservedSlot = expectedSlot
blk, err := ss.db.BlockBySlot(params.BeaconConfig().GenesisSlot)
blk, err := ss.db.BlockBySlot(ctx, params.BeaconConfig().GenesisSlot)
if err != nil {
t.Fatalf("Unable to get genesis block %v", err)
}

View File

@@ -296,9 +296,7 @@ func (rs *RegularSync) handleBlockRequestBySlot(msg p2p.Message) error {
return errors.New("incoming message is not type *pb.BeaconBlockRequestBySlotNumber")
}
ctx, getBlockSpan := trace.StartSpan(ctx, "getBlockBySlot")
block, err := rs.db.BlockBySlot(request.SlotNumber)
getBlockSpan.End()
block, err := rs.db.BlockBySlot(ctx, request.SlotNumber)
if err != nil || block == nil {
if block == nil {
log.Debugf("Block with slot %d does not exist", request.SlotNumber)
@@ -560,7 +558,7 @@ func (rs *RegularSync) handleBatchedBlockRequest(msg p2p.Message) error {
response := make([]*pb.BeaconBlock, 0, blockRange)
for i := startSlot; i <= endSlot; i++ {
retBlock, err := rs.db.BlockBySlot(i)
retBlock, err := rs.db.BlockBySlot(ctx, i)
if err != nil {
log.Errorf("Unable to retrieve block from db %v", err)
continue

View File

@@ -5,6 +5,8 @@ import (
"context"
"fmt"
"net/http"
"runtime/debug"
"runtime/pprof"
"time"
"github.com/prometheus/client_golang/prometheus/promhttp"
@@ -30,6 +32,7 @@ func NewPrometheusService(addr string, svcRegistry *shared.ServiceRegistry) *Ser
mux := http.NewServeMux()
mux.Handle("/metrics", promhttp.Handler())
mux.HandleFunc("/healthz", s.healthzHandler)
mux.HandleFunc("/goroutinez", s.goroutinezHandler)
s.server = &http.Server{Addr: addr, Handler: mux}
@@ -71,6 +74,12 @@ func (s *Service) healthzHandler(w http.ResponseWriter, _ *http.Request) {
}
}
func (s *Service) goroutinezHandler(w http.ResponseWriter, r *http.Request) {
stack := debug.Stack()
w.Write(stack)
pprof.Lookup("goroutine").WriteTo(w, 2)
}
// Start the prometheus service.
func (s *Service) Start() {
log.WithField("endpoint", s.server.Addr).Info("Starting service")