mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-09 15:37:56 -05:00
Renames receiver in sync service (#6348)
* updates coinciding vars * renames receiver in sync service * Merge branch 'master' into sync-rename-receiver
This commit is contained in:
@@ -22,12 +22,12 @@ import (
|
||||
var processPendingBlocksPeriod = slotutil.DivideSlotBy(3 /* times per slot */)
|
||||
|
||||
// processes pending blocks queue on every processPendingBlocksPeriod
|
||||
func (r *Service) processPendingBlocksQueue() {
|
||||
func (s *Service) processPendingBlocksQueue() {
|
||||
ctx := context.Background()
|
||||
locker := new(sync.Mutex)
|
||||
runutil.RunEvery(r.ctx, processPendingBlocksPeriod, func() {
|
||||
runutil.RunEvery(s.ctx, processPendingBlocksPeriod, func() {
|
||||
locker.Lock()
|
||||
if err := r.processPendingBlocks(ctx); err != nil {
|
||||
if err := s.processPendingBlocks(ctx); err != nil {
|
||||
log.WithError(err).Error("Failed to process pending blocks")
|
||||
}
|
||||
locker.Unlock()
|
||||
@@ -35,37 +35,37 @@ func (r *Service) processPendingBlocksQueue() {
|
||||
}
|
||||
|
||||
// processes the block tree inside the queue
|
||||
func (r *Service) processPendingBlocks(ctx context.Context) error {
|
||||
func (s *Service) processPendingBlocks(ctx context.Context) error {
|
||||
ctx, span := trace.StartSpan(ctx, "processPendingBlocks")
|
||||
defer span.End()
|
||||
|
||||
pids := r.p2p.Peers().Connected()
|
||||
if err := r.validatePendingSlots(); err != nil {
|
||||
pids := s.p2p.Peers().Connected()
|
||||
if err := s.validatePendingSlots(); err != nil {
|
||||
return errors.Wrap(err, "could not validate pending slots")
|
||||
}
|
||||
slots := r.sortedPendingSlots()
|
||||
slots := s.sortedPendingSlots()
|
||||
|
||||
span.AddAttributes(
|
||||
trace.Int64Attribute("numSlots", int64(len(slots))),
|
||||
trace.Int64Attribute("numPeers", int64(len(pids))),
|
||||
)
|
||||
|
||||
for _, s := range slots {
|
||||
for _, slot := range slots {
|
||||
ctx, span := trace.StartSpan(ctx, "processPendingBlocks.InnerLoop")
|
||||
span.AddAttributes(trace.Int64Attribute("slot", int64(s)))
|
||||
span.AddAttributes(trace.Int64Attribute("slot", int64(slot)))
|
||||
|
||||
r.pendingQueueLock.RLock()
|
||||
b := r.slotToPendingBlocks[s]
|
||||
s.pendingQueueLock.RLock()
|
||||
b := s.slotToPendingBlocks[slot]
|
||||
// Skip if block does not exist.
|
||||
if b == nil || b.Block == nil {
|
||||
r.pendingQueueLock.RUnlock()
|
||||
s.pendingQueueLock.RUnlock()
|
||||
span.End()
|
||||
continue
|
||||
}
|
||||
r.pendingQueueLock.RUnlock()
|
||||
inPendingQueue := r.seenPendingBlocks[bytesutil.ToBytes32(b.Block.ParentRoot)]
|
||||
s.pendingQueueLock.RUnlock()
|
||||
inPendingQueue := s.seenPendingBlocks[bytesutil.ToBytes32(b.Block.ParentRoot)]
|
||||
|
||||
inDB := r.db.HasBlock(ctx, bytesutil.ToBytes32(b.Block.ParentRoot))
|
||||
inDB := s.db.HasBlock(ctx, bytesutil.ToBytes32(b.Block.ParentRoot))
|
||||
hasPeer := len(pids) != 0
|
||||
|
||||
// Only request for missing parent block if it's not in DB, not in pending cache
|
||||
@@ -81,17 +81,17 @@ func (r *Service) processPendingBlocks(ctx context.Context) error {
|
||||
// have a head slot newer than the block slot we are requesting.
|
||||
pid := pids[rand.Int()%len(pids)]
|
||||
for _, p := range pids {
|
||||
cs, err := r.p2p.Peers().ChainState(p)
|
||||
cs, err := s.p2p.Peers().ChainState(p)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "failed to read chain state for peer")
|
||||
}
|
||||
if cs != nil && cs.HeadSlot >= s {
|
||||
if cs != nil && cs.HeadSlot >= slot {
|
||||
pid = p
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if err := r.sendRecentBeaconBlocksRequest(ctx, req, pid); err != nil {
|
||||
if err := s.sendRecentBeaconBlocksRequest(ctx, req, pid); err != nil {
|
||||
traceutil.AnnotateError(span, err)
|
||||
log.Errorf("Could not send recent block request: %v", err)
|
||||
}
|
||||
@@ -111,23 +111,23 @@ func (r *Service) processPendingBlocks(ctx context.Context) error {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := r.chain.ReceiveBlockNoPubsub(ctx, b, blkRoot); err != nil {
|
||||
if err := s.chain.ReceiveBlockNoPubsub(ctx, b, blkRoot); err != nil {
|
||||
log.Errorf("Could not process block from slot %d: %v", b.Block.Slot, err)
|
||||
traceutil.AnnotateError(span, err)
|
||||
}
|
||||
|
||||
// Broadcasting the block again once a node is able to process it.
|
||||
if err := r.p2p.Broadcast(ctx, b); err != nil {
|
||||
if err := s.p2p.Broadcast(ctx, b); err != nil {
|
||||
log.WithError(err).Error("Failed to broadcast block")
|
||||
}
|
||||
|
||||
r.pendingQueueLock.Lock()
|
||||
delete(r.slotToPendingBlocks, s)
|
||||
delete(r.seenPendingBlocks, blkRoot)
|
||||
r.pendingQueueLock.Unlock()
|
||||
s.pendingQueueLock.Lock()
|
||||
delete(s.slotToPendingBlocks, slot)
|
||||
delete(s.seenPendingBlocks, blkRoot)
|
||||
s.pendingQueueLock.Unlock()
|
||||
|
||||
log.WithFields(logrus.Fields{
|
||||
"slot": s,
|
||||
"slot": slot,
|
||||
"blockRoot": hex.EncodeToString(bytesutil.Trunc(blkRoot[:])),
|
||||
}).Debug("Processed pending block and cleared it in cache")
|
||||
|
||||
@@ -137,13 +137,13 @@ func (r *Service) processPendingBlocks(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Service) sortedPendingSlots() []uint64 {
|
||||
r.pendingQueueLock.RLock()
|
||||
defer r.pendingQueueLock.RUnlock()
|
||||
func (s *Service) sortedPendingSlots() []uint64 {
|
||||
s.pendingQueueLock.RLock()
|
||||
defer s.pendingQueueLock.RUnlock()
|
||||
|
||||
slots := make([]uint64, 0, len(r.slotToPendingBlocks))
|
||||
for s := range r.slotToPendingBlocks {
|
||||
slots = append(slots, s)
|
||||
slots := make([]uint64, 0, len(s.slotToPendingBlocks))
|
||||
for slot := range s.slotToPendingBlocks {
|
||||
slots = append(slots, slot)
|
||||
}
|
||||
sort.Slice(slots, func(i, j int) bool {
|
||||
return slots[i] < slots[j]
|
||||
@@ -154,14 +154,14 @@ func (r *Service) sortedPendingSlots() []uint64 {
|
||||
// validatePendingSlots validates the pending blocks
|
||||
// by their slot. If they are before the current finalized
|
||||
// checkpoint, these blocks are removed from the queue.
|
||||
func (r *Service) validatePendingSlots() error {
|
||||
r.pendingQueueLock.Lock()
|
||||
defer r.pendingQueueLock.Unlock()
|
||||
func (s *Service) validatePendingSlots() error {
|
||||
s.pendingQueueLock.Lock()
|
||||
defer s.pendingQueueLock.Unlock()
|
||||
oldBlockRoots := make(map[[32]byte]bool)
|
||||
|
||||
finalizedEpoch := r.chain.FinalizedCheckpt().Epoch
|
||||
for s, b := range r.slotToPendingBlocks {
|
||||
epoch := helpers.SlotToEpoch(s)
|
||||
finalizedEpoch := s.chain.FinalizedCheckpt().Epoch
|
||||
for slot, b := range s.slotToPendingBlocks {
|
||||
epoch := helpers.SlotToEpoch(slot)
|
||||
// remove all descendant blocks of old blocks
|
||||
if oldBlockRoots[bytesutil.ToBytes32(b.Block.ParentRoot)] {
|
||||
root, err := stateutil.BlockRoot(b.Block)
|
||||
@@ -169,8 +169,8 @@ func (r *Service) validatePendingSlots() error {
|
||||
return err
|
||||
}
|
||||
oldBlockRoots[root] = true
|
||||
delete(r.slotToPendingBlocks, s)
|
||||
delete(r.seenPendingBlocks, root)
|
||||
delete(s.slotToPendingBlocks, slot)
|
||||
delete(s.seenPendingBlocks, root)
|
||||
continue
|
||||
}
|
||||
// don't process old blocks
|
||||
@@ -180,16 +180,16 @@ func (r *Service) validatePendingSlots() error {
|
||||
return err
|
||||
}
|
||||
oldBlockRoots[blkRoot] = true
|
||||
delete(r.slotToPendingBlocks, s)
|
||||
delete(r.seenPendingBlocks, blkRoot)
|
||||
delete(s.slotToPendingBlocks, slot)
|
||||
delete(s.seenPendingBlocks, blkRoot)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Service) clearPendingSlots() {
|
||||
r.pendingQueueLock.Lock()
|
||||
defer r.pendingQueueLock.Unlock()
|
||||
r.slotToPendingBlocks = make(map[uint64]*ethpb.SignedBeaconBlock)
|
||||
r.seenPendingBlocks = make(map[[32]byte]bool)
|
||||
func (s *Service) clearPendingSlots() {
|
||||
s.pendingQueueLock.Lock()
|
||||
defer s.pendingQueueLock.Unlock()
|
||||
s.slotToPendingBlocks = make(map[uint64]*ethpb.SignedBeaconBlock)
|
||||
s.seenPendingBlocks = make(map[[32]byte]bool)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user