Init-sync: more tests + minor refactoring (#7692)

* cherry-pick commits

* re-arrange calls
This commit is contained in:
Victor Farazdagi
2020-11-01 00:33:57 +03:00
committed by GitHub
parent 817c16a2f4
commit 926d3b9b34
9 changed files with 371 additions and 255 deletions

View File

@@ -13,8 +13,7 @@ import (
"github.com/libp2p/go-libp2p-core/peer"
"github.com/pkg/errors"
eth "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/beacon-chain/blockchain"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/db"
"github.com/prysmaticlabs/prysm/beacon-chain/flags"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
prysmsync "github.com/prysmaticlabs/prysm/beacon-chain/sync"
@@ -56,9 +55,9 @@ var (
// blocksFetcherConfig is a config to setup the block fetcher.
type blocksFetcherConfig struct {
headFetcher blockchain.HeadFetcher
finalizationFetcher blockchain.FinalizationFetcher
chain blockchainService
p2p p2p.P2P
db db.ReadOnlyDatabase
peerFilterCapacityWeight float64
mode syncMode
}
@@ -68,20 +67,20 @@ type blocksFetcherConfig struct {
// among available peers (for fair network load distribution).
type blocksFetcher struct {
sync.Mutex
ctx context.Context
cancel context.CancelFunc
rand *rand.Rand
headFetcher blockchain.HeadFetcher
finalizationFetcher blockchain.FinalizationFetcher
p2p p2p.P2P
blocksPerSecond uint64
rateLimiter *leakybucket.Collector
peerLocks map[peer.ID]*peerLock
fetchRequests chan *fetchRequestParams
fetchResponses chan *fetchRequestResponse
capacityWeight float64 // how remaining capacity affects peer selection
mode syncMode // allows to use fetcher in different sync scenarios
quit chan struct{} // termination notifier
ctx context.Context
cancel context.CancelFunc
rand *rand.Rand
chain blockchainService
p2p p2p.P2P
db db.ReadOnlyDatabase
blocksPerSecond uint64
rateLimiter *leakybucket.Collector
peerLocks map[peer.ID]*peerLock
fetchRequests chan *fetchRequestParams
fetchResponses chan *fetchRequestResponse
capacityWeight float64 // how remaining capacity affects peer selection
mode syncMode // allows to use fetcher in different sync scenarios
quit chan struct{} // termination notifier
}
// peerLock restricts fetcher actions on per peer basis. Currently, used for rate limiting.
@@ -122,20 +121,20 @@ func newBlocksFetcher(ctx context.Context, cfg *blocksFetcherConfig) *blocksFetc
ctx, cancel := context.WithCancel(ctx)
return &blocksFetcher{
ctx: ctx,
cancel: cancel,
rand: rand.NewGenerator(),
headFetcher: cfg.headFetcher,
finalizationFetcher: cfg.finalizationFetcher,
p2p: cfg.p2p,
blocksPerSecond: uint64(blocksPerSecond),
rateLimiter: rateLimiter,
peerLocks: make(map[peer.ID]*peerLock),
fetchRequests: make(chan *fetchRequestParams, maxPendingRequests),
fetchResponses: make(chan *fetchRequestResponse, maxPendingRequests),
capacityWeight: capacityWeight,
mode: cfg.mode,
quit: make(chan struct{}),
ctx: ctx,
cancel: cancel,
rand: rand.NewGenerator(),
chain: cfg.chain,
p2p: cfg.p2p,
db: cfg.db,
blocksPerSecond: uint64(blocksPerSecond),
rateLimiter: rateLimiter,
peerLocks: make(map[peer.ID]*peerLock),
fetchRequests: make(chan *fetchRequestParams, maxPendingRequests),
fetchResponses: make(chan *fetchRequestResponse, maxPendingRequests),
capacityWeight: capacityWeight,
mode: cfg.mode,
quit: make(chan struct{}),
}
}
@@ -252,15 +251,7 @@ func (f *blocksFetcher) handleRequest(ctx context.Context, start, count uint64)
return response
}
var targetEpoch uint64
var peers []peer.ID
if f.mode == modeStopOnFinalizedEpoch {
headEpoch := f.finalizationFetcher.FinalizedCheckpt().Epoch
targetEpoch, peers = f.p2p.Peers().BestFinalized(params.BeaconConfig().MaxPeersToSync, headEpoch)
} else {
headEpoch := helpers.SlotToEpoch(f.headFetcher.HeadSlot())
targetEpoch, peers = f.p2p.Peers().BestNonFinalized(flags.Get().MinimumSyncPeers, headEpoch)
}
_, targetEpoch, peers := f.calculateHeadAndTargetEpochs()
if len(peers) == 0 {
response.err = errNoPeersAvailable
return response
@@ -290,12 +281,7 @@ func (f *blocksFetcher) fetchBlocksFromPeer(
defer span.End()
var blocks []*eth.SignedBeaconBlock
var err error
if featureconfig.Get().EnablePeerScorer {
peers, err = f.filterScoredPeers(ctx, peers, peersPercentagePerRequest)
} else {
peers, err = f.filterPeers(peers, peersPercentagePerRequest)
}
peers, err := f.filterPeers(ctx, peers, peersPercentagePerRequest)
if err != nil {
return blocks, "", err
}

View File

@@ -11,6 +11,7 @@ import (
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/flags"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers/scorers"
"github.com/prysmaticlabs/prysm/shared/featureconfig"
"github.com/prysmaticlabs/prysm/shared/mathutil"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/timeutils"
@@ -74,10 +75,10 @@ func (f *blocksFetcher) waitForMinimumPeers(ctx context.Context) ([]peer.ID, err
}
var peers []peer.ID
if f.mode == modeStopOnFinalizedEpoch {
headEpoch := f.finalizationFetcher.FinalizedCheckpt().Epoch
headEpoch := f.chain.FinalizedCheckpt().Epoch
_, peers = f.p2p.Peers().BestFinalized(params.BeaconConfig().MaxPeersToSync, headEpoch)
} else {
headEpoch := helpers.SlotToEpoch(f.headFetcher.HeadSlot())
headEpoch := helpers.SlotToEpoch(f.chain.HeadSlot())
_, peers = f.p2p.Peers().BestNonFinalized(flags.Get().MinimumSyncPeers, headEpoch)
}
if len(peers) >= required {
@@ -90,9 +91,14 @@ func (f *blocksFetcher) waitForMinimumPeers(ctx context.Context) ([]peer.ID, err
}
}
// filterPeers returns transformed list of peers,
// weight ordered or randomized, constrained if necessary (only percentage of peers returned).
func (f *blocksFetcher) filterPeers(peers []peer.ID, peersPercentage float64) ([]peer.ID, error) {
// filterPeers returns transformed list of peers, weight ordered or randomized, constrained
// if necessary (when only percentage of peers returned).
// When peer scorer is enabled, fallbacks filterScoredPeers.
func (f *blocksFetcher) filterPeers(ctx context.Context, peers []peer.ID, peersPercentage float64) ([]peer.ID, error) {
if featureconfig.Get().EnablePeerScorer {
return f.filterScoredPeers(ctx, peers, peersPercentagePerRequest)
}
if len(peers) == 0 {
return peers, nil
}
@@ -120,9 +126,8 @@ func (f *blocksFetcher) filterPeers(peers []peer.ID, peersPercentage float64) ([
return peers, nil
}
// filterScoredPeers returns transformed list of peers,
// weight sorted by scores and capacity remaining. List can be constrained using peersPercentage,
// where only percentage of peers are returned.
// filterScoredPeers returns transformed list of peers, weight sorted by scores and capacity remaining.
// List can be further constrained using peersPercentage, where only percentage of peers are returned.
func (f *blocksFetcher) filterScoredPeers(ctx context.Context, peers []peer.ID, peersPercentage float64) ([]peer.ID, error) {
ctx, span := trace.StartSpan(ctx, "initialsync.filterScoredPeers")
defer span.End()

View File

@@ -12,6 +12,7 @@ import (
"github.com/libp2p/go-libp2p-core/peer"
"github.com/prysmaticlabs/prysm/beacon-chain/flags"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers/scorers"
"github.com/prysmaticlabs/prysm/shared/featureconfig"
"github.com/prysmaticlabs/prysm/shared/testutil/assert"
"github.com/prysmaticlabs/prysm/shared/testutil/require"
"github.com/prysmaticlabs/prysm/shared/timeutils"
@@ -107,6 +108,11 @@ func TestBlocksFetcher_selectFailOverPeer(t *testing.T) {
}
func TestBlocksFetcher_filterPeers(t *testing.T) {
resetCfg := featureconfig.InitWithReset(&featureconfig.Flags{
EnablePeerScorer: false,
})
defer resetCfg()
type weightedPeer struct {
peer.ID
usedCapacity int64
@@ -175,7 +181,7 @@ func TestBlocksFetcher_filterPeers(t *testing.T) {
pids = append(pids, pid.ID)
fetcher.rateLimiter.Add(pid.ID.String(), pid.usedCapacity)
}
got, err := fetcher.filterPeers(pids, tt.args.peersPercentage)
got, err := fetcher.filterPeers(context.Background(), pids, tt.args.peersPercentage)
require.NoError(t, err)
// Re-arrange peers with the same remaining capacity, deterministically .
// They are deliberately shuffled - so that on the same capacity any of
@@ -318,8 +324,7 @@ func TestBlocksFetcher_filterScoredPeers(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
mc, p2p, _ := initializeTestServices(t, []uint64{}, []*peerData{})
fetcher := newBlocksFetcher(context.Background(), &blocksFetcherConfig{
finalizationFetcher: mc,
headFetcher: mc,
chain: mc,
p2p: p2p,
peerFilterCapacityWeight: tt.args.capacityWeight,
})
@@ -340,7 +345,7 @@ func TestBlocksFetcher_filterScoredPeers(t *testing.T) {
var filteredPIDs []peer.ID
var err error
for i := 0; i < 1000; i++ {
filteredPIDs, err = fetcher.filterScoredPeers(context.Background(), peerIDs, tt.args.peersPercentage)
filteredPIDs, err = fetcher.filterPeers(context.Background(), peerIDs, tt.args.peersPercentage)
if len(filteredPIDs) <= 1 {
break
}

View File

@@ -35,9 +35,8 @@ func TestBlocksFetcher_InitStartStop(t *testing.T) {
fetcher := newBlocksFetcher(
ctx,
&blocksFetcherConfig{
headFetcher: mc,
finalizationFetcher: mc,
p2p: p2p,
chain: mc,
p2p: p2p,
},
)
@@ -60,9 +59,8 @@ func TestBlocksFetcher_InitStartStop(t *testing.T) {
fetcher := newBlocksFetcher(
context.Background(),
&blocksFetcherConfig{
headFetcher: mc,
finalizationFetcher: mc,
p2p: p2p,
chain: mc,
p2p: p2p,
})
require.NoError(t, fetcher.start())
fetcher.stop()
@@ -74,14 +72,27 @@ func TestBlocksFetcher_InitStartStop(t *testing.T) {
fetcher := newBlocksFetcher(
ctx,
&blocksFetcherConfig{
headFetcher: mc,
finalizationFetcher: mc,
p2p: p2p,
chain: mc,
p2p: p2p,
})
require.NoError(t, fetcher.start())
cancel()
fetcher.stop()
})
t.Run("peer filter capacity weight", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
fetcher := newBlocksFetcher(
ctx,
&blocksFetcherConfig{
chain: mc,
p2p: p2p,
peerFilterCapacityWeight: 2,
})
require.NoError(t, fetcher.start())
assert.Equal(t, peerFilterCapacityWeight, fetcher.capacityWeight)
})
}
func TestBlocksFetcher_RoundRobin(t *testing.T) {
@@ -268,9 +279,8 @@ func TestBlocksFetcher_RoundRobin(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
fetcher := newBlocksFetcher(ctx, &blocksFetcherConfig{
headFetcher: mc,
finalizationFetcher: mc,
p2p: p,
chain: mc,
p2p: p,
})
require.NoError(t, fetcher.start())
@@ -358,13 +368,24 @@ func TestBlocksFetcher_scheduleRequest(t *testing.T) {
blockBatchLimit := uint64(flags.Get().BlockBatchLimit)
t.Run("context cancellation", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
fetcher := newBlocksFetcher(ctx, &blocksFetcherConfig{
headFetcher: nil,
p2p: nil,
})
fetcher := newBlocksFetcher(ctx, &blocksFetcherConfig{})
cancel()
assert.ErrorContains(t, "context canceled", fetcher.scheduleRequest(ctx, 1, blockBatchLimit))
})
t.Run("unblock on context cancellation", func(t *testing.T) {
fetcher := newBlocksFetcher(context.Background(), &blocksFetcherConfig{})
for i := 0; i < maxPendingRequests; i++ {
assert.NoError(t, fetcher.scheduleRequest(context.Background(), 1, blockBatchLimit))
}
// Will block on next request (and wait until requests are either processed or context is closed).
go func() {
fetcher.cancel()
}()
assert.ErrorContains(t, errFetcherCtxIsDone.Error(),
fetcher.scheduleRequest(context.Background(), 1, blockBatchLimit))
})
}
func TestBlocksFetcher_handleRequest(t *testing.T) {
blockBatchLimit := uint64(flags.Get().BlockBatchLimit)
@@ -392,9 +413,8 @@ func TestBlocksFetcher_handleRequest(t *testing.T) {
t.Run("context cancellation", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
fetcher := newBlocksFetcher(ctx, &blocksFetcherConfig{
headFetcher: mc,
finalizationFetcher: mc,
p2p: p2p,
chain: mc,
p2p: p2p,
})
cancel()
@@ -406,9 +426,8 @@ func TestBlocksFetcher_handleRequest(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
fetcher := newBlocksFetcher(ctx, &blocksFetcherConfig{
headFetcher: mc,
finalizationFetcher: mc,
p2p: p2p,
chain: mc,
p2p: p2p,
})
requestCtx, reqCancel := context.WithTimeout(context.Background(), 2*time.Second)
@@ -477,9 +496,8 @@ func TestBlocksFetcher_requestBeaconBlocksByRange(t *testing.T) {
fetcher := newBlocksFetcher(
ctx,
&blocksFetcherConfig{
finalizationFetcher: mc,
headFetcher: mc,
p2p: p2p,
chain: mc,
p2p: p2p,
})
_, peerIDs := p2p.Peers().BestFinalized(params.BeaconConfig().MaxPeersToSync, helpers.SlotToEpoch(mc.HeadSlot()))

View File

@@ -8,7 +8,6 @@ import (
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/flags"
p2ppb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/featureconfig"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/sirupsen/logrus"
"go.opencensus.io/trace"
@@ -23,30 +22,20 @@ func (f *blocksFetcher) nonSkippedSlotAfter(ctx context.Context, slot uint64) (u
ctx, span := trace.StartSpan(ctx, "initialsync.nonSkippedSlotAfter")
defer span.End()
var targetEpoch, headEpoch uint64
var peers []peer.ID
if f.mode == modeStopOnFinalizedEpoch {
headEpoch = f.finalizationFetcher.FinalizedCheckpt().Epoch
targetEpoch, peers = f.p2p.Peers().BestFinalized(params.BeaconConfig().MaxPeersToSync, headEpoch)
} else {
headEpoch = helpers.SlotToEpoch(f.headFetcher.HeadSlot())
targetEpoch, peers = f.p2p.Peers().BestNonFinalized(flags.Get().MinimumSyncPeers, headEpoch)
}
headEpoch, targetEpoch, peers := f.calculateHeadAndTargetEpochs()
log.WithFields(logrus.Fields{
"start": slot,
"headEpoch": headEpoch,
"targetEpoch": targetEpoch,
}).Debug("Searching for non-skipped slot")
// Exit early, if no peers with high enough finalized epoch are found.
// Exit early if no peers with epoch higher than our known head are found.
if targetEpoch <= headEpoch {
return 0, errSlotIsTooHigh
}
var err error
if featureconfig.Get().EnablePeerScorer {
peers, err = f.filterScoredPeers(ctx, peers, peersPercentagePerRequest)
} else {
peers, err = f.filterPeers(peers, peersPercentagePerRequest)
}
// Transform peer list to avoid eclipsing (filter, shuffle, trim).
peers, err := f.filterPeers(ctx, peers, peersPercentagePerRequest)
if err != nil {
return 0, err
}
@@ -135,13 +124,29 @@ func (f *blocksFetcher) nonSkippedSlotAfter(ctx context.Context, slot uint64) (u
// bestFinalizedSlot returns the highest finalized slot of the majority of connected peers.
func (f *blocksFetcher) bestFinalizedSlot() uint64 {
finalizedEpoch, _ := f.p2p.Peers().BestFinalized(params.BeaconConfig().MaxPeersToSync, f.finalizationFetcher.FinalizedCheckpt().Epoch)
finalizedEpoch, _ := f.p2p.Peers().BestFinalized(
params.BeaconConfig().MaxPeersToSync, f.chain.FinalizedCheckpt().Epoch)
return finalizedEpoch * params.BeaconConfig().SlotsPerEpoch
}
// bestNonFinalizedSlot returns the highest non-finalized slot of enough number of connected peers.
func (f *blocksFetcher) bestNonFinalizedSlot() uint64 {
headEpoch := helpers.SlotToEpoch(f.headFetcher.HeadSlot())
headEpoch := helpers.SlotToEpoch(f.chain.HeadSlot())
targetEpoch, _ := f.p2p.Peers().BestNonFinalized(flags.Get().MinimumSyncPeers*2, headEpoch)
return targetEpoch * params.BeaconConfig().SlotsPerEpoch
}
// calculateHeadAndTargetEpochs return node's current head epoch, along with the best known target
// epoch. For the latter peers supporting that target epoch are returned as well.
func (f *blocksFetcher) calculateHeadAndTargetEpochs() (uint64, uint64, []peer.ID) {
var targetEpoch, headEpoch uint64
var peers []peer.ID
if f.mode == modeStopOnFinalizedEpoch {
headEpoch = f.chain.FinalizedCheckpt().Epoch
targetEpoch, peers = f.p2p.Peers().BestFinalized(params.BeaconConfig().MaxPeersToSync, headEpoch)
} else {
headEpoch = helpers.SlotToEpoch(f.chain.HeadSlot())
targetEpoch, peers = f.p2p.Peers().BestNonFinalized(flags.Get().MinimumSyncPeers, headEpoch)
}
return headEpoch, targetEpoch, peers
}

View File

@@ -6,7 +6,11 @@ import (
"testing"
"github.com/kevinms/leakybucket-go"
eth "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
p2pt "github.com/prysmaticlabs/prysm/beacon-chain/p2p/testing"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/testutil/assert"
"github.com/prysmaticlabs/prysm/shared/testutil/require"
)
func TestBlocksFetcher_nonSkippedSlotAfter(t *testing.T) {
@@ -38,9 +42,8 @@ func TestBlocksFetcher_nonSkippedSlotAfter(t *testing.T) {
fetcher := newBlocksFetcher(
ctx,
&blocksFetcherConfig{
finalizationFetcher: mc,
headFetcher: mc,
p2p: p2p,
chain: mc,
p2p: p2p,
},
)
fetcher.rateLimiter = leakybucket.NewCollector(6400, 6400, false)
@@ -84,7 +87,156 @@ func TestBlocksFetcher_nonSkippedSlotAfter(t *testing.T) {
if !found {
t.Errorf("Isolated non-skipped slot not found in %d iterations: %v", i, expectedSlot)
} else {
t.Logf("Isolated non-skipped slot found in %d iterations", i)
log.Debugf("Isolated non-skipped slot found in %d iterations", i)
}
})
t.Run("no peers with higher target epoch available", func(t *testing.T) {
peers := []*peerData{
{finalizedEpoch: 3, headSlot: 160},
{finalizedEpoch: 3, headSlot: 160},
{finalizedEpoch: 3, headSlot: 160},
{finalizedEpoch: 8, headSlot: 320},
{finalizedEpoch: 8, headSlot: 320},
{finalizedEpoch: 10, headSlot: 320},
{finalizedEpoch: 10, headSlot: 640},
}
p2p := p2pt.NewTestP2P(t)
connectPeers(t, p2p, peers, p2p.Peers())
fetcher := newBlocksFetcher(
ctx,
&blocksFetcherConfig{
chain: mc,
p2p: p2p,
},
)
mc.FinalizedCheckPoint = &eth.Checkpoint{
Epoch: 10,
}
require.NoError(t, mc.State.SetSlot(12*params.BeaconConfig().SlotsPerEpoch))
fetcher.mode = modeStopOnFinalizedEpoch
slot, err := fetcher.nonSkippedSlotAfter(ctx, 160)
assert.ErrorContains(t, errSlotIsTooHigh.Error(), err)
assert.Equal(t, uint64(0), slot)
fetcher.mode = modeNonConstrained
require.NoError(t, mc.State.SetSlot(20*params.BeaconConfig().SlotsPerEpoch))
slot, err = fetcher.nonSkippedSlotAfter(ctx, 160)
assert.ErrorContains(t, errSlotIsTooHigh.Error(), err)
assert.Equal(t, uint64(0), slot)
})
}
func TestBlocksFetcher_currentHeadAndTargetEpochs(t *testing.T) {
tests := []struct {
name string
syncMode syncMode
peers []*peerData
ourFinalizedEpoch uint64
ourHeadSlot uint64
expectedHeadEpoch uint64
targetEpoch uint64
targetEpochSupport int
}{
{
name: "ignore lower epoch peers in best finalized",
syncMode: modeStopOnFinalizedEpoch,
ourHeadSlot: 5 * params.BeaconConfig().SlotsPerEpoch,
expectedHeadEpoch: 4,
ourFinalizedEpoch: 4,
targetEpoch: 10,
targetEpochSupport: 3,
peers: []*peerData{
{finalizedEpoch: 3, headSlot: 160},
{finalizedEpoch: 3, headSlot: 160},
{finalizedEpoch: 3, headSlot: 160},
{finalizedEpoch: 3, headSlot: 160},
{finalizedEpoch: 3, headSlot: 160},
{finalizedEpoch: 8, headSlot: 320},
{finalizedEpoch: 8, headSlot: 320},
{finalizedEpoch: 10, headSlot: 320},
{finalizedEpoch: 10, headSlot: 320},
{finalizedEpoch: 10, headSlot: 320},
},
},
{
name: "resolve ties in best finalized",
syncMode: modeStopOnFinalizedEpoch,
ourHeadSlot: 5 * params.BeaconConfig().SlotsPerEpoch,
expectedHeadEpoch: 4,
ourFinalizedEpoch: 4,
targetEpoch: 10,
targetEpochSupport: 3,
peers: []*peerData{
{finalizedEpoch: 3, headSlot: 160},
{finalizedEpoch: 3, headSlot: 160},
{finalizedEpoch: 3, headSlot: 160},
{finalizedEpoch: 3, headSlot: 160},
{finalizedEpoch: 3, headSlot: 160},
{finalizedEpoch: 8, headSlot: 320},
{finalizedEpoch: 8, headSlot: 320},
{finalizedEpoch: 8, headSlot: 320},
{finalizedEpoch: 10, headSlot: 320},
{finalizedEpoch: 10, headSlot: 320},
{finalizedEpoch: 10, headSlot: 320},
},
},
{
name: "best non-finalized",
syncMode: modeNonConstrained,
ourHeadSlot: 5 * params.BeaconConfig().SlotsPerEpoch,
expectedHeadEpoch: 5,
ourFinalizedEpoch: 4,
targetEpoch: 20,
targetEpochSupport: 1,
peers: []*peerData{
{finalizedEpoch: 3, headSlot: 160},
{finalizedEpoch: 3, headSlot: 160},
{finalizedEpoch: 3, headSlot: 160},
{finalizedEpoch: 3, headSlot: 160},
{finalizedEpoch: 3, headSlot: 160},
{finalizedEpoch: 8, headSlot: 320},
{finalizedEpoch: 8, headSlot: 320},
{finalizedEpoch: 10, headSlot: 320},
{finalizedEpoch: 10, headSlot: 320},
{finalizedEpoch: 10, headSlot: 320},
{finalizedEpoch: 15, headSlot: 640},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
mc, p2p, _ := initializeTestServices(t, []uint64{}, tt.peers)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
fetcher := newBlocksFetcher(
ctx,
&blocksFetcherConfig{
chain: mc,
p2p: p2p,
},
)
mc.FinalizedCheckPoint = &eth.Checkpoint{
Epoch: tt.ourFinalizedEpoch,
}
require.NoError(t, mc.State.SetSlot(tt.ourHeadSlot))
fetcher.mode = tt.syncMode
// Head and target epochs calculation.
headEpoch, targetEpoch, peers := fetcher.calculateHeadAndTargetEpochs()
assert.Equal(t, tt.expectedHeadEpoch, headEpoch, "Unexpected head epoch")
assert.Equal(t, tt.targetEpoch, targetEpoch, "Unexpected target epoch")
assert.Equal(t, tt.targetEpochSupport, len(peers), "Unexpected number of peers supporting target epoch")
// Best finalized and non-finalized slots.
finalizedSlot := tt.targetEpoch * params.BeaconConfig().SlotsPerEpoch
if tt.syncMode == modeStopOnFinalizedEpoch {
assert.Equal(t, finalizedSlot, fetcher.bestFinalizedSlot(), "Unexpected finalized slot")
} else {
assert.Equal(t, finalizedSlot, fetcher.bestNonFinalizedSlot(), "Unexpected non-finalized slot")
}
})
}
}

View File

@@ -7,8 +7,8 @@ import (
"github.com/libp2p/go-libp2p-core/peer"
eth "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/beacon-chain/blockchain"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/db"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
"github.com/sirupsen/logrus"
)
@@ -49,11 +49,11 @@ type syncMode uint8
// blocksQueueConfig is a config to setup block queue service.
type blocksQueueConfig struct {
blocksFetcher *blocksFetcher
headFetcher blockchain.HeadFetcher
finalizationFetcher blockchain.FinalizationFetcher
chain blockchainService
startSlot uint64
highestExpectedSlot uint64
p2p p2p.P2P
db db.ReadOnlyDatabase
mode syncMode
}
@@ -64,7 +64,7 @@ type blocksQueue struct {
cancel context.CancelFunc
smm *stateMachineManager
blocksFetcher *blocksFetcher
headFetcher blockchain.HeadFetcher
chain blockchainService
highestExpectedSlot uint64
mode syncMode
exitConditions struct {
@@ -87,9 +87,9 @@ func newBlocksQueue(ctx context.Context, cfg *blocksQueueConfig) *blocksQueue {
blocksFetcher := cfg.blocksFetcher
if blocksFetcher == nil {
blocksFetcher = newBlocksFetcher(ctx, &blocksFetcherConfig{
headFetcher: cfg.headFetcher,
finalizationFetcher: cfg.finalizationFetcher,
p2p: cfg.p2p,
chain: cfg.chain,
p2p: cfg.p2p,
db: cfg.db,
})
}
highestExpectedSlot := cfg.highestExpectedSlot
@@ -109,7 +109,7 @@ func newBlocksQueue(ctx context.Context, cfg *blocksQueueConfig) *blocksQueue {
cancel: cancel,
highestExpectedSlot: highestExpectedSlot,
blocksFetcher: blocksFetcher,
headFetcher: cfg.headFetcher,
chain: cfg.chain,
mode: cfg.mode,
fetchedData: make(chan *blocksQueueFetchedData, 1),
quit: make(chan struct{}),
@@ -162,7 +162,7 @@ func (q *blocksQueue) loop() {
}
// Define initial state machines.
startSlot := q.headFetcher.HeadSlot()
startSlot := q.chain.HeadSlot()
blocksPerRequest := q.blocksFetcher.blocksPerSecond
for i := startSlot; i < startSlot+blocksPerRequest*lookaheadSteps; i += blocksPerRequest {
q.smm.addStateMachine(i)
@@ -172,7 +172,7 @@ func (q *blocksQueue) loop() {
defer ticker.Stop()
for {
// Check highest expected slot when we approach chain's head slot.
if q.headFetcher.HeadSlot() >= q.highestExpectedSlot {
if q.chain.HeadSlot() >= q.highestExpectedSlot {
// By the time initial sync is complete, highest slot may increase, re-check.
if q.mode == modeStopOnFinalizedEpoch {
if q.highestExpectedSlot < q.blocksFetcher.bestFinalizedSlot() {
@@ -191,7 +191,7 @@ func (q *blocksQueue) loop() {
log.WithFields(logrus.Fields{
"highestExpectedSlot": q.highestExpectedSlot,
"headSlot": q.headFetcher.HeadSlot(),
"headSlot": q.chain.HeadSlot(),
"state": q.smm.String(),
}).Trace("tick")
@@ -221,7 +221,7 @@ func (q *blocksQueue) loop() {
}
}
// Do garbage collection, and advance sliding window forward.
if q.headFetcher.HeadSlot() >= fsm.start+blocksPerRequest-1 {
if q.chain.HeadSlot() >= fsm.start+blocksPerRequest-1 {
highestStartSlot, err := q.smm.highestStartSlot()
if err != nil {
log.WithError(err).Debug("Cannot obtain highest epoch state number")
@@ -389,17 +389,17 @@ func (q *blocksQueue) onProcessSkippedEvent(ctx context.Context) eventHandlerFn
// Check if we have enough peers to progress, or sync needs to halt (due to no peers available).
if q.mode == modeStopOnFinalizedEpoch {
if q.blocksFetcher.bestFinalizedSlot() <= q.headFetcher.HeadSlot() {
if q.blocksFetcher.bestFinalizedSlot() <= q.chain.HeadSlot() {
return stateSkipped, errNoRequiredPeers
}
} else {
if q.blocksFetcher.bestNonFinalizedSlot() <= q.headFetcher.HeadSlot() {
if q.blocksFetcher.bestNonFinalizedSlot() <= q.chain.HeadSlot() {
return stateSkipped, errNoRequiredPeers
}
}
// Shift start position of all the machines except for the last one.
startSlot := q.headFetcher.HeadSlot() + 1
startSlot := q.chain.HeadSlot() + 1
blocksPerRequest := q.blocksFetcher.blocksPerSecond
if err := q.smm.removeAllStateMachines(); err != nil {
return stateSkipped, err

View File

@@ -25,17 +25,15 @@ func TestBlocksQueue_InitStartStop(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
fetcher := newBlocksFetcher(ctx, &blocksFetcherConfig{
headFetcher: mc,
finalizationFetcher: mc,
p2p: p2p,
chain: mc,
p2p: p2p,
})
t.Run("stop without start", func(t *testing.T) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
queue := newBlocksQueue(ctx, &blocksQueueConfig{
headFetcher: mc,
finalizationFetcher: mc,
chain: mc,
highestExpectedSlot: blockBatchLimit,
})
assert.ErrorContains(t, errQueueTakesTooLongToStop.Error(), queue.stop())
@@ -45,8 +43,7 @@ func TestBlocksQueue_InitStartStop(t *testing.T) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
queue := newBlocksQueue(ctx, &blocksQueueConfig{
headFetcher: mc,
finalizationFetcher: mc,
chain: mc,
highestExpectedSlot: blockBatchLimit,
})
assert.NoError(t, queue.start())
@@ -56,8 +53,7 @@ func TestBlocksQueue_InitStartStop(t *testing.T) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
queue := newBlocksQueue(ctx, &blocksQueueConfig{
headFetcher: mc,
finalizationFetcher: mc,
chain: mc,
highestExpectedSlot: blockBatchLimit,
})
assert.NoError(t, queue.start())
@@ -69,8 +65,7 @@ func TestBlocksQueue_InitStartStop(t *testing.T) {
defer cancel()
queue := newBlocksQueue(ctx, &blocksQueueConfig{
blocksFetcher: fetcher,
headFetcher: mc,
finalizationFetcher: mc,
chain: mc,
highestExpectedSlot: blockBatchLimit,
})
@@ -94,8 +89,7 @@ func TestBlocksQueue_InitStartStop(t *testing.T) {
defer cancel()
queue := newBlocksQueue(ctx, &blocksQueueConfig{
blocksFetcher: fetcher,
headFetcher: mc,
finalizationFetcher: mc,
chain: mc,
highestExpectedSlot: blockBatchLimit,
})
assert.NoError(t, queue.start())
@@ -108,8 +102,7 @@ func TestBlocksQueue_InitStartStop(t *testing.T) {
defer cancel()
queue := newBlocksQueue(ctx, &blocksQueueConfig{
blocksFetcher: fetcher,
headFetcher: mc,
finalizationFetcher: mc,
chain: mc,
highestExpectedSlot: blockBatchLimit,
})
assert.NoError(t, queue.start())
@@ -121,8 +114,7 @@ func TestBlocksQueue_InitStartStop(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
queue := newBlocksQueue(ctx, &blocksQueueConfig{
blocksFetcher: fetcher,
headFetcher: mc,
finalizationFetcher: mc,
chain: mc,
highestExpectedSlot: blockBatchLimit,
})
assert.NoError(t, queue.start())
@@ -138,15 +130,13 @@ func TestBlocksQueue_InitStartStop(t *testing.T) {
{blocks: makeSequence(500, 628), finalizedEpoch: 16, headSlot: 600},
}, p.Peers())
fetcher := newBlocksFetcher(ctx, &blocksFetcherConfig{
headFetcher: mc,
finalizationFetcher: mc,
p2p: p,
chain: mc,
p2p: p,
})
// Mode 1: stop on finalized.
queue := newBlocksQueue(ctx, &blocksQueueConfig{
blocksFetcher: fetcher,
headFetcher: mc,
finalizationFetcher: mc,
chain: mc,
highestExpectedSlot: blockBatchLimit,
startSlot: 128,
})
@@ -154,8 +144,7 @@ func TestBlocksQueue_InitStartStop(t *testing.T) {
// Mode 2: unconstrained.
queue = newBlocksQueue(ctx, &blocksQueueConfig{
blocksFetcher: fetcher,
headFetcher: mc,
finalizationFetcher: mc,
chain: mc,
highestExpectedSlot: blockBatchLimit,
startSlot: 128,
mode: modeNonConstrained,
@@ -276,14 +265,12 @@ func TestBlocksQueue_Loop(t *testing.T) {
defer cancel()
fetcher := newBlocksFetcher(ctx, &blocksFetcherConfig{
headFetcher: mc,
finalizationFetcher: mc,
p2p: p2p,
chain: mc,
p2p: p2p,
})
queue := newBlocksQueue(ctx, &blocksQueueConfig{
blocksFetcher: fetcher,
headFetcher: mc,
finalizationFetcher: mc,
chain: mc,
highestExpectedSlot: tt.highestExpectedSlot,
})
assert.NoError(t, queue.start())
@@ -310,9 +297,9 @@ func TestBlocksQueue_Loop(t *testing.T) {
assert.NoError(t, queue.stop())
if queue.headFetcher.HeadSlot() < tt.highestExpectedSlot {
if queue.chain.HeadSlot() < tt.highestExpectedSlot {
t.Errorf("Not enough slots synced, want: %v, got: %v",
len(tt.expectedBlockSlots), queue.headFetcher.HeadSlot())
len(tt.expectedBlockSlots), queue.chain.HeadSlot())
}
assert.Equal(t, len(tt.expectedBlockSlots), len(blocks), "Processes wrong number of blocks")
var receivedBlockSlots []uint64
@@ -334,17 +321,15 @@ func TestBlocksQueue_onScheduleEvent(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
fetcher := newBlocksFetcher(ctx, &blocksFetcherConfig{
headFetcher: mc,
finalizationFetcher: mc,
p2p: p2p,
chain: mc,
p2p: p2p,
})
t.Run("expired context", func(t *testing.T) {
ctx, cancel := context.WithCancel(ctx)
queue := newBlocksQueue(ctx, &blocksQueueConfig{
blocksFetcher: fetcher,
headFetcher: mc,
finalizationFetcher: mc,
chain: mc,
highestExpectedSlot: blockBatchLimit,
})
handlerFn := queue.onScheduleEvent(ctx)
@@ -359,8 +344,7 @@ func TestBlocksQueue_onScheduleEvent(t *testing.T) {
t.Run("invalid input state", func(t *testing.T) {
queue := newBlocksQueue(ctx, &blocksQueueConfig{
blocksFetcher: fetcher,
headFetcher: mc,
finalizationFetcher: mc,
chain: mc,
highestExpectedSlot: blockBatchLimit,
})
@@ -380,8 +364,7 @@ func TestBlocksQueue_onScheduleEvent(t *testing.T) {
t.Run("slot is too high", func(t *testing.T) {
queue := newBlocksQueue(ctx, &blocksQueueConfig{
blocksFetcher: fetcher,
headFetcher: mc,
finalizationFetcher: mc,
chain: mc,
highestExpectedSlot: blockBatchLimit,
})
@@ -397,8 +380,7 @@ func TestBlocksQueue_onScheduleEvent(t *testing.T) {
t.Run("fetcher fails scheduling", func(t *testing.T) {
queue := newBlocksQueue(ctx, &blocksQueueConfig{
blocksFetcher: fetcher,
headFetcher: mc,
finalizationFetcher: mc,
chain: mc,
highestExpectedSlot: blockBatchLimit,
})
// Cancel to make fetcher spit error when trying to schedule next FSM.
@@ -415,8 +397,7 @@ func TestBlocksQueue_onScheduleEvent(t *testing.T) {
t.Run("schedule next fetch ok", func(t *testing.T) {
queue := newBlocksQueue(ctx, &blocksQueueConfig{
blocksFetcher: fetcher,
headFetcher: mc,
finalizationFetcher: mc,
chain: mc,
highestExpectedSlot: blockBatchLimit,
})
handlerFn := queue.onScheduleEvent(ctx)
@@ -435,17 +416,15 @@ func TestBlocksQueue_onDataReceivedEvent(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
fetcher := newBlocksFetcher(ctx, &blocksFetcherConfig{
headFetcher: mc,
finalizationFetcher: mc,
p2p: p2p,
chain: mc,
p2p: p2p,
})
t.Run("expired context", func(t *testing.T) {
ctx, cancel := context.WithCancel(ctx)
queue := newBlocksQueue(ctx, &blocksQueueConfig{
blocksFetcher: fetcher,
headFetcher: mc,
finalizationFetcher: mc,
chain: mc,
highestExpectedSlot: blockBatchLimit,
})
handlerFn := queue.onDataReceivedEvent(ctx)
@@ -460,8 +439,7 @@ func TestBlocksQueue_onDataReceivedEvent(t *testing.T) {
t.Run("invalid input state", func(t *testing.T) {
queue := newBlocksQueue(ctx, &blocksQueueConfig{
blocksFetcher: fetcher,
headFetcher: mc,
finalizationFetcher: mc,
chain: mc,
highestExpectedSlot: blockBatchLimit,
})
@@ -481,8 +459,7 @@ func TestBlocksQueue_onDataReceivedEvent(t *testing.T) {
t.Run("invalid input param", func(t *testing.T) {
queue := newBlocksQueue(ctx, &blocksQueueConfig{
blocksFetcher: fetcher,
headFetcher: mc,
finalizationFetcher: mc,
chain: mc,
highestExpectedSlot: blockBatchLimit,
})
@@ -497,8 +474,7 @@ func TestBlocksQueue_onDataReceivedEvent(t *testing.T) {
t.Run("slot is too high do nothing", func(t *testing.T) {
queue := newBlocksQueue(ctx, &blocksQueueConfig{
blocksFetcher: fetcher,
headFetcher: mc,
finalizationFetcher: mc,
chain: mc,
highestExpectedSlot: blockBatchLimit,
})
@@ -516,8 +492,7 @@ func TestBlocksQueue_onDataReceivedEvent(t *testing.T) {
t.Run("slot is too high force re-request on previous epoch", func(t *testing.T) {
queue := newBlocksQueue(ctx, &blocksQueueConfig{
blocksFetcher: fetcher,
headFetcher: mc,
finalizationFetcher: mc,
chain: mc,
highestExpectedSlot: blockBatchLimit,
})
@@ -542,8 +517,7 @@ func TestBlocksQueue_onDataReceivedEvent(t *testing.T) {
t.Run("invalid data returned", func(t *testing.T) {
queue := newBlocksQueue(ctx, &blocksQueueConfig{
blocksFetcher: fetcher,
headFetcher: mc,
finalizationFetcher: mc,
chain: mc,
highestExpectedSlot: blockBatchLimit,
})
@@ -564,8 +538,7 @@ func TestBlocksQueue_onDataReceivedEvent(t *testing.T) {
t.Run("transition ok", func(t *testing.T) {
queue := newBlocksQueue(ctx, &blocksQueueConfig{
blocksFetcher: fetcher,
headFetcher: mc,
finalizationFetcher: mc,
chain: mc,
highestExpectedSlot: blockBatchLimit,
})
@@ -597,17 +570,15 @@ func TestBlocksQueue_onReadyToSendEvent(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
fetcher := newBlocksFetcher(ctx, &blocksFetcherConfig{
headFetcher: mc,
finalizationFetcher: mc,
p2p: p2p,
chain: mc,
p2p: p2p,
})
t.Run("expired context", func(t *testing.T) {
ctx, cancel := context.WithCancel(ctx)
queue := newBlocksQueue(ctx, &blocksQueueConfig{
blocksFetcher: fetcher,
headFetcher: mc,
finalizationFetcher: mc,
chain: mc,
highestExpectedSlot: blockBatchLimit,
})
handlerFn := queue.onReadyToSendEvent(ctx)
@@ -622,8 +593,7 @@ func TestBlocksQueue_onReadyToSendEvent(t *testing.T) {
t.Run("invalid input state", func(t *testing.T) {
queue := newBlocksQueue(ctx, &blocksQueueConfig{
blocksFetcher: fetcher,
headFetcher: mc,
finalizationFetcher: mc,
chain: mc,
highestExpectedSlot: blockBatchLimit,
})
@@ -643,8 +613,7 @@ func TestBlocksQueue_onReadyToSendEvent(t *testing.T) {
t.Run("no blocks to send", func(t *testing.T) {
queue := newBlocksQueue(ctx, &blocksQueueConfig{
blocksFetcher: fetcher,
headFetcher: mc,
finalizationFetcher: mc,
chain: mc,
highestExpectedSlot: blockBatchLimit,
})
@@ -659,14 +628,12 @@ func TestBlocksQueue_onReadyToSendEvent(t *testing.T) {
t.Run("send from the first machine", func(t *testing.T) {
fetcher := newBlocksFetcher(ctx, &blocksFetcherConfig{
headFetcher: mc,
finalizationFetcher: mc,
p2p: p2p,
chain: mc,
p2p: p2p,
})
queue := newBlocksQueue(ctx, &blocksQueueConfig{
blocksFetcher: fetcher,
headFetcher: mc,
finalizationFetcher: mc,
chain: mc,
highestExpectedSlot: blockBatchLimit,
})
queue.smm.addStateMachine(256)
@@ -686,14 +653,12 @@ func TestBlocksQueue_onReadyToSendEvent(t *testing.T) {
t.Run("previous machines are not processed - do not send", func(t *testing.T) {
fetcher := newBlocksFetcher(ctx, &blocksFetcherConfig{
headFetcher: mc,
finalizationFetcher: mc,
p2p: p2p,
chain: mc,
p2p: p2p,
})
queue := newBlocksQueue(ctx, &blocksQueueConfig{
blocksFetcher: fetcher,
headFetcher: mc,
finalizationFetcher: mc,
chain: mc,
highestExpectedSlot: blockBatchLimit,
})
queue.smm.addStateMachine(128)
@@ -719,14 +684,12 @@ func TestBlocksQueue_onReadyToSendEvent(t *testing.T) {
t.Run("previous machines are processed - send", func(t *testing.T) {
fetcher := newBlocksFetcher(ctx, &blocksFetcherConfig{
headFetcher: mc,
finalizationFetcher: mc,
p2p: p2p,
chain: mc,
p2p: p2p,
})
queue := newBlocksQueue(ctx, &blocksQueueConfig{
blocksFetcher: fetcher,
headFetcher: mc,
finalizationFetcher: mc,
chain: mc,
highestExpectedSlot: blockBatchLimit,
})
queue.smm.addStateMachine(256)
@@ -752,17 +715,15 @@ func TestBlocksQueue_onProcessSkippedEvent(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
fetcher := newBlocksFetcher(ctx, &blocksFetcherConfig{
headFetcher: mc,
finalizationFetcher: mc,
p2p: p2p,
chain: mc,
p2p: p2p,
})
t.Run("expired context", func(t *testing.T) {
ctx, cancel := context.WithCancel(ctx)
queue := newBlocksQueue(ctx, &blocksQueueConfig{
blocksFetcher: fetcher,
headFetcher: mc,
finalizationFetcher: mc,
chain: mc,
highestExpectedSlot: blockBatchLimit,
})
handlerFn := queue.onProcessSkippedEvent(ctx)
@@ -777,8 +738,7 @@ func TestBlocksQueue_onProcessSkippedEvent(t *testing.T) {
t.Run("invalid input state", func(t *testing.T) {
queue := newBlocksQueue(ctx, &blocksQueueConfig{
blocksFetcher: fetcher,
headFetcher: mc,
finalizationFetcher: mc,
chain: mc,
highestExpectedSlot: blockBatchLimit,
})
@@ -798,8 +758,7 @@ func TestBlocksQueue_onProcessSkippedEvent(t *testing.T) {
t.Run("not the last machine - do nothing", func(t *testing.T) {
queue := newBlocksQueue(ctx, &blocksQueueConfig{
blocksFetcher: fetcher,
headFetcher: mc,
finalizationFetcher: mc,
chain: mc,
highestExpectedSlot: blockBatchLimit,
})
@@ -818,8 +777,7 @@ func TestBlocksQueue_onProcessSkippedEvent(t *testing.T) {
t.Run("not the last machine - reset", func(t *testing.T) {
queue := newBlocksQueue(ctx, &blocksQueueConfig{
blocksFetcher: fetcher,
headFetcher: mc,
finalizationFetcher: mc,
chain: mc,
highestExpectedSlot: blockBatchLimit,
})
@@ -838,8 +796,7 @@ func TestBlocksQueue_onProcessSkippedEvent(t *testing.T) {
t.Run("not all machines are skipped", func(t *testing.T) {
queue := newBlocksQueue(ctx, &blocksQueueConfig{
blocksFetcher: fetcher,
headFetcher: mc,
finalizationFetcher: mc,
chain: mc,
highestExpectedSlot: blockBatchLimit,
})
@@ -858,8 +815,7 @@ func TestBlocksQueue_onProcessSkippedEvent(t *testing.T) {
t.Run("not enough peers", func(t *testing.T) {
queue := newBlocksQueue(ctx, &blocksQueueConfig{
blocksFetcher: fetcher,
headFetcher: mc,
finalizationFetcher: mc,
chain: mc,
highestExpectedSlot: blockBatchLimit,
})
@@ -888,18 +844,16 @@ func TestBlocksQueue_onProcessSkippedEvent(t *testing.T) {
{blocks: makeSequence(1, 160), finalizedEpoch: 5, headSlot: 128},
}, p.Peers())
fetcher := newBlocksFetcher(ctx, &blocksFetcherConfig{
headFetcher: mc,
finalizationFetcher: mc,
p2p: p,
chain: mc,
p2p: p,
})
queue := newBlocksQueue(ctx, &blocksQueueConfig{
blocksFetcher: fetcher,
headFetcher: mc,
finalizationFetcher: mc,
chain: mc,
highestExpectedSlot: blockBatchLimit,
})
startSlot := queue.headFetcher.HeadSlot()
startSlot := queue.chain.HeadSlot()
blocksPerRequest := queue.blocksFetcher.blocksPerSecond
for i := startSlot; i < startSlot+blocksPerRequest*lookaheadSteps; i += blocksPerRequest {
queue.smm.addStateMachine(i).setState(stateSkipped)
@@ -917,19 +871,17 @@ func TestBlocksQueue_onProcessSkippedEvent(t *testing.T) {
{blocks: makeSequence(500, 628), finalizedEpoch: 16, headSlot: 600},
}, p.Peers())
fetcher := newBlocksFetcher(ctx, &blocksFetcherConfig{
headFetcher: mc,
finalizationFetcher: mc,
p2p: p,
chain: mc,
p2p: p,
})
queue := newBlocksQueue(ctx, &blocksQueueConfig{
blocksFetcher: fetcher,
headFetcher: mc,
finalizationFetcher: mc,
chain: mc,
highestExpectedSlot: blockBatchLimit,
})
assert.Equal(t, blockBatchLimit, queue.highestExpectedSlot)
startSlot := queue.headFetcher.HeadSlot()
startSlot := queue.chain.HeadSlot()
blocksPerRequest := queue.blocksFetcher.blocksPerSecond
var machineSlots []uint64
for i := startSlot; i < startSlot+blocksPerRequest*lookaheadSteps; i += blocksPerRequest {
@@ -969,20 +921,18 @@ func TestBlocksQueue_onProcessSkippedEvent(t *testing.T) {
{blocks: makeSequence(500, 628), finalizedEpoch: 16, headSlot: 600},
}, p.Peers())
fetcher := newBlocksFetcher(ctx, &blocksFetcherConfig{
headFetcher: mc,
finalizationFetcher: mc,
p2p: p,
chain: mc,
p2p: p,
})
queue := newBlocksQueue(ctx, &blocksQueueConfig{
blocksFetcher: fetcher,
headFetcher: mc,
finalizationFetcher: mc,
chain: mc,
highestExpectedSlot: blockBatchLimit,
})
queue.mode = modeNonConstrained
assert.Equal(t, blockBatchLimit, queue.highestExpectedSlot)
startSlot := queue.headFetcher.HeadSlot()
startSlot := queue.chain.HeadSlot()
blocksPerRequest := queue.blocksFetcher.blocksPerSecond
var machineSlots []uint64
for i := startSlot; i < startSlot+blocksPerRequest*lookaheadSteps; i += blocksPerRequest {
@@ -1021,17 +971,15 @@ func TestBlocksQueue_onCheckStaleEvent(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
fetcher := newBlocksFetcher(ctx, &blocksFetcherConfig{
headFetcher: mc,
finalizationFetcher: mc,
p2p: p2p,
chain: mc,
p2p: p2p,
})
t.Run("expired context", func(t *testing.T) {
ctx, cancel := context.WithCancel(ctx)
queue := newBlocksQueue(ctx, &blocksQueueConfig{
blocksFetcher: fetcher,
headFetcher: mc,
finalizationFetcher: mc,
chain: mc,
highestExpectedSlot: blockBatchLimit,
})
handlerFn := queue.onCheckStaleEvent(ctx)
@@ -1046,8 +994,7 @@ func TestBlocksQueue_onCheckStaleEvent(t *testing.T) {
t.Run("invalid input state", func(t *testing.T) {
queue := newBlocksQueue(ctx, &blocksQueueConfig{
blocksFetcher: fetcher,
headFetcher: mc,
finalizationFetcher: mc,
chain: mc,
highestExpectedSlot: blockBatchLimit,
})
@@ -1067,8 +1014,7 @@ func TestBlocksQueue_onCheckStaleEvent(t *testing.T) {
t.Run("process non stale machine", func(t *testing.T) {
queue := newBlocksQueue(ctx, &blocksQueueConfig{
blocksFetcher: fetcher,
headFetcher: mc,
finalizationFetcher: mc,
chain: mc,
highestExpectedSlot: blockBatchLimit,
})
handlerFn := queue.onCheckStaleEvent(ctx)
@@ -1084,8 +1030,7 @@ func TestBlocksQueue_onCheckStaleEvent(t *testing.T) {
t.Run("process stale machine", func(t *testing.T) {
queue := newBlocksQueue(ctx, &blocksQueueConfig{
blocksFetcher: fetcher,
headFetcher: mc,
finalizationFetcher: mc,
chain: mc,
highestExpectedSlot: blockBatchLimit,
})
handlerFn := queue.onCheckStaleEvent(ctx)

View File

@@ -51,8 +51,8 @@ func (s *Service) roundRobinSync(genesis time.Time) error {
}
queue := newBlocksQueue(ctx, &blocksQueueConfig{
p2p: s.p2p,
headFetcher: s.chain,
finalizationFetcher: s.chain,
db: s.db,
chain: s.chain,
highestExpectedSlot: highestFinalizedSlot,
mode: modeStopOnFinalizedEpoch,
})
@@ -82,8 +82,8 @@ func (s *Service) roundRobinSync(genesis time.Time) error {
// world view on non-finalized epoch.
queue = newBlocksQueue(ctx, &blocksQueueConfig{
p2p: s.p2p,
headFetcher: s.chain,
finalizationFetcher: s.chain,
db: s.db,
chain: s.chain,
highestExpectedSlot: helpers.SlotsSince(genesis),
mode: modeNonConstrained,
})