Request missing blobs while processing pending queue (#13015)

* Request missing blobs while processing pending queue

* Fix build

* Lower complexity

* Fix conflict

* Clean up per James's comment

---------

Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
This commit is contained in:
terencechain
2023-10-11 15:07:11 -07:00
committed by GitHub
parent a536612c39
commit 99843688cd

View File

@@ -8,6 +8,7 @@ import (
"sync" "sync"
"time" "time"
"github.com/libp2p/go-libp2p/core"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v4/async" "github.com/prysmaticlabs/prysm/v4/async"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/blockchain" "github.com/prysmaticlabs/prysm/v4/beacon-chain/blockchain"
@@ -184,6 +185,7 @@ func (s *Service) hasPeer() bool {
} }
// processAndBroadcastBlock validates, processes, and broadcasts a block. // processAndBroadcastBlock validates, processes, and broadcasts a block.
// part of the function is to request missing blobs from peers if the block contains kzg commitments.
func (s *Service) processAndBroadcastBlock(ctx context.Context, b interfaces.ReadOnlySignedBeaconBlock, blkRoot [32]byte) error { func (s *Service) processAndBroadcastBlock(ctx context.Context, b interfaces.ReadOnlySignedBeaconBlock, blkRoot [32]byte) error {
if err := s.validateBeaconBlock(ctx, b, blkRoot); err != nil { if err := s.validateBeaconBlock(ctx, b, blkRoot); err != nil {
if !errors.Is(ErrOptimisticParent, err) { if !errors.Is(ErrOptimisticParent, err) {
@@ -192,6 +194,14 @@ func (s *Service) processAndBroadcastBlock(ctx context.Context, b interfaces.Rea
} }
} }
peers := s.getBestPeers()
peerCount := len(peers)
if peerCount > 0 {
if err := s.requestPendingBlobs(ctx, b.Block(), blkRoot, peers[rand.NewGenerator().Int()%peerCount]); err != nil {
return err
}
}
if err := s.cfg.chain.ReceiveBlock(ctx, b, blkRoot); err != nil { if err := s.cfg.chain.ReceiveBlock(ctx, b, blkRoot); err != nil {
return err return err
} }
@@ -219,6 +229,12 @@ func (s *Service) handleBlockProcessingError(ctx context.Context, err error, b i
log.WithError(err).WithField("slot", b.Block().Slot()).Debug("Could not process block") log.WithError(err).WithField("slot", b.Block().Slot()).Debug("Could not process block")
} }
// getBestPeers returns the list of best peers based on finalized checkpoint epoch.
func (s *Service) getBestPeers() []core.PeerID {
_, bestPeers := s.cfg.p2p.Peers().BestFinalized(maxPeerRequest, s.cfg.chain.FinalizedCheckpt().Epoch)
return bestPeers
}
func (s *Service) checkIfBlockIsBad( func (s *Service) checkIfBlockIsBad(
ctx context.Context, ctx context.Context,
span *trace.Span, span *trace.Span,
@@ -265,8 +281,7 @@ func (s *Service) sendBatchRootRequest(ctx context.Context, roots [][32]byte, ra
if len(roots) == 0 { if len(roots) == 0 {
return nil return nil
} }
cp := s.cfg.chain.FinalizedCheckpt() bestPeers := s.getBestPeers()
_, bestPeers := s.cfg.p2p.Peers().BestFinalized(maxPeerRequest, cp.Epoch)
if len(bestPeers) == 0 { if len(bestPeers) == 0 {
return nil return nil
} }