refactors blocks fetcher: repackage methods (#6962)

This commit is contained in:
Victor Farazdagi
2020-08-11 18:15:18 +03:00
committed by GitHub
parent a279f374bc
commit c24666e02e
4 changed files with 240 additions and 212 deletions

View File

@@ -5,6 +5,8 @@ go_library(
name = "go_default_library",
srcs = [
"blocks_fetcher.go",
"blocks_fetcher_peers.go",
"blocks_fetcher_utils.go",
"blocks_queue.go",
"fsm.go",
"log.go",

View File

@@ -4,8 +4,6 @@ import (
"context"
"fmt"
"io"
"math"
"sort"
"sync"
"time"
@@ -21,10 +19,8 @@ import (
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
prysmsync "github.com/prysmaticlabs/prysm/beacon-chain/sync"
p2ppb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/mathutil"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/rand"
"github.com/prysmaticlabs/prysm/shared/roughtime"
"github.com/sirupsen/logrus"
"go.opencensus.io/trace"
)
@@ -347,211 +343,3 @@ func (f *blocksFetcher) requestBlocks(
return resp, nil
}
// getPeerLock returns peer lock for a given peer. If lock is not found, it is created.
func (f *blocksFetcher) getPeerLock(pid peer.ID) *peerLock {
f.Lock()
defer f.Unlock()
if lock, ok := f.peerLocks[pid]; ok {
lock.accessed = roughtime.Now()
return lock
}
f.peerLocks[pid] = &peerLock{
Mutex: sync.Mutex{},
accessed: roughtime.Now(),
}
return f.peerLocks[pid]
}
// removeStalePeerLocks is a cleanup procedure which removes stale locks.
func (f *blocksFetcher) removeStalePeerLocks(age time.Duration) {
f.Lock()
defer f.Unlock()
for peerID, lock := range f.peerLocks {
if time.Since(lock.accessed) >= age {
lock.Lock()
delete(f.peerLocks, peerID)
lock.Unlock()
}
}
}
// selectFailOverPeer randomly selects fail over peer from the list of available peers.
func (f *blocksFetcher) selectFailOverPeer(excludedPID peer.ID, peers []peer.ID) (peer.ID, error) {
if len(peers) == 0 {
return "", errNoPeersAvailable
}
if len(peers) == 1 && peers[0] == excludedPID {
return "", errNoPeersAvailable
}
ind := f.rand.Int() % len(peers)
if peers[ind] == excludedPID {
return f.selectFailOverPeer(excludedPID, append(peers[:ind], peers[ind+1:]...))
}
return peers[ind], nil
}
// waitForMinimumPeers spins and waits up until enough peers are available.
func (f *blocksFetcher) waitForMinimumPeers(ctx context.Context) ([]peer.ID, error) {
required := params.BeaconConfig().MaxPeersToSync
if flags.Get().MinimumSyncPeers < required {
required = flags.Get().MinimumSyncPeers
}
for {
if ctx.Err() != nil {
return nil, ctx.Err()
}
headEpoch := helpers.SlotToEpoch(f.headFetcher.HeadSlot())
_, peers := f.p2p.Peers().BestFinalized(params.BeaconConfig().MaxPeersToSync, headEpoch)
if len(peers) >= required {
return peers, nil
}
log.WithFields(logrus.Fields{
"suitable": len(peers),
"required": required}).Info("Waiting for enough suitable peers before syncing")
time.Sleep(handshakePollingInterval)
}
}
// filterPeers returns transformed list of peers,
// weight ordered or randomized, constrained if necessary.
func (f *blocksFetcher) filterPeers(peers []peer.ID, peersPercentage float64) ([]peer.ID, error) {
if len(peers) == 0 {
return peers, nil
}
// Shuffle peers to prevent a bad peer from
// stalling sync with invalid blocks.
f.rand.Shuffle(len(peers), func(i, j int) {
peers[i], peers[j] = peers[j], peers[i]
})
// Select sub-sample from peers (honoring min-max invariants).
required := params.BeaconConfig().MaxPeersToSync
if flags.Get().MinimumSyncPeers < required {
required = flags.Get().MinimumSyncPeers
}
limit := uint64(math.Round(float64(len(peers)) * peersPercentage))
limit = mathutil.Max(limit, uint64(required))
limit = mathutil.Min(limit, uint64(len(peers)))
peers = peers[:limit]
// Order peers by remaining capacity, effectively turning in-order
// round robin peer processing into a weighted one (peers with higher
// remaining capacity are preferred). Peers with the same capacity
// are selected at random, since we have already shuffled peers
// at this point.
sort.SliceStable(peers, func(i, j int) bool {
cap1 := f.rateLimiter.Remaining(peers[i].String())
cap2 := f.rateLimiter.Remaining(peers[j].String())
return cap1 > cap2
})
return peers, nil
}
// nonSkippedSlotAfter checks slots after the given one in an attempt to find a non-empty future slot.
// For efficiency only one random slot is checked per epoch, so returned slot might not be the first
// non-skipped slot. This shouldn't be a problem, as in case of adversary peer, we might get incorrect
// data anyway, so code that relies on this function must be robust enough to re-request, if no progress
// is possible with a returned value.
func (f *blocksFetcher) nonSkippedSlotAfter(ctx context.Context, slot uint64) (uint64, error) {
ctx, span := trace.StartSpan(ctx, "initialsync.nonSkippedSlotAfter")
defer span.End()
headEpoch := helpers.SlotToEpoch(f.headFetcher.HeadSlot())
finalizedEpoch, peers := f.p2p.Peers().BestFinalized(params.BeaconConfig().MaxPeersToSync, headEpoch)
log.WithFields(logrus.Fields{
"start": slot,
"headEpoch": headEpoch,
"finalizedEpoch": finalizedEpoch,
}).Debug("Searching for non-skipped slot")
// Exit early, if no peers with high enough finalized epoch are found.
if finalizedEpoch <= headEpoch {
return 0, errSlotIsTooHigh
}
var err error
peers, err = f.filterPeers(peers, peersPercentagePerRequest)
if err != nil {
return 0, err
}
if len(peers) == 0 {
return 0, errNoPeersAvailable
}
slotsPerEpoch := params.BeaconConfig().SlotsPerEpoch
pidInd := 0
fetch := func(pid peer.ID, start, count, step uint64) (uint64, error) {
req := &p2ppb.BeaconBlocksByRangeRequest{
StartSlot: start,
Count: count,
Step: step,
}
blocks, err := f.requestBlocks(ctx, req, pid)
if err != nil {
return 0, err
}
if len(blocks) > 0 {
for _, block := range blocks {
if block.Block.Slot > slot {
return block.Block.Slot, nil
}
}
}
return 0, nil
}
// Start by checking several epochs fully, w/o resorting to random sampling.
start := slot + 1
end := start + nonSkippedSlotsFullSearchEpochs*slotsPerEpoch
for ind := start; ind < end; ind += slotsPerEpoch {
nextSlot, err := fetch(peers[pidInd%len(peers)], ind, slotsPerEpoch, 1)
if err != nil {
return 0, err
}
if nextSlot > slot {
return nextSlot, nil
}
pidInd++
}
// Quickly find the close enough epoch where a non-empty slot definitely exists.
// Only single random slot per epoch is checked - allowing to move forward relatively quickly.
slot = slot + nonSkippedSlotsFullSearchEpochs*slotsPerEpoch
upperBoundSlot := helpers.StartSlot(finalizedEpoch + 1)
for ind := slot + 1; ind < upperBoundSlot; ind += (slotsPerEpoch * slotsPerEpoch) / 2 {
start := ind + uint64(f.rand.Intn(int(slotsPerEpoch)))
nextSlot, err := fetch(peers[pidInd%len(peers)], start, slotsPerEpoch/2, slotsPerEpoch)
if err != nil {
return 0, err
}
pidInd++
if nextSlot > slot && upperBoundSlot >= nextSlot {
upperBoundSlot = nextSlot
break
}
}
// Epoch with non-empty slot is located. Check all slots within two nearby epochs.
if upperBoundSlot > slotsPerEpoch {
upperBoundSlot -= slotsPerEpoch
}
upperBoundSlot = helpers.StartSlot(helpers.SlotToEpoch(upperBoundSlot))
nextSlot, err := fetch(peers[pidInd%len(peers)], upperBoundSlot, slotsPerEpoch*2, 1)
if err != nil {
return 0, err
}
if nextSlot < slot || helpers.StartSlot(finalizedEpoch+1) < nextSlot {
return 0, errors.New("invalid range for non-skipped slot")
}
return nextSlot, nil
}
// bestFinalizedSlot returns the highest finalized slot of the majority of connected peers.
func (f *blocksFetcher) bestFinalizedSlot() uint64 {
headEpoch := helpers.SlotToEpoch(f.headFetcher.HeadSlot())
finalizedEpoch, _ := f.p2p.Peers().BestFinalized(params.BeaconConfig().MaxPeersToSync, headEpoch)
return helpers.StartSlot(finalizedEpoch)
}

View File

@@ -0,0 +1,120 @@
package initialsync
import (
"context"
"math"
"sort"
"sync"
"time"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/flags"
"github.com/prysmaticlabs/prysm/shared/mathutil"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/roughtime"
"github.com/sirupsen/logrus"
)
// getPeerLock returns peer lock for a given peer. If lock is not found, it is created.
func (f *blocksFetcher) getPeerLock(pid peer.ID) *peerLock {
f.Lock()
defer f.Unlock()
if lock, ok := f.peerLocks[pid]; ok {
lock.accessed = roughtime.Now()
return lock
}
f.peerLocks[pid] = &peerLock{
Mutex: sync.Mutex{},
accessed: roughtime.Now(),
}
return f.peerLocks[pid]
}
// removeStalePeerLocks is a cleanup procedure which removes stale locks.
func (f *blocksFetcher) removeStalePeerLocks(age time.Duration) {
f.Lock()
defer f.Unlock()
for peerID, lock := range f.peerLocks {
if time.Since(lock.accessed) >= age {
lock.Lock()
delete(f.peerLocks, peerID)
lock.Unlock()
}
}
}
// selectFailOverPeer randomly selects fail over peer from the list of available peers.
func (f *blocksFetcher) selectFailOverPeer(excludedPID peer.ID, peers []peer.ID) (peer.ID, error) {
if len(peers) == 0 {
return "", errNoPeersAvailable
}
if len(peers) == 1 && peers[0] == excludedPID {
return "", errNoPeersAvailable
}
ind := f.rand.Int() % len(peers)
if peers[ind] == excludedPID {
return f.selectFailOverPeer(excludedPID, append(peers[:ind], peers[ind+1:]...))
}
return peers[ind], nil
}
// waitForMinimumPeers spins and waits up until enough peers are available.
func (f *blocksFetcher) waitForMinimumPeers(ctx context.Context) ([]peer.ID, error) {
required := params.BeaconConfig().MaxPeersToSync
if flags.Get().MinimumSyncPeers < required {
required = flags.Get().MinimumSyncPeers
}
for {
if ctx.Err() != nil {
return nil, ctx.Err()
}
headEpoch := helpers.SlotToEpoch(f.headFetcher.HeadSlot())
_, peers := f.p2p.Peers().BestFinalized(params.BeaconConfig().MaxPeersToSync, headEpoch)
if len(peers) >= required {
return peers, nil
}
log.WithFields(logrus.Fields{
"suitable": len(peers),
"required": required}).Info("Waiting for enough suitable peers before syncing")
time.Sleep(handshakePollingInterval)
}
}
// filterPeers returns transformed list of peers,
// weight ordered or randomized, constrained if necessary.
func (f *blocksFetcher) filterPeers(peers []peer.ID, peersPercentage float64) ([]peer.ID, error) {
if len(peers) == 0 {
return peers, nil
}
// Shuffle peers to prevent a bad peer from
// stalling sync with invalid blocks.
f.rand.Shuffle(len(peers), func(i, j int) {
peers[i], peers[j] = peers[j], peers[i]
})
// Select sub-sample from peers (honoring min-max invariants).
required := params.BeaconConfig().MaxPeersToSync
if flags.Get().MinimumSyncPeers < required {
required = flags.Get().MinimumSyncPeers
}
limit := uint64(math.Round(float64(len(peers)) * peersPercentage))
limit = mathutil.Max(limit, uint64(required))
limit = mathutil.Min(limit, uint64(len(peers)))
peers = peers[:limit]
// Order peers by remaining capacity, effectively turning in-order
// round robin peer processing into a weighted one (peers with higher
// remaining capacity are preferred). Peers with the same capacity
// are selected at random, since we have already shuffled peers
// at this point.
sort.SliceStable(peers, func(i, j int) bool {
cap1 := f.rateLimiter.Remaining(peers[i].String())
cap2 := f.rateLimiter.Remaining(peers[j].String())
return cap1 > cap2
})
return peers, nil
}

View File

@@ -0,0 +1,118 @@
package initialsync
import (
"context"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
p2ppb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/sirupsen/logrus"
"go.opencensus.io/trace"
)
// nonSkippedSlotAfter checks slots after the given one in an attempt to find a non-empty future slot.
// For efficiency only one random slot is checked per epoch, so returned slot might not be the first
// non-skipped slot. This shouldn't be a problem, as in case of adversary peer, we might get incorrect
// data anyway, so code that relies on this function must be robust enough to re-request, if no progress
// is possible with a returned value.
func (f *blocksFetcher) nonSkippedSlotAfter(ctx context.Context, slot uint64) (uint64, error) {
ctx, span := trace.StartSpan(ctx, "initialsync.nonSkippedSlotAfter")
defer span.End()
headEpoch := helpers.SlotToEpoch(f.headFetcher.HeadSlot())
finalizedEpoch, peers := f.p2p.Peers().BestFinalized(params.BeaconConfig().MaxPeersToSync, headEpoch)
log.WithFields(logrus.Fields{
"start": slot,
"headEpoch": headEpoch,
"finalizedEpoch": finalizedEpoch,
}).Debug("Searching for non-skipped slot")
// Exit early, if no peers with high enough finalized epoch are found.
if finalizedEpoch <= headEpoch {
return 0, errSlotIsTooHigh
}
var err error
peers, err = f.filterPeers(peers, peersPercentagePerRequest)
if err != nil {
return 0, err
}
if len(peers) == 0 {
return 0, errNoPeersAvailable
}
slotsPerEpoch := params.BeaconConfig().SlotsPerEpoch
pidInd := 0
fetch := func(pid peer.ID, start, count, step uint64) (uint64, error) {
req := &p2ppb.BeaconBlocksByRangeRequest{
StartSlot: start,
Count: count,
Step: step,
}
blocks, err := f.requestBlocks(ctx, req, pid)
if err != nil {
return 0, err
}
if len(blocks) > 0 {
for _, block := range blocks {
if block.Block.Slot > slot {
return block.Block.Slot, nil
}
}
}
return 0, nil
}
// Start by checking several epochs fully, w/o resorting to random sampling.
start := slot + 1
end := start + nonSkippedSlotsFullSearchEpochs*slotsPerEpoch
for ind := start; ind < end; ind += slotsPerEpoch {
nextSlot, err := fetch(peers[pidInd%len(peers)], ind, slotsPerEpoch, 1)
if err != nil {
return 0, err
}
if nextSlot > slot {
return nextSlot, nil
}
pidInd++
}
// Quickly find the close enough epoch where a non-empty slot definitely exists.
// Only single random slot per epoch is checked - allowing to move forward relatively quickly.
slot = slot + nonSkippedSlotsFullSearchEpochs*slotsPerEpoch
upperBoundSlot := helpers.StartSlot(finalizedEpoch + 1)
for ind := slot + 1; ind < upperBoundSlot; ind += (slotsPerEpoch * slotsPerEpoch) / 2 {
start := ind + uint64(f.rand.Intn(int(slotsPerEpoch)))
nextSlot, err := fetch(peers[pidInd%len(peers)], start, slotsPerEpoch/2, slotsPerEpoch)
if err != nil {
return 0, err
}
pidInd++
if nextSlot > slot && upperBoundSlot >= nextSlot {
upperBoundSlot = nextSlot
break
}
}
// Epoch with non-empty slot is located. Check all slots within two nearby epochs.
if upperBoundSlot > slotsPerEpoch {
upperBoundSlot -= slotsPerEpoch
}
upperBoundSlot = helpers.StartSlot(helpers.SlotToEpoch(upperBoundSlot))
nextSlot, err := fetch(peers[pidInd%len(peers)], upperBoundSlot, slotsPerEpoch*2, 1)
if err != nil {
return 0, err
}
if nextSlot < slot || helpers.StartSlot(finalizedEpoch+1) < nextSlot {
return 0, errors.New("invalid range for non-skipped slot")
}
return nextSlot, nil
}
// bestFinalizedSlot returns the highest finalized slot of the majority of connected peers.
func (f *blocksFetcher) bestFinalizedSlot() uint64 {
headEpoch := helpers.SlotToEpoch(f.headFetcher.HeadSlot())
finalizedEpoch, _ := f.p2p.Peers().BestFinalized(params.BeaconConfig().MaxPeersToSync, headEpoch)
return helpers.StartSlot(finalizedEpoch)
}