init-sync revamp (#5148)

* fix issue with rate limiting
* force fetcher to wait for minimum peers
* adds backoff interval
* cap the max blocks requested from a peer
* queue rewritten
* adds docs to fsm
* fix visibility
* updates fsm
* fsm tests added
* optimizes queue resource allocations
* removes debug log
* replace auto-fixed comment
* fixes typo
* better handling of evil peers
* fixes test
* minor fixes to fsm
* better interface for findEpochState func
This commit is contained in:
Victor Farazdagi
2020-03-27 09:54:57 +03:00
committed by GitHub
parent 33f6c22607
commit 7ebb3c1784
9 changed files with 832 additions and 912 deletions

View File

@@ -5,6 +5,7 @@ go_library(
srcs = [
"blocks_fetcher.go",
"blocks_queue.go",
"fsm.go",
"log.go",
"round_robin.go",
"service.go",
@@ -43,6 +44,7 @@ go_test(
srcs = [
"blocks_fetcher_test.go",
"blocks_queue_test.go",
"fsm_test.go",
"round_robin_test.go",
],
embed = [":go_default_library"],

View File

@@ -5,6 +5,7 @@ import (
"context"
"fmt"
"io"
"math"
"math/rand"
"sort"
"sync"
@@ -16,14 +17,23 @@ import (
eth "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/beacon-chain/blockchain"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/flags"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
prysmsync "github.com/prysmaticlabs/prysm/beacon-chain/sync"
p2ppb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/mathutil"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/sirupsen/logrus"
"go.opencensus.io/trace"
)
const (
// maxPendingRequests limits how many concurrent fetch request one can initiate.
maxPendingRequests = 8
// peersPercentagePerRequest caps percentage of peers to be used in a request.
peersPercentagePerRequest = 0.75
)
var (
errNoPeersAvailable = errors.New("no peers available, waiting for reconnect")
errFetcherCtxIsDone = errors.New("fetcher's context is done, reinitialize")
@@ -40,6 +50,7 @@ type blocksFetcherConfig struct {
// On an incoming requests, requested block range is evenly divided
// among available peers (for fair network load distribution).
type blocksFetcher struct {
sync.Mutex
ctx context.Context
cancel context.CancelFunc
headFetcher blockchain.HeadFetcher
@@ -72,7 +83,7 @@ func newBlocksFetcher(ctx context.Context, cfg *blocksFetcherConfig) *blocksFetc
rateLimiter := leakybucket.NewCollector(
allowedBlocksPerSecond, /* rate */
allowedBlocksPerSecond, /* capacity */
false /* deleteEmptyBuckets */)
false /* deleteEmptyBuckets */)
return &blocksFetcher{
ctx: ctx,
@@ -80,8 +91,8 @@ func newBlocksFetcher(ctx context.Context, cfg *blocksFetcherConfig) *blocksFetc
headFetcher: cfg.headFetcher,
p2p: cfg.p2p,
rateLimiter: rateLimiter,
fetchRequests: make(chan *fetchRequestParams, queueMaxPendingRequests),
fetchResponses: make(chan *fetchRequestResponse, queueMaxPendingRequests),
fetchRequests: make(chan *fetchRequestParams, maxPendingRequests),
fetchResponses: make(chan *fetchRequestResponse, maxPendingRequests),
quit: make(chan struct{}),
}
}
@@ -120,6 +131,11 @@ func (f *blocksFetcher) loop() {
}()
for {
// Make sure there is are available peers before processing requests.
if _, err := f.waitForMinimumPeers(f.ctx); err != nil {
log.Error(err)
}
select {
case <-f.ctx.Done():
log.Debug("Context closed, exiting goroutine (blocks fetcher)")
@@ -221,17 +237,11 @@ func (f *blocksFetcher) collectPeerResponses(
return nil, ctx.Err()
}
peers = f.selectPeers(peers)
if len(peers) == 0 {
return nil, errNoPeersAvailable
}
// Shuffle peers to prevent a bad peer from
// stalling sync with invalid blocks.
randGenerator := rand.New(rand.NewSource(time.Now().Unix()))
randGenerator.Shuffle(len(peers), func(i, j int) {
peers[i], peers[j] = peers[j], peers[i]
})
p2pRequests := new(sync.WaitGroup)
errChan := make(chan error)
blocksChan := make(chan []*eth.SignedBeaconBlock)
@@ -249,7 +259,7 @@ func (f *blocksFetcher) collectPeerResponses(
}
// Spread load evenly among available peers.
perPeerCount := count / uint64(len(peers))
perPeerCount := mathutil.Min(count/uint64(len(peers)), allowedBlocksPerSecond)
remainder := int(count % uint64(len(peers)))
for i, pid := range peers {
start, step := start+uint64(i)*step, step*uint64(len(peers))
@@ -354,6 +364,7 @@ func (f *blocksFetcher) requestBlocks(
req *p2ppb.BeaconBlocksByRangeRequest,
pid peer.ID,
) ([]*eth.SignedBeaconBlock, error) {
f.Lock()
if f.rateLimiter.Remaining(pid.String()) < int64(req.Count) {
log.WithField("peer", pid).Debug("Slowing down for rate limit")
time.Sleep(f.rateLimiter.TillEmpty(pid.String()))
@@ -366,6 +377,7 @@ func (f *blocksFetcher) requestBlocks(
"step": req.Step,
"head": fmt.Sprintf("%#x", req.HeadBlockRoot),
}).Debug("Requesting blocks")
f.Unlock()
stream, err := f.p2p.Send(ctx, req, pid)
if err != nil {
return nil, err
@@ -407,3 +419,49 @@ func selectFailOverPeer(excludedPID peer.ID, peers []peer.ID) (peer.ID, error) {
return peers[0], nil
}
// waitForMinimumPeers spins and waits up until enough peers are available.
func (f *blocksFetcher) waitForMinimumPeers(ctx context.Context) ([]peer.ID, error) {
required := params.BeaconConfig().MaxPeersToSync
if flags.Get().MinimumSyncPeers < required {
required = flags.Get().MinimumSyncPeers
}
for {
if ctx.Err() != nil {
return nil, ctx.Err()
}
headEpoch := helpers.SlotToEpoch(f.headFetcher.HeadSlot())
_, _, peers := f.p2p.Peers().BestFinalized(params.BeaconConfig().MaxPeersToSync, headEpoch)
if len(peers) >= required {
return peers, nil
}
log.WithFields(logrus.Fields{
"suitable": len(peers),
"required": required}).Info("Waiting for enough suitable peers before syncing")
time.Sleep(handshakePollingInterval)
}
}
// selectPeers returns transformed list of peers (randomized, constrained if necessary).
func (f *blocksFetcher) selectPeers(peers []peer.ID) []peer.ID {
if len(peers) == 0 {
return peers
}
// Shuffle peers to prevent a bad peer from
// stalling sync with invalid blocks.
randGenerator := rand.New(rand.NewSource(time.Now().Unix()))
randGenerator.Shuffle(len(peers), func(i, j int) {
peers[i], peers[j] = peers[j], peers[i]
})
required := params.BeaconConfig().MaxPeersToSync
if flags.Get().MinimumSyncPeers < required {
required = flags.Get().MinimumSyncPeers
}
limit := uint64(math.Round(float64(len(peers)) * peersPercentagePerRequest))
limit = mathutil.Max(limit, uint64(required))
limit = mathutil.Min(limit, uint64(len(peers)))
return peers[:limit]
}

View File

@@ -97,7 +97,7 @@ func TestBlocksFetcherRoundRobin(t *testing.T) {
}{
{
name: "Single peer with all blocks",
expectedBlockSlots: makeSequence(1, 128), // up to 4th epoch
expectedBlockSlots: makeSequence(1, 3*blockBatchSize),
peers: []*peerData{
{
blocks: makeSequence(1, 131),
@@ -122,7 +122,7 @@ func TestBlocksFetcherRoundRobin(t *testing.T) {
},
{
name: "Single peer with all blocks (many small requests)",
expectedBlockSlots: makeSequence(1, 128), // up to 4th epoch
expectedBlockSlots: makeSequence(1, 80),
peers: []*peerData{
{
blocks: makeSequence(1, 131),
@@ -155,7 +155,7 @@ func TestBlocksFetcherRoundRobin(t *testing.T) {
},
{
name: "Multiple peers with all blocks",
expectedBlockSlots: makeSequence(1, 128), // up to 4th epoch
expectedBlockSlots: makeSequence(1, 96), // up to 4th epoch
peers: []*peerData{
{
blocks: makeSequence(1, 131),
@@ -218,6 +218,16 @@ func TestBlocksFetcherRoundRobin(t *testing.T) {
finalizedEpoch: 18,
headSlot: 640,
},
{
blocks: append(makeSequence(1, 64), makeSequence(500, 640)...),
finalizedEpoch: 18,
headSlot: 640,
},
{
blocks: append(makeSequence(1, 64), makeSequence(500, 640)...),
finalizedEpoch: 18,
headSlot: 640,
},
},
requests: []*fetchRequestParams{
{
@@ -233,8 +243,8 @@ func TestBlocksFetcherRoundRobin(t *testing.T) {
count: blockBatchSize,
},
{
start: 400,
count: 150,
start: 500,
count: 53,
},
{
start: 553,

View File

@@ -3,30 +3,31 @@ package initialsync
import (
"context"
"errors"
"sync"
"time"
eth "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/beacon-chain/blockchain"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
"github.com/prysmaticlabs/prysm/shared/mathutil"
"go.opencensus.io/trace"
"github.com/prysmaticlabs/prysm/shared/params"
)
const (
// queueMaxPendingRequests limits how many concurrent fetch request queue can initiate.
queueMaxPendingRequests = 8
// queueFetchRequestTimeout caps maximum amount of time before fetch requests is cancelled.
queueFetchRequestTimeout = 60 * time.Second
// queueMaxCachedBlocks hard limit on how many queue items to cache before forced dequeue.
queueMaxCachedBlocks = 8 * queueMaxPendingRequests * blockBatchSize
// queueStopCallTimeout is time allowed for queue to release resources when quitting.
queueStopCallTimeout = 1 * time.Second
// pollingInterval defines how often state machine needs to check for new events.
pollingInterval = 200 * time.Millisecond
// staleEpochTimeout is an period after which epoch's state is considered stale.
staleEpochTimeout = 5 * pollingInterval
// lookaheadEpochs is a default limit on how many forward epochs are loaded into queue.
lookaheadEpochs = 4
)
var (
errQueueCtxIsDone = errors.New("queue's context is done, reinitialize")
errQueueTakesTooLongToStop = errors.New("queue takes too long to stop")
errNoEpochState = errors.New("epoch state not found")
)
// blocksProvider exposes enough methods for queue to fetch incoming blocks.
@@ -46,60 +47,17 @@ type blocksQueueConfig struct {
p2p p2p.P2P
}
// blocksQueueState holds internal queue state (for easier management of state transitions).
type blocksQueueState struct {
scheduler *schedulerState
sender *senderState
cachedBlocks map[uint64]*cachedBlock
}
// blockState enums possible queue block states.
type blockState uint8
const (
// pendingBlock is a default block status when just added to queue.
pendingBlock = iota
// validBlock represents block that can be processed.
validBlock
// skippedBlock is a block for a slot that is not found on any available peers.
skippedBlock
// failedBlock represents block that can not be processed at the moment.
failedBlock
// blockStateLen is a sentinel to know number of possible block states.
blockStateLen
)
// schedulerState a state of scheduling process.
type schedulerState struct {
sync.Mutex
currentSlot uint64
blockBatchSize uint64
requestedBlocks map[blockState]uint64
}
// senderState is a state of block sending process.
type senderState struct {
sync.Mutex
}
// cachedBlock is a container for signed beacon block.
type cachedBlock struct {
*eth.SignedBeaconBlock
}
// blocksQueue is a priority queue that serves as a intermediary between block fetchers (producers)
// and block processing goroutine (consumer). Consumer can rely on order of incoming blocks.
type blocksQueue struct {
ctx context.Context
cancel context.CancelFunc
highestExpectedSlot uint64
state *blocksQueueState
blocksFetcher blocksProvider
headFetcher blockchain.HeadFetcher
fetchedBlocks chan *eth.SignedBeaconBlock // output channel for ready blocks
pendingFetchRequests chan struct{} // pending requests semaphore
pendingFetchedBlocks chan struct{} // notifier, pings block sending handler
quit chan struct{} // termination notifier
ctx context.Context
cancel context.CancelFunc
highestExpectedSlot uint64
state *stateMachine
blocksFetcher blocksProvider
headFetcher blockchain.HeadFetcher
fetchedBlocks chan *eth.SignedBeaconBlock // output channel for ready blocks
quit chan struct{} // termination notifier
}
// newBlocksQueue creates initialized priority queue.
@@ -114,26 +72,25 @@ func newBlocksQueue(ctx context.Context, cfg *blocksQueueConfig) *blocksQueue {
})
}
return &blocksQueue{
queue := &blocksQueue{
ctx: ctx,
cancel: cancel,
highestExpectedSlot: cfg.highestExpectedSlot,
state: &blocksQueueState{
scheduler: &schedulerState{
currentSlot: cfg.startSlot,
blockBatchSize: blockBatchSize,
requestedBlocks: make(map[blockState]uint64, blockStateLen),
},
sender: &senderState{},
cachedBlocks: make(map[uint64]*cachedBlock, queueMaxCachedBlocks),
},
blocksFetcher: blocksFetcher,
headFetcher: cfg.headFetcher,
fetchedBlocks: make(chan *eth.SignedBeaconBlock, blockBatchSize),
pendingFetchRequests: make(chan struct{}, queueMaxPendingRequests),
pendingFetchedBlocks: make(chan struct{}, queueMaxPendingRequests),
quit: make(chan struct{}),
blocksFetcher: blocksFetcher,
headFetcher: cfg.headFetcher,
fetchedBlocks: make(chan *eth.SignedBeaconBlock, allowedBlocksPerSecond),
quit: make(chan struct{}),
}
// Configure state machine.
queue.state = newStateMachine()
queue.state.addHandler(stateNew, eventSchedule, queue.onScheduleEvent(ctx))
queue.state.addHandler(stateScheduled, eventDataReceived, queue.onDataReceivedEvent(ctx))
queue.state.addHandler(stateDataParsed, eventReadyToSend, queue.onReadyToSendEvent(ctx))
queue.state.addHandler(stateSkipped, eventExtendWindow, queue.onExtendWindowEvent(ctx))
queue.state.addHandler(stateSent, eventCheckStale, queue.onCheckStaleEvent(ctx))
return queue
}
// start boots up the queue processing.
@@ -162,10 +119,7 @@ func (q *blocksQueue) stop() error {
func (q *blocksQueue) loop() {
defer close(q.quit)
// Wait for all goroutines to wrap up (forced by cancelled context), and do a cleanup.
wg := &sync.WaitGroup{}
defer func() {
wg.Wait()
q.blocksFetcher.stop()
close(q.fetchedBlocks)
}()
@@ -174,14 +128,16 @@ func (q *blocksQueue) loop() {
log.WithError(err).Debug("Can not start blocks provider")
}
// Reads from semaphore channel, thus allowing next goroutine to grab it and schedule next request.
releaseTicket := func() {
select {
case <-q.ctx.Done():
case <-q.pendingFetchRequests:
}
startEpoch := helpers.SlotToEpoch(q.headFetcher.HeadSlot())
slotsPerEpoch := params.BeaconConfig().SlotsPerEpoch
// Define epoch states as finite state machines.
for i := startEpoch; i < startEpoch+lookaheadEpochs; i++ {
q.state.addEpochState(i)
}
ticker := time.NewTicker(pollingInterval)
tickerEvents := []eventID{eventSchedule, eventReadyToSend, eventCheckStale, eventExtendWindow}
for {
if q.headFetcher.HeadSlot() >= q.highestExpectedSlot {
log.Debug("Highest expected slot reached")
@@ -189,229 +145,214 @@ func (q *blocksQueue) loop() {
}
select {
case <-q.ctx.Done():
log.Debug("Context closed, exiting goroutine (blocks queue)")
return
case q.pendingFetchRequests <- struct{}{}:
wg.Add(1)
go func() {
defer wg.Done()
// Schedule request.
if err := q.scheduleFetchRequests(q.ctx); err != nil {
q.state.scheduler.incrementCounter(failedBlock, blockBatchSize)
releaseTicket()
case <-ticker.C:
for _, state := range q.state.epochs {
data := &fetchRequestParams{
start: helpers.StartSlot(state.epoch),
count: slotsPerEpoch,
}
}()
// Trigger events on each epoch's state machine.
for _, event := range tickerEvents {
if err := q.state.trigger(event, state.epoch, data); err != nil {
log.WithError(err).Debug("Can not trigger event")
}
}
// Do garbage collection, and advance sliding window forward.
if q.headFetcher.HeadSlot() >= helpers.StartSlot(state.epoch+1) {
highestEpochSlot, err := q.state.highestEpochSlot()
if err != nil {
log.WithError(err).Debug("Cannot obtain highest epoch state number")
continue
}
if err := q.state.removeEpochState(state.epoch); err != nil {
log.WithError(err).Debug("Can not remove epoch state")
}
if len(q.state.epochs) < lookaheadEpochs {
q.state.addEpochState(highestEpochSlot + 1)
}
}
}
case response, ok := <-q.blocksFetcher.requestResponses():
if !ok {
log.Debug("Fetcher closed output channel")
q.cancel()
return
}
// Release semaphore ticket.
go releaseTicket()
// Process incoming response into blocks.
wg.Add(1)
go func() {
defer func() {
select {
case <-q.ctx.Done():
case q.pendingFetchedBlocks <- struct{}{}: // notify sender of data availability
}
wg.Done()
}()
skippedBlocks, err := q.parseFetchResponse(q.ctx, response)
if err != nil {
q.state.scheduler.incrementCounter(failedBlock, response.count)
return
// Update state of an epoch for which data is received.
epoch := helpers.SlotToEpoch(response.start)
if ind, ok := q.state.findEpochState(epoch); ok {
state := q.state.epochs[ind]
if err := q.state.trigger(eventDataReceived, state.epoch, response); err != nil {
log.WithError(err).Debug("Can not trigger event")
state.setState(stateNew)
continue
}
q.state.scheduler.incrementCounter(skippedBlock, skippedBlocks)
}()
case <-q.pendingFetchedBlocks:
wg.Add(1)
go func() {
defer wg.Done()
if err := q.sendFetchedBlocks(q.ctx); err != nil {
log.WithError(err).Debug("Error sending received blocks")
}
}()
}
}
}
// scheduleFetchRequests enqueues block fetch requests to block fetcher.
func (q *blocksQueue) scheduleFetchRequests(ctx context.Context) error {
q.state.scheduler.Lock()
defer q.state.scheduler.Unlock()
if ctx.Err() != nil {
return ctx.Err()
}
s := q.state.scheduler
blocks := q.state.scheduler.requestedBlocks
func() {
resetStateCounters := func() {
for i := 0; i < blockStateLen; i++ {
blocks[blockState(i)] = 0
}
s.currentSlot = q.headFetcher.HeadSlot()
}
// Update state's current slot pointer.
count := blocks[pendingBlock] + blocks[skippedBlock] + blocks[failedBlock] + blocks[validBlock]
if count == 0 {
s.currentSlot = q.headFetcher.HeadSlot()
case <-q.ctx.Done():
log.Debug("Context closed, exiting goroutine (blocks queue)")
ticker.Stop()
return
}
// Too many failures (blocks that can't be processed at this time).
if blocks[failedBlock] >= s.blockBatchSize/2 {
s.blockBatchSize *= 2
resetStateCounters()
return
}
// Given enough valid blocks, we can set back block batch size.
if blocks[validBlock] >= blockBatchSize && s.blockBatchSize != blockBatchSize {
blocks[skippedBlock], blocks[validBlock] = blocks[skippedBlock]+blocks[validBlock], 0
s.blockBatchSize = blockBatchSize
}
// Too many items in scheduler, time to update current slot to point to current head's slot.
count = blocks[pendingBlock] + blocks[skippedBlock] + blocks[failedBlock] + blocks[validBlock]
if count >= queueMaxCachedBlocks {
s.blockBatchSize = blockBatchSize
resetStateCounters()
return
}
// All blocks processed, no pending blocks.
count = blocks[skippedBlock] + blocks[failedBlock] + blocks[validBlock]
if count > 0 && blocks[pendingBlock] == 0 {
s.blockBatchSize = blockBatchSize
resetStateCounters()
return
}
}()
offset := blocks[pendingBlock] + blocks[skippedBlock] + blocks[failedBlock] + blocks[validBlock]
start := q.state.scheduler.currentSlot + offset + 1
count := mathutil.Min(q.state.scheduler.blockBatchSize, q.highestExpectedSlot-start+1)
if count <= 0 {
return errStartSlotIsTooHigh
}
ctx, _ = context.WithTimeout(ctx, queueFetchRequestTimeout)
if err := q.blocksFetcher.scheduleRequest(ctx, start, count); err != nil {
return err
}
q.state.scheduler.requestedBlocks[pendingBlock] += count
return nil
}
// parseFetchResponse processes incoming responses.
func (q *blocksQueue) parseFetchResponse(ctx context.Context, response *fetchRequestResponse) (uint64, error) {
q.state.sender.Lock()
defer q.state.sender.Unlock()
if ctx.Err() != nil {
return 0, ctx.Err()
}
if response.err != nil {
return 0, response.err
}
// Extract beacon blocks.
responseBlocks := make(map[uint64]*eth.SignedBeaconBlock, len(response.blocks))
for _, blk := range response.blocks {
responseBlocks[blk.Block.Slot] = blk
}
// Cache blocks in [start, start + count) range, include skipped blocks.
var skippedBlocks uint64
end := response.start + mathutil.Max(response.count, uint64(len(response.blocks)))
for slot := response.start; slot < end; slot++ {
if block, ok := responseBlocks[slot]; ok {
q.state.cachedBlocks[slot] = &cachedBlock{
SignedBeaconBlock: block,
}
delete(responseBlocks, slot)
continue
// onScheduleEvent is an event called on newly arrived epochs. Transforms state to scheduled.
func (q *blocksQueue) onScheduleEvent(ctx context.Context) eventHandlerFn {
return func(es *epochState, in interface{}) (stateID, error) {
data := in.(*fetchRequestParams)
start := data.start
count := mathutil.Min(data.count, q.highestExpectedSlot-start+1)
if count <= 0 {
return es.state, errStartSlotIsTooHigh
}
q.state.cachedBlocks[slot] = &cachedBlock{}
skippedBlocks++
}
// If there are any items left in incoming response, cache them too.
for slot, block := range responseBlocks {
q.state.cachedBlocks[slot] = &cachedBlock{
SignedBeaconBlock: block,
if err := q.blocksFetcher.scheduleRequest(ctx, start, count); err != nil {
return es.state, err
}
return stateScheduled, nil
}
return skippedBlocks, nil
}
// sendFetchedBlocks analyses available blocks, and sends them downstream in a correct slot order.
// Blocks are checked starting from the current head slot, and up until next consecutive block is available.
func (q *blocksQueue) sendFetchedBlocks(ctx context.Context) error {
q.state.sender.Lock()
defer q.state.sender.Unlock()
ctx, span := trace.StartSpan(ctx, "initialsync.sendFetchedBlocks")
defer span.End()
startSlot := q.headFetcher.HeadSlot() + 1
nonSkippedSlot := uint64(0)
for slot := startSlot; slot <= q.highestExpectedSlot; slot++ {
// onDataReceivedEvent is an event called when data is received from fetcher.
func (q *blocksQueue) onDataReceivedEvent(ctx context.Context) eventHandlerFn {
return func(es *epochState, in interface{}) (stateID, error) {
if ctx.Err() != nil {
return ctx.Err()
return es.state, ctx.Err()
}
blockData, ok := q.state.cachedBlocks[slot]
response := in.(*fetchRequestResponse)
epoch := helpers.SlotToEpoch(response.start)
if response.err != nil {
// Current window is already too big, re-request previous epochs.
if response.err == errStartSlotIsTooHigh {
for _, state := range q.state.epochs {
isSkipped := state.state == stateSkipped || state.state == stateSkippedExt
if state.epoch < epoch && isSkipped {
state.setState(stateNew)
}
}
}
return es.state, response.err
}
ind, ok := q.state.findEpochState(epoch)
if !ok {
break
}
if blockData.SignedBeaconBlock != nil && blockData.Block != nil {
select {
case <-ctx.Done():
return ctx.Err()
case q.fetchedBlocks <- blockData.SignedBeaconBlock:
}
nonSkippedSlot = slot
return es.state, errNoEpochState
}
q.state.epochs[ind].blocks = response.blocks
return stateDataParsed, nil
}
// Remove processed blocks.
if nonSkippedSlot > 0 {
for slot := range q.state.cachedBlocks {
if slot <= nonSkippedSlot {
delete(q.state.cachedBlocks, slot)
}
}
}
return nil
}
// incrementCounter increments particular scheduler counter.
func (s *schedulerState) incrementCounter(counter blockState, n uint64) {
s.Lock()
defer s.Unlock()
// onReadyToSendEvent is an event called to allow epochs with available blocks to send them downstream.
func (q *blocksQueue) onReadyToSendEvent(ctx context.Context) eventHandlerFn {
return func(es *epochState, in interface{}) (stateID, error) {
if ctx.Err() != nil {
return es.state, ctx.Err()
}
// Assert that counter is within acceptable boundaries.
if counter < 1 || counter >= blockStateLen {
return
data := in.(*fetchRequestParams)
epoch := helpers.SlotToEpoch(data.start)
ind, ok := q.state.findEpochState(epoch)
if !ok {
return es.state, errNoEpochState
}
if len(q.state.epochs[ind].blocks) == 0 {
return stateSkipped, nil
}
send := func() (stateID, error) {
for _, block := range q.state.epochs[ind].blocks {
select {
case <-ctx.Done():
return es.state, ctx.Err()
case q.fetchedBlocks <- block:
}
}
return stateSent, nil
}
// Make sure that we send epochs in a correct order.
if q.state.isLowestEpochState(epoch) {
return send()
}
// Make sure that previous epoch is already processed.
for _, state := range q.state.epochs {
// Review only previous slots.
if state.epoch < epoch {
switch state.state {
case stateNew, stateScheduled, stateDataParsed:
return es.state, nil
default:
}
}
}
return send()
}
}
// onExtendWindowEvent is and event allowing handlers to extend sliding window,
// in case where progress is not possible otherwise.
func (q *blocksQueue) onExtendWindowEvent(ctx context.Context) eventHandlerFn {
return func(es *epochState, in interface{}) (stateID, error) {
if ctx.Err() != nil {
return es.state, ctx.Err()
}
data := in.(*fetchRequestParams)
epoch := helpers.SlotToEpoch(data.start)
if _, ok := q.state.findEpochState(epoch); !ok {
return es.state, errNoEpochState
}
// Only the highest epoch with skipped state can trigger extension.
highestEpochSlot, err := q.state.highestEpochSlot()
if err != nil {
return es.state, err
}
if highestEpochSlot != epoch {
return es.state, nil
}
// Check if window is expanded recently, if so, time to reset and re-request the same blocks.
resetWindow := false
for _, state := range q.state.epochs {
if state.state == stateSkippedExt {
resetWindow = true
break
}
}
if resetWindow {
for _, state := range q.state.epochs {
state.setState(stateNew)
}
return stateNew, nil
}
// Extend sliding window.
for i := 1; i <= lookaheadEpochs; i++ {
q.state.addEpochState(highestEpochSlot + uint64(i))
}
return stateSkippedExt, nil
}
}
// onCheckStaleEvent is an event that allows to mark stale epochs,
// so that they can be re-processed.
func (q *blocksQueue) onCheckStaleEvent(ctx context.Context) eventHandlerFn {
return func(es *epochState, in interface{}) (stateID, error) {
if ctx.Err() != nil {
return es.state, ctx.Err()
}
if time.Since(es.updated) > staleEpochTimeout {
return stateSkipped, nil
}
return es.state, nil
}
n = mathutil.Min(s.requestedBlocks[pendingBlock], n)
s.requestedBlocks[counter] += n
s.requestedBlocks[pendingBlock] -= n
}

View File

@@ -170,605 +170,6 @@ func TestBlocksQueueInitStartStop(t *testing.T) {
})
}
func TestBlocksQueueUpdateSchedulerState(t *testing.T) {
chainConfig := struct {
expectedBlockSlots []uint64
peers []*peerData
}{
expectedBlockSlots: makeSequence(1, 241),
peers: []*peerData{},
}
mc, _, beaconDB := initializeTestServices(t, chainConfig.expectedBlockSlots, chainConfig.peers)
defer dbtest.TeardownDB(t, beaconDB)
setupQueue := func(ctx context.Context) *blocksQueue {
queue := newBlocksQueue(ctx, &blocksQueueConfig{
blocksFetcher: &blocksProviderMock{},
headFetcher: mc,
highestExpectedSlot: uint64(len(chainConfig.expectedBlockSlots)),
})
return queue
}
t.Run("cancelled context", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
queue := setupQueue(ctx)
cancel()
if err := assertState(queue.state.scheduler, 0, 0, 0, 0); err != nil {
t.Error(err)
}
if err := queue.scheduleFetchRequests(ctx); err != ctx.Err() {
t.Errorf("expected error: %v", ctx.Err())
}
})
t.Run("empty state on pristine node", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
queue := setupQueue(ctx)
state := queue.state.scheduler
if err := assertState(queue.state.scheduler, 0, 0, 0, 0); err != nil {
t.Error(err)
}
if err := queue.scheduleFetchRequests(ctx); err != nil {
t.Error(err)
}
if state.currentSlot != 0 {
t.Errorf("invalid current slot, want: %v, got: %v", 0, state.currentSlot)
}
})
t.Run("empty state on pre-synced node", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
queue := setupQueue(ctx)
state := queue.state.scheduler
syncToSlot := uint64(7)
setBlocksFromCache(ctx, t, mc, syncToSlot)
if err := assertState(queue.state.scheduler, 0, 0, 0, 0); err != nil {
t.Error(err)
}
if err := queue.scheduleFetchRequests(ctx); err != nil {
t.Error(err)
}
if state.currentSlot != syncToSlot {
t.Errorf("invalid current slot, want: %v, got: %v", syncToSlot, state.currentSlot)
}
})
t.Run("reset block batch size to default", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
queue := setupQueue(ctx)
state := queue.state.scheduler
if err := assertState(queue.state.scheduler, 0, 0, 0, 0); err != nil {
t.Error(err)
}
// On enough valid blocks, batch size should get back to default value.
state.blockBatchSize *= 2
state.requestedBlocks[validBlock] = blockBatchSize
state.requestedBlocks[pendingBlock] = 13
state.requestedBlocks[skippedBlock] = 17
state.requestedBlocks[failedBlock] = 19
if err := assertState(queue.state.scheduler, 13, blockBatchSize, 17, 19); err != nil {
t.Error(err)
}
if state.blockBatchSize != 2*blockBatchSize {
t.Errorf("unexpected batch size, want: %v, got: %v", 2*blockBatchSize, state.blockBatchSize)
}
if err := queue.scheduleFetchRequests(ctx); err != nil {
t.Error(err)
}
if err := assertState(queue.state.scheduler, 13+state.blockBatchSize, 0, 17+blockBatchSize, 19); err != nil {
t.Error(err)
}
if state.blockBatchSize != blockBatchSize {
t.Errorf("unexpected batch size, want: %v, got: %v", blockBatchSize, state.blockBatchSize)
}
})
t.Run("increase block batch size on too many failures", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
queue := setupQueue(ctx)
state := queue.state.scheduler
if err := assertState(queue.state.scheduler, 0, 0, 0, 0); err != nil {
t.Error(err)
}
// On too many failures, batch size should get doubled and counters reset.
state.requestedBlocks[validBlock] = 19
state.requestedBlocks[pendingBlock] = 13
state.requestedBlocks[skippedBlock] = 17
state.requestedBlocks[failedBlock] = blockBatchSize
if err := assertState(queue.state.scheduler, 13, 19, 17, blockBatchSize); err != nil {
t.Error(err)
}
if state.blockBatchSize != blockBatchSize {
t.Errorf("unexpected batch size, want: %v, got: %v", blockBatchSize, state.blockBatchSize)
}
if err := queue.scheduleFetchRequests(ctx); err != nil {
t.Error(err)
}
if state.blockBatchSize != 2*blockBatchSize {
t.Errorf("unexpected batch size, want: %v, got: %v", 2*blockBatchSize, state.blockBatchSize)
}
if err := assertState(queue.state.scheduler, state.blockBatchSize, 0, 0, 0); err != nil {
t.Error(err)
}
})
t.Run("reset counters and block batch size on too many cached items", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
queue := setupQueue(ctx)
state := queue.state.scheduler
if err := assertState(queue.state.scheduler, 0, 0, 0, 0); err != nil {
t.Error(err)
}
// On too many cached items, batch size and counters should reset.
state.requestedBlocks[validBlock] = queueMaxCachedBlocks
state.requestedBlocks[pendingBlock] = 13
state.requestedBlocks[skippedBlock] = 17
state.requestedBlocks[failedBlock] = 19
if err := assertState(queue.state.scheduler, 13, queueMaxCachedBlocks, 17, 19); err != nil {
t.Error(err)
}
if state.blockBatchSize != blockBatchSize {
t.Errorf("unexpected batch size, want: %v, got: %v", blockBatchSize, state.blockBatchSize)
}
// This call should trigger resetting.
if err := queue.scheduleFetchRequests(ctx); err != nil {
t.Error(err)
}
if state.blockBatchSize != blockBatchSize {
t.Errorf("unexpected batch size, want: %v, got: %v", blockBatchSize, state.blockBatchSize)
}
if err := assertState(queue.state.scheduler, state.blockBatchSize, 0, 0, 0); err != nil {
t.Error(err)
}
})
t.Run("no pending blocks left", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
queue := setupQueue(ctx)
state := queue.state.scheduler
if err := assertState(queue.state.scheduler, 0, 0, 0, 0); err != nil {
t.Error(err)
}
// On too many cached items, batch size and counters should reset.
state.blockBatchSize = 2 * blockBatchSize
state.requestedBlocks[pendingBlock] = 0
state.requestedBlocks[validBlock] = 1
state.requestedBlocks[skippedBlock] = 1
state.requestedBlocks[failedBlock] = 1
if err := assertState(queue.state.scheduler, 0, 1, 1, 1); err != nil {
t.Error(err)
}
if state.blockBatchSize != 2*blockBatchSize {
t.Errorf("unexpected batch size, want: %v, got: %v", 2*blockBatchSize, state.blockBatchSize)
}
// This call should trigger resetting.
if err := queue.scheduleFetchRequests(ctx); err != nil {
t.Error(err)
}
if state.blockBatchSize != blockBatchSize {
t.Errorf("unexpected batch size, want: %v, got: %v", blockBatchSize, state.blockBatchSize)
}
if err := assertState(queue.state.scheduler, state.blockBatchSize, 0, 0, 0); err != nil {
t.Error(err)
}
})
}
func TestBlocksQueueScheduleFetchRequests(t *testing.T) {
chainConfig := struct {
expectedBlockSlots []uint64
peers []*peerData
}{
expectedBlockSlots: makeSequence(1, 241),
peers: []*peerData{
{
blocks: makeSequence(1, 320),
finalizedEpoch: 8,
headSlot: 320,
},
{
blocks: makeSequence(1, 320),
finalizedEpoch: 8,
headSlot: 320,
},
},
}
mc, _, beaconDB := initializeTestServices(t, chainConfig.expectedBlockSlots, chainConfig.peers)
defer dbtest.TeardownDB(t, beaconDB)
setupQueue := func(ctx context.Context) *blocksQueue {
queue := newBlocksQueue(ctx, &blocksQueueConfig{
blocksFetcher: &blocksProviderMock{},
headFetcher: mc,
highestExpectedSlot: uint64(len(chainConfig.expectedBlockSlots)),
})
return queue
}
t.Run("check start/count boundaries", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
queue := setupQueue(ctx)
state := queue.state.scheduler
// Move sliding window normally.
if err := assertState(state, 0, 0, 0, 0); err != nil {
t.Error(err)
}
end := queue.highestExpectedSlot / state.blockBatchSize
for i := uint64(0); i < end; i++ {
if err := queue.scheduleFetchRequests(ctx); err != nil {
t.Error(err)
}
if err := assertState(state, (i+1)*blockBatchSize, 0, 0, 0); err != nil {
t.Error(err)
}
}
// Make sure that the last request is up to highest expected slot.
if err := queue.scheduleFetchRequests(ctx); err != nil {
t.Error(err)
}
if err := assertState(state, queue.highestExpectedSlot, 0, 0, 0); err != nil {
t.Error(err)
}
// Try schedule beyond the highest slot.
if err := queue.scheduleFetchRequests(ctx); err == nil {
t.Errorf("expected error: %v", errStartSlotIsTooHigh)
}
if err := assertState(state, queue.highestExpectedSlot, 0, 0, 0); err != nil {
t.Error(err)
}
})
t.Run("too many failures", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
queue := setupQueue(ctx)
state := queue.state.scheduler
// Schedule enough items.
if err := assertState(state, 0, 0, 0, 0); err != nil {
t.Error(err)
}
end := queue.highestExpectedSlot / state.blockBatchSize
for i := uint64(0); i < end; i++ {
if err := queue.scheduleFetchRequests(ctx); err != nil {
t.Error(err)
}
if err := assertState(state, (i+1)*blockBatchSize, 0, 0, 0); err != nil {
t.Error(err)
}
}
// "Process" some items and reschedule.
if err := assertState(state, end*blockBatchSize, 0, 0, 0); err != nil {
t.Error(err)
}
state.incrementCounter(failedBlock, 25)
if err := assertState(state, end*blockBatchSize-25, 0, 0, 25); err != nil {
t.Error(err)
}
state.incrementCounter(failedBlock, 500) // too high value shouldn't cause issues
if err := assertState(state, 0, 0, 0, end*blockBatchSize); err != nil {
t.Error(err)
}
// Due to failures, resetting is expected.
if err := queue.scheduleFetchRequests(ctx); err != nil {
t.Error(err)
}
if err := assertState(state, 2*blockBatchSize, 0, 0, 0); err != nil {
t.Error(err)
}
if state.blockBatchSize != 2*blockBatchSize {
t.Errorf("unexpeced block batch size, want: %v, got: %v", 2*blockBatchSize, state.blockBatchSize)
}
})
t.Run("too many skipped", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
queue := setupQueue(ctx)
state := queue.state.scheduler
// Schedule enough items.
if err := assertState(state, 0, 0, 0, 0); err != nil {
t.Error(err)
}
end := queue.highestExpectedSlot / state.blockBatchSize
for i := uint64(0); i < end; i++ {
if err := queue.scheduleFetchRequests(ctx); err != nil {
t.Error(err)
}
if err := assertState(state, (i+1)*blockBatchSize, 0, 0, 0); err != nil {
t.Error(err)
}
}
// "Process" some items and reschedule.
if err := assertState(state, end*blockBatchSize, 0, 0, 0); err != nil {
t.Error(err)
}
state.incrementCounter(skippedBlock, 25)
if err := assertState(state, end*blockBatchSize-25, 0, 25, 0); err != nil {
t.Error(err)
}
state.incrementCounter(skippedBlock, 500) // too high value shouldn't cause issues
if err := assertState(state, 0, 0, end*blockBatchSize, 0); err != nil {
t.Error(err)
}
// No pending items, resetting is expected (both counters and block batch size).
state.blockBatchSize = 2 * blockBatchSize
if err := queue.scheduleFetchRequests(ctx); err != nil {
t.Error(err)
}
if err := assertState(state, blockBatchSize, 0, 0, 0); err != nil {
t.Error(err)
}
if state.blockBatchSize != blockBatchSize {
t.Errorf("unexpeced block batch size, want: %v, got: %v", blockBatchSize, state.blockBatchSize)
}
})
t.Run("reset block batch size", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
queue := setupQueue(ctx)
state := queue.state.scheduler
state.requestedBlocks[failedBlock] = blockBatchSize
// Increase block batch size.
if err := queue.scheduleFetchRequests(ctx); err != nil {
t.Error(err)
}
if err := assertState(state, 2*blockBatchSize, 0, 0, 0); err != nil {
t.Error(err)
}
if state.blockBatchSize != 2*blockBatchSize {
t.Errorf("unexpeced block batch size, want: %v, got: %v", 2*blockBatchSize, state.blockBatchSize)
}
// Reset block batch size.
state.requestedBlocks[validBlock] = blockBatchSize
state.requestedBlocks[pendingBlock] = 1
state.requestedBlocks[failedBlock] = 1
state.requestedBlocks[skippedBlock] = 1
if err := assertState(state, 1, blockBatchSize, 1, 1); err != nil {
t.Error(err)
}
if err := queue.scheduleFetchRequests(ctx); err != nil {
t.Error(err)
}
if err := assertState(state, blockBatchSize+1, 0, blockBatchSize+1, 1); err != nil {
t.Error(err)
}
if state.blockBatchSize != blockBatchSize {
t.Errorf("unexpeced block batch size, want: %v, got: %v", blockBatchSize, state.blockBatchSize)
}
})
t.Run("overcrowded scheduler", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
queue := setupQueue(ctx)
state := queue.state.scheduler
state.requestedBlocks[pendingBlock] = queueMaxCachedBlocks
if err := queue.scheduleFetchRequests(ctx); err != nil {
t.Error(err)
}
if err := assertState(state, blockBatchSize, 0, 0, 0); err != nil {
t.Error(err)
}
if state.blockBatchSize != blockBatchSize {
t.Errorf("unexpeced block batch size, want: %v, got: %v", blockBatchSize, state.blockBatchSize)
}
})
}
func TestBlocksQueueParseFetchResponse(t *testing.T) {
chainConfig := struct {
expectedBlockSlots []uint64
peers []*peerData
}{
expectedBlockSlots: makeSequence(1, 2*blockBatchSize*queueMaxPendingRequests+31),
peers: []*peerData{
{
blocks: makeSequence(1, 320),
finalizedEpoch: 8,
headSlot: 320,
},
{
blocks: makeSequence(1, 320),
finalizedEpoch: 8,
headSlot: 320,
},
},
}
mc, _, beaconDB := initializeTestServices(t, chainConfig.expectedBlockSlots, chainConfig.peers)
defer dbtest.TeardownDB(t, beaconDB)
setupQueue := func(ctx context.Context) *blocksQueue {
queue := newBlocksQueue(ctx, &blocksQueueConfig{
blocksFetcher: &blocksProviderMock{},
headFetcher: mc,
highestExpectedSlot: uint64(len(chainConfig.expectedBlockSlots)),
})
return queue
}
var blocks []*eth.SignedBeaconBlock
for i := 1; i <= blockBatchSize; i++ {
blocks = append(blocks, &eth.SignedBeaconBlock{
Block: &eth.BeaconBlock{
Slot: uint64(i),
},
})
}
t.Run("response error", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
queue := setupQueue(ctx)
response := &fetchRequestResponse{
start: 1,
count: blockBatchSize,
blocks: blocks,
err: errStartSlotIsTooHigh,
}
if _, err := queue.parseFetchResponse(ctx, response); err != errStartSlotIsTooHigh {
t.Errorf("expected error not thrown, want: %v, got: %v", errStartSlotIsTooHigh, err)
}
})
t.Run("context error", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
queue := setupQueue(ctx)
cancel()
response := &fetchRequestResponse{
start: 1,
count: blockBatchSize,
blocks: blocks,
err: ctx.Err(),
}
if _, err := queue.parseFetchResponse(ctx, response); err != ctx.Err() {
t.Errorf("expected error not thrown, want: %v, got: %v", ctx.Err(), err)
}
})
t.Run("no skipped blocks", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
queue := setupQueue(ctx)
for i := uint64(1); i <= blockBatchSize; i++ {
if _, ok := queue.state.cachedBlocks[i]; ok {
t.Errorf("unexpeced block found: %v", i)
}
}
response := &fetchRequestResponse{
start: 1,
count: blockBatchSize,
blocks: blocks,
}
if _, err := queue.parseFetchResponse(ctx, response); err != nil {
t.Error(err)
}
// All blocks should be saved at this point.
for i := uint64(1); i <= blockBatchSize; i++ {
block, ok := queue.state.cachedBlocks[i]
if !ok {
t.Errorf("expeced block not found: %v", i)
}
if block.SignedBeaconBlock == nil {
t.Errorf("unexpectedly marked as skipped: %v", i)
}
}
})
t.Run("with skipped blocks", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
queue := setupQueue(ctx)
for i := uint64(1); i <= blockBatchSize; i++ {
if _, ok := queue.state.cachedBlocks[i]; ok {
t.Errorf("unexpeced block found: %v", i)
}
}
response := &fetchRequestResponse{
start: 1,
count: blockBatchSize,
blocks: blocks,
}
skipStart, skipEnd := uint64(5), uint64(15)
response.blocks = append(response.blocks[:skipStart], response.blocks[skipEnd:]...)
if _, err := queue.parseFetchResponse(ctx, response); err != nil {
t.Error(err)
}
for i := skipStart + 1; i <= skipEnd; i++ {
block, ok := queue.state.cachedBlocks[i]
if !ok {
t.Errorf("expeced block not found: %v", i)
}
if block.SignedBeaconBlock != nil {
t.Errorf("unexpectedly marked as not skipped: %v", i)
}
}
for i := uint64(1); i <= skipStart; i++ {
block, ok := queue.state.cachedBlocks[i]
if !ok {
t.Errorf("expeced block not found: %v", i)
}
if block.SignedBeaconBlock == nil {
t.Errorf("unexpectedly marked as skipped: %v", i)
}
}
for i := skipEnd + 1; i <= blockBatchSize; i++ {
block, ok := queue.state.cachedBlocks[i]
if !ok {
t.Errorf("expeced block not found: %v", i)
}
if block.SignedBeaconBlock == nil {
t.Errorf("unexpectedly marked as skipped: %v", i)
}
}
})
}
func TestBlocksQueueLoop(t *testing.T) {
tests := []struct {
name string
@@ -913,11 +314,9 @@ func TestBlocksQueueLoop(t *testing.T) {
var blocks []*eth.SignedBeaconBlock
for block := range queue.fetchedBlocks {
if err := processBlock(block); err != nil {
queue.state.scheduler.incrementCounter(failedBlock, 1)
continue
}
blocks = append(blocks, block)
queue.state.scheduler.incrementCounter(validBlock, 1)
}
if err := queue.stop(); err != nil {
@@ -968,13 +367,3 @@ func setBlocksFromCache(ctx context.Context, t *testing.T, mc *mock.ChainService
parentRoot = currRoot
}
}
func assertState(state *schedulerState, pending, valid, skipped, failed uint64) error {
s := state.requestedBlocks
res := s[pendingBlock] != pending || s[validBlock] != valid ||
s[skippedBlock] != skipped || s[failedBlock] != failed
if res {
b := struct{ pending, valid, skipped, failed uint64 }{pending, valid, skipped, failed}
return fmt.Errorf("invalid state, want: %+v, got: %+v", b, state.requestedBlocks)
}
return nil
}

View File

@@ -0,0 +1,231 @@
package initialsync
import (
"errors"
"fmt"
"time"
eth "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
)
const (
stateNew stateID = iota
stateScheduled
stateDataParsed
stateSkipped
stateSent
stateSkippedExt
stateComplete
)
const (
eventSchedule eventID = iota
eventDataReceived
eventReadyToSend
eventCheckStale
eventExtendWindow
)
// stateID is unique handle for a state.
type stateID uint8
// eventID is unique handle for an event.
type eventID uint8
// stateMachine is a FSM that allows easy state transitions:
// State(S) x Event(E) -> Actions (A), State(S').
type stateMachine struct {
epochs []*epochState
events map[eventID]*stateMachineEvent
}
// epochState holds state of a single epoch.
type epochState struct {
epoch uint64
state stateID
blocks []*eth.SignedBeaconBlock
updated time.Time
}
// stateMachineEvent is a container for event data.
type stateMachineEvent struct {
name eventID
actions []*stateMachineAction
}
// stateMachineAction represents a state actions that can be attached to an event.
type stateMachineAction struct {
state stateID
handlerFn eventHandlerFn
}
// eventHandlerFn is an event handler function's signature.
type eventHandlerFn func(*epochState, interface{}) (stateID, error)
// newStateMachine returns fully initialized state machine.
func newStateMachine() *stateMachine {
return &stateMachine{
epochs: make([]*epochState, 0, lookaheadEpochs),
events: map[eventID]*stateMachineEvent{},
}
}
// addHandler attaches an event handler to a state event.
func (sm *stateMachine) addHandler(state stateID, event eventID, fn eventHandlerFn) *stateMachineEvent {
e, ok := sm.events[event]
if !ok {
e = &stateMachineEvent{
name: event,
}
sm.events[event] = e
}
action := &stateMachineAction{
state: state,
handlerFn: fn,
}
e.actions = append(e.actions, action)
return e
}
// trigger invokes the event on a given epoch's state machine.
func (sm *stateMachine) trigger(name eventID, epoch uint64, data interface{}) error {
event, ok := sm.events[name]
if !ok {
return fmt.Errorf("event not found: %v", name)
}
ind, ok := sm.findEpochState(epoch)
if !ok {
return fmt.Errorf("state for %v epoch not found", epoch)
}
for _, action := range event.actions {
if action.state != sm.epochs[ind].state {
continue
}
state, err := action.handlerFn(sm.epochs[ind], data)
if err != nil {
return err
}
sm.epochs[ind].setState(state)
}
return nil
}
// addEpochState allocates memory for tracking epoch state.
func (sm *stateMachine) addEpochState(epoch uint64) {
state := &epochState{
epoch: epoch,
state: stateNew,
blocks: make([]*eth.SignedBeaconBlock, 0, allowedBlocksPerSecond),
updated: time.Now(),
}
sm.epochs = append(sm.epochs, state)
}
// removeEpochState frees memory of processed epoch.
func (sm *stateMachine) removeEpochState(epoch uint64) error {
ind, ok := sm.findEpochState(epoch)
if !ok {
return fmt.Errorf("state for %v epoch not found", epoch)
}
sm.epochs[ind].blocks = nil
sm.epochs[ind] = sm.epochs[len(sm.epochs)-1]
sm.epochs = sm.epochs[:len(sm.epochs)-1]
return nil
}
// findEpochState returns index at which state.epoch = epoch, or len(epochs) if not found.
func (sm *stateMachine) findEpochState(epoch uint64) (int, bool) {
for i, state := range sm.epochs {
if epoch == state.epoch {
return i, true
}
}
return len(sm.epochs), false
}
// isLowestEpochState checks whether a given epoch is the lowest for which we know epoch state.
func (sm *stateMachine) isLowestEpochState(epoch uint64) bool {
if _, ok := sm.findEpochState(epoch); !ok {
return false
}
for _, state := range sm.epochs {
if epoch > state.epoch {
return false
}
}
return true
}
// highestEpochSlot returns slot for the latest known epoch.
func (sm *stateMachine) highestEpochSlot() (uint64, error) {
if len(sm.epochs) == 0 {
return 0, errors.New("no epoch states exist")
}
highestEpochSlot := sm.epochs[0].epoch
for _, state := range sm.epochs {
if state.epoch > highestEpochSlot {
highestEpochSlot = state.epoch
}
}
return highestEpochSlot, nil
}
// String returns human readable representation of a state.
func (sm *stateMachine) String() string {
return fmt.Sprintf("%v", sm.epochs)
}
// String returns human-readable representation of an epoch state.
func (es *epochState) String() string {
return fmt.Sprintf("%d:%s", es.epoch, es.state)
}
// String returns human-readable representation of a state.
func (s stateID) String() (state string) {
switch s {
case stateNew:
state = "new"
case stateScheduled:
state = "scheduled"
case stateDataParsed:
state = "dataParsed"
case stateSkipped:
state = "skipped"
case stateSkippedExt:
state = "skippedExt"
case stateSent:
state = "sent"
case stateComplete:
state = "complete"
}
return
}
// setState updates the current state of a given epoch.
func (es *epochState) setState(name stateID) {
if es.state == name {
return
}
es.updated = time.Now()
es.state = name
}
// String returns human-readable representation of an event.
func (e eventID) String() (event string) {
switch e {
case eventSchedule:
event = "schedule"
case eventDataReceived:
event = "dataReceived"
case eventReadyToSend:
event = "readyToSend"
case eventCheckStale:
event = "checkStale"
case eventExtendWindow:
event = "extendWindow"
}
return
}

View File

@@ -0,0 +1,270 @@
package initialsync
import (
"errors"
"fmt"
"testing"
)
func TestStateMachine_Stringify(t *testing.T) {
tests := []struct {
name string
epochs []*epochState
want string
}{
{
"empty epoch state list",
make([]*epochState, 0, lookaheadEpochs),
"[]",
},
{
"newly created state machine",
[]*epochState{
{epoch: 8, state: stateNew,},
{epoch: 9, state: stateScheduled,},
{epoch: 10, state: stateDataParsed,},
{epoch: 11, state: stateSkipped,},
{epoch: 12, state: stateSkippedExt,},
{epoch: 13, state: stateComplete,},
{epoch: 14, state: stateSent,},
},
"[8:new 9:scheduled 10:dataParsed 11:skipped 12:skippedExt 13:complete 14:sent]",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
sm := &stateMachine{
epochs: tt.epochs,
}
if got := sm.String(); got != tt.want {
t.Errorf("unexpected output, got: %v, want: %v", got, tt.want)
}
})
}
}
func TestStateMachine_addHandler(t *testing.T) {
sm := newStateMachine()
sm.addHandler(stateNew, eventSchedule, func(state *epochState, i interface{}) (id stateID, err error) {
return stateScheduled, nil
})
if len(sm.events[eventSchedule].actions) != 1 {
t.Errorf("unexpected size, got: %v, want: %v", len(sm.events[eventSchedule].actions), 1)
}
state, err := sm.events[eventSchedule].actions[0].handlerFn(nil, nil)
if err != nil {
t.Error(err)
}
if state != stateScheduled {
t.Errorf("unexpected state, got: %v, want: %v", state, stateScheduled)
}
// Add second handler to the same event
sm.addHandler(stateSent, eventSchedule, func(state *epochState, i interface{}) (id stateID, err error) {
return stateDataParsed, nil
})
if len(sm.events[eventSchedule].actions) != 2 {
t.Errorf("unexpected size, got: %v, want: %v", len(sm.events[eventSchedule].actions), 2)
}
state, err = sm.events[eventSchedule].actions[1].handlerFn(nil, nil)
if err != nil {
t.Error(err)
}
if state != stateDataParsed {
t.Errorf("unexpected state, got: %v, want: %v", state, stateScheduled)
}
}
func TestStateMachine_trigger(t *testing.T) {
type event struct {
state stateID
event eventID
returnState stateID
err bool
}
type args struct {
name eventID
epoch uint64
data interface{}
returnState stateID
}
tests := []struct {
name string
events []event
epochs []uint64
args args
err error
}{
{
name: "event not found",
events: []event{},
epochs: []uint64{},
args: args{eventSchedule, 12, nil, stateNew},
err: fmt.Errorf("event not found: %v", eventSchedule),
},
{
name: "epoch not found",
events: []event{
{stateNew, eventSchedule, stateScheduled, false},
},
epochs: []uint64{},
args: args{eventSchedule, 12, nil, stateScheduled},
err: fmt.Errorf("state for %v epoch not found", 12),
},
{
name: "single action",
events: []event{
{stateNew, eventSchedule, stateScheduled, false},
},
epochs: []uint64{12, 13},
args: args{eventSchedule, 12, nil, stateScheduled},
err: nil,
},
{
name: "multiple actions, has error",
events: []event{
{stateNew, eventSchedule, stateScheduled, false},
{stateScheduled, eventSchedule, stateSent, true},
{stateSent, eventSchedule, stateComplete, false},
},
epochs: []uint64{12, 13},
args: args{eventSchedule, 12, nil, stateScheduled},
err: nil,
},
{
name: "multiple actions, no error, cascade",
events: []event{
{stateNew, eventSchedule, stateScheduled, false},
{stateScheduled, eventSchedule, stateSent, false},
{stateSent, eventSchedule, stateComplete, false},
},
epochs: []uint64{12, 13},
args: args{eventSchedule, 12, nil, stateComplete},
err: nil,
},
{
name: "multiple actions, no error, no cascade",
events: []event{
{stateNew, eventSchedule, stateScheduled, false},
{stateScheduled, eventSchedule, stateSent, false},
{stateNew, eventSchedule, stateComplete, false},
},
epochs: []uint64{12, 13},
args: args{eventSchedule, 12, nil, stateSent},
err: nil,
},
}
fn := func(e event) eventHandlerFn {
return func(es *epochState, in interface{}) (stateID, error) {
if e.err {
return es.state, errors.New("invalid")
}
return e.returnState, nil
}
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
sm := newStateMachine()
expectHandlerError := false
for _, event := range tt.events {
sm.addHandler(event.state, event.event, fn(event))
if event.err {
expectHandlerError = true
}
}
for _, epoch := range tt.epochs {
sm.addEpochState(epoch)
}
err := sm.trigger(tt.args.name, tt.args.epoch, tt.args.data)
if tt.err != nil && (err == nil || tt.err.Error() != err.Error()) {
t.Errorf("unexpected error = '%v', want '%v'", err, tt.err)
}
if tt.err == nil {
if err != nil && !expectHandlerError {
t.Error(err)
}
ind, ok := sm.findEpochState(tt.args.epoch)
if !ok {
t.Errorf("expected epoch not found: %v", tt.args.epoch)
return
}
if sm.epochs[ind].state != tt.args.returnState {
t.Errorf("unexpected final state: %v, want: %v (%v)", sm.epochs[ind].state, tt.args.returnState, sm.epochs)
}
}
})
}
}
func TestStateMachine_findEpochState(t *testing.T) {
sm := newStateMachine()
if ind, ok := sm.findEpochState(12); ok || ind != 0 {
t.Errorf("unexpected index: %v, want: %v", ind, 0)
}
sm.addEpochState(12)
if ind, ok := sm.findEpochState(12); !ok || ind != 0 {
t.Errorf("unexpected index: %v, want: %v", ind, 0)
}
sm.addEpochState(13)
sm.addEpochState(14)
sm.addEpochState(15)
if ind, ok := sm.findEpochState(14); !ok || ind != 2 {
t.Errorf("unexpected index: %v, want: %v", ind, 2)
}
if ind, ok := sm.findEpochState(16); ok || ind != len(sm.epochs) {
t.Errorf("unexpected index: %v, want: %v", ind, len(sm.epochs))
}
}
func TestStateMachine_isLowestEpochState(t *testing.T) {
sm := newStateMachine()
sm.addEpochState(12)
sm.addEpochState(13)
sm.addEpochState(14)
if res := sm.isLowestEpochState(15); res {
t.Errorf("unexpected lowest state: %v", 15)
}
if res := sm.isLowestEpochState(13); res {
t.Errorf("unexpected lowest state: %v", 15)
}
if res := sm.isLowestEpochState(12); !res {
t.Errorf("expected lowest state not found: %v", 12)
}
if err := sm.removeEpochState(12); err != nil {
t.Error(err)
}
if res := sm.isLowestEpochState(12); res {
t.Errorf("unexpected lowest state: %v", 12)
}
if res := sm.isLowestEpochState(13); !res {
t.Errorf("expected lowest state not found: %v", 13)
}
}
func TestStateMachine_highestEpochSlot(t *testing.T) {
sm := newStateMachine()
if _, err := sm.highestEpochSlot(); err == nil {
t.Error("expected error")
}
sm.addEpochState(12)
sm.addEpochState(13)
sm.addEpochState(14)
slot, err := sm.highestEpochSlot()
if err != nil {
t.Error(err)
}
if slot != 14 {
t.Errorf("incorrect highest slot: %v, want: %v", slot, 14)
}
if err := sm.removeEpochState(14); err != nil {
t.Error(err)
}
slot, err = sm.highestEpochSlot()
if err != nil {
t.Error(err)
}
if slot != 13 {
t.Errorf("incorrect highest slot: %v, want: %v", slot, 13)
}
}

View File

@@ -21,7 +21,7 @@ import (
"github.com/sirupsen/logrus"
)
const blockBatchSize = 64
const blockBatchSize = 32
const counterSeconds = 20
const refreshTime = 6 * time.Second
@@ -66,10 +66,8 @@ func (s *Service) roundRobinSync(genesis time.Time) error {
s.logSyncStatus(genesis, blk.Block, counter)
if err := s.processBlock(ctx, blk); err != nil {
log.WithError(err).Info("Block is invalid")
queue.state.scheduler.incrementCounter(failedBlock, 1)
continue
}
queue.state.scheduler.incrementCounter(validBlock, 1)
}
log.Debug("Synced to finalized epoch - now syncing blocks up to current head")
@@ -99,7 +97,7 @@ func (s *Service) roundRobinSync(genesis time.Time) error {
req := &p2ppb.BeaconBlocksByRangeRequest{
HeadBlockRoot: root,
StartSlot: s.chain.HeadSlot() + 1,
Count: mathutil.Min(helpers.SlotsSince(genesis)-s.chain.HeadSlot()+1, blockBatchSize),
Count: mathutil.Min(helpers.SlotsSince(genesis)-s.chain.HeadSlot()+1, allowedBlocksPerSecond),
Step: 1,
}
@@ -109,7 +107,8 @@ func (s *Service) roundRobinSync(genesis time.Time) error {
resp, err := s.requestBlocks(ctx, req, best)
if err != nil {
return err
log.WithError(err).Error("Failed to receive blocks, exiting init sync")
return nil
}
for _, blk := range resp {

View File

@@ -235,6 +235,26 @@ func TestRoundRobinSync(t *testing.T) {
finalizedEpoch: 4,
headSlot: 160,
},
{
blocks: makeSequence(1, 160),
finalizedEpoch: 4,
headSlot: 160,
},
{
blocks: makeSequence(1, 160),
finalizedEpoch: 4,
headSlot: 160,
},
{
blocks: makeSequence(1, 160),
finalizedEpoch: 4,
headSlot: 160,
},
{
blocks: makeSequence(1, 160),
finalizedEpoch: 4,
headSlot: 160,
},
},
},
}