From 926d3b9b34417690effae1d98e2cfebe8bd849c0 Mon Sep 17 00:00:00 2001 From: Victor Farazdagi Date: Sun, 1 Nov 2020 00:33:57 +0300 Subject: [PATCH] Init-sync: more tests + minor refactoring (#7692) * cherry-pick commits * re-arrange calls --- .../sync/initial-sync/blocks_fetcher.go | 80 +++---- .../sync/initial-sync/blocks_fetcher_peers.go | 21 +- .../initial-sync/blocks_fetcher_peers_test.go | 13 +- .../sync/initial-sync/blocks_fetcher_test.go | 68 +++--- .../sync/initial-sync/blocks_fetcher_utils.go | 43 ++-- .../initial-sync/blocks_fetcher_utils_test.go | 160 +++++++++++++- .../sync/initial-sync/blocks_queue.go | 30 +-- .../sync/initial-sync/blocks_queue_test.go | 203 +++++++----------- beacon-chain/sync/initial-sync/round_robin.go | 8 +- 9 files changed, 371 insertions(+), 255 deletions(-) diff --git a/beacon-chain/sync/initial-sync/blocks_fetcher.go b/beacon-chain/sync/initial-sync/blocks_fetcher.go index 6f71eb137d..d5387a5b61 100644 --- a/beacon-chain/sync/initial-sync/blocks_fetcher.go +++ b/beacon-chain/sync/initial-sync/blocks_fetcher.go @@ -13,8 +13,7 @@ import ( "github.com/libp2p/go-libp2p-core/peer" "github.com/pkg/errors" eth "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" - "github.com/prysmaticlabs/prysm/beacon-chain/blockchain" - "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" + "github.com/prysmaticlabs/prysm/beacon-chain/db" "github.com/prysmaticlabs/prysm/beacon-chain/flags" "github.com/prysmaticlabs/prysm/beacon-chain/p2p" prysmsync "github.com/prysmaticlabs/prysm/beacon-chain/sync" @@ -56,9 +55,9 @@ var ( // blocksFetcherConfig is a config to setup the block fetcher. type blocksFetcherConfig struct { - headFetcher blockchain.HeadFetcher - finalizationFetcher blockchain.FinalizationFetcher + chain blockchainService p2p p2p.P2P + db db.ReadOnlyDatabase peerFilterCapacityWeight float64 mode syncMode } @@ -68,20 +67,20 @@ type blocksFetcherConfig struct { // among available peers (for fair network load distribution). type blocksFetcher struct { sync.Mutex - ctx context.Context - cancel context.CancelFunc - rand *rand.Rand - headFetcher blockchain.HeadFetcher - finalizationFetcher blockchain.FinalizationFetcher - p2p p2p.P2P - blocksPerSecond uint64 - rateLimiter *leakybucket.Collector - peerLocks map[peer.ID]*peerLock - fetchRequests chan *fetchRequestParams - fetchResponses chan *fetchRequestResponse - capacityWeight float64 // how remaining capacity affects peer selection - mode syncMode // allows to use fetcher in different sync scenarios - quit chan struct{} // termination notifier + ctx context.Context + cancel context.CancelFunc + rand *rand.Rand + chain blockchainService + p2p p2p.P2P + db db.ReadOnlyDatabase + blocksPerSecond uint64 + rateLimiter *leakybucket.Collector + peerLocks map[peer.ID]*peerLock + fetchRequests chan *fetchRequestParams + fetchResponses chan *fetchRequestResponse + capacityWeight float64 // how remaining capacity affects peer selection + mode syncMode // allows to use fetcher in different sync scenarios + quit chan struct{} // termination notifier } // peerLock restricts fetcher actions on per peer basis. Currently, used for rate limiting. @@ -122,20 +121,20 @@ func newBlocksFetcher(ctx context.Context, cfg *blocksFetcherConfig) *blocksFetc ctx, cancel := context.WithCancel(ctx) return &blocksFetcher{ - ctx: ctx, - cancel: cancel, - rand: rand.NewGenerator(), - headFetcher: cfg.headFetcher, - finalizationFetcher: cfg.finalizationFetcher, - p2p: cfg.p2p, - blocksPerSecond: uint64(blocksPerSecond), - rateLimiter: rateLimiter, - peerLocks: make(map[peer.ID]*peerLock), - fetchRequests: make(chan *fetchRequestParams, maxPendingRequests), - fetchResponses: make(chan *fetchRequestResponse, maxPendingRequests), - capacityWeight: capacityWeight, - mode: cfg.mode, - quit: make(chan struct{}), + ctx: ctx, + cancel: cancel, + rand: rand.NewGenerator(), + chain: cfg.chain, + p2p: cfg.p2p, + db: cfg.db, + blocksPerSecond: uint64(blocksPerSecond), + rateLimiter: rateLimiter, + peerLocks: make(map[peer.ID]*peerLock), + fetchRequests: make(chan *fetchRequestParams, maxPendingRequests), + fetchResponses: make(chan *fetchRequestResponse, maxPendingRequests), + capacityWeight: capacityWeight, + mode: cfg.mode, + quit: make(chan struct{}), } } @@ -252,15 +251,7 @@ func (f *blocksFetcher) handleRequest(ctx context.Context, start, count uint64) return response } - var targetEpoch uint64 - var peers []peer.ID - if f.mode == modeStopOnFinalizedEpoch { - headEpoch := f.finalizationFetcher.FinalizedCheckpt().Epoch - targetEpoch, peers = f.p2p.Peers().BestFinalized(params.BeaconConfig().MaxPeersToSync, headEpoch) - } else { - headEpoch := helpers.SlotToEpoch(f.headFetcher.HeadSlot()) - targetEpoch, peers = f.p2p.Peers().BestNonFinalized(flags.Get().MinimumSyncPeers, headEpoch) - } + _, targetEpoch, peers := f.calculateHeadAndTargetEpochs() if len(peers) == 0 { response.err = errNoPeersAvailable return response @@ -290,12 +281,7 @@ func (f *blocksFetcher) fetchBlocksFromPeer( defer span.End() var blocks []*eth.SignedBeaconBlock - var err error - if featureconfig.Get().EnablePeerScorer { - peers, err = f.filterScoredPeers(ctx, peers, peersPercentagePerRequest) - } else { - peers, err = f.filterPeers(peers, peersPercentagePerRequest) - } + peers, err := f.filterPeers(ctx, peers, peersPercentagePerRequest) if err != nil { return blocks, "", err } diff --git a/beacon-chain/sync/initial-sync/blocks_fetcher_peers.go b/beacon-chain/sync/initial-sync/blocks_fetcher_peers.go index 73fc2df613..6596c568c1 100644 --- a/beacon-chain/sync/initial-sync/blocks_fetcher_peers.go +++ b/beacon-chain/sync/initial-sync/blocks_fetcher_peers.go @@ -11,6 +11,7 @@ import ( "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" "github.com/prysmaticlabs/prysm/beacon-chain/flags" "github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers/scorers" + "github.com/prysmaticlabs/prysm/shared/featureconfig" "github.com/prysmaticlabs/prysm/shared/mathutil" "github.com/prysmaticlabs/prysm/shared/params" "github.com/prysmaticlabs/prysm/shared/timeutils" @@ -74,10 +75,10 @@ func (f *blocksFetcher) waitForMinimumPeers(ctx context.Context) ([]peer.ID, err } var peers []peer.ID if f.mode == modeStopOnFinalizedEpoch { - headEpoch := f.finalizationFetcher.FinalizedCheckpt().Epoch + headEpoch := f.chain.FinalizedCheckpt().Epoch _, peers = f.p2p.Peers().BestFinalized(params.BeaconConfig().MaxPeersToSync, headEpoch) } else { - headEpoch := helpers.SlotToEpoch(f.headFetcher.HeadSlot()) + headEpoch := helpers.SlotToEpoch(f.chain.HeadSlot()) _, peers = f.p2p.Peers().BestNonFinalized(flags.Get().MinimumSyncPeers, headEpoch) } if len(peers) >= required { @@ -90,9 +91,14 @@ func (f *blocksFetcher) waitForMinimumPeers(ctx context.Context) ([]peer.ID, err } } -// filterPeers returns transformed list of peers, -// weight ordered or randomized, constrained if necessary (only percentage of peers returned). -func (f *blocksFetcher) filterPeers(peers []peer.ID, peersPercentage float64) ([]peer.ID, error) { +// filterPeers returns transformed list of peers, weight ordered or randomized, constrained +// if necessary (when only percentage of peers returned). +// When peer scorer is enabled, fallbacks filterScoredPeers. +func (f *blocksFetcher) filterPeers(ctx context.Context, peers []peer.ID, peersPercentage float64) ([]peer.ID, error) { + if featureconfig.Get().EnablePeerScorer { + return f.filterScoredPeers(ctx, peers, peersPercentagePerRequest) + } + if len(peers) == 0 { return peers, nil } @@ -120,9 +126,8 @@ func (f *blocksFetcher) filterPeers(peers []peer.ID, peersPercentage float64) ([ return peers, nil } -// filterScoredPeers returns transformed list of peers, -// weight sorted by scores and capacity remaining. List can be constrained using peersPercentage, -// where only percentage of peers are returned. +// filterScoredPeers returns transformed list of peers, weight sorted by scores and capacity remaining. +// List can be further constrained using peersPercentage, where only percentage of peers are returned. func (f *blocksFetcher) filterScoredPeers(ctx context.Context, peers []peer.ID, peersPercentage float64) ([]peer.ID, error) { ctx, span := trace.StartSpan(ctx, "initialsync.filterScoredPeers") defer span.End() diff --git a/beacon-chain/sync/initial-sync/blocks_fetcher_peers_test.go b/beacon-chain/sync/initial-sync/blocks_fetcher_peers_test.go index a39caf7a0d..90ec019b77 100644 --- a/beacon-chain/sync/initial-sync/blocks_fetcher_peers_test.go +++ b/beacon-chain/sync/initial-sync/blocks_fetcher_peers_test.go @@ -12,6 +12,7 @@ import ( "github.com/libp2p/go-libp2p-core/peer" "github.com/prysmaticlabs/prysm/beacon-chain/flags" "github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers/scorers" + "github.com/prysmaticlabs/prysm/shared/featureconfig" "github.com/prysmaticlabs/prysm/shared/testutil/assert" "github.com/prysmaticlabs/prysm/shared/testutil/require" "github.com/prysmaticlabs/prysm/shared/timeutils" @@ -107,6 +108,11 @@ func TestBlocksFetcher_selectFailOverPeer(t *testing.T) { } func TestBlocksFetcher_filterPeers(t *testing.T) { + resetCfg := featureconfig.InitWithReset(&featureconfig.Flags{ + EnablePeerScorer: false, + }) + defer resetCfg() + type weightedPeer struct { peer.ID usedCapacity int64 @@ -175,7 +181,7 @@ func TestBlocksFetcher_filterPeers(t *testing.T) { pids = append(pids, pid.ID) fetcher.rateLimiter.Add(pid.ID.String(), pid.usedCapacity) } - got, err := fetcher.filterPeers(pids, tt.args.peersPercentage) + got, err := fetcher.filterPeers(context.Background(), pids, tt.args.peersPercentage) require.NoError(t, err) // Re-arrange peers with the same remaining capacity, deterministically . // They are deliberately shuffled - so that on the same capacity any of @@ -318,8 +324,7 @@ func TestBlocksFetcher_filterScoredPeers(t *testing.T) { t.Run(tt.name, func(t *testing.T) { mc, p2p, _ := initializeTestServices(t, []uint64{}, []*peerData{}) fetcher := newBlocksFetcher(context.Background(), &blocksFetcherConfig{ - finalizationFetcher: mc, - headFetcher: mc, + chain: mc, p2p: p2p, peerFilterCapacityWeight: tt.args.capacityWeight, }) @@ -340,7 +345,7 @@ func TestBlocksFetcher_filterScoredPeers(t *testing.T) { var filteredPIDs []peer.ID var err error for i := 0; i < 1000; i++ { - filteredPIDs, err = fetcher.filterScoredPeers(context.Background(), peerIDs, tt.args.peersPercentage) + filteredPIDs, err = fetcher.filterPeers(context.Background(), peerIDs, tt.args.peersPercentage) if len(filteredPIDs) <= 1 { break } diff --git a/beacon-chain/sync/initial-sync/blocks_fetcher_test.go b/beacon-chain/sync/initial-sync/blocks_fetcher_test.go index 0328c26a84..aa336e0ea1 100644 --- a/beacon-chain/sync/initial-sync/blocks_fetcher_test.go +++ b/beacon-chain/sync/initial-sync/blocks_fetcher_test.go @@ -35,9 +35,8 @@ func TestBlocksFetcher_InitStartStop(t *testing.T) { fetcher := newBlocksFetcher( ctx, &blocksFetcherConfig{ - headFetcher: mc, - finalizationFetcher: mc, - p2p: p2p, + chain: mc, + p2p: p2p, }, ) @@ -60,9 +59,8 @@ func TestBlocksFetcher_InitStartStop(t *testing.T) { fetcher := newBlocksFetcher( context.Background(), &blocksFetcherConfig{ - headFetcher: mc, - finalizationFetcher: mc, - p2p: p2p, + chain: mc, + p2p: p2p, }) require.NoError(t, fetcher.start()) fetcher.stop() @@ -74,14 +72,27 @@ func TestBlocksFetcher_InitStartStop(t *testing.T) { fetcher := newBlocksFetcher( ctx, &blocksFetcherConfig{ - headFetcher: mc, - finalizationFetcher: mc, - p2p: p2p, + chain: mc, + p2p: p2p, }) require.NoError(t, fetcher.start()) cancel() fetcher.stop() }) + + t.Run("peer filter capacity weight", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + fetcher := newBlocksFetcher( + ctx, + &blocksFetcherConfig{ + chain: mc, + p2p: p2p, + peerFilterCapacityWeight: 2, + }) + require.NoError(t, fetcher.start()) + assert.Equal(t, peerFilterCapacityWeight, fetcher.capacityWeight) + }) } func TestBlocksFetcher_RoundRobin(t *testing.T) { @@ -268,9 +279,8 @@ func TestBlocksFetcher_RoundRobin(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) fetcher := newBlocksFetcher(ctx, &blocksFetcherConfig{ - headFetcher: mc, - finalizationFetcher: mc, - p2p: p, + chain: mc, + p2p: p, }) require.NoError(t, fetcher.start()) @@ -358,13 +368,24 @@ func TestBlocksFetcher_scheduleRequest(t *testing.T) { blockBatchLimit := uint64(flags.Get().BlockBatchLimit) t.Run("context cancellation", func(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) - fetcher := newBlocksFetcher(ctx, &blocksFetcherConfig{ - headFetcher: nil, - p2p: nil, - }) + fetcher := newBlocksFetcher(ctx, &blocksFetcherConfig{}) cancel() assert.ErrorContains(t, "context canceled", fetcher.scheduleRequest(ctx, 1, blockBatchLimit)) }) + + t.Run("unblock on context cancellation", func(t *testing.T) { + fetcher := newBlocksFetcher(context.Background(), &blocksFetcherConfig{}) + for i := 0; i < maxPendingRequests; i++ { + assert.NoError(t, fetcher.scheduleRequest(context.Background(), 1, blockBatchLimit)) + } + + // Will block on next request (and wait until requests are either processed or context is closed). + go func() { + fetcher.cancel() + }() + assert.ErrorContains(t, errFetcherCtxIsDone.Error(), + fetcher.scheduleRequest(context.Background(), 1, blockBatchLimit)) + }) } func TestBlocksFetcher_handleRequest(t *testing.T) { blockBatchLimit := uint64(flags.Get().BlockBatchLimit) @@ -392,9 +413,8 @@ func TestBlocksFetcher_handleRequest(t *testing.T) { t.Run("context cancellation", func(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) fetcher := newBlocksFetcher(ctx, &blocksFetcherConfig{ - headFetcher: mc, - finalizationFetcher: mc, - p2p: p2p, + chain: mc, + p2p: p2p, }) cancel() @@ -406,9 +426,8 @@ func TestBlocksFetcher_handleRequest(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() fetcher := newBlocksFetcher(ctx, &blocksFetcherConfig{ - headFetcher: mc, - finalizationFetcher: mc, - p2p: p2p, + chain: mc, + p2p: p2p, }) requestCtx, reqCancel := context.WithTimeout(context.Background(), 2*time.Second) @@ -477,9 +496,8 @@ func TestBlocksFetcher_requestBeaconBlocksByRange(t *testing.T) { fetcher := newBlocksFetcher( ctx, &blocksFetcherConfig{ - finalizationFetcher: mc, - headFetcher: mc, - p2p: p2p, + chain: mc, + p2p: p2p, }) _, peerIDs := p2p.Peers().BestFinalized(params.BeaconConfig().MaxPeersToSync, helpers.SlotToEpoch(mc.HeadSlot())) diff --git a/beacon-chain/sync/initial-sync/blocks_fetcher_utils.go b/beacon-chain/sync/initial-sync/blocks_fetcher_utils.go index 620d941589..460d92eb23 100644 --- a/beacon-chain/sync/initial-sync/blocks_fetcher_utils.go +++ b/beacon-chain/sync/initial-sync/blocks_fetcher_utils.go @@ -8,7 +8,6 @@ import ( "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" "github.com/prysmaticlabs/prysm/beacon-chain/flags" p2ppb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" - "github.com/prysmaticlabs/prysm/shared/featureconfig" "github.com/prysmaticlabs/prysm/shared/params" "github.com/sirupsen/logrus" "go.opencensus.io/trace" @@ -23,30 +22,20 @@ func (f *blocksFetcher) nonSkippedSlotAfter(ctx context.Context, slot uint64) (u ctx, span := trace.StartSpan(ctx, "initialsync.nonSkippedSlotAfter") defer span.End() - var targetEpoch, headEpoch uint64 - var peers []peer.ID - if f.mode == modeStopOnFinalizedEpoch { - headEpoch = f.finalizationFetcher.FinalizedCheckpt().Epoch - targetEpoch, peers = f.p2p.Peers().BestFinalized(params.BeaconConfig().MaxPeersToSync, headEpoch) - } else { - headEpoch = helpers.SlotToEpoch(f.headFetcher.HeadSlot()) - targetEpoch, peers = f.p2p.Peers().BestNonFinalized(flags.Get().MinimumSyncPeers, headEpoch) - } + headEpoch, targetEpoch, peers := f.calculateHeadAndTargetEpochs() log.WithFields(logrus.Fields{ "start": slot, "headEpoch": headEpoch, "targetEpoch": targetEpoch, }).Debug("Searching for non-skipped slot") - // Exit early, if no peers with high enough finalized epoch are found. + + // Exit early if no peers with epoch higher than our known head are found. if targetEpoch <= headEpoch { return 0, errSlotIsTooHigh } - var err error - if featureconfig.Get().EnablePeerScorer { - peers, err = f.filterScoredPeers(ctx, peers, peersPercentagePerRequest) - } else { - peers, err = f.filterPeers(peers, peersPercentagePerRequest) - } + + // Transform peer list to avoid eclipsing (filter, shuffle, trim). + peers, err := f.filterPeers(ctx, peers, peersPercentagePerRequest) if err != nil { return 0, err } @@ -135,13 +124,29 @@ func (f *blocksFetcher) nonSkippedSlotAfter(ctx context.Context, slot uint64) (u // bestFinalizedSlot returns the highest finalized slot of the majority of connected peers. func (f *blocksFetcher) bestFinalizedSlot() uint64 { - finalizedEpoch, _ := f.p2p.Peers().BestFinalized(params.BeaconConfig().MaxPeersToSync, f.finalizationFetcher.FinalizedCheckpt().Epoch) + finalizedEpoch, _ := f.p2p.Peers().BestFinalized( + params.BeaconConfig().MaxPeersToSync, f.chain.FinalizedCheckpt().Epoch) return finalizedEpoch * params.BeaconConfig().SlotsPerEpoch } // bestNonFinalizedSlot returns the highest non-finalized slot of enough number of connected peers. func (f *blocksFetcher) bestNonFinalizedSlot() uint64 { - headEpoch := helpers.SlotToEpoch(f.headFetcher.HeadSlot()) + headEpoch := helpers.SlotToEpoch(f.chain.HeadSlot()) targetEpoch, _ := f.p2p.Peers().BestNonFinalized(flags.Get().MinimumSyncPeers*2, headEpoch) return targetEpoch * params.BeaconConfig().SlotsPerEpoch } + +// calculateHeadAndTargetEpochs return node's current head epoch, along with the best known target +// epoch. For the latter peers supporting that target epoch are returned as well. +func (f *blocksFetcher) calculateHeadAndTargetEpochs() (uint64, uint64, []peer.ID) { + var targetEpoch, headEpoch uint64 + var peers []peer.ID + if f.mode == modeStopOnFinalizedEpoch { + headEpoch = f.chain.FinalizedCheckpt().Epoch + targetEpoch, peers = f.p2p.Peers().BestFinalized(params.BeaconConfig().MaxPeersToSync, headEpoch) + } else { + headEpoch = helpers.SlotToEpoch(f.chain.HeadSlot()) + targetEpoch, peers = f.p2p.Peers().BestNonFinalized(flags.Get().MinimumSyncPeers, headEpoch) + } + return headEpoch, targetEpoch, peers +} diff --git a/beacon-chain/sync/initial-sync/blocks_fetcher_utils_test.go b/beacon-chain/sync/initial-sync/blocks_fetcher_utils_test.go index 47b04eb363..040070b080 100644 --- a/beacon-chain/sync/initial-sync/blocks_fetcher_utils_test.go +++ b/beacon-chain/sync/initial-sync/blocks_fetcher_utils_test.go @@ -6,7 +6,11 @@ import ( "testing" "github.com/kevinms/leakybucket-go" + eth "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" + p2pt "github.com/prysmaticlabs/prysm/beacon-chain/p2p/testing" + "github.com/prysmaticlabs/prysm/shared/params" "github.com/prysmaticlabs/prysm/shared/testutil/assert" + "github.com/prysmaticlabs/prysm/shared/testutil/require" ) func TestBlocksFetcher_nonSkippedSlotAfter(t *testing.T) { @@ -38,9 +42,8 @@ func TestBlocksFetcher_nonSkippedSlotAfter(t *testing.T) { fetcher := newBlocksFetcher( ctx, &blocksFetcherConfig{ - finalizationFetcher: mc, - headFetcher: mc, - p2p: p2p, + chain: mc, + p2p: p2p, }, ) fetcher.rateLimiter = leakybucket.NewCollector(6400, 6400, false) @@ -84,7 +87,156 @@ func TestBlocksFetcher_nonSkippedSlotAfter(t *testing.T) { if !found { t.Errorf("Isolated non-skipped slot not found in %d iterations: %v", i, expectedSlot) } else { - t.Logf("Isolated non-skipped slot found in %d iterations", i) + log.Debugf("Isolated non-skipped slot found in %d iterations", i) } }) + + t.Run("no peers with higher target epoch available", func(t *testing.T) { + peers := []*peerData{ + {finalizedEpoch: 3, headSlot: 160}, + {finalizedEpoch: 3, headSlot: 160}, + {finalizedEpoch: 3, headSlot: 160}, + {finalizedEpoch: 8, headSlot: 320}, + {finalizedEpoch: 8, headSlot: 320}, + {finalizedEpoch: 10, headSlot: 320}, + {finalizedEpoch: 10, headSlot: 640}, + } + p2p := p2pt.NewTestP2P(t) + connectPeers(t, p2p, peers, p2p.Peers()) + fetcher := newBlocksFetcher( + ctx, + &blocksFetcherConfig{ + chain: mc, + p2p: p2p, + }, + ) + mc.FinalizedCheckPoint = ð.Checkpoint{ + Epoch: 10, + } + require.NoError(t, mc.State.SetSlot(12*params.BeaconConfig().SlotsPerEpoch)) + + fetcher.mode = modeStopOnFinalizedEpoch + slot, err := fetcher.nonSkippedSlotAfter(ctx, 160) + assert.ErrorContains(t, errSlotIsTooHigh.Error(), err) + assert.Equal(t, uint64(0), slot) + + fetcher.mode = modeNonConstrained + require.NoError(t, mc.State.SetSlot(20*params.BeaconConfig().SlotsPerEpoch)) + slot, err = fetcher.nonSkippedSlotAfter(ctx, 160) + assert.ErrorContains(t, errSlotIsTooHigh.Error(), err) + assert.Equal(t, uint64(0), slot) + }) +} + +func TestBlocksFetcher_currentHeadAndTargetEpochs(t *testing.T) { + tests := []struct { + name string + syncMode syncMode + peers []*peerData + ourFinalizedEpoch uint64 + ourHeadSlot uint64 + expectedHeadEpoch uint64 + targetEpoch uint64 + targetEpochSupport int + }{ + { + name: "ignore lower epoch peers in best finalized", + syncMode: modeStopOnFinalizedEpoch, + ourHeadSlot: 5 * params.BeaconConfig().SlotsPerEpoch, + expectedHeadEpoch: 4, + ourFinalizedEpoch: 4, + targetEpoch: 10, + targetEpochSupport: 3, + peers: []*peerData{ + {finalizedEpoch: 3, headSlot: 160}, + {finalizedEpoch: 3, headSlot: 160}, + {finalizedEpoch: 3, headSlot: 160}, + {finalizedEpoch: 3, headSlot: 160}, + {finalizedEpoch: 3, headSlot: 160}, + {finalizedEpoch: 8, headSlot: 320}, + {finalizedEpoch: 8, headSlot: 320}, + {finalizedEpoch: 10, headSlot: 320}, + {finalizedEpoch: 10, headSlot: 320}, + {finalizedEpoch: 10, headSlot: 320}, + }, + }, + { + name: "resolve ties in best finalized", + syncMode: modeStopOnFinalizedEpoch, + ourHeadSlot: 5 * params.BeaconConfig().SlotsPerEpoch, + expectedHeadEpoch: 4, + ourFinalizedEpoch: 4, + targetEpoch: 10, + targetEpochSupport: 3, + peers: []*peerData{ + {finalizedEpoch: 3, headSlot: 160}, + {finalizedEpoch: 3, headSlot: 160}, + {finalizedEpoch: 3, headSlot: 160}, + {finalizedEpoch: 3, headSlot: 160}, + {finalizedEpoch: 3, headSlot: 160}, + {finalizedEpoch: 8, headSlot: 320}, + {finalizedEpoch: 8, headSlot: 320}, + {finalizedEpoch: 8, headSlot: 320}, + {finalizedEpoch: 10, headSlot: 320}, + {finalizedEpoch: 10, headSlot: 320}, + {finalizedEpoch: 10, headSlot: 320}, + }, + }, + { + name: "best non-finalized", + syncMode: modeNonConstrained, + ourHeadSlot: 5 * params.BeaconConfig().SlotsPerEpoch, + expectedHeadEpoch: 5, + ourFinalizedEpoch: 4, + targetEpoch: 20, + targetEpochSupport: 1, + peers: []*peerData{ + {finalizedEpoch: 3, headSlot: 160}, + {finalizedEpoch: 3, headSlot: 160}, + {finalizedEpoch: 3, headSlot: 160}, + {finalizedEpoch: 3, headSlot: 160}, + {finalizedEpoch: 3, headSlot: 160}, + {finalizedEpoch: 8, headSlot: 320}, + {finalizedEpoch: 8, headSlot: 320}, + {finalizedEpoch: 10, headSlot: 320}, + {finalizedEpoch: 10, headSlot: 320}, + {finalizedEpoch: 10, headSlot: 320}, + {finalizedEpoch: 15, headSlot: 640}, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mc, p2p, _ := initializeTestServices(t, []uint64{}, tt.peers) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + fetcher := newBlocksFetcher( + ctx, + &blocksFetcherConfig{ + chain: mc, + p2p: p2p, + }, + ) + mc.FinalizedCheckPoint = ð.Checkpoint{ + Epoch: tt.ourFinalizedEpoch, + } + require.NoError(t, mc.State.SetSlot(tt.ourHeadSlot)) + fetcher.mode = tt.syncMode + + // Head and target epochs calculation. + headEpoch, targetEpoch, peers := fetcher.calculateHeadAndTargetEpochs() + assert.Equal(t, tt.expectedHeadEpoch, headEpoch, "Unexpected head epoch") + assert.Equal(t, tt.targetEpoch, targetEpoch, "Unexpected target epoch") + assert.Equal(t, tt.targetEpochSupport, len(peers), "Unexpected number of peers supporting target epoch") + + // Best finalized and non-finalized slots. + finalizedSlot := tt.targetEpoch * params.BeaconConfig().SlotsPerEpoch + if tt.syncMode == modeStopOnFinalizedEpoch { + assert.Equal(t, finalizedSlot, fetcher.bestFinalizedSlot(), "Unexpected finalized slot") + } else { + assert.Equal(t, finalizedSlot, fetcher.bestNonFinalizedSlot(), "Unexpected non-finalized slot") + } + }) + } } diff --git a/beacon-chain/sync/initial-sync/blocks_queue.go b/beacon-chain/sync/initial-sync/blocks_queue.go index 12b7037b05..21a7343ea9 100644 --- a/beacon-chain/sync/initial-sync/blocks_queue.go +++ b/beacon-chain/sync/initial-sync/blocks_queue.go @@ -7,8 +7,8 @@ import ( "github.com/libp2p/go-libp2p-core/peer" eth "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" - "github.com/prysmaticlabs/prysm/beacon-chain/blockchain" "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" + "github.com/prysmaticlabs/prysm/beacon-chain/db" "github.com/prysmaticlabs/prysm/beacon-chain/p2p" "github.com/sirupsen/logrus" ) @@ -49,11 +49,11 @@ type syncMode uint8 // blocksQueueConfig is a config to setup block queue service. type blocksQueueConfig struct { blocksFetcher *blocksFetcher - headFetcher blockchain.HeadFetcher - finalizationFetcher blockchain.FinalizationFetcher + chain blockchainService startSlot uint64 highestExpectedSlot uint64 p2p p2p.P2P + db db.ReadOnlyDatabase mode syncMode } @@ -64,7 +64,7 @@ type blocksQueue struct { cancel context.CancelFunc smm *stateMachineManager blocksFetcher *blocksFetcher - headFetcher blockchain.HeadFetcher + chain blockchainService highestExpectedSlot uint64 mode syncMode exitConditions struct { @@ -87,9 +87,9 @@ func newBlocksQueue(ctx context.Context, cfg *blocksQueueConfig) *blocksQueue { blocksFetcher := cfg.blocksFetcher if blocksFetcher == nil { blocksFetcher = newBlocksFetcher(ctx, &blocksFetcherConfig{ - headFetcher: cfg.headFetcher, - finalizationFetcher: cfg.finalizationFetcher, - p2p: cfg.p2p, + chain: cfg.chain, + p2p: cfg.p2p, + db: cfg.db, }) } highestExpectedSlot := cfg.highestExpectedSlot @@ -109,7 +109,7 @@ func newBlocksQueue(ctx context.Context, cfg *blocksQueueConfig) *blocksQueue { cancel: cancel, highestExpectedSlot: highestExpectedSlot, blocksFetcher: blocksFetcher, - headFetcher: cfg.headFetcher, + chain: cfg.chain, mode: cfg.mode, fetchedData: make(chan *blocksQueueFetchedData, 1), quit: make(chan struct{}), @@ -162,7 +162,7 @@ func (q *blocksQueue) loop() { } // Define initial state machines. - startSlot := q.headFetcher.HeadSlot() + startSlot := q.chain.HeadSlot() blocksPerRequest := q.blocksFetcher.blocksPerSecond for i := startSlot; i < startSlot+blocksPerRequest*lookaheadSteps; i += blocksPerRequest { q.smm.addStateMachine(i) @@ -172,7 +172,7 @@ func (q *blocksQueue) loop() { defer ticker.Stop() for { // Check highest expected slot when we approach chain's head slot. - if q.headFetcher.HeadSlot() >= q.highestExpectedSlot { + if q.chain.HeadSlot() >= q.highestExpectedSlot { // By the time initial sync is complete, highest slot may increase, re-check. if q.mode == modeStopOnFinalizedEpoch { if q.highestExpectedSlot < q.blocksFetcher.bestFinalizedSlot() { @@ -191,7 +191,7 @@ func (q *blocksQueue) loop() { log.WithFields(logrus.Fields{ "highestExpectedSlot": q.highestExpectedSlot, - "headSlot": q.headFetcher.HeadSlot(), + "headSlot": q.chain.HeadSlot(), "state": q.smm.String(), }).Trace("tick") @@ -221,7 +221,7 @@ func (q *blocksQueue) loop() { } } // Do garbage collection, and advance sliding window forward. - if q.headFetcher.HeadSlot() >= fsm.start+blocksPerRequest-1 { + if q.chain.HeadSlot() >= fsm.start+blocksPerRequest-1 { highestStartSlot, err := q.smm.highestStartSlot() if err != nil { log.WithError(err).Debug("Cannot obtain highest epoch state number") @@ -389,17 +389,17 @@ func (q *blocksQueue) onProcessSkippedEvent(ctx context.Context) eventHandlerFn // Check if we have enough peers to progress, or sync needs to halt (due to no peers available). if q.mode == modeStopOnFinalizedEpoch { - if q.blocksFetcher.bestFinalizedSlot() <= q.headFetcher.HeadSlot() { + if q.blocksFetcher.bestFinalizedSlot() <= q.chain.HeadSlot() { return stateSkipped, errNoRequiredPeers } } else { - if q.blocksFetcher.bestNonFinalizedSlot() <= q.headFetcher.HeadSlot() { + if q.blocksFetcher.bestNonFinalizedSlot() <= q.chain.HeadSlot() { return stateSkipped, errNoRequiredPeers } } // Shift start position of all the machines except for the last one. - startSlot := q.headFetcher.HeadSlot() + 1 + startSlot := q.chain.HeadSlot() + 1 blocksPerRequest := q.blocksFetcher.blocksPerSecond if err := q.smm.removeAllStateMachines(); err != nil { return stateSkipped, err diff --git a/beacon-chain/sync/initial-sync/blocks_queue_test.go b/beacon-chain/sync/initial-sync/blocks_queue_test.go index 4e9947093c..a71cf7d075 100644 --- a/beacon-chain/sync/initial-sync/blocks_queue_test.go +++ b/beacon-chain/sync/initial-sync/blocks_queue_test.go @@ -25,17 +25,15 @@ func TestBlocksQueue_InitStartStop(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() fetcher := newBlocksFetcher(ctx, &blocksFetcherConfig{ - headFetcher: mc, - finalizationFetcher: mc, - p2p: p2p, + chain: mc, + p2p: p2p, }) t.Run("stop without start", func(t *testing.T) { ctx, cancel := context.WithCancel(ctx) defer cancel() queue := newBlocksQueue(ctx, &blocksQueueConfig{ - headFetcher: mc, - finalizationFetcher: mc, + chain: mc, highestExpectedSlot: blockBatchLimit, }) assert.ErrorContains(t, errQueueTakesTooLongToStop.Error(), queue.stop()) @@ -45,8 +43,7 @@ func TestBlocksQueue_InitStartStop(t *testing.T) { ctx, cancel := context.WithCancel(ctx) defer cancel() queue := newBlocksQueue(ctx, &blocksQueueConfig{ - headFetcher: mc, - finalizationFetcher: mc, + chain: mc, highestExpectedSlot: blockBatchLimit, }) assert.NoError(t, queue.start()) @@ -56,8 +53,7 @@ func TestBlocksQueue_InitStartStop(t *testing.T) { ctx, cancel := context.WithCancel(ctx) defer cancel() queue := newBlocksQueue(ctx, &blocksQueueConfig{ - headFetcher: mc, - finalizationFetcher: mc, + chain: mc, highestExpectedSlot: blockBatchLimit, }) assert.NoError(t, queue.start()) @@ -69,8 +65,7 @@ func TestBlocksQueue_InitStartStop(t *testing.T) { defer cancel() queue := newBlocksQueue(ctx, &blocksQueueConfig{ blocksFetcher: fetcher, - headFetcher: mc, - finalizationFetcher: mc, + chain: mc, highestExpectedSlot: blockBatchLimit, }) @@ -94,8 +89,7 @@ func TestBlocksQueue_InitStartStop(t *testing.T) { defer cancel() queue := newBlocksQueue(ctx, &blocksQueueConfig{ blocksFetcher: fetcher, - headFetcher: mc, - finalizationFetcher: mc, + chain: mc, highestExpectedSlot: blockBatchLimit, }) assert.NoError(t, queue.start()) @@ -108,8 +102,7 @@ func TestBlocksQueue_InitStartStop(t *testing.T) { defer cancel() queue := newBlocksQueue(ctx, &blocksQueueConfig{ blocksFetcher: fetcher, - headFetcher: mc, - finalizationFetcher: mc, + chain: mc, highestExpectedSlot: blockBatchLimit, }) assert.NoError(t, queue.start()) @@ -121,8 +114,7 @@ func TestBlocksQueue_InitStartStop(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) queue := newBlocksQueue(ctx, &blocksQueueConfig{ blocksFetcher: fetcher, - headFetcher: mc, - finalizationFetcher: mc, + chain: mc, highestExpectedSlot: blockBatchLimit, }) assert.NoError(t, queue.start()) @@ -138,15 +130,13 @@ func TestBlocksQueue_InitStartStop(t *testing.T) { {blocks: makeSequence(500, 628), finalizedEpoch: 16, headSlot: 600}, }, p.Peers()) fetcher := newBlocksFetcher(ctx, &blocksFetcherConfig{ - headFetcher: mc, - finalizationFetcher: mc, - p2p: p, + chain: mc, + p2p: p, }) // Mode 1: stop on finalized. queue := newBlocksQueue(ctx, &blocksQueueConfig{ blocksFetcher: fetcher, - headFetcher: mc, - finalizationFetcher: mc, + chain: mc, highestExpectedSlot: blockBatchLimit, startSlot: 128, }) @@ -154,8 +144,7 @@ func TestBlocksQueue_InitStartStop(t *testing.T) { // Mode 2: unconstrained. queue = newBlocksQueue(ctx, &blocksQueueConfig{ blocksFetcher: fetcher, - headFetcher: mc, - finalizationFetcher: mc, + chain: mc, highestExpectedSlot: blockBatchLimit, startSlot: 128, mode: modeNonConstrained, @@ -276,14 +265,12 @@ func TestBlocksQueue_Loop(t *testing.T) { defer cancel() fetcher := newBlocksFetcher(ctx, &blocksFetcherConfig{ - headFetcher: mc, - finalizationFetcher: mc, - p2p: p2p, + chain: mc, + p2p: p2p, }) queue := newBlocksQueue(ctx, &blocksQueueConfig{ blocksFetcher: fetcher, - headFetcher: mc, - finalizationFetcher: mc, + chain: mc, highestExpectedSlot: tt.highestExpectedSlot, }) assert.NoError(t, queue.start()) @@ -310,9 +297,9 @@ func TestBlocksQueue_Loop(t *testing.T) { assert.NoError(t, queue.stop()) - if queue.headFetcher.HeadSlot() < tt.highestExpectedSlot { + if queue.chain.HeadSlot() < tt.highestExpectedSlot { t.Errorf("Not enough slots synced, want: %v, got: %v", - len(tt.expectedBlockSlots), queue.headFetcher.HeadSlot()) + len(tt.expectedBlockSlots), queue.chain.HeadSlot()) } assert.Equal(t, len(tt.expectedBlockSlots), len(blocks), "Processes wrong number of blocks") var receivedBlockSlots []uint64 @@ -334,17 +321,15 @@ func TestBlocksQueue_onScheduleEvent(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() fetcher := newBlocksFetcher(ctx, &blocksFetcherConfig{ - headFetcher: mc, - finalizationFetcher: mc, - p2p: p2p, + chain: mc, + p2p: p2p, }) t.Run("expired context", func(t *testing.T) { ctx, cancel := context.WithCancel(ctx) queue := newBlocksQueue(ctx, &blocksQueueConfig{ blocksFetcher: fetcher, - headFetcher: mc, - finalizationFetcher: mc, + chain: mc, highestExpectedSlot: blockBatchLimit, }) handlerFn := queue.onScheduleEvent(ctx) @@ -359,8 +344,7 @@ func TestBlocksQueue_onScheduleEvent(t *testing.T) { t.Run("invalid input state", func(t *testing.T) { queue := newBlocksQueue(ctx, &blocksQueueConfig{ blocksFetcher: fetcher, - headFetcher: mc, - finalizationFetcher: mc, + chain: mc, highestExpectedSlot: blockBatchLimit, }) @@ -380,8 +364,7 @@ func TestBlocksQueue_onScheduleEvent(t *testing.T) { t.Run("slot is too high", func(t *testing.T) { queue := newBlocksQueue(ctx, &blocksQueueConfig{ blocksFetcher: fetcher, - headFetcher: mc, - finalizationFetcher: mc, + chain: mc, highestExpectedSlot: blockBatchLimit, }) @@ -397,8 +380,7 @@ func TestBlocksQueue_onScheduleEvent(t *testing.T) { t.Run("fetcher fails scheduling", func(t *testing.T) { queue := newBlocksQueue(ctx, &blocksQueueConfig{ blocksFetcher: fetcher, - headFetcher: mc, - finalizationFetcher: mc, + chain: mc, highestExpectedSlot: blockBatchLimit, }) // Cancel to make fetcher spit error when trying to schedule next FSM. @@ -415,8 +397,7 @@ func TestBlocksQueue_onScheduleEvent(t *testing.T) { t.Run("schedule next fetch ok", func(t *testing.T) { queue := newBlocksQueue(ctx, &blocksQueueConfig{ blocksFetcher: fetcher, - headFetcher: mc, - finalizationFetcher: mc, + chain: mc, highestExpectedSlot: blockBatchLimit, }) handlerFn := queue.onScheduleEvent(ctx) @@ -435,17 +416,15 @@ func TestBlocksQueue_onDataReceivedEvent(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() fetcher := newBlocksFetcher(ctx, &blocksFetcherConfig{ - headFetcher: mc, - finalizationFetcher: mc, - p2p: p2p, + chain: mc, + p2p: p2p, }) t.Run("expired context", func(t *testing.T) { ctx, cancel := context.WithCancel(ctx) queue := newBlocksQueue(ctx, &blocksQueueConfig{ blocksFetcher: fetcher, - headFetcher: mc, - finalizationFetcher: mc, + chain: mc, highestExpectedSlot: blockBatchLimit, }) handlerFn := queue.onDataReceivedEvent(ctx) @@ -460,8 +439,7 @@ func TestBlocksQueue_onDataReceivedEvent(t *testing.T) { t.Run("invalid input state", func(t *testing.T) { queue := newBlocksQueue(ctx, &blocksQueueConfig{ blocksFetcher: fetcher, - headFetcher: mc, - finalizationFetcher: mc, + chain: mc, highestExpectedSlot: blockBatchLimit, }) @@ -481,8 +459,7 @@ func TestBlocksQueue_onDataReceivedEvent(t *testing.T) { t.Run("invalid input param", func(t *testing.T) { queue := newBlocksQueue(ctx, &blocksQueueConfig{ blocksFetcher: fetcher, - headFetcher: mc, - finalizationFetcher: mc, + chain: mc, highestExpectedSlot: blockBatchLimit, }) @@ -497,8 +474,7 @@ func TestBlocksQueue_onDataReceivedEvent(t *testing.T) { t.Run("slot is too high do nothing", func(t *testing.T) { queue := newBlocksQueue(ctx, &blocksQueueConfig{ blocksFetcher: fetcher, - headFetcher: mc, - finalizationFetcher: mc, + chain: mc, highestExpectedSlot: blockBatchLimit, }) @@ -516,8 +492,7 @@ func TestBlocksQueue_onDataReceivedEvent(t *testing.T) { t.Run("slot is too high force re-request on previous epoch", func(t *testing.T) { queue := newBlocksQueue(ctx, &blocksQueueConfig{ blocksFetcher: fetcher, - headFetcher: mc, - finalizationFetcher: mc, + chain: mc, highestExpectedSlot: blockBatchLimit, }) @@ -542,8 +517,7 @@ func TestBlocksQueue_onDataReceivedEvent(t *testing.T) { t.Run("invalid data returned", func(t *testing.T) { queue := newBlocksQueue(ctx, &blocksQueueConfig{ blocksFetcher: fetcher, - headFetcher: mc, - finalizationFetcher: mc, + chain: mc, highestExpectedSlot: blockBatchLimit, }) @@ -564,8 +538,7 @@ func TestBlocksQueue_onDataReceivedEvent(t *testing.T) { t.Run("transition ok", func(t *testing.T) { queue := newBlocksQueue(ctx, &blocksQueueConfig{ blocksFetcher: fetcher, - headFetcher: mc, - finalizationFetcher: mc, + chain: mc, highestExpectedSlot: blockBatchLimit, }) @@ -597,17 +570,15 @@ func TestBlocksQueue_onReadyToSendEvent(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() fetcher := newBlocksFetcher(ctx, &blocksFetcherConfig{ - headFetcher: mc, - finalizationFetcher: mc, - p2p: p2p, + chain: mc, + p2p: p2p, }) t.Run("expired context", func(t *testing.T) { ctx, cancel := context.WithCancel(ctx) queue := newBlocksQueue(ctx, &blocksQueueConfig{ blocksFetcher: fetcher, - headFetcher: mc, - finalizationFetcher: mc, + chain: mc, highestExpectedSlot: blockBatchLimit, }) handlerFn := queue.onReadyToSendEvent(ctx) @@ -622,8 +593,7 @@ func TestBlocksQueue_onReadyToSendEvent(t *testing.T) { t.Run("invalid input state", func(t *testing.T) { queue := newBlocksQueue(ctx, &blocksQueueConfig{ blocksFetcher: fetcher, - headFetcher: mc, - finalizationFetcher: mc, + chain: mc, highestExpectedSlot: blockBatchLimit, }) @@ -643,8 +613,7 @@ func TestBlocksQueue_onReadyToSendEvent(t *testing.T) { t.Run("no blocks to send", func(t *testing.T) { queue := newBlocksQueue(ctx, &blocksQueueConfig{ blocksFetcher: fetcher, - headFetcher: mc, - finalizationFetcher: mc, + chain: mc, highestExpectedSlot: blockBatchLimit, }) @@ -659,14 +628,12 @@ func TestBlocksQueue_onReadyToSendEvent(t *testing.T) { t.Run("send from the first machine", func(t *testing.T) { fetcher := newBlocksFetcher(ctx, &blocksFetcherConfig{ - headFetcher: mc, - finalizationFetcher: mc, - p2p: p2p, + chain: mc, + p2p: p2p, }) queue := newBlocksQueue(ctx, &blocksQueueConfig{ blocksFetcher: fetcher, - headFetcher: mc, - finalizationFetcher: mc, + chain: mc, highestExpectedSlot: blockBatchLimit, }) queue.smm.addStateMachine(256) @@ -686,14 +653,12 @@ func TestBlocksQueue_onReadyToSendEvent(t *testing.T) { t.Run("previous machines are not processed - do not send", func(t *testing.T) { fetcher := newBlocksFetcher(ctx, &blocksFetcherConfig{ - headFetcher: mc, - finalizationFetcher: mc, - p2p: p2p, + chain: mc, + p2p: p2p, }) queue := newBlocksQueue(ctx, &blocksQueueConfig{ blocksFetcher: fetcher, - headFetcher: mc, - finalizationFetcher: mc, + chain: mc, highestExpectedSlot: blockBatchLimit, }) queue.smm.addStateMachine(128) @@ -719,14 +684,12 @@ func TestBlocksQueue_onReadyToSendEvent(t *testing.T) { t.Run("previous machines are processed - send", func(t *testing.T) { fetcher := newBlocksFetcher(ctx, &blocksFetcherConfig{ - headFetcher: mc, - finalizationFetcher: mc, - p2p: p2p, + chain: mc, + p2p: p2p, }) queue := newBlocksQueue(ctx, &blocksQueueConfig{ blocksFetcher: fetcher, - headFetcher: mc, - finalizationFetcher: mc, + chain: mc, highestExpectedSlot: blockBatchLimit, }) queue.smm.addStateMachine(256) @@ -752,17 +715,15 @@ func TestBlocksQueue_onProcessSkippedEvent(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() fetcher := newBlocksFetcher(ctx, &blocksFetcherConfig{ - headFetcher: mc, - finalizationFetcher: mc, - p2p: p2p, + chain: mc, + p2p: p2p, }) t.Run("expired context", func(t *testing.T) { ctx, cancel := context.WithCancel(ctx) queue := newBlocksQueue(ctx, &blocksQueueConfig{ blocksFetcher: fetcher, - headFetcher: mc, - finalizationFetcher: mc, + chain: mc, highestExpectedSlot: blockBatchLimit, }) handlerFn := queue.onProcessSkippedEvent(ctx) @@ -777,8 +738,7 @@ func TestBlocksQueue_onProcessSkippedEvent(t *testing.T) { t.Run("invalid input state", func(t *testing.T) { queue := newBlocksQueue(ctx, &blocksQueueConfig{ blocksFetcher: fetcher, - headFetcher: mc, - finalizationFetcher: mc, + chain: mc, highestExpectedSlot: blockBatchLimit, }) @@ -798,8 +758,7 @@ func TestBlocksQueue_onProcessSkippedEvent(t *testing.T) { t.Run("not the last machine - do nothing", func(t *testing.T) { queue := newBlocksQueue(ctx, &blocksQueueConfig{ blocksFetcher: fetcher, - headFetcher: mc, - finalizationFetcher: mc, + chain: mc, highestExpectedSlot: blockBatchLimit, }) @@ -818,8 +777,7 @@ func TestBlocksQueue_onProcessSkippedEvent(t *testing.T) { t.Run("not the last machine - reset", func(t *testing.T) { queue := newBlocksQueue(ctx, &blocksQueueConfig{ blocksFetcher: fetcher, - headFetcher: mc, - finalizationFetcher: mc, + chain: mc, highestExpectedSlot: blockBatchLimit, }) @@ -838,8 +796,7 @@ func TestBlocksQueue_onProcessSkippedEvent(t *testing.T) { t.Run("not all machines are skipped", func(t *testing.T) { queue := newBlocksQueue(ctx, &blocksQueueConfig{ blocksFetcher: fetcher, - headFetcher: mc, - finalizationFetcher: mc, + chain: mc, highestExpectedSlot: blockBatchLimit, }) @@ -858,8 +815,7 @@ func TestBlocksQueue_onProcessSkippedEvent(t *testing.T) { t.Run("not enough peers", func(t *testing.T) { queue := newBlocksQueue(ctx, &blocksQueueConfig{ blocksFetcher: fetcher, - headFetcher: mc, - finalizationFetcher: mc, + chain: mc, highestExpectedSlot: blockBatchLimit, }) @@ -888,18 +844,16 @@ func TestBlocksQueue_onProcessSkippedEvent(t *testing.T) { {blocks: makeSequence(1, 160), finalizedEpoch: 5, headSlot: 128}, }, p.Peers()) fetcher := newBlocksFetcher(ctx, &blocksFetcherConfig{ - headFetcher: mc, - finalizationFetcher: mc, - p2p: p, + chain: mc, + p2p: p, }) queue := newBlocksQueue(ctx, &blocksQueueConfig{ blocksFetcher: fetcher, - headFetcher: mc, - finalizationFetcher: mc, + chain: mc, highestExpectedSlot: blockBatchLimit, }) - startSlot := queue.headFetcher.HeadSlot() + startSlot := queue.chain.HeadSlot() blocksPerRequest := queue.blocksFetcher.blocksPerSecond for i := startSlot; i < startSlot+blocksPerRequest*lookaheadSteps; i += blocksPerRequest { queue.smm.addStateMachine(i).setState(stateSkipped) @@ -917,19 +871,17 @@ func TestBlocksQueue_onProcessSkippedEvent(t *testing.T) { {blocks: makeSequence(500, 628), finalizedEpoch: 16, headSlot: 600}, }, p.Peers()) fetcher := newBlocksFetcher(ctx, &blocksFetcherConfig{ - headFetcher: mc, - finalizationFetcher: mc, - p2p: p, + chain: mc, + p2p: p, }) queue := newBlocksQueue(ctx, &blocksQueueConfig{ blocksFetcher: fetcher, - headFetcher: mc, - finalizationFetcher: mc, + chain: mc, highestExpectedSlot: blockBatchLimit, }) assert.Equal(t, blockBatchLimit, queue.highestExpectedSlot) - startSlot := queue.headFetcher.HeadSlot() + startSlot := queue.chain.HeadSlot() blocksPerRequest := queue.blocksFetcher.blocksPerSecond var machineSlots []uint64 for i := startSlot; i < startSlot+blocksPerRequest*lookaheadSteps; i += blocksPerRequest { @@ -969,20 +921,18 @@ func TestBlocksQueue_onProcessSkippedEvent(t *testing.T) { {blocks: makeSequence(500, 628), finalizedEpoch: 16, headSlot: 600}, }, p.Peers()) fetcher := newBlocksFetcher(ctx, &blocksFetcherConfig{ - headFetcher: mc, - finalizationFetcher: mc, - p2p: p, + chain: mc, + p2p: p, }) queue := newBlocksQueue(ctx, &blocksQueueConfig{ blocksFetcher: fetcher, - headFetcher: mc, - finalizationFetcher: mc, + chain: mc, highestExpectedSlot: blockBatchLimit, }) queue.mode = modeNonConstrained assert.Equal(t, blockBatchLimit, queue.highestExpectedSlot) - startSlot := queue.headFetcher.HeadSlot() + startSlot := queue.chain.HeadSlot() blocksPerRequest := queue.blocksFetcher.blocksPerSecond var machineSlots []uint64 for i := startSlot; i < startSlot+blocksPerRequest*lookaheadSteps; i += blocksPerRequest { @@ -1021,17 +971,15 @@ func TestBlocksQueue_onCheckStaleEvent(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() fetcher := newBlocksFetcher(ctx, &blocksFetcherConfig{ - headFetcher: mc, - finalizationFetcher: mc, - p2p: p2p, + chain: mc, + p2p: p2p, }) t.Run("expired context", func(t *testing.T) { ctx, cancel := context.WithCancel(ctx) queue := newBlocksQueue(ctx, &blocksQueueConfig{ blocksFetcher: fetcher, - headFetcher: mc, - finalizationFetcher: mc, + chain: mc, highestExpectedSlot: blockBatchLimit, }) handlerFn := queue.onCheckStaleEvent(ctx) @@ -1046,8 +994,7 @@ func TestBlocksQueue_onCheckStaleEvent(t *testing.T) { t.Run("invalid input state", func(t *testing.T) { queue := newBlocksQueue(ctx, &blocksQueueConfig{ blocksFetcher: fetcher, - headFetcher: mc, - finalizationFetcher: mc, + chain: mc, highestExpectedSlot: blockBatchLimit, }) @@ -1067,8 +1014,7 @@ func TestBlocksQueue_onCheckStaleEvent(t *testing.T) { t.Run("process non stale machine", func(t *testing.T) { queue := newBlocksQueue(ctx, &blocksQueueConfig{ blocksFetcher: fetcher, - headFetcher: mc, - finalizationFetcher: mc, + chain: mc, highestExpectedSlot: blockBatchLimit, }) handlerFn := queue.onCheckStaleEvent(ctx) @@ -1084,8 +1030,7 @@ func TestBlocksQueue_onCheckStaleEvent(t *testing.T) { t.Run("process stale machine", func(t *testing.T) { queue := newBlocksQueue(ctx, &blocksQueueConfig{ blocksFetcher: fetcher, - headFetcher: mc, - finalizationFetcher: mc, + chain: mc, highestExpectedSlot: blockBatchLimit, }) handlerFn := queue.onCheckStaleEvent(ctx) diff --git a/beacon-chain/sync/initial-sync/round_robin.go b/beacon-chain/sync/initial-sync/round_robin.go index b8e8b5f317..57acafdf3d 100644 --- a/beacon-chain/sync/initial-sync/round_robin.go +++ b/beacon-chain/sync/initial-sync/round_robin.go @@ -51,8 +51,8 @@ func (s *Service) roundRobinSync(genesis time.Time) error { } queue := newBlocksQueue(ctx, &blocksQueueConfig{ p2p: s.p2p, - headFetcher: s.chain, - finalizationFetcher: s.chain, + db: s.db, + chain: s.chain, highestExpectedSlot: highestFinalizedSlot, mode: modeStopOnFinalizedEpoch, }) @@ -82,8 +82,8 @@ func (s *Service) roundRobinSync(genesis time.Time) error { // world view on non-finalized epoch. queue = newBlocksQueue(ctx, &blocksQueueConfig{ p2p: s.p2p, - headFetcher: s.chain, - finalizationFetcher: s.chain, + db: s.db, + chain: s.chain, highestExpectedSlot: helpers.SlotsSince(genesis), mode: modeNonConstrained, })