mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-09 15:37:56 -05:00
Annotate errors / spans in block processing queue (#3751)
* annotation error in span * added more annotations and spans to process pending blocks * use diff * workspace dep
This commit is contained in:
@@ -10,7 +10,10 @@ import (
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/sync/peerstatus"
|
||||
"github.com/prysmaticlabs/prysm/shared/bytesutil"
|
||||
"github.com/prysmaticlabs/prysm/shared/params"
|
||||
"github.com/prysmaticlabs/prysm/shared/traceutil"
|
||||
"github.com/sirupsen/logrus"
|
||||
"go.opencensus.io/trace"
|
||||
"golang.org/x/exp/rand"
|
||||
)
|
||||
|
||||
var processPendingBlocksPeriod = time.Duration(params.BeaconConfig().SecondsPerSlot/3) * time.Second
|
||||
@@ -32,10 +35,21 @@ func (r *RegularSync) processPendingBlocksQueue() {
|
||||
|
||||
// processes the block tree inside the queue
|
||||
func (r *RegularSync) processPendingBlocks(ctx context.Context) error {
|
||||
ctx, span := trace.StartSpan(ctx, "processPendingBlocks")
|
||||
defer span.End()
|
||||
|
||||
pids := peerstatus.Keys()
|
||||
slots := r.sortedPendingSlots()
|
||||
|
||||
span.AddAttributes(
|
||||
trace.Int64Attribute("numSlots", int64(len(slots))),
|
||||
trace.Int64Attribute("numPeers", int64(len(pids))),
|
||||
)
|
||||
|
||||
for _, s := range slots {
|
||||
ctx, span := trace.StartSpan(ctx, "processPendingBlocks.InnerLoop")
|
||||
span.AddAttributes(trace.Int64Attribute("slot", int64(s)))
|
||||
|
||||
r.pendingQueueLock.RLock()
|
||||
b := r.slotToPendingBlocks[uint64(s)]
|
||||
inPendingQueue := r.seenPendingBlocks[bytesutil.ToBytes32(b.ParentRoot)]
|
||||
@@ -52,25 +66,30 @@ func (r *RegularSync) processPendingBlocks(ctx context.Context) error {
|
||||
"parentRoot": hex.EncodeToString(b.ParentRoot),
|
||||
}).Info("Requesting parent block")
|
||||
req := [][32]byte{bytesutil.ToBytes32(b.ParentRoot)}
|
||||
// TODO(3450): Use round robin sync API to rotate peers for sending recent block request
|
||||
if err := r.sendRecentBeaconBlocksRequest(ctx, req, pids[0]); err != nil {
|
||||
if err := r.sendRecentBeaconBlocksRequest(ctx, req, pids[rand.Int()%len(pids)]); err != nil {
|
||||
traceutil.AnnotateError(span, err)
|
||||
log.Errorf("Could not send recent block request: %v", err)
|
||||
}
|
||||
span.End()
|
||||
continue
|
||||
}
|
||||
|
||||
if !inDB {
|
||||
span.End()
|
||||
continue
|
||||
}
|
||||
|
||||
if err := r.chain.ReceiveBlockNoPubsub(ctx, b); err != nil {
|
||||
log.Errorf("Could not process block from slot %d: %v", b.Slot, err)
|
||||
traceutil.AnnotateError(span, err)
|
||||
}
|
||||
|
||||
r.pendingQueueLock.Lock()
|
||||
delete(r.slotToPendingBlocks, uint64(s))
|
||||
blkRoot, err := ssz.SigningRoot(b)
|
||||
if err != nil {
|
||||
traceutil.AnnotateError(span, err)
|
||||
span.End()
|
||||
return err
|
||||
}
|
||||
delete(r.seenPendingBlocks, blkRoot)
|
||||
|
||||
Reference in New Issue
Block a user