Init-sync fetcher: backtracking (#7704)

* fetcher: backtracking

* gazelle

* amend comments

* cleanup

* cleanup

* minor fixes

* simplify

* remove unrelevant test

* add TestBlocksFetcher_findAncestor

* more tests

* Address Nishant's feedback

* optimize backtracking

Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
This commit is contained in:
Victor Farazdagi
2020-11-05 10:27:46 +03:00
committed by GitHub
parent bafc7479b0
commit f6cbfd5e27
5 changed files with 629 additions and 0 deletions

View File

@@ -2,17 +2,30 @@ package initialsync
import (
"context"
"fmt"
"sort"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/pkg/errors"
eth "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/flags"
p2pTypes "github.com/prysmaticlabs/prysm/beacon-chain/p2p/types"
p2ppb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/sirupsen/logrus"
"go.opencensus.io/trace"
)
// forkData represents alternative chain path supported by a given peer.
// Blocks are stored in an ascending slot order. The first block is guaranteed to have parent
// either in DB or initial sync cache.
type forkData struct {
peer peer.ID
blocks []*eth.SignedBeaconBlock
}
// nonSkippedSlotAfter checks slots after the given one in an attempt to find a non-empty future slot.
// For efficiency only one random slot is checked per epoch, so returned slot might not be the first
// non-skipped slot. This shouldn't be a problem, as in case of adversary peer, we might get incorrect
@@ -36,6 +49,15 @@ func (f *blocksFetcher) nonSkippedSlotAfter(ctx context.Context, slot uint64) (u
// Transform peer list to avoid eclipsing (filter, shuffle, trim).
peers = f.filterPeers(ctx, peers, peersPercentagePerRequest)
return f.nonSkippedSlotAfterWithPeersTarget(ctx, slot, peers, targetEpoch)
}
// nonSkippedSlotWithPeersTarget traverse peers (supporting a given target epoch), in an attempt
// to find non-skipped slot among returned blocks.
func (f *blocksFetcher) nonSkippedSlotAfterWithPeersTarget(
ctx context.Context, slot uint64, peers []peer.ID, targetEpoch uint64,
) (uint64, error) {
// Exit early if no peers are ready.
if len(peers) == 0 {
return 0, errNoPeersAvailable
}
@@ -119,6 +141,149 @@ func (f *blocksFetcher) nonSkippedSlotAfter(ctx context.Context, slot uint64) (u
return nextSlot, nil
}
// findFork queries all peers that have higher head slot, in an attempt to find
// ones that feature blocks from alternative branches. Once found, peer is further queried
// to find common ancestor slot. On success, all obtained blocks and peer is returned.
func (f *blocksFetcher) findFork(ctx context.Context, slot uint64) (*forkData, error) {
ctx, span := trace.StartSpan(ctx, "initialsync.findFork")
defer span.End()
// Safe-guard, since previous epoch is used when calculating.
slotsPerEpoch := params.BeaconConfig().SlotsPerEpoch
if slot < slotsPerEpoch*2 {
return nil, fmt.Errorf("slot is too low to backtrack, min. expected %d", slotsPerEpoch*2)
}
// The current slot's epoch must be after the finalization epoch,
// triggering backtracking on earlier epochs is unnecessary.
finalizedEpoch := f.chain.FinalizedCheckpt().Epoch
epoch := helpers.SlotToEpoch(slot)
if epoch <= finalizedEpoch {
return nil, errors.New("slot is not after the finalized epoch, no backtracking is necessary")
}
// Update slot to the beginning of the current epoch (preserve original slot for comparison).
slot, err := helpers.StartSlot(epoch)
if err != nil {
return nil, err
}
// Select peers that have higher head slot, and potentially blocks from more favourable fork.
// Exit early if no peers are ready.
_, peers := f.p2p.Peers().BestNonFinalized(1, epoch+1)
if len(peers) == 0 {
return nil, errNoPeersAvailable
}
f.rand.Shuffle(len(peers), func(i, j int) {
peers[i], peers[j] = peers[j], peers[i]
})
// Query all found peers, stop on peer with alternative blocks, and try backtracking.
for i, pid := range peers {
log.WithFields(logrus.Fields{
"peer": pid,
"step": fmt.Sprintf("%d/%d", i+1, len(peers)),
}).Debug("Searching for alternative blocks")
fork, err := f.findForkWithPeer(ctx, pid, slot)
if err != nil {
log.WithFields(logrus.Fields{
"peer": pid,
"error": err.Error(),
}).Debug("No alternative blocks found for peer")
continue
}
return fork, nil
}
return nil, errNoPeersWithAltBlocks
}
// findForkWithPeer loads some blocks from a peer in an attempt to find alternative blocks.
func (f *blocksFetcher) findForkWithPeer(ctx context.Context, pid peer.ID, slot uint64) (*forkData, error) {
// Safe-guard, since previous epoch is used when calculating.
slotsPerEpoch := params.BeaconConfig().SlotsPerEpoch
if slot < slotsPerEpoch*2 {
return nil, fmt.Errorf("slot is too low to backtrack, min. expected %d", slotsPerEpoch*2)
}
// Locate non-skipped slot, supported by a given peer (can survive long periods of empty slots).
// When searching for non-empty slot, start an epoch earlier - for those blocks we
// definitely have roots. So, spotting a fork will be easier. It is not a problem if unknown
// block of the current fork is found: we are searching for forks when FSMs are stuck, so
// being able to progress on any fork is good.
pidState, err := f.p2p.Peers().ChainState(pid)
if err != nil {
return nil, fmt.Errorf("cannot obtain peer's status: %w", err)
}
nonSkippedSlot, err := f.nonSkippedSlotAfterWithPeersTarget(
ctx, slot-slotsPerEpoch, []peer.ID{pid}, helpers.SlotToEpoch(pidState.HeadSlot))
if err != nil {
return nil, fmt.Errorf("cannot locate non-empty slot for a peer: %w", err)
}
// Request blocks starting from the first non-empty slot.
req := &p2ppb.BeaconBlocksByRangeRequest{
StartSlot: nonSkippedSlot,
Count: slotsPerEpoch * 2,
Step: 1,
}
blocks, err := f.requestBlocks(ctx, req, pid)
if err != nil {
return nil, fmt.Errorf("cannot fetch blocks: %w", err)
}
// Traverse blocks, and if we've got one that doesn't have parent in DB, backtrack on it.
for i, block := range blocks {
parentRoot := bytesutil.ToBytes32(block.Block.ParentRoot)
if !f.db.HasBlock(ctx, parentRoot) && !f.chain.HasInitSyncBlock(parentRoot) {
log.WithFields(logrus.Fields{
"peer": pid,
"slot": block.Block.Slot,
"root": fmt.Sprintf("%#x", parentRoot),
}).Debug("Block with unknown parent root has been found")
// Backtrack only if the first block is diverging,
// otherwise we already know the common ancestor slot.
if i == 0 {
// Backtrack on a root, to find a common ancestor from which we can resume syncing.
fork, err := f.findAncestor(ctx, pid, block)
if err != nil {
return nil, fmt.Errorf("failed to find common ancestor: %w", err)
}
return fork, nil
}
return &forkData{peer: pid, blocks: blocks}, nil
}
}
return nil, errors.New("no alternative blocks exist within scanned range")
}
// findAncestor tries to figure out common ancestor slot that connects a given root to known block.
func (f *blocksFetcher) findAncestor(ctx context.Context, pid peer.ID, block *eth.SignedBeaconBlock) (*forkData, error) {
outBlocks := []*eth.SignedBeaconBlock{block}
for i := uint64(0); i < backtrackingMaxHops; i++ {
parentRoot := bytesutil.ToBytes32(outBlocks[len(outBlocks)-1].Block.ParentRoot)
if f.db.HasBlock(ctx, parentRoot) || f.chain.HasInitSyncBlock(parentRoot) {
// Common ancestor found, forward blocks back to processor.
sort.Slice(outBlocks, func(i, j int) bool {
return outBlocks[i].Block.Slot < outBlocks[j].Block.Slot
})
return &forkData{
peer: pid,
blocks: outBlocks,
}, nil
}
// Request block's parent.
req := &p2pTypes.BeaconBlockByRootsReq{parentRoot}
blocks, err := f.requestBlocksByRoot(ctx, req, pid)
if err != nil {
return nil, err
}
if len(blocks) == 0 {
break
}
outBlocks = append(outBlocks, blocks[0])
}
return nil, errors.New("no common ancestor found")
}
// bestFinalizedSlot returns the highest finalized slot of the majority of connected peers.
func (f *blocksFetcher) bestFinalizedSlot() uint64 {
finalizedEpoch, _ := f.p2p.Peers().BestFinalized(