mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-09 21:38:05 -05:00
**What type of PR is this?**
Other
**What does this PR do? Why is it needed?**
When an (potentially aggregated) attestation is received **before** the
block being voted for, Prysm queues this attestation, then processes the
queue when the block has been received.
This behavior is consistent with the [Phase0 specification
](https://github.com/ethereum/consensus-specs/blob/master/specs/phase0/p2p-interface.md#beacon_attestation_subnet_id).
> [IGNORE] The block being voted for
(attestation.data.beacon_block_root) has been seen (via gossip or
non-gossip sources) (a client MAY queue attestations for processing once
block is retrieved).
Once the block being voted for is processed, previously queued
(potentially aggregated) attestations are then processed, and
broadcasted.
Processing (potentially aggregated) attestations takes some non
negligible time. For this reason, (potentially aggregated) attestations
are deduplicated before being introduced into the pending queue, to
avoid eventually processing duplicates.
Before this PR, two aggregated attestations were considered duplicated
if all of the following conditions were gathered:
1. Attestations have the same version,
2. **Attestations have the same aggregator index (aka., the same
validator aggregated them)**,
3. Attestations have the same slot,
4. Attestations have the same committee index, and
5. Attestations have the same aggregation bits
Aggregated attestations are then broadcasted.
The final purpose of aggregated attestations is to be packed into the
next block by the next proposer.
When packing attestations, the aggregator index is not used any more.
This pull request modifies the deduplication function used in the
pending aggregated attestations queue by considering that multiple
aggregated attestations only differing by the aggregator index are
equivalent (removing `2.` of the previous list.)
As a consequence, the count of aggregated attestations to be introduced
in the pending queue is reduced from 1 aggregated attestation by
aggregator to, in the best case,
[MAX_COMMITTEE_PER_SLOT=64](https://github.com/ethereum/consensus-specs/blob/master/specs/phase0/beacon-chain.md#misc-1).
Also, only a single aggregated attestation for a given version, slot,
committee index and aggregation bits will be re-broadcasted. This is a
correct behavior, since no data to be included in a block will be lost.
(We can even say that this will reduce by a bit the total networking
volume.)
**How to test**:
1. Start a beacon node (preferably, on a slow computer) from a
checkpoint.
2. Filter logs containing `Synced new block` and `Verified and saved
pending attestations to pool`. (You can pipe logs into `grep -E "Synced
new block|Verified and saved pending attestations to pool"`.
- In `Synced new block` logs, monitor the `sinceSlotStartTime` value.
This should monotonically decrease.
- In `Verified and saved pending attestations to pool`, monitor the
`pendingAggregateAttAndProofCount` value. It should be a "honest" value.
"honest" is not really quantifiable here, since it depends on the
aggregators. But it's likely to be less than
`5*MAX_COMMITTEE_PER_SLOT=320`.
**Which issues(s) does this PR fix?**
Partially fixes:
- https://github.com/OffchainLabs/prysm/issues/16160
**Other notes for review**
Please read commit by commit, with commit messages.
The important commit is b748c04a67.
**Acknowledgements**
- [x] I have read
[CONTRIBUTING.md](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md).
- [x] I have included a uniquely named [changelog fragment
file](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md#maintaining-changelogmd).
- [x] I have added a description with sufficient context for reviewers
to understand this PR.
- [x] I have tested that my changes work as expected and I added a
testing plan to the PR description (if applicable).
675 lines
20 KiB
Go
675 lines
20 KiB
Go
package sync
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"slices"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/OffchainLabs/prysm/v7/async"
|
|
"github.com/OffchainLabs/prysm/v7/beacon-chain/blockchain"
|
|
p2ptypes "github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/types"
|
|
fieldparams "github.com/OffchainLabs/prysm/v7/config/fieldparams"
|
|
"github.com/OffchainLabs/prysm/v7/config/params"
|
|
"github.com/OffchainLabs/prysm/v7/consensus-types/blocks"
|
|
"github.com/OffchainLabs/prysm/v7/consensus-types/interfaces"
|
|
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
|
|
"github.com/OffchainLabs/prysm/v7/crypto/rand"
|
|
"github.com/OffchainLabs/prysm/v7/encoding/bytesutil"
|
|
"github.com/OffchainLabs/prysm/v7/encoding/ssz/equality"
|
|
"github.com/OffchainLabs/prysm/v7/monitoring/tracing"
|
|
prysmTrace "github.com/OffchainLabs/prysm/v7/monitoring/tracing/trace"
|
|
"github.com/OffchainLabs/prysm/v7/time/slots"
|
|
"github.com/libp2p/go-libp2p/core"
|
|
"github.com/pkg/errors"
|
|
"github.com/sirupsen/logrus"
|
|
"github.com/trailofbits/go-mutexasserts"
|
|
"go.opentelemetry.io/otel/trace"
|
|
)
|
|
|
|
var processPendingBlocksPeriod = slots.DivideSlotBy(3 /* times per slot */)
|
|
|
|
const maxPeerRequest = 50
|
|
const numOfTries = 5
|
|
const maxBlocksPerSlot = 3
|
|
|
|
// processes pending blocks queue on every processPendingBlocksPeriod
|
|
func (s *Service) processPendingBlocksQueue() {
|
|
// Prevents multiple queue processing goroutines (invoked by RunEvery) from contending for data.
|
|
locker := new(sync.Mutex)
|
|
async.RunEvery(s.ctx, processPendingBlocksPeriod, func() {
|
|
// Don't process the pending blocks if genesis time has not been set. The chain is not ready.
|
|
if !s.chainIsStarted() {
|
|
return
|
|
}
|
|
|
|
locker.Lock()
|
|
defer locker.Unlock()
|
|
|
|
if err := s.processPendingBlocks(s.ctx); err != nil {
|
|
log.WithError(err).Debug("Could not process pending blocks")
|
|
}
|
|
})
|
|
}
|
|
|
|
// processPendingBlocks validates, processes, and broadcasts pending blocks.
|
|
func (s *Service) processPendingBlocks(ctx context.Context) error {
|
|
ctx, span := prysmTrace.StartSpan(ctx, "processPendingBlocks")
|
|
defer span.End()
|
|
|
|
// Remove old blocks from our expiration cache.
|
|
s.deleteExpiredBlocksFromCache()
|
|
|
|
// Validate pending slots before processing.
|
|
if err := s.validatePendingSlots(); err != nil {
|
|
return errors.Wrap(err, "could not validate pending slots")
|
|
}
|
|
|
|
// Sort slots for ordered processing.
|
|
sortedSlots := s.sortedPendingSlots()
|
|
|
|
span.SetAttributes(prysmTrace.Int64Attribute("numSlots", int64(len(sortedSlots))), prysmTrace.Int64Attribute("numPeers", int64(len(s.cfg.p2p.Peers().Connected()))))
|
|
|
|
randGen := rand.NewGenerator()
|
|
var parentRoots [][32]byte
|
|
|
|
blkRoots := make([][32]byte, 0, len(sortedSlots)*maxBlocksPerSlot)
|
|
|
|
// Iterate through sorted slots.
|
|
for i, slot := range sortedSlots {
|
|
// Skip processing if slot is in the future.
|
|
if slot > s.cfg.clock.CurrentSlot() {
|
|
continue
|
|
}
|
|
|
|
ctx, span := startInnerSpan(ctx, slot)
|
|
|
|
// Get blocks in the pending queue for the current slot.
|
|
blocksInCache := s.getBlocksInQueue(slot)
|
|
if len(blocksInCache) == 0 {
|
|
span.End()
|
|
continue
|
|
}
|
|
|
|
// Process each block in the queue.
|
|
for _, b := range blocksInCache {
|
|
start := time.Now()
|
|
totalDuration := time.Duration(0)
|
|
|
|
if err := blocks.BeaconBlockIsNil(b); err != nil {
|
|
continue
|
|
}
|
|
blkRoot, err := b.Block().HashTreeRoot()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Skip blocks that are already being processed.
|
|
if s.cfg.chain.BlockBeingSynced(blkRoot) {
|
|
log.WithField("blockRoot", fmt.Sprintf("%#x", blkRoot)).Info("Skipping pending block already being processed")
|
|
continue
|
|
}
|
|
|
|
// Remove and skip blocks already in the database.
|
|
if s.cfg.beaconDB.HasBlock(ctx, blkRoot) {
|
|
if err := s.removeBlockFromQueue(b, blkRoot); err != nil {
|
|
return err
|
|
}
|
|
continue
|
|
}
|
|
|
|
parentRoot := b.Block().ParentRoot()
|
|
inPendingQueue := s.isBlockInQueue(parentRoot)
|
|
|
|
// Check if block is bad.
|
|
keepProcessing, err := s.checkIfBlockIsBad(ctx, span, slot, b, blkRoot)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if !keepProcessing {
|
|
continue
|
|
}
|
|
|
|
// Request parent block if not in the pending queue and not in the database.
|
|
isParentBlockInDB := s.cfg.beaconDB.HasBlock(ctx, parentRoot)
|
|
if !inPendingQueue && !isParentBlockInDB && s.hasPeer() {
|
|
parentRoots = append(parentRoots, parentRoot)
|
|
continue
|
|
}
|
|
if !isParentBlockInDB {
|
|
continue
|
|
}
|
|
|
|
// Calculate the deadline time by adding three slots duration to the current time
|
|
secondsPerSlot := params.BeaconConfig().SecondsPerSlot
|
|
threeSlotDuration := 3 * time.Duration(secondsPerSlot) * time.Second
|
|
ctxWithTimeout, cancelFunction := context.WithTimeout(ctx, threeSlotDuration)
|
|
// Process and broadcast the block.
|
|
if err := s.processAndBroadcastBlock(ctxWithTimeout, b, blkRoot); err != nil {
|
|
s.handleBlockProcessingError(ctxWithTimeout, err, b, blkRoot)
|
|
cancelFunction()
|
|
continue
|
|
}
|
|
cancelFunction()
|
|
|
|
blkRoots = append(blkRoots, blkRoot)
|
|
|
|
// Remove the processed block from the queue.
|
|
if err := s.removeBlockFromQueue(b, blkRoot); err != nil {
|
|
return err
|
|
}
|
|
|
|
duration := time.Since(start)
|
|
totalDuration += duration
|
|
log.WithFields(logrus.Fields{
|
|
"slotIndex": fmt.Sprintf("%d/%d", i+1, len(sortedSlots)),
|
|
"slot": slot,
|
|
"root": fmt.Sprintf("%#x", blkRoot),
|
|
"duration": duration,
|
|
"totalDuration": totalDuration,
|
|
}).Debug("Processed pending block and cleared it in cache")
|
|
}
|
|
|
|
span.End()
|
|
}
|
|
|
|
for _, blkRoot := range blkRoots {
|
|
// Process pending attestations for this block.
|
|
if err := s.processPendingAttsForBlock(ctx, blkRoot); err != nil {
|
|
log.WithError(err).Debug("Failed to process pending attestations for block")
|
|
}
|
|
}
|
|
|
|
return s.sendBatchRootRequest(ctx, parentRoots, randGen)
|
|
}
|
|
|
|
// startInnerSpan starts a new tracing span for an inner loop and returns the new context and span.
|
|
func startInnerSpan(ctx context.Context, slot primitives.Slot) (context.Context, trace.Span) {
|
|
ctx, span := prysmTrace.StartSpan(ctx, "processPendingBlocks.InnerLoop")
|
|
span.SetAttributes(prysmTrace.Int64Attribute("slot", int64(slot))) // lint:ignore uintcast -- This conversion is OK for tracing.
|
|
return ctx, span
|
|
}
|
|
|
|
// getBlocksInQueue retrieves the blocks in the pending queue for a given slot.
|
|
func (s *Service) getBlocksInQueue(slot primitives.Slot) []interfaces.ReadOnlySignedBeaconBlock {
|
|
s.pendingQueueLock.RLock()
|
|
defer s.pendingQueueLock.RUnlock()
|
|
return s.pendingBlocksInCache(slot)
|
|
}
|
|
|
|
// removeBlockFromQueue removes a block from the pending queue.
|
|
func (s *Service) removeBlockFromQueue(b interfaces.ReadOnlySignedBeaconBlock, blkRoot [32]byte) error {
|
|
s.pendingQueueLock.Lock()
|
|
defer s.pendingQueueLock.Unlock()
|
|
|
|
if err := s.deleteBlockFromPendingQueue(b.Block().Slot(), b, blkRoot); err != nil {
|
|
return errors.Wrap(err, "delete block from pending queue")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// isBlockInQueue checks if a block's parent root is in the pending queue.
|
|
func (s *Service) isBlockInQueue(parentRoot [32]byte) bool {
|
|
s.pendingQueueLock.RLock()
|
|
defer s.pendingQueueLock.RUnlock()
|
|
|
|
return s.seenPendingBlocks[parentRoot]
|
|
}
|
|
|
|
func (s *Service) hasPeer() bool {
|
|
return len(s.cfg.p2p.Peers().Connected()) > 0
|
|
}
|
|
|
|
var errNoPeersForPending = errors.New("no suitable peers to process pending block queue, delaying")
|
|
|
|
// processAndBroadcastBlock validates, processes, and broadcasts a block.
|
|
// Part of the function is to request missing sidecars from peers if the block contains kzg commitments.
|
|
func (s *Service) processAndBroadcastBlock(ctx context.Context, b interfaces.ReadOnlySignedBeaconBlock, blkRoot [fieldparams.RootLength]byte) error {
|
|
if err := s.processBlock(ctx, b, blkRoot); err != nil {
|
|
return errors.Wrap(err, "process block")
|
|
}
|
|
|
|
if err := s.receiveAndBroadCastBlock(ctx, b, blkRoot, b.Block().Slot()); err != nil {
|
|
return errors.Wrap(err, "receive and broadcast block")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *Service) processBlock(ctx context.Context, b interfaces.ReadOnlySignedBeaconBlock, blkRoot [fieldparams.RootLength]byte) error {
|
|
blockSlot := b.Block().Slot()
|
|
|
|
if err := s.validateBeaconBlock(ctx, b, blkRoot); err != nil {
|
|
if !errors.Is(ErrOptimisticParent, err) {
|
|
log.WithError(err).WithField("slot", blockSlot).Debug("Could not validate block")
|
|
return err
|
|
}
|
|
}
|
|
|
|
blockEpoch, denebForkEpoch, fuluForkEpoch := slots.ToEpoch(blockSlot), params.BeaconConfig().DenebForkEpoch, params.BeaconConfig().FuluForkEpoch
|
|
|
|
roBlock, err := blocks.NewROBlockWithRoot(b, blkRoot)
|
|
if err != nil {
|
|
return errors.Wrap(err, "new ro block with root")
|
|
}
|
|
|
|
if blockEpoch >= fuluForkEpoch {
|
|
if err := s.requestAndSaveMissingDataColumnSidecars([]blocks.ROBlock{roBlock}); err != nil {
|
|
return errors.Wrap(err, "request and save missing data column sidecars")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
if blockEpoch >= denebForkEpoch {
|
|
request, err := s.pendingBlobsRequestForBlock(blkRoot, b)
|
|
if err != nil {
|
|
return errors.Wrap(err, "pending blobs request for block")
|
|
}
|
|
|
|
if len(request) > 0 {
|
|
peers := s.getBestPeers()
|
|
peerCount := len(peers)
|
|
|
|
if peerCount == 0 {
|
|
return errors.Wrapf(errNoPeersForPending, "block root=%#x", blkRoot)
|
|
}
|
|
|
|
if err := s.sendAndSaveBlobSidecars(ctx, request, peers[rand.NewGenerator().Int()%peerCount], b); err != nil {
|
|
return errors.Wrap(err, "send and save blob sidecars")
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *Service) receiveAndBroadCastBlock(ctx context.Context, b interfaces.ReadOnlySignedBeaconBlock, blkRoot [fieldparams.RootLength]byte, blockSlot primitives.Slot) error {
|
|
if err := s.cfg.chain.ReceiveBlock(ctx, b, blkRoot, nil); err != nil {
|
|
return errors.Wrap(err, "receive block")
|
|
}
|
|
|
|
s.setSeenBlockIndexSlot(blockSlot, b.Block().ProposerIndex())
|
|
|
|
pb, err := b.Proto()
|
|
if err != nil {
|
|
log.WithError(err).Debug("Could not get protobuf block")
|
|
return err
|
|
}
|
|
|
|
if err := s.cfg.p2p.Broadcast(ctx, pb); err != nil {
|
|
log.WithError(err).Debug("Could not broadcast block")
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// handleBlockProcessingError handles errors during block processing.
|
|
func (s *Service) handleBlockProcessingError(ctx context.Context, err error, b interfaces.ReadOnlySignedBeaconBlock, blkRoot [32]byte) {
|
|
if blockchain.IsInvalidBlock(err) {
|
|
s.setBadBlock(ctx, blkRoot)
|
|
}
|
|
log.WithError(err).WithField("slot", b.Block().Slot()).Debug("Could not process block")
|
|
}
|
|
|
|
// getBestPeers returns the list of best peers based on finalized checkpoint epoch.
|
|
func (s *Service) getBestPeers() []core.PeerID {
|
|
_, bestPeers := s.cfg.p2p.Peers().BestFinalized(s.cfg.chain.FinalizedCheckpt().Epoch)
|
|
if len(bestPeers) > maxPeerRequest {
|
|
bestPeers = bestPeers[:maxPeerRequest]
|
|
}
|
|
return bestPeers
|
|
}
|
|
|
|
func (s *Service) checkIfBlockIsBad(
|
|
ctx context.Context,
|
|
span trace.Span,
|
|
slot primitives.Slot,
|
|
b interfaces.ReadOnlySignedBeaconBlock,
|
|
blkRoot [32]byte,
|
|
) (keepProcessing bool, err error) {
|
|
parentIsBad := s.hasBadBlock(b.Block().ParentRoot())
|
|
blockIsBad := s.hasBadBlock(blkRoot)
|
|
// Check if parent is a bad block.
|
|
if parentIsBad || blockIsBad {
|
|
// Set block as bad if its parent block is bad too.
|
|
if parentIsBad {
|
|
s.setBadBlock(ctx, blkRoot)
|
|
}
|
|
// Remove block from queue.
|
|
s.pendingQueueLock.Lock()
|
|
if err = s.deleteBlockFromPendingQueue(slot, b, blkRoot); err != nil {
|
|
s.pendingQueueLock.Unlock()
|
|
return false, err
|
|
}
|
|
s.pendingQueueLock.Unlock()
|
|
span.End()
|
|
return false, nil
|
|
}
|
|
|
|
return true, nil
|
|
}
|
|
|
|
func (s *Service) sendBatchRootRequest(ctx context.Context, roots [][32]byte, randGen *rand.Rand) error {
|
|
ctx, span := prysmTrace.StartSpan(ctx, "sendBatchRootRequest")
|
|
defer span.End()
|
|
|
|
// Exit early if there are no roots to request.
|
|
if len(roots) == 0 {
|
|
return nil
|
|
}
|
|
|
|
// Filter out roots that are already seen in pending blocks or being synced.
|
|
roots = s.filterOutPendingAndSynced(roots)
|
|
|
|
// Nothing to do, exit early.
|
|
if len(roots) == 0 {
|
|
return nil
|
|
}
|
|
|
|
// Fetch best peers to request blocks from.
|
|
bestPeers := s.getBestPeers()
|
|
|
|
// No suitable peer, exit early.
|
|
if len(bestPeers) == 0 {
|
|
log.WithField("roots", fmt.Sprintf("%#x", roots)).Debug("Send batch root request: No suitable peers")
|
|
return nil
|
|
}
|
|
|
|
// Randomly choose a peer to query from our best peers.
|
|
// If that peer cannot return all the requested blocks,
|
|
// we randomly select another peer.
|
|
randomIndex := randGen.Int() % len(bestPeers)
|
|
pid := bestPeers[randomIndex]
|
|
|
|
for range numOfTries {
|
|
req := p2ptypes.BeaconBlockByRootsReq(roots)
|
|
|
|
// Get the current epoch.
|
|
currentSlot := s.cfg.clock.CurrentSlot()
|
|
currentEpoch := slots.ToEpoch(currentSlot)
|
|
|
|
// Trim the request to the maximum number of blocks we can request if needed.
|
|
maxReqBlock := params.MaxRequestBlock(currentEpoch)
|
|
rootCount := uint64(len(roots))
|
|
if rootCount > maxReqBlock {
|
|
req = roots[:maxReqBlock]
|
|
}
|
|
|
|
if logrus.GetLevel() >= logrus.DebugLevel {
|
|
rootsStr := make([]string, 0, len(roots))
|
|
for _, req := range roots {
|
|
rootsStr = append(rootsStr, fmt.Sprintf("%#x", req))
|
|
}
|
|
|
|
log.WithFields(logrus.Fields{
|
|
"peer": pid,
|
|
"count": len(req),
|
|
"roots": rootsStr,
|
|
}).Debug("Requesting blocks by root")
|
|
}
|
|
|
|
// Send the request to the peer.
|
|
if err := s.sendBeaconBlocksRequest(ctx, &req, pid); err != nil {
|
|
tracing.AnnotateError(span, err)
|
|
log.WithError(err).Debug("Could not send recent block request")
|
|
}
|
|
|
|
// Filter out roots that are already seen in pending blocks.
|
|
newRoots := make([][32]byte, 0, rootCount)
|
|
func() {
|
|
s.pendingQueueLock.RLock()
|
|
defer s.pendingQueueLock.RUnlock()
|
|
|
|
for _, rt := range roots {
|
|
if !s.seenPendingBlocks[rt] {
|
|
newRoots = append(newRoots, rt)
|
|
}
|
|
}
|
|
}()
|
|
|
|
// Exit early if all roots have been seen.
|
|
// This is the happy path.
|
|
if len(newRoots) == 0 {
|
|
return nil
|
|
}
|
|
|
|
// There is still some roots that have not been seen.
|
|
// Choosing a new peer with the leftover set of oots to request.
|
|
roots = newRoots
|
|
|
|
// Choose a new peer to query.
|
|
randomIndex = randGen.Int() % len(bestPeers)
|
|
pid = bestPeers[randomIndex]
|
|
}
|
|
|
|
// Some roots are still missing after all allowed tries.
|
|
// This is the unhappy path.
|
|
log.WithFields(logrus.Fields{
|
|
"roots": fmt.Sprintf("%#x", roots),
|
|
"tries": numOfTries,
|
|
}).Debug("Send batch root request: Some roots are still missing after all allowed tries")
|
|
|
|
return nil
|
|
}
|
|
|
|
// filterOutPendingAndSynced filters out roots that are already seen in pending blocks or being synced.
|
|
func (s *Service) filterOutPendingAndSynced(roots [][fieldparams.RootLength]byte) [][fieldparams.RootLength]byte {
|
|
// Remove duplicates (if any) from the list of roots.
|
|
roots = dedupRoots(roots)
|
|
|
|
// Filters out in place roots that are already seen in pending blocks or being synced.
|
|
s.pendingQueueLock.RLock()
|
|
defer s.pendingQueueLock.RUnlock()
|
|
|
|
for i := len(roots) - 1; i >= 0; i-- {
|
|
r := roots[i]
|
|
if s.seenPendingBlocks[r] || s.cfg.chain.BlockBeingSynced(r) {
|
|
roots = append(roots[:i], roots[i+1:]...)
|
|
continue
|
|
}
|
|
}
|
|
return roots
|
|
}
|
|
|
|
func (s *Service) sortedPendingSlots() []primitives.Slot {
|
|
s.pendingQueueLock.RLock()
|
|
defer s.pendingQueueLock.RUnlock()
|
|
|
|
items := s.slotToPendingBlocks.Items()
|
|
|
|
ss := make([]primitives.Slot, 0, len(items))
|
|
for k := range items {
|
|
slot := cacheKeyToSlot(k)
|
|
ss = append(ss, slot)
|
|
}
|
|
slices.Sort(ss)
|
|
return ss
|
|
}
|
|
|
|
// validatePendingSlots validates the pending blocks
|
|
// by their slot. If they are before the current finalized
|
|
// checkpoint, these blocks are removed from the queue.
|
|
func (s *Service) validatePendingSlots() error {
|
|
s.pendingQueueLock.Lock()
|
|
defer s.pendingQueueLock.Unlock()
|
|
oldBlockRoots := make(map[[32]byte]bool)
|
|
|
|
cp := s.cfg.chain.FinalizedCheckpt()
|
|
finalizedEpoch := cp.Epoch
|
|
if s.slotToPendingBlocks == nil {
|
|
return errors.New("slotToPendingBlocks cache can't be nil")
|
|
}
|
|
items := s.slotToPendingBlocks.Items()
|
|
for k := range items {
|
|
slot := cacheKeyToSlot(k)
|
|
blks := s.pendingBlocksInCache(slot)
|
|
for _, b := range blks {
|
|
epoch := slots.ToEpoch(slot)
|
|
// remove all descendant blocks of old blocks
|
|
if oldBlockRoots[b.Block().ParentRoot()] {
|
|
root, err := b.Block().HashTreeRoot()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
oldBlockRoots[root] = true
|
|
if err := s.deleteBlockFromPendingQueue(slot, b, root); err != nil {
|
|
return err
|
|
}
|
|
continue
|
|
}
|
|
// don't process old blocks
|
|
if finalizedEpoch > 0 && epoch <= finalizedEpoch {
|
|
blkRoot, err := b.Block().HashTreeRoot()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
oldBlockRoots[blkRoot] = true
|
|
if err := s.deleteBlockFromPendingQueue(slot, b, blkRoot); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *Service) clearPendingSlots() {
|
|
s.pendingQueueLock.Lock()
|
|
defer s.pendingQueueLock.Unlock()
|
|
s.slotToPendingBlocks.Flush()
|
|
s.seenPendingBlocks = make(map[[32]byte]bool)
|
|
}
|
|
|
|
// Delete block from the list from the pending queue using the slot as key.
|
|
// Note: this helper is not thread safe.
|
|
func (s *Service) deleteBlockFromPendingQueue(slot primitives.Slot, b interfaces.ReadOnlySignedBeaconBlock, r [32]byte) error {
|
|
mutexasserts.AssertRWMutexLocked(&s.pendingQueueLock)
|
|
|
|
blks := s.pendingBlocksInCache(slot)
|
|
if len(blks) == 0 {
|
|
return nil
|
|
}
|
|
|
|
// Defensive check to ignore nil blocks
|
|
if err := blocks.BeaconBlockIsNil(b); err != nil {
|
|
return err
|
|
}
|
|
|
|
newBlks := make([]interfaces.ReadOnlySignedBeaconBlock, 0, len(blks))
|
|
for _, blk := range blks {
|
|
blkPb, err := blk.Proto()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
bPb, err := b.Proto()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if equality.DeepEqual(blkPb, bPb) {
|
|
continue
|
|
}
|
|
newBlks = append(newBlks, blk)
|
|
}
|
|
if len(newBlks) == 0 {
|
|
s.slotToPendingBlocks.Delete(slotToCacheKey(slot))
|
|
delete(s.seenPendingBlocks, r)
|
|
return nil
|
|
}
|
|
|
|
// Decrease exp time in proportion to how many blocks are still in the cache for slot key.
|
|
d := pendingBlockExpTime / time.Duration(len(newBlks))
|
|
if err := s.slotToPendingBlocks.Replace(slotToCacheKey(slot), newBlks, d); err != nil {
|
|
return err
|
|
}
|
|
delete(s.seenPendingBlocks, r)
|
|
return nil
|
|
}
|
|
|
|
// This method manually clears our cache so that all expired
|
|
// entries are correctly removed.
|
|
func (s *Service) deleteExpiredBlocksFromCache() {
|
|
s.pendingQueueLock.Lock()
|
|
defer s.pendingQueueLock.Unlock()
|
|
|
|
s.slotToPendingBlocks.DeleteExpired()
|
|
}
|
|
|
|
// Insert block to the list in the pending queue using the slot as key.
|
|
// Note: this helper is not thread safe.
|
|
func (s *Service) insertBlockToPendingQueue(_ primitives.Slot, b interfaces.ReadOnlySignedBeaconBlock, r [32]byte) error {
|
|
mutexasserts.AssertRWMutexLocked(&s.pendingQueueLock)
|
|
|
|
if s.seenPendingBlocks[r] {
|
|
return nil
|
|
}
|
|
|
|
if err := s.addPendingBlockToCache(b); err != nil {
|
|
return err
|
|
}
|
|
|
|
s.seenPendingBlocks[r] = true
|
|
return nil
|
|
}
|
|
|
|
// This returns signed beacon blocks given input key from slotToPendingBlocks.
|
|
func (s *Service) pendingBlocksInCache(slot primitives.Slot) []interfaces.ReadOnlySignedBeaconBlock {
|
|
k := slotToCacheKey(slot)
|
|
value, ok := s.slotToPendingBlocks.Get(k)
|
|
if !ok {
|
|
return []interfaces.ReadOnlySignedBeaconBlock{}
|
|
}
|
|
blks, ok := value.([]interfaces.ReadOnlySignedBeaconBlock)
|
|
if !ok {
|
|
return []interfaces.ReadOnlySignedBeaconBlock{}
|
|
}
|
|
return blks
|
|
}
|
|
|
|
// This adds input signed beacon block to slotToPendingBlocks cache.
|
|
func (s *Service) addPendingBlockToCache(b interfaces.ReadOnlySignedBeaconBlock) error {
|
|
if err := blocks.BeaconBlockIsNil(b); err != nil {
|
|
return err
|
|
}
|
|
|
|
blks := s.pendingBlocksInCache(b.Block().Slot())
|
|
|
|
if len(blks) >= maxBlocksPerSlot {
|
|
return nil
|
|
}
|
|
|
|
blks = append(blks, b)
|
|
k := slotToCacheKey(b.Block().Slot())
|
|
s.slotToPendingBlocks.Set(k, blks, pendingBlockExpTime)
|
|
return nil
|
|
}
|
|
|
|
// This converts input string to slot.
|
|
func cacheKeyToSlot(s string) primitives.Slot {
|
|
b := []byte(s)
|
|
return bytesutil.BytesToSlotBigEndian(b)
|
|
}
|
|
|
|
// This converts input slot to a key to be used for slotToPendingBlocks cache.
|
|
func slotToCacheKey(s primitives.Slot) string {
|
|
b := bytesutil.SlotToBytesBigEndian(s)
|
|
return string(b)
|
|
}
|
|
|
|
func dedupRoots(roots [][32]byte) [][32]byte {
|
|
newRoots := make([][32]byte, 0, len(roots))
|
|
rootMap := make(map[[32]byte]bool, len(roots))
|
|
for i, r := range roots {
|
|
if rootMap[r] {
|
|
continue
|
|
}
|
|
rootMap[r] = true
|
|
newRoots = append(newRoots, roots[i])
|
|
}
|
|
return newRoots
|
|
}
|