diff --git a/beacon-chain/sync/initial-sync/BUILD.bazel b/beacon-chain/sync/initial-sync/BUILD.bazel index 6e1ffac88b..721a3550f6 100644 --- a/beacon-chain/sync/initial-sync/BUILD.bazel +++ b/beacon-chain/sync/initial-sync/BUILD.bazel @@ -5,6 +5,8 @@ go_library( name = "go_default_library", srcs = [ "blocks_fetcher.go", + "blocks_fetcher_peers.go", + "blocks_fetcher_utils.go", "blocks_queue.go", "fsm.go", "log.go", diff --git a/beacon-chain/sync/initial-sync/blocks_fetcher.go b/beacon-chain/sync/initial-sync/blocks_fetcher.go index 607d6d8efd..16c505ab7e 100644 --- a/beacon-chain/sync/initial-sync/blocks_fetcher.go +++ b/beacon-chain/sync/initial-sync/blocks_fetcher.go @@ -4,8 +4,6 @@ import ( "context" "fmt" "io" - "math" - "sort" "sync" "time" @@ -21,10 +19,8 @@ import ( "github.com/prysmaticlabs/prysm/beacon-chain/p2p" prysmsync "github.com/prysmaticlabs/prysm/beacon-chain/sync" p2ppb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" - "github.com/prysmaticlabs/prysm/shared/mathutil" "github.com/prysmaticlabs/prysm/shared/params" "github.com/prysmaticlabs/prysm/shared/rand" - "github.com/prysmaticlabs/prysm/shared/roughtime" "github.com/sirupsen/logrus" "go.opencensus.io/trace" ) @@ -347,211 +343,3 @@ func (f *blocksFetcher) requestBlocks( return resp, nil } - -// getPeerLock returns peer lock for a given peer. If lock is not found, it is created. -func (f *blocksFetcher) getPeerLock(pid peer.ID) *peerLock { - f.Lock() - defer f.Unlock() - if lock, ok := f.peerLocks[pid]; ok { - lock.accessed = roughtime.Now() - return lock - } - f.peerLocks[pid] = &peerLock{ - Mutex: sync.Mutex{}, - accessed: roughtime.Now(), - } - return f.peerLocks[pid] -} - -// removeStalePeerLocks is a cleanup procedure which removes stale locks. -func (f *blocksFetcher) removeStalePeerLocks(age time.Duration) { - f.Lock() - defer f.Unlock() - for peerID, lock := range f.peerLocks { - if time.Since(lock.accessed) >= age { - lock.Lock() - delete(f.peerLocks, peerID) - lock.Unlock() - } - } -} - -// selectFailOverPeer randomly selects fail over peer from the list of available peers. -func (f *blocksFetcher) selectFailOverPeer(excludedPID peer.ID, peers []peer.ID) (peer.ID, error) { - if len(peers) == 0 { - return "", errNoPeersAvailable - } - if len(peers) == 1 && peers[0] == excludedPID { - return "", errNoPeersAvailable - } - - ind := f.rand.Int() % len(peers) - if peers[ind] == excludedPID { - return f.selectFailOverPeer(excludedPID, append(peers[:ind], peers[ind+1:]...)) - } - return peers[ind], nil -} - -// waitForMinimumPeers spins and waits up until enough peers are available. -func (f *blocksFetcher) waitForMinimumPeers(ctx context.Context) ([]peer.ID, error) { - required := params.BeaconConfig().MaxPeersToSync - if flags.Get().MinimumSyncPeers < required { - required = flags.Get().MinimumSyncPeers - } - for { - if ctx.Err() != nil { - return nil, ctx.Err() - } - headEpoch := helpers.SlotToEpoch(f.headFetcher.HeadSlot()) - _, peers := f.p2p.Peers().BestFinalized(params.BeaconConfig().MaxPeersToSync, headEpoch) - if len(peers) >= required { - return peers, nil - } - log.WithFields(logrus.Fields{ - "suitable": len(peers), - "required": required}).Info("Waiting for enough suitable peers before syncing") - time.Sleep(handshakePollingInterval) - } -} - -// filterPeers returns transformed list of peers, -// weight ordered or randomized, constrained if necessary. -func (f *blocksFetcher) filterPeers(peers []peer.ID, peersPercentage float64) ([]peer.ID, error) { - if len(peers) == 0 { - return peers, nil - } - - // Shuffle peers to prevent a bad peer from - // stalling sync with invalid blocks. - f.rand.Shuffle(len(peers), func(i, j int) { - peers[i], peers[j] = peers[j], peers[i] - }) - - // Select sub-sample from peers (honoring min-max invariants). - required := params.BeaconConfig().MaxPeersToSync - if flags.Get().MinimumSyncPeers < required { - required = flags.Get().MinimumSyncPeers - } - limit := uint64(math.Round(float64(len(peers)) * peersPercentage)) - limit = mathutil.Max(limit, uint64(required)) - limit = mathutil.Min(limit, uint64(len(peers))) - peers = peers[:limit] - - // Order peers by remaining capacity, effectively turning in-order - // round robin peer processing into a weighted one (peers with higher - // remaining capacity are preferred). Peers with the same capacity - // are selected at random, since we have already shuffled peers - // at this point. - sort.SliceStable(peers, func(i, j int) bool { - cap1 := f.rateLimiter.Remaining(peers[i].String()) - cap2 := f.rateLimiter.Remaining(peers[j].String()) - return cap1 > cap2 - }) - - return peers, nil -} - -// nonSkippedSlotAfter checks slots after the given one in an attempt to find a non-empty future slot. -// For efficiency only one random slot is checked per epoch, so returned slot might not be the first -// non-skipped slot. This shouldn't be a problem, as in case of adversary peer, we might get incorrect -// data anyway, so code that relies on this function must be robust enough to re-request, if no progress -// is possible with a returned value. -func (f *blocksFetcher) nonSkippedSlotAfter(ctx context.Context, slot uint64) (uint64, error) { - ctx, span := trace.StartSpan(ctx, "initialsync.nonSkippedSlotAfter") - defer span.End() - - headEpoch := helpers.SlotToEpoch(f.headFetcher.HeadSlot()) - finalizedEpoch, peers := f.p2p.Peers().BestFinalized(params.BeaconConfig().MaxPeersToSync, headEpoch) - log.WithFields(logrus.Fields{ - "start": slot, - "headEpoch": headEpoch, - "finalizedEpoch": finalizedEpoch, - }).Debug("Searching for non-skipped slot") - // Exit early, if no peers with high enough finalized epoch are found. - if finalizedEpoch <= headEpoch { - return 0, errSlotIsTooHigh - } - var err error - peers, err = f.filterPeers(peers, peersPercentagePerRequest) - if err != nil { - return 0, err - } - if len(peers) == 0 { - return 0, errNoPeersAvailable - } - - slotsPerEpoch := params.BeaconConfig().SlotsPerEpoch - pidInd := 0 - - fetch := func(pid peer.ID, start, count, step uint64) (uint64, error) { - req := &p2ppb.BeaconBlocksByRangeRequest{ - StartSlot: start, - Count: count, - Step: step, - } - blocks, err := f.requestBlocks(ctx, req, pid) - if err != nil { - return 0, err - } - if len(blocks) > 0 { - for _, block := range blocks { - if block.Block.Slot > slot { - return block.Block.Slot, nil - } - } - } - return 0, nil - } - - // Start by checking several epochs fully, w/o resorting to random sampling. - start := slot + 1 - end := start + nonSkippedSlotsFullSearchEpochs*slotsPerEpoch - for ind := start; ind < end; ind += slotsPerEpoch { - nextSlot, err := fetch(peers[pidInd%len(peers)], ind, slotsPerEpoch, 1) - if err != nil { - return 0, err - } - if nextSlot > slot { - return nextSlot, nil - } - pidInd++ - } - - // Quickly find the close enough epoch where a non-empty slot definitely exists. - // Only single random slot per epoch is checked - allowing to move forward relatively quickly. - slot = slot + nonSkippedSlotsFullSearchEpochs*slotsPerEpoch - upperBoundSlot := helpers.StartSlot(finalizedEpoch + 1) - for ind := slot + 1; ind < upperBoundSlot; ind += (slotsPerEpoch * slotsPerEpoch) / 2 { - start := ind + uint64(f.rand.Intn(int(slotsPerEpoch))) - nextSlot, err := fetch(peers[pidInd%len(peers)], start, slotsPerEpoch/2, slotsPerEpoch) - if err != nil { - return 0, err - } - pidInd++ - if nextSlot > slot && upperBoundSlot >= nextSlot { - upperBoundSlot = nextSlot - break - } - } - - // Epoch with non-empty slot is located. Check all slots within two nearby epochs. - if upperBoundSlot > slotsPerEpoch { - upperBoundSlot -= slotsPerEpoch - } - upperBoundSlot = helpers.StartSlot(helpers.SlotToEpoch(upperBoundSlot)) - nextSlot, err := fetch(peers[pidInd%len(peers)], upperBoundSlot, slotsPerEpoch*2, 1) - if err != nil { - return 0, err - } - if nextSlot < slot || helpers.StartSlot(finalizedEpoch+1) < nextSlot { - return 0, errors.New("invalid range for non-skipped slot") - } - return nextSlot, nil -} - -// bestFinalizedSlot returns the highest finalized slot of the majority of connected peers. -func (f *blocksFetcher) bestFinalizedSlot() uint64 { - headEpoch := helpers.SlotToEpoch(f.headFetcher.HeadSlot()) - finalizedEpoch, _ := f.p2p.Peers().BestFinalized(params.BeaconConfig().MaxPeersToSync, headEpoch) - return helpers.StartSlot(finalizedEpoch) -} diff --git a/beacon-chain/sync/initial-sync/blocks_fetcher_peers.go b/beacon-chain/sync/initial-sync/blocks_fetcher_peers.go new file mode 100644 index 0000000000..b39af39547 --- /dev/null +++ b/beacon-chain/sync/initial-sync/blocks_fetcher_peers.go @@ -0,0 +1,120 @@ +package initialsync + +import ( + "context" + "math" + "sort" + "sync" + "time" + + "github.com/libp2p/go-libp2p-core/peer" + "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" + "github.com/prysmaticlabs/prysm/beacon-chain/flags" + "github.com/prysmaticlabs/prysm/shared/mathutil" + "github.com/prysmaticlabs/prysm/shared/params" + "github.com/prysmaticlabs/prysm/shared/roughtime" + "github.com/sirupsen/logrus" +) + +// getPeerLock returns peer lock for a given peer. If lock is not found, it is created. +func (f *blocksFetcher) getPeerLock(pid peer.ID) *peerLock { + f.Lock() + defer f.Unlock() + if lock, ok := f.peerLocks[pid]; ok { + lock.accessed = roughtime.Now() + return lock + } + f.peerLocks[pid] = &peerLock{ + Mutex: sync.Mutex{}, + accessed: roughtime.Now(), + } + return f.peerLocks[pid] +} + +// removeStalePeerLocks is a cleanup procedure which removes stale locks. +func (f *blocksFetcher) removeStalePeerLocks(age time.Duration) { + f.Lock() + defer f.Unlock() + for peerID, lock := range f.peerLocks { + if time.Since(lock.accessed) >= age { + lock.Lock() + delete(f.peerLocks, peerID) + lock.Unlock() + } + } +} + +// selectFailOverPeer randomly selects fail over peer from the list of available peers. +func (f *blocksFetcher) selectFailOverPeer(excludedPID peer.ID, peers []peer.ID) (peer.ID, error) { + if len(peers) == 0 { + return "", errNoPeersAvailable + } + if len(peers) == 1 && peers[0] == excludedPID { + return "", errNoPeersAvailable + } + + ind := f.rand.Int() % len(peers) + if peers[ind] == excludedPID { + return f.selectFailOverPeer(excludedPID, append(peers[:ind], peers[ind+1:]...)) + } + return peers[ind], nil +} + +// waitForMinimumPeers spins and waits up until enough peers are available. +func (f *blocksFetcher) waitForMinimumPeers(ctx context.Context) ([]peer.ID, error) { + required := params.BeaconConfig().MaxPeersToSync + if flags.Get().MinimumSyncPeers < required { + required = flags.Get().MinimumSyncPeers + } + for { + if ctx.Err() != nil { + return nil, ctx.Err() + } + headEpoch := helpers.SlotToEpoch(f.headFetcher.HeadSlot()) + _, peers := f.p2p.Peers().BestFinalized(params.BeaconConfig().MaxPeersToSync, headEpoch) + if len(peers) >= required { + return peers, nil + } + log.WithFields(logrus.Fields{ + "suitable": len(peers), + "required": required}).Info("Waiting for enough suitable peers before syncing") + time.Sleep(handshakePollingInterval) + } +} + +// filterPeers returns transformed list of peers, +// weight ordered or randomized, constrained if necessary. +func (f *blocksFetcher) filterPeers(peers []peer.ID, peersPercentage float64) ([]peer.ID, error) { + if len(peers) == 0 { + return peers, nil + } + + // Shuffle peers to prevent a bad peer from + // stalling sync with invalid blocks. + f.rand.Shuffle(len(peers), func(i, j int) { + peers[i], peers[j] = peers[j], peers[i] + }) + + // Select sub-sample from peers (honoring min-max invariants). + required := params.BeaconConfig().MaxPeersToSync + if flags.Get().MinimumSyncPeers < required { + required = flags.Get().MinimumSyncPeers + } + limit := uint64(math.Round(float64(len(peers)) * peersPercentage)) + limit = mathutil.Max(limit, uint64(required)) + limit = mathutil.Min(limit, uint64(len(peers))) + peers = peers[:limit] + + // Order peers by remaining capacity, effectively turning in-order + // round robin peer processing into a weighted one (peers with higher + // remaining capacity are preferred). Peers with the same capacity + // are selected at random, since we have already shuffled peers + // at this point. + sort.SliceStable(peers, func(i, j int) bool { + cap1 := f.rateLimiter.Remaining(peers[i].String()) + cap2 := f.rateLimiter.Remaining(peers[j].String()) + return cap1 > cap2 + }) + + return peers, nil +} diff --git a/beacon-chain/sync/initial-sync/blocks_fetcher_utils.go b/beacon-chain/sync/initial-sync/blocks_fetcher_utils.go new file mode 100644 index 0000000000..0bd0bf4e2d --- /dev/null +++ b/beacon-chain/sync/initial-sync/blocks_fetcher_utils.go @@ -0,0 +1,118 @@ +package initialsync + +import ( + "context" + + "github.com/libp2p/go-libp2p-core/peer" + "github.com/pkg/errors" + "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" + p2ppb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" + "github.com/prysmaticlabs/prysm/shared/params" + "github.com/sirupsen/logrus" + "go.opencensus.io/trace" +) + +// nonSkippedSlotAfter checks slots after the given one in an attempt to find a non-empty future slot. +// For efficiency only one random slot is checked per epoch, so returned slot might not be the first +// non-skipped slot. This shouldn't be a problem, as in case of adversary peer, we might get incorrect +// data anyway, so code that relies on this function must be robust enough to re-request, if no progress +// is possible with a returned value. +func (f *blocksFetcher) nonSkippedSlotAfter(ctx context.Context, slot uint64) (uint64, error) { + ctx, span := trace.StartSpan(ctx, "initialsync.nonSkippedSlotAfter") + defer span.End() + + headEpoch := helpers.SlotToEpoch(f.headFetcher.HeadSlot()) + finalizedEpoch, peers := f.p2p.Peers().BestFinalized(params.BeaconConfig().MaxPeersToSync, headEpoch) + log.WithFields(logrus.Fields{ + "start": slot, + "headEpoch": headEpoch, + "finalizedEpoch": finalizedEpoch, + }).Debug("Searching for non-skipped slot") + // Exit early, if no peers with high enough finalized epoch are found. + if finalizedEpoch <= headEpoch { + return 0, errSlotIsTooHigh + } + var err error + peers, err = f.filterPeers(peers, peersPercentagePerRequest) + if err != nil { + return 0, err + } + if len(peers) == 0 { + return 0, errNoPeersAvailable + } + + slotsPerEpoch := params.BeaconConfig().SlotsPerEpoch + pidInd := 0 + + fetch := func(pid peer.ID, start, count, step uint64) (uint64, error) { + req := &p2ppb.BeaconBlocksByRangeRequest{ + StartSlot: start, + Count: count, + Step: step, + } + blocks, err := f.requestBlocks(ctx, req, pid) + if err != nil { + return 0, err + } + if len(blocks) > 0 { + for _, block := range blocks { + if block.Block.Slot > slot { + return block.Block.Slot, nil + } + } + } + return 0, nil + } + + // Start by checking several epochs fully, w/o resorting to random sampling. + start := slot + 1 + end := start + nonSkippedSlotsFullSearchEpochs*slotsPerEpoch + for ind := start; ind < end; ind += slotsPerEpoch { + nextSlot, err := fetch(peers[pidInd%len(peers)], ind, slotsPerEpoch, 1) + if err != nil { + return 0, err + } + if nextSlot > slot { + return nextSlot, nil + } + pidInd++ + } + + // Quickly find the close enough epoch where a non-empty slot definitely exists. + // Only single random slot per epoch is checked - allowing to move forward relatively quickly. + slot = slot + nonSkippedSlotsFullSearchEpochs*slotsPerEpoch + upperBoundSlot := helpers.StartSlot(finalizedEpoch + 1) + for ind := slot + 1; ind < upperBoundSlot; ind += (slotsPerEpoch * slotsPerEpoch) / 2 { + start := ind + uint64(f.rand.Intn(int(slotsPerEpoch))) + nextSlot, err := fetch(peers[pidInd%len(peers)], start, slotsPerEpoch/2, slotsPerEpoch) + if err != nil { + return 0, err + } + pidInd++ + if nextSlot > slot && upperBoundSlot >= nextSlot { + upperBoundSlot = nextSlot + break + } + } + + // Epoch with non-empty slot is located. Check all slots within two nearby epochs. + if upperBoundSlot > slotsPerEpoch { + upperBoundSlot -= slotsPerEpoch + } + upperBoundSlot = helpers.StartSlot(helpers.SlotToEpoch(upperBoundSlot)) + nextSlot, err := fetch(peers[pidInd%len(peers)], upperBoundSlot, slotsPerEpoch*2, 1) + if err != nil { + return 0, err + } + if nextSlot < slot || helpers.StartSlot(finalizedEpoch+1) < nextSlot { + return 0, errors.New("invalid range for non-skipped slot") + } + return nextSlot, nil +} + +// bestFinalizedSlot returns the highest finalized slot of the majority of connected peers. +func (f *blocksFetcher) bestFinalizedSlot() uint64 { + headEpoch := helpers.SlotToEpoch(f.headFetcher.HeadSlot()) + finalizedEpoch, _ := f.p2p.Peers().BestFinalized(params.BeaconConfig().MaxPeersToSync, headEpoch) + return helpers.StartSlot(finalizedEpoch) +}