mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-09 21:38:05 -05:00
Use Stage 1 For Non Finalized Sync (#7012)
* checkpoint * dont cancel * remove * sync mode * fixes build * cap max retries when no finalized peers are found * use max peers * change it * fixes TestService_roundRobinSync/Multiple_peers_with_different_finalized_epoch * fixes blocks fetcher tests * fixes blocks queue tests * fixes TestService_blockProviderScoring test * gofmt * Merge branch 'master' into useStage1 * Update round_robin.go preston's review * Preston's suggestions * Merge branch 'useStage1' of github.com:prysmaticlabs/prysm into useStage1 * fixes test
This commit is contained in:
@@ -55,8 +55,10 @@ var (
|
||||
// blocksFetcherConfig is a config to setup the block fetcher.
|
||||
type blocksFetcherConfig struct {
|
||||
headFetcher blockchain.HeadFetcher
|
||||
finalizationFetcher blockchain.FinalizationFetcher
|
||||
p2p p2p.P2P
|
||||
peerFilterCapacityWeight float64
|
||||
mode syncMode
|
||||
}
|
||||
|
||||
// blocksFetcher is a service to fetch chain data from peers.
|
||||
@@ -64,18 +66,20 @@ type blocksFetcherConfig struct {
|
||||
// among available peers (for fair network load distribution).
|
||||
type blocksFetcher struct {
|
||||
sync.Mutex
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
rand *rand.Rand
|
||||
headFetcher blockchain.HeadFetcher
|
||||
p2p p2p.P2P
|
||||
blocksPerSecond uint64
|
||||
rateLimiter *leakybucket.Collector
|
||||
peerLocks map[peer.ID]*peerLock
|
||||
fetchRequests chan *fetchRequestParams
|
||||
fetchResponses chan *fetchRequestResponse
|
||||
capacityWeight float64 // how remaining capacity affects peer selection
|
||||
quit chan struct{} // termination notifier
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
rand *rand.Rand
|
||||
headFetcher blockchain.HeadFetcher
|
||||
finalizationFetcher blockchain.FinalizationFetcher
|
||||
p2p p2p.P2P
|
||||
blocksPerSecond uint64
|
||||
rateLimiter *leakybucket.Collector
|
||||
peerLocks map[peer.ID]*peerLock
|
||||
fetchRequests chan *fetchRequestParams
|
||||
fetchResponses chan *fetchRequestResponse
|
||||
capacityWeight float64 // how remaining capacity affects peer selection
|
||||
mode syncMode // allows to use fetcher in different sync scenarios
|
||||
quit chan struct{} // termination notifier
|
||||
}
|
||||
|
||||
// peerLock restricts fetcher actions on per peer basis. Currently, used for rate limiting.
|
||||
@@ -116,18 +120,19 @@ func newBlocksFetcher(ctx context.Context, cfg *blocksFetcherConfig) *blocksFetc
|
||||
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
return &blocksFetcher{
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
rand: rand.NewGenerator(),
|
||||
headFetcher: cfg.headFetcher,
|
||||
p2p: cfg.p2p,
|
||||
blocksPerSecond: uint64(blocksPerSecond),
|
||||
rateLimiter: rateLimiter,
|
||||
peerLocks: make(map[peer.ID]*peerLock),
|
||||
fetchRequests: make(chan *fetchRequestParams, maxPendingRequests),
|
||||
fetchResponses: make(chan *fetchRequestResponse, maxPendingRequests),
|
||||
capacityWeight: capacityWeight,
|
||||
quit: make(chan struct{}),
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
rand: rand.NewGenerator(),
|
||||
headFetcher: cfg.headFetcher,
|
||||
finalizationFetcher: cfg.finalizationFetcher,
|
||||
p2p: cfg.p2p,
|
||||
blocksPerSecond: uint64(blocksPerSecond),
|
||||
rateLimiter: rateLimiter,
|
||||
peerLocks: make(map[peer.ID]*peerLock),
|
||||
fetchRequests: make(chan *fetchRequestParams, maxPendingRequests),
|
||||
fetchResponses: make(chan *fetchRequestResponse, maxPendingRequests),
|
||||
capacityWeight: capacityWeight,
|
||||
quit: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -244,7 +249,7 @@ func (f *blocksFetcher) handleRequest(ctx context.Context, start, count uint64)
|
||||
return response
|
||||
}
|
||||
|
||||
headEpoch := helpers.SlotToEpoch(f.headFetcher.HeadSlot())
|
||||
headEpoch := f.finalizationFetcher.FinalizedCheckpt().Epoch
|
||||
finalizedEpoch, peers := f.p2p.Peers().BestFinalized(params.BeaconConfig().MaxPeersToSync, headEpoch)
|
||||
if len(peers) == 0 {
|
||||
response.err = errNoPeersAvailable
|
||||
@@ -252,11 +257,13 @@ func (f *blocksFetcher) handleRequest(ctx context.Context, start, count uint64)
|
||||
}
|
||||
|
||||
// Short circuit start far exceeding the highest finalized epoch in some infinite loop.
|
||||
highestFinalizedSlot := helpers.StartSlot(finalizedEpoch + 1)
|
||||
if start > highestFinalizedSlot {
|
||||
response.err = fmt.Errorf("%v, slot: %d, higest finilized slot: %d",
|
||||
errSlotIsTooHigh, start, highestFinalizedSlot)
|
||||
return response
|
||||
if f.mode == modeStopOnFinalizedEpoch {
|
||||
highestFinalizedSlot := helpers.StartSlot(finalizedEpoch + 1)
|
||||
if start > highestFinalizedSlot {
|
||||
response.err = fmt.Errorf("%v, slot: %d, highest finalized slot: %d",
|
||||
errSlotIsTooHigh, start, highestFinalizedSlot)
|
||||
return response
|
||||
}
|
||||
}
|
||||
|
||||
response.blocks, response.pid, response.err = f.fetchBlocksFromPeer(ctx, start, count, peers)
|
||||
|
||||
@@ -8,7 +8,6 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/flags"
|
||||
scorers "github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers"
|
||||
"github.com/prysmaticlabs/prysm/shared/mathutil"
|
||||
@@ -72,8 +71,7 @@ func (f *blocksFetcher) waitForMinimumPeers(ctx context.Context) ([]peer.ID, err
|
||||
if ctx.Err() != nil {
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
headEpoch := helpers.SlotToEpoch(f.headFetcher.HeadSlot())
|
||||
_, peers := f.p2p.Peers().BestFinalized(params.BeaconConfig().MaxPeersToSync, headEpoch)
|
||||
_, peers := f.p2p.Peers().BestFinalized(params.BeaconConfig().MaxPeersToSync, f.finalizationFetcher.FinalizedCheckpt().Epoch)
|
||||
if len(peers) >= required {
|
||||
return peers, nil
|
||||
}
|
||||
|
||||
@@ -39,8 +39,9 @@ func TestBlocksFetcher_InitStartStop(t *testing.T) {
|
||||
fetcher := newBlocksFetcher(
|
||||
ctx,
|
||||
&blocksFetcherConfig{
|
||||
headFetcher: mc,
|
||||
p2p: p2p,
|
||||
headFetcher: mc,
|
||||
finalizationFetcher: mc,
|
||||
p2p: p2p,
|
||||
},
|
||||
)
|
||||
|
||||
@@ -63,8 +64,9 @@ func TestBlocksFetcher_InitStartStop(t *testing.T) {
|
||||
fetcher := newBlocksFetcher(
|
||||
context.Background(),
|
||||
&blocksFetcherConfig{
|
||||
headFetcher: mc,
|
||||
p2p: p2p,
|
||||
headFetcher: mc,
|
||||
finalizationFetcher: mc,
|
||||
p2p: p2p,
|
||||
})
|
||||
require.NoError(t, fetcher.start())
|
||||
fetcher.stop()
|
||||
@@ -76,8 +78,9 @@ func TestBlocksFetcher_InitStartStop(t *testing.T) {
|
||||
fetcher := newBlocksFetcher(
|
||||
ctx,
|
||||
&blocksFetcherConfig{
|
||||
headFetcher: mc,
|
||||
p2p: p2p,
|
||||
headFetcher: mc,
|
||||
finalizationFetcher: mc,
|
||||
p2p: p2p,
|
||||
})
|
||||
require.NoError(t, fetcher.start())
|
||||
cancel()
|
||||
@@ -263,10 +266,17 @@ func TestBlocksFetcher_RoundRobin(t *testing.T) {
|
||||
State: st,
|
||||
Root: genesisRoot[:],
|
||||
DB: beaconDB,
|
||||
FinalizedCheckPoint: ð.Checkpoint{
|
||||
Epoch: 0,
|
||||
},
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
fetcher := newBlocksFetcher(ctx, &blocksFetcherConfig{headFetcher: mc, p2p: p})
|
||||
fetcher := newBlocksFetcher(ctx, &blocksFetcherConfig{
|
||||
headFetcher: mc,
|
||||
finalizationFetcher: mc,
|
||||
p2p: p,
|
||||
})
|
||||
require.NoError(t, fetcher.start())
|
||||
|
||||
var wg sync.WaitGroup
|
||||
@@ -384,8 +394,9 @@ func TestBlocksFetcher_handleRequest(t *testing.T) {
|
||||
t.Run("context cancellation", func(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
fetcher := newBlocksFetcher(ctx, &blocksFetcherConfig{
|
||||
headFetcher: mc,
|
||||
p2p: p2p,
|
||||
headFetcher: mc,
|
||||
finalizationFetcher: mc,
|
||||
p2p: p2p,
|
||||
})
|
||||
|
||||
cancel()
|
||||
@@ -397,8 +408,9 @@ func TestBlocksFetcher_handleRequest(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
fetcher := newBlocksFetcher(ctx, &blocksFetcherConfig{
|
||||
headFetcher: mc,
|
||||
p2p: p2p,
|
||||
headFetcher: mc,
|
||||
finalizationFetcher: mc,
|
||||
p2p: p2p,
|
||||
})
|
||||
|
||||
requestCtx, reqCancel := context.WithTimeout(context.Background(), 2*time.Second)
|
||||
|
||||
@@ -117,7 +117,6 @@ func (f *blocksFetcher) nonSkippedSlotAfter(ctx context.Context, slot uint64) (u
|
||||
|
||||
// bestFinalizedSlot returns the highest finalized slot of the majority of connected peers.
|
||||
func (f *blocksFetcher) bestFinalizedSlot() uint64 {
|
||||
headEpoch := helpers.SlotToEpoch(f.headFetcher.HeadSlot())
|
||||
finalizedEpoch, _ := f.p2p.Peers().BestFinalized(params.BeaconConfig().MaxPeersToSync, headEpoch)
|
||||
finalizedEpoch, _ := f.p2p.Peers().BestFinalized(params.BeaconConfig().MaxPeersToSync, f.finalizationFetcher.FinalizedCheckpt().Epoch)
|
||||
return helpers.StartSlot(finalizedEpoch)
|
||||
}
|
||||
|
||||
@@ -23,6 +23,8 @@ const (
|
||||
// lookaheadSteps is a limit on how many forward steps are loaded into queue.
|
||||
// Each step is managed by assigned finite state machine.
|
||||
lookaheadSteps = 8
|
||||
// noFinalizedPeersErrMaxRetries defines number of retries when no finalized peers are found.
|
||||
noFinalizedPeersErrMaxRetries = 100
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -33,13 +35,23 @@ var (
|
||||
errNoPeersWithFinalizedBlocks = errors.New("no peers with finalized blocks are found")
|
||||
)
|
||||
|
||||
const (
|
||||
modeStopOnFinalizedEpoch syncMode = iota
|
||||
modeNonConstrained
|
||||
)
|
||||
|
||||
// syncMode specifies sync mod type.
|
||||
type syncMode uint8
|
||||
|
||||
// blocksQueueConfig is a config to setup block queue service.
|
||||
type blocksQueueConfig struct {
|
||||
blocksFetcher *blocksFetcher
|
||||
headFetcher blockchain.HeadFetcher
|
||||
finalizationFetcher blockchain.FinalizationFetcher
|
||||
startSlot uint64
|
||||
highestExpectedSlot uint64
|
||||
p2p p2p.P2P
|
||||
mode syncMode
|
||||
}
|
||||
|
||||
// blocksQueue is a priority queue that serves as a intermediary between block fetchers (producers)
|
||||
@@ -51,8 +63,12 @@ type blocksQueue struct {
|
||||
blocksFetcher *blocksFetcher
|
||||
headFetcher blockchain.HeadFetcher
|
||||
highestExpectedSlot uint64
|
||||
fetchedData chan *blocksQueueFetchedData // output channel for ready blocks
|
||||
quit chan struct{} // termination notifier
|
||||
mode syncMode
|
||||
exitConditions struct {
|
||||
noFinalizedPeersErrRetries int
|
||||
}
|
||||
fetchedData chan *blocksQueueFetchedData // output channel for ready blocks
|
||||
quit chan struct{} // termination notifier
|
||||
}
|
||||
|
||||
// blocksQueueFetchedData is a data container that is returned from a queue on each step.
|
||||
@@ -68,8 +84,9 @@ func newBlocksQueue(ctx context.Context, cfg *blocksQueueConfig) *blocksQueue {
|
||||
blocksFetcher := cfg.blocksFetcher
|
||||
if blocksFetcher == nil {
|
||||
blocksFetcher = newBlocksFetcher(ctx, &blocksFetcherConfig{
|
||||
headFetcher: cfg.headFetcher,
|
||||
p2p: cfg.p2p,
|
||||
headFetcher: cfg.headFetcher,
|
||||
finalizationFetcher: cfg.finalizationFetcher,
|
||||
p2p: cfg.p2p,
|
||||
})
|
||||
}
|
||||
highestExpectedSlot := cfg.highestExpectedSlot
|
||||
@@ -77,12 +94,16 @@ func newBlocksQueue(ctx context.Context, cfg *blocksQueueConfig) *blocksQueue {
|
||||
highestExpectedSlot = blocksFetcher.bestFinalizedSlot()
|
||||
}
|
||||
|
||||
// Override fetcher's sync mode.
|
||||
blocksFetcher.mode = cfg.mode
|
||||
|
||||
queue := &blocksQueue{
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
highestExpectedSlot: highestExpectedSlot,
|
||||
blocksFetcher: blocksFetcher,
|
||||
headFetcher: cfg.headFetcher,
|
||||
mode: cfg.mode,
|
||||
fetchedData: make(chan *blocksQueueFetchedData, 1),
|
||||
quit: make(chan struct{}),
|
||||
}
|
||||
@@ -159,14 +180,19 @@ func (q *blocksQueue) loop() {
|
||||
fsm := q.smm.machines[key]
|
||||
if err := fsm.trigger(eventTick, nil); err != nil {
|
||||
log.WithFields(logrus.Fields{
|
||||
"highestExpectedSlot": q.highestExpectedSlot,
|
||||
"event": eventTick,
|
||||
"epoch": helpers.SlotToEpoch(fsm.start),
|
||||
"start": fsm.start,
|
||||
"error": err.Error(),
|
||||
"highestExpectedSlot": q.highestExpectedSlot,
|
||||
"noFinalizedPeersErrRetries": q.exitConditions.noFinalizedPeersErrRetries,
|
||||
"event": eventTick,
|
||||
"epoch": helpers.SlotToEpoch(fsm.start),
|
||||
"start": fsm.start,
|
||||
"error": err.Error(),
|
||||
}).Debug("Can not trigger event")
|
||||
if err == errNoPeersWithFinalizedBlocks {
|
||||
q.cancel()
|
||||
forceExit := q.exitConditions.noFinalizedPeersErrRetries > noFinalizedPeersErrMaxRetries
|
||||
if q.mode == modeStopOnFinalizedEpoch || forceExit {
|
||||
q.cancel()
|
||||
}
|
||||
q.exitConditions.noFinalizedPeersErrRetries++
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
@@ -20,8 +20,9 @@ func TestBlocksQueueInitStartStop(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
fetcher := newBlocksFetcher(ctx, &blocksFetcherConfig{
|
||||
headFetcher: mc,
|
||||
p2p: p2p,
|
||||
headFetcher: mc,
|
||||
finalizationFetcher: mc,
|
||||
p2p: p2p,
|
||||
})
|
||||
|
||||
t.Run("stop without start", func(t *testing.T) {
|
||||
@@ -29,6 +30,7 @@ func TestBlocksQueueInitStartStop(t *testing.T) {
|
||||
defer cancel()
|
||||
queue := newBlocksQueue(ctx, &blocksQueueConfig{
|
||||
headFetcher: mc,
|
||||
finalizationFetcher: mc,
|
||||
highestExpectedSlot: blockBatchLimit,
|
||||
})
|
||||
assert.ErrorContains(t, errQueueTakesTooLongToStop.Error(), queue.stop())
|
||||
@@ -39,6 +41,7 @@ func TestBlocksQueueInitStartStop(t *testing.T) {
|
||||
defer cancel()
|
||||
queue := newBlocksQueue(ctx, &blocksQueueConfig{
|
||||
headFetcher: mc,
|
||||
finalizationFetcher: mc,
|
||||
highestExpectedSlot: blockBatchLimit,
|
||||
})
|
||||
assert.NoError(t, queue.start())
|
||||
@@ -49,6 +52,7 @@ func TestBlocksQueueInitStartStop(t *testing.T) {
|
||||
defer cancel()
|
||||
queue := newBlocksQueue(ctx, &blocksQueueConfig{
|
||||
headFetcher: mc,
|
||||
finalizationFetcher: mc,
|
||||
highestExpectedSlot: blockBatchLimit,
|
||||
})
|
||||
assert.NoError(t, queue.start())
|
||||
@@ -61,6 +65,7 @@ func TestBlocksQueueInitStartStop(t *testing.T) {
|
||||
queue := newBlocksQueue(ctx, &blocksQueueConfig{
|
||||
blocksFetcher: fetcher,
|
||||
headFetcher: mc,
|
||||
finalizationFetcher: mc,
|
||||
highestExpectedSlot: blockBatchLimit,
|
||||
})
|
||||
|
||||
@@ -85,6 +90,7 @@ func TestBlocksQueueInitStartStop(t *testing.T) {
|
||||
queue := newBlocksQueue(ctx, &blocksQueueConfig{
|
||||
blocksFetcher: fetcher,
|
||||
headFetcher: mc,
|
||||
finalizationFetcher: mc,
|
||||
highestExpectedSlot: blockBatchLimit,
|
||||
})
|
||||
assert.NoError(t, queue.start())
|
||||
@@ -98,6 +104,7 @@ func TestBlocksQueueInitStartStop(t *testing.T) {
|
||||
queue := newBlocksQueue(ctx, &blocksQueueConfig{
|
||||
blocksFetcher: fetcher,
|
||||
headFetcher: mc,
|
||||
finalizationFetcher: mc,
|
||||
highestExpectedSlot: blockBatchLimit,
|
||||
})
|
||||
assert.NoError(t, queue.start())
|
||||
@@ -110,6 +117,7 @@ func TestBlocksQueueInitStartStop(t *testing.T) {
|
||||
queue := newBlocksQueue(ctx, &blocksQueueConfig{
|
||||
blocksFetcher: fetcher,
|
||||
headFetcher: mc,
|
||||
finalizationFetcher: mc,
|
||||
highestExpectedSlot: blockBatchLimit,
|
||||
})
|
||||
assert.NoError(t, queue.start())
|
||||
@@ -230,12 +238,14 @@ func TestBlocksQueueLoop(t *testing.T) {
|
||||
defer cancel()
|
||||
|
||||
fetcher := newBlocksFetcher(ctx, &blocksFetcherConfig{
|
||||
headFetcher: mc,
|
||||
p2p: p2p,
|
||||
headFetcher: mc,
|
||||
finalizationFetcher: mc,
|
||||
p2p: p2p,
|
||||
})
|
||||
queue := newBlocksQueue(ctx, &blocksQueueConfig{
|
||||
blocksFetcher: fetcher,
|
||||
headFetcher: mc,
|
||||
finalizationFetcher: mc,
|
||||
highestExpectedSlot: tt.highestExpectedSlot,
|
||||
})
|
||||
assert.NoError(t, queue.start())
|
||||
|
||||
@@ -92,6 +92,9 @@ func initializeTestServices(t *testing.T, blocks []uint64, peers []*peerData) (*
|
||||
State: st,
|
||||
Root: genesisRoot[:],
|
||||
DB: beaconDB,
|
||||
FinalizedCheckPoint: ð.Checkpoint{
|
||||
Epoch: 0,
|
||||
},
|
||||
}, p, beaconDB
|
||||
}
|
||||
|
||||
|
||||
@@ -5,7 +5,6 @@ import (
|
||||
"context"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
@@ -15,11 +14,8 @@ import (
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/core/state"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/state/stateutil"
|
||||
p2ppb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
|
||||
"github.com/prysmaticlabs/prysm/shared/bytesutil"
|
||||
"github.com/prysmaticlabs/prysm/shared/featureconfig"
|
||||
"github.com/prysmaticlabs/prysm/shared/mathutil"
|
||||
"github.com/prysmaticlabs/prysm/shared/params"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
@@ -58,7 +54,9 @@ func (s *Service) roundRobinSync(genesis time.Time) error {
|
||||
queue := newBlocksQueue(ctx, &blocksQueueConfig{
|
||||
p2p: s.p2p,
|
||||
headFetcher: s.chain,
|
||||
finalizationFetcher: s.chain,
|
||||
highestExpectedSlot: highestFinalizedSlot,
|
||||
mode: modeStopOnFinalizedEpoch,
|
||||
})
|
||||
if err := queue.start(); err != nil {
|
||||
return err
|
||||
@@ -87,72 +85,25 @@ func (s *Service) roundRobinSync(genesis time.Time) error {
|
||||
// mitigation. We are already convinced that we are on the correct finalized chain. Any blocks
|
||||
// we receive there after must build on the finalized chain or be considered invalid during
|
||||
// fork choice resolution / block processing.
|
||||
blocksFetcher := newBlocksFetcher(ctx, &blocksFetcherConfig{
|
||||
p2p: s.p2p,
|
||||
headFetcher: s.chain,
|
||||
queue = newBlocksQueue(ctx, &blocksQueueConfig{
|
||||
p2p: s.p2p,
|
||||
headFetcher: s.chain,
|
||||
finalizationFetcher: s.chain,
|
||||
highestExpectedSlot: helpers.SlotsSince(genesis),
|
||||
mode: modeNonConstrained,
|
||||
})
|
||||
|
||||
// Select a new peer in the event of failure.
|
||||
nextBestPeer := func(prevPeer peer.ID) peer.ID {
|
||||
var pids []peer.ID
|
||||
for {
|
||||
_, pids = s.p2p.Peers().BestFinalized(params.BeaconConfig().MaxPeersToSync /* maxPeers */, s.highestFinalizedEpoch())
|
||||
if len(pids) == 0 {
|
||||
log.Info("Waiting for a suitable peer before syncing to the head of the chain")
|
||||
time.Sleep(refreshTime)
|
||||
continue
|
||||
}
|
||||
break
|
||||
}
|
||||
// Return new peer in the event of failure
|
||||
for _, id := range pids {
|
||||
if prevPeer != id {
|
||||
return id
|
||||
}
|
||||
}
|
||||
return prevPeer
|
||||
if err := queue.start(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, pids := s.p2p.Peers().BestFinalized(1 /* maxPeers */, s.highestFinalizedEpoch())
|
||||
for len(pids) == 0 {
|
||||
log.Info("Waiting for a suitable peer before syncing to the head of the chain")
|
||||
time.Sleep(refreshTime)
|
||||
_, pids = s.p2p.Peers().BestFinalized(1 /* maxPeers */, s.highestFinalizedEpoch())
|
||||
for data := range queue.fetchedData {
|
||||
s.processFetchedDataRegSync(ctx, genesis, s.chain.HeadSlot(), data)
|
||||
}
|
||||
best := pids[0]
|
||||
|
||||
for head := helpers.SlotsSince(genesis); s.chain.HeadSlot() < head; {
|
||||
count := mathutil.Min(
|
||||
helpers.SlotsSince(genesis)-s.chain.HeadSlot()+1, blocksFetcher.blocksPerSecond)
|
||||
req := &p2ppb.BeaconBlocksByRangeRequest{
|
||||
StartSlot: s.chain.HeadSlot() + 1,
|
||||
Count: count,
|
||||
Step: 1,
|
||||
}
|
||||
log.WithFields(logrus.Fields{
|
||||
"req": req,
|
||||
"peer": best.Pretty(),
|
||||
}).Debug("Sending batch block request")
|
||||
resp, err := blocksFetcher.requestBlocks(ctx, req, best)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Failed to receive blocks")
|
||||
best = nextBestPeer(best)
|
||||
continue
|
||||
}
|
||||
for _, blk := range resp {
|
||||
err := s.processBlock(ctx, genesis, blk, s.chain.ReceiveBlock)
|
||||
if err != nil {
|
||||
if strings.Contains(err.Error(), errBlockAlreadyProcessed.Error()) {
|
||||
continue
|
||||
}
|
||||
log.WithError(err).Error("Failed to process block")
|
||||
best = nextBestPeer(best)
|
||||
break
|
||||
}
|
||||
}
|
||||
if len(resp) == 0 {
|
||||
best = nextBestPeer(best)
|
||||
}
|
||||
log.WithFields(logrus.Fields{
|
||||
"syncedSlot": s.chain.HeadSlot(),
|
||||
"headSlot": helpers.SlotsSince(genesis),
|
||||
}).Debug("Synced to head of chain")
|
||||
if err := queue.stop(); err != nil {
|
||||
log.WithError(err).Debug("Error stopping queue")
|
||||
}
|
||||
|
||||
return nil
|
||||
@@ -161,15 +112,7 @@ func (s *Service) roundRobinSync(genesis time.Time) error {
|
||||
// processFetchedData processes data received from queue.
|
||||
func (s *Service) processFetchedData(
|
||||
ctx context.Context, genesis time.Time, startSlot uint64, data *blocksQueueFetchedData) {
|
||||
defer func() {
|
||||
if !featureconfig.Get().EnablePeerScorer || data.pid == "" {
|
||||
return
|
||||
}
|
||||
scorer := s.p2p.Peers().Scorers().BlockProviderScorer()
|
||||
if diff := s.chain.HeadSlot() - startSlot; diff > 0 {
|
||||
scorer.IncrementProcessedBlocks(data.pid, diff)
|
||||
}
|
||||
}()
|
||||
defer s.updatePeerScorerStats(data.pid, startSlot)
|
||||
|
||||
blockReceiver := s.chain.ReceiveBlockInitialSync
|
||||
batchReceiver := s.chain.ReceiveBlockBatch
|
||||
@@ -189,6 +132,21 @@ func (s *Service) processFetchedData(
|
||||
}
|
||||
}
|
||||
|
||||
// processFetchedData processes data received from queue.
|
||||
func (s *Service) processFetchedDataRegSync(
|
||||
ctx context.Context, genesis time.Time, startSlot uint64, data *blocksQueueFetchedData) {
|
||||
defer s.updatePeerScorerStats(data.pid, startSlot)
|
||||
|
||||
blockReceiver := s.chain.ReceiveBlock
|
||||
|
||||
for _, blk := range data.blocks {
|
||||
if err := s.processBlock(ctx, genesis, blk, blockReceiver); err != nil {
|
||||
log.WithError(err).Debug("Block is not processed")
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// highestFinalizedEpoch returns the absolute highest finalized epoch of all connected peers.
|
||||
// Note this can be lower than our finalized epoch if we have no peers or peers that are all behind us.
|
||||
func (s *Service) highestFinalizedEpoch() uint64 {
|
||||
@@ -312,3 +270,18 @@ func (s *Service) processBatchedBlocks(ctx context.Context, genesis time.Time,
|
||||
s.lastProcessedSlot = lastBlk.Block.Slot
|
||||
return nil
|
||||
}
|
||||
|
||||
// updatePeerScorerStats adjusts monitored metrics for a peer.
|
||||
func (s *Service) updatePeerScorerStats(pid peer.ID, startSlot uint64) {
|
||||
if !featureconfig.Get().EnablePeerScorer || pid == "" {
|
||||
return
|
||||
}
|
||||
headSlot := s.chain.HeadSlot()
|
||||
if startSlot >= headSlot {
|
||||
return
|
||||
}
|
||||
if diff := s.chain.HeadSlot() - startSlot; diff > 0 {
|
||||
scorer := s.p2p.Peers().Scorers().BlockProviderScorer()
|
||||
scorer.IncrementProcessedBlocks(pid, diff)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -201,17 +201,17 @@ func TestService_roundRobinSync(t *testing.T) {
|
||||
peers: []*peerData{
|
||||
{
|
||||
blocks: makeSequence(1, 384),
|
||||
finalizedEpoch: 4,
|
||||
finalizedEpoch: 10,
|
||||
headSlot: 320,
|
||||
},
|
||||
{
|
||||
blocks: makeSequence(1, 384),
|
||||
finalizedEpoch: 10,
|
||||
headSlot: 320,
|
||||
},
|
||||
{
|
||||
blocks: makeSequence(1, 256),
|
||||
finalizedEpoch: 3,
|
||||
headSlot: 256,
|
||||
},
|
||||
{
|
||||
blocks: makeSequence(1, 256),
|
||||
finalizedEpoch: 3,
|
||||
finalizedEpoch: 5,
|
||||
headSlot: 256,
|
||||
},
|
||||
{
|
||||
@@ -296,6 +296,9 @@ func TestService_roundRobinSync(t *testing.T) {
|
||||
State: st,
|
||||
Root: genesisRoot[:],
|
||||
DB: beaconDB,
|
||||
FinalizedCheckPoint: ð.Checkpoint{
|
||||
Epoch: 0,
|
||||
},
|
||||
} // no-op mock
|
||||
s := &Service{
|
||||
chain: mc,
|
||||
@@ -339,6 +342,9 @@ func TestService_processBlock(t *testing.T) {
|
||||
State: st,
|
||||
Root: genesisBlkRoot[:],
|
||||
DB: beaconDB,
|
||||
FinalizedCheckPoint: ð.Checkpoint{
|
||||
Epoch: 0,
|
||||
},
|
||||
},
|
||||
})
|
||||
ctx := context.Background()
|
||||
@@ -404,6 +410,9 @@ func TestService_processBlockBatch(t *testing.T) {
|
||||
State: st,
|
||||
Root: genesisBlkRoot[:],
|
||||
DB: beaconDB,
|
||||
FinalizedCheckPoint: ð.Checkpoint{
|
||||
Epoch: 0,
|
||||
},
|
||||
},
|
||||
})
|
||||
ctx := context.Background()
|
||||
@@ -533,6 +542,9 @@ func TestService_blockProviderScoring(t *testing.T) {
|
||||
State: st,
|
||||
Root: genesisRoot[:],
|
||||
DB: beaconDB,
|
||||
FinalizedCheckPoint: ð.Checkpoint{
|
||||
Epoch: 0,
|
||||
},
|
||||
} // no-op mock
|
||||
s := &Service{
|
||||
chain: mc,
|
||||
@@ -572,5 +584,5 @@ func TestService_blockProviderScoring(t *testing.T) {
|
||||
score3 := scorer.Score(peer3)
|
||||
assert.Equal(t, true, score1 < score3, "Incorrect score (%v) for peer: %v (must be lower than %v)", score1, peer1, score3)
|
||||
assert.Equal(t, true, score2 < score3, "Incorrect score (%v) for peer: %v (must be lower than %v)", score2, peer2, score3)
|
||||
assert.Equal(t, true, scorer.ProcessedBlocks(peer3) > 100, "Not enough blocks returned by healthy peer")
|
||||
assert.Equal(t, true, scorer.ProcessedBlocks(peer3) > 100, "Not enough blocks returned by healthy peer: %d", scorer.ProcessedBlocks(peer3))
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user