diff --git a/beacon-chain/sync/initial-sync/BUILD.bazel b/beacon-chain/sync/initial-sync/BUILD.bazel index c8cb1c25fc..ec0833e7a2 100644 --- a/beacon-chain/sync/initial-sync/BUILD.bazel +++ b/beacon-chain/sync/initial-sync/BUILD.bazel @@ -4,6 +4,7 @@ go_library( name = "go_default_library", srcs = [ "blocks_fetcher.go", + "blocks_queue.go", "log.go", "round_robin.go", "service.go", @@ -41,6 +42,7 @@ go_test( name = "go_default_test", srcs = [ "blocks_fetcher_test.go", + "blocks_queue_test.go", "round_robin_test.go", ], embed = [":go_default_library"], @@ -48,6 +50,8 @@ go_test( tags = ["race_on"], deps = [ "//beacon-chain/blockchain/testing:go_default_library", + "//beacon-chain/core/feed:go_default_library", + "//beacon-chain/core/feed/block:go_default_library", "//beacon-chain/core/helpers:go_default_library", "//beacon-chain/db:go_default_library", "//beacon-chain/db/testing:go_default_library", @@ -56,6 +60,8 @@ go_test( "//beacon-chain/state:go_default_library", "//beacon-chain/sync:go_default_library", "//proto/beacon/p2p/v1:go_default_library", + "//shared/bytesutil:go_default_library", + "//shared/featureconfig:go_default_library", "//shared/hashutil:go_default_library", "//shared/params:go_default_library", "//shared/roughtime:go_default_library", diff --git a/beacon-chain/sync/initial-sync/blocks_fetcher.go b/beacon-chain/sync/initial-sync/blocks_fetcher.go index a527a032af..8bdcd87906 100644 --- a/beacon-chain/sync/initial-sync/blocks_fetcher.go +++ b/beacon-chain/sync/initial-sync/blocks_fetcher.go @@ -24,8 +24,6 @@ import ( "go.opencensus.io/trace" ) -const fetchRequestsBuffer = 8 // number of pending fetch requests - var ( errNoPeersAvailable = errors.New("no peers available, waiting for reconnect") errFetcherCtxIsDone = errors.New("fetcher's context is done, reinitialize") @@ -82,8 +80,8 @@ func newBlocksFetcher(ctx context.Context, cfg *blocksFetcherConfig) *blocksFetc headFetcher: cfg.headFetcher, p2p: cfg.p2p, rateLimiter: rateLimiter, - fetchRequests: make(chan *fetchRequestParams, fetchRequestsBuffer), - fetchResponses: make(chan *fetchRequestResponse, fetchRequestsBuffer), + fetchRequests: make(chan *fetchRequestParams, queueMaxPendingRequests), + fetchResponses: make(chan *fetchRequestResponse, queueMaxPendingRequests), quit: make(chan struct{}), } } diff --git a/beacon-chain/sync/initial-sync/blocks_queue.go b/beacon-chain/sync/initial-sync/blocks_queue.go new file mode 100644 index 0000000000..a17e984e62 --- /dev/null +++ b/beacon-chain/sync/initial-sync/blocks_queue.go @@ -0,0 +1,413 @@ +package initialsync + +import ( + "context" + "errors" + "sync" + "time" + + eth "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" + "github.com/prysmaticlabs/prysm/beacon-chain/blockchain" + "github.com/prysmaticlabs/prysm/beacon-chain/p2p" + "github.com/prysmaticlabs/prysm/shared/mathutil" +) + +const ( + // queueMaxPendingRequests limits how many concurrent fetch request queue can initiate. + queueMaxPendingRequests = 8 + // queueFetchRequestTimeout caps maximum amount of time before fetch requests is cancelled. + queueFetchRequestTimeout = 60 * time.Second + // queueMaxCachedBlocks hard limit on how many queue items to cache before forced dequeue. + queueMaxCachedBlocks = 8 * queueMaxPendingRequests * blockBatchSize + // queueStopCallTimeout is time allowed for queue to release resources when quitting. + queueStopCallTimeout = 1 * time.Second +) + +var ( + errQueueCtxIsDone = errors.New("queue's context is done, reinitialize") + errQueueTakesTooLongToStop = errors.New("queue takes too long to stop") +) + +// blocksProvider exposes enough methods for queue to fetch incoming blocks. +type blocksProvider interface { + requestResponses() <-chan *fetchRequestResponse + scheduleRequest(ctx context.Context, start, count uint64) error + start() error + stop() +} + +// blocksQueueConfig is a config to setup block queue service. +type blocksQueueConfig struct { + blocksFetcher blocksProvider + headFetcher blockchain.HeadFetcher + startSlot uint64 + highestExpectedSlot uint64 + p2p p2p.P2P +} + +// blocksQueueState holds internal queue state (for easier management of state transitions). +type blocksQueueState struct { + scheduler *schedulerState + sender *senderState + cachedBlocks map[uint64]*cachedBlock +} + +// blockState enums possible queue block states. +type blockState uint8 + +const ( + // pendingBlock is a default block status when just added to queue. + pendingBlock = iota + // validBlock represents block that can be processed. + validBlock + // skippedBlock is a block for a slot that is not found on any available peers. + skippedBlock + // failedBlock represents block that can not be processed at the moment. + failedBlock + // blockStateLen is a sentinel to know number of possible block states. + blockStateLen +) + +// schedulerState a state of scheduling process. +type schedulerState struct { + sync.Mutex + currentSlot uint64 + blockBatchSize uint64 + requestedBlocks map[blockState]uint64 +} + +// senderState is a state of block sending process. +type senderState struct { + sync.Mutex +} + +// cachedBlock is a container for signed beacon block. +type cachedBlock struct { + *eth.SignedBeaconBlock +} + +// blocksQueue is a priority queue that serves as a intermediary between block fetchers (producers) +// and block processing goroutine (consumer). Consumer can rely on order of incoming blocks. +type blocksQueue struct { + ctx context.Context + cancel context.CancelFunc + highestExpectedSlot uint64 + state *blocksQueueState + blocksFetcher blocksProvider + headFetcher blockchain.HeadFetcher + fetchedBlocks chan *eth.SignedBeaconBlock // output channel for ready blocks + pendingFetchRequests chan struct{} // pending requests semaphore + pendingFetchedBlocks chan struct{} // notifier, pings block sending handler + quit chan struct{} // termination notifier +} + +// newBlocksQueue creates initialized priority queue. +func newBlocksQueue(ctx context.Context, cfg *blocksQueueConfig) *blocksQueue { + ctx, cancel := context.WithCancel(ctx) + + blocksFetcher := cfg.blocksFetcher + if blocksFetcher == nil { + blocksFetcher = newBlocksFetcher(ctx, &blocksFetcherConfig{ + headFetcher: cfg.headFetcher, + p2p: cfg.p2p, + }) + } + + return &blocksQueue{ + ctx: ctx, + cancel: cancel, + highestExpectedSlot: cfg.highestExpectedSlot, + state: &blocksQueueState{ + scheduler: &schedulerState{ + currentSlot: cfg.startSlot, + blockBatchSize: blockBatchSize, + requestedBlocks: make(map[blockState]uint64, blockStateLen), + }, + sender: &senderState{}, + cachedBlocks: make(map[uint64]*cachedBlock, queueMaxCachedBlocks), + }, + blocksFetcher: blocksFetcher, + headFetcher: cfg.headFetcher, + fetchedBlocks: make(chan *eth.SignedBeaconBlock, blockBatchSize), + pendingFetchRequests: make(chan struct{}, queueMaxPendingRequests), + pendingFetchedBlocks: make(chan struct{}, queueMaxPendingRequests), + quit: make(chan struct{}), + } +} + +// start boots up the queue processing. +func (q *blocksQueue) start() error { + select { + case <-q.ctx.Done(): + return errQueueCtxIsDone + default: + go q.loop() + return nil + } +} + +// stop terminates all queue operations. +func (q *blocksQueue) stop() error { + q.cancel() + select { + case <-q.quit: + return nil + case <-time.After(queueStopCallTimeout): + return errQueueTakesTooLongToStop + } +} + +// loop is a main queue loop. +func (q *blocksQueue) loop() { + defer close(q.quit) + + // Wait for all goroutines to wrap up (forced by cancelled context), and do a cleanup. + wg := &sync.WaitGroup{} + defer func() { + wg.Wait() + q.blocksFetcher.stop() + close(q.fetchedBlocks) + }() + + if err := q.blocksFetcher.start(); err != nil { + log.WithError(err).Debug("Can not start blocks provider") + } + + // Reads from semaphore channel, thus allowing next goroutine to grab it and schedule next request. + releaseTicket := func() { + select { + case <-q.ctx.Done(): + case <-q.pendingFetchRequests: + } + } + + for { + if q.headFetcher.HeadSlot() >= q.highestExpectedSlot { + log.Debug("Highest expected slot reached") + q.cancel() + } + + select { + case <-q.ctx.Done(): + log.Debug("Context closed, exiting goroutine (blocks queue)") + return + case q.pendingFetchRequests <- struct{}{}: + wg.Add(1) + go func() { + defer wg.Done() + + // Schedule request. + if err := q.scheduleFetchRequests(q.ctx); err != nil { + q.state.scheduler.incrementCounter(failedBlock, blockBatchSize) + releaseTicket() + } + }() + case response, ok := <-q.blocksFetcher.requestResponses(): + if !ok { + log.Debug("Fetcher closed output channel") + q.cancel() + return + } + + // Release semaphore ticket. + go releaseTicket() + + // Process incoming response into blocks. + wg.Add(1) + go func() { + defer func() { + select { + case <-q.ctx.Done(): + case q.pendingFetchedBlocks <- struct{}{}: // notify sender of data availability + } + wg.Done() + }() + + skippedBlocks, err := q.parseFetchResponse(q.ctx, response) + if err != nil { + q.state.scheduler.incrementCounter(failedBlock, response.count) + return + } + q.state.scheduler.incrementCounter(skippedBlock, skippedBlocks) + }() + case <-q.pendingFetchedBlocks: + wg.Add(1) + go func() { + defer wg.Done() + if err := q.sendFetchedBlocks(q.ctx); err != nil { + log.WithError(err).Debug("Error sending received blocks") + } + }() + } + } +} + +// scheduleFetchRequests enqueues block fetch requests to block fetcher. +func (q *blocksQueue) scheduleFetchRequests(ctx context.Context) error { + q.state.scheduler.Lock() + defer q.state.scheduler.Unlock() + + if ctx.Err() != nil { + return ctx.Err() + } + + s := q.state.scheduler + blocks := q.state.scheduler.requestedBlocks + + func() { + resetStateCounters := func() { + for i := 0; i < blockStateLen; i++ { + blocks[blockState(i)] = 0 + } + s.currentSlot = q.headFetcher.HeadSlot() + } + + // Update state's current slot pointer. + count := blocks[pendingBlock] + blocks[skippedBlock] + blocks[failedBlock] + blocks[validBlock] + if count == 0 { + s.currentSlot = q.headFetcher.HeadSlot() + return + } + + // Too many failures (blocks that can't be processed at this time). + if blocks[failedBlock] >= s.blockBatchSize/2 { + s.blockBatchSize *= 2 + resetStateCounters() + return + } + + // Given enough valid blocks, we can set back block batch size. + if blocks[validBlock] >= blockBatchSize && s.blockBatchSize != blockBatchSize { + blocks[skippedBlock], blocks[validBlock] = blocks[skippedBlock]+blocks[validBlock], 0 + s.blockBatchSize = blockBatchSize + } + + // Too many items in scheduler, time to update current slot to point to current head's slot. + count = blocks[pendingBlock] + blocks[skippedBlock] + blocks[failedBlock] + blocks[validBlock] + if count >= queueMaxCachedBlocks { + s.blockBatchSize = blockBatchSize + resetStateCounters() + return + } + + // All blocks processed, no pending blocks. + count = blocks[skippedBlock] + blocks[failedBlock] + blocks[validBlock] + if count > 0 && blocks[pendingBlock] == 0 { + s.blockBatchSize = blockBatchSize + resetStateCounters() + return + } + }() + + offset := blocks[pendingBlock] + blocks[skippedBlock] + blocks[failedBlock] + blocks[validBlock] + start := q.state.scheduler.currentSlot + offset + 1 + count := mathutil.Min(q.state.scheduler.blockBatchSize, q.highestExpectedSlot-start+1) + if count <= 0 { + return errStartSlotIsTooHigh + } + + ctx, _ = context.WithTimeout(ctx, queueFetchRequestTimeout) + if err := q.blocksFetcher.scheduleRequest(ctx, start, count); err != nil { + return err + } + q.state.scheduler.requestedBlocks[pendingBlock] += count + + return nil +} + +// parseFetchResponse processes incoming responses. +func (q *blocksQueue) parseFetchResponse(ctx context.Context, response *fetchRequestResponse) (uint64, error) { + q.state.sender.Lock() + defer q.state.sender.Unlock() + + if ctx.Err() != nil { + return 0, ctx.Err() + } + + if response.err != nil { + return 0, response.err + } + + // Extract beacon blocks. + responseBlocks := make(map[uint64]*eth.SignedBeaconBlock, len(response.blocks)) + for _, blk := range response.blocks { + responseBlocks[blk.Block.Slot] = blk + } + + // Cache blocks in [start, start + count) range, include skipped blocks. + var skippedBlocks uint64 + end := response.start + mathutil.Max(response.count, uint64(len(response.blocks))) + for slot := response.start; slot < end; slot++ { + if block, ok := responseBlocks[slot]; ok { + q.state.cachedBlocks[slot] = &cachedBlock{ + SignedBeaconBlock: block, + } + delete(responseBlocks, slot) + continue + } + q.state.cachedBlocks[slot] = &cachedBlock{} + skippedBlocks++ + } + + // If there are any items left in incoming response, cache them too. + for slot, block := range responseBlocks { + q.state.cachedBlocks[slot] = &cachedBlock{ + SignedBeaconBlock: block, + } + } + + return skippedBlocks, nil +} + +// sendFetchedBlocks analyses available blocks, and sends them downstream in a correct slot order. +// Blocks are checked starting from the current head slot, and up until next consecutive block is available. +func (q *blocksQueue) sendFetchedBlocks(ctx context.Context) error { + q.state.sender.Lock() + defer q.state.sender.Unlock() + + startSlot := q.headFetcher.HeadSlot() + 1 + nonSkippedSlot := uint64(0) + for slot := startSlot; slot <= q.highestExpectedSlot; slot++ { + if ctx.Err() != nil { + return ctx.Err() + } + blockData, ok := q.state.cachedBlocks[slot] + if !ok { + break + } + if blockData.SignedBeaconBlock != nil && blockData.Block != nil { + select { + case <-ctx.Done(): + return ctx.Err() + case q.fetchedBlocks <- blockData.SignedBeaconBlock: + } + nonSkippedSlot = slot + } + } + + // Remove processed blocks. + if nonSkippedSlot > 0 { + for slot := range q.state.cachedBlocks { + if slot <= nonSkippedSlot { + delete(q.state.cachedBlocks, slot) + } + } + } + + return nil +} + +// incrementCounter increments particular scheduler counter. +func (s *schedulerState) incrementCounter(counter blockState, n uint64) { + s.Lock() + defer s.Unlock() + + // Assert that counter is within acceptable boundaries. + if counter < 1 || counter >= blockStateLen { + return + } + + n = mathutil.Min(s.requestedBlocks[pendingBlock], n) + s.requestedBlocks[counter] += n + s.requestedBlocks[pendingBlock] -= n +} diff --git a/beacon-chain/sync/initial-sync/blocks_queue_test.go b/beacon-chain/sync/initial-sync/blocks_queue_test.go new file mode 100644 index 0000000000..0d6023d6d3 --- /dev/null +++ b/beacon-chain/sync/initial-sync/blocks_queue_test.go @@ -0,0 +1,980 @@ +package initialsync + +import ( + "context" + "fmt" + "testing" + + eth "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" + "github.com/prysmaticlabs/go-ssz" + mock "github.com/prysmaticlabs/prysm/beacon-chain/blockchain/testing" + "github.com/prysmaticlabs/prysm/beacon-chain/core/feed" + blockfeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/block" + dbtest "github.com/prysmaticlabs/prysm/beacon-chain/db/testing" + "github.com/prysmaticlabs/prysm/shared/bytesutil" + "github.com/prysmaticlabs/prysm/shared/featureconfig" + "github.com/prysmaticlabs/prysm/shared/sliceutil" +) + +type blocksProviderMock struct { +} + +func (f *blocksProviderMock) start() error { + return nil +} + +func (f *blocksProviderMock) stop() { +} + +func (f *blocksProviderMock) scheduleRequest(ctx context.Context, start, count uint64) error { + return nil +} + +func (f *blocksProviderMock) requestResponses() <-chan *fetchRequestResponse { + return nil +} + +func TestBlocksQueueInitStartStop(t *testing.T) { + mc, p2p, beaconDB := initializeTestServices(t, []uint64{}, []*peerData{}) + defer dbtest.TeardownDB(t, beaconDB) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + fetcher := newBlocksFetcher(ctx, &blocksFetcherConfig{ + headFetcher: mc, + p2p: p2p, + }) + + t.Run("stop without start", func(t *testing.T) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + queue := newBlocksQueue(ctx, &blocksQueueConfig{ + headFetcher: mc, + highestExpectedSlot: blockBatchSize, + }) + + if err := queue.stop(); err == nil { + t.Errorf("expected error: %v", errQueueTakesTooLongToStop) + } + }) + + t.Run("use default fetcher", func(t *testing.T) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + queue := newBlocksQueue(ctx, &blocksQueueConfig{ + headFetcher: mc, + highestExpectedSlot: blockBatchSize, + }) + if err := queue.start(); err != nil { + t.Error(err) + } + }) + + t.Run("stop timeout", func(t *testing.T) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + queue := newBlocksQueue(ctx, &blocksQueueConfig{ + headFetcher: mc, + highestExpectedSlot: blockBatchSize, + }) + if err := queue.start(); err != nil { + t.Error(err) + } + if err := queue.stop(); err == nil { + t.Errorf("expected error: %v", errQueueTakesTooLongToStop) + } + }) + + t.Run("check for leaked goroutines", func(t *testing.T) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + queue := newBlocksQueue(ctx, &blocksQueueConfig{ + blocksFetcher: fetcher, + headFetcher: mc, + highestExpectedSlot: blockBatchSize, + }) + + if err := queue.start(); err != nil { + t.Error(err) + } + // Blocks up until all resources are reclaimed (or timeout is called) + if err := queue.stop(); err != nil { + t.Error(err) + } + select { + case <-queue.fetchedBlocks: + default: + t.Error("queue.fetchedBlocks channel is leaked") + } + select { + case <-fetcher.fetchResponses: + default: + t.Error("fetcher.fetchResponses channel is leaked") + } + }) + + t.Run("re-starting of stopped queue", func(t *testing.T) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + queue := newBlocksQueue(ctx, &blocksQueueConfig{ + blocksFetcher: fetcher, + headFetcher: mc, + highestExpectedSlot: blockBatchSize, + }) + if err := queue.start(); err != nil { + t.Error(err) + } + if err := queue.stop(); err != nil { + t.Error(err) + } + if err := queue.start(); err == nil { + t.Errorf("expected error not returned: %v", errQueueCtxIsDone) + } + }) + + t.Run("multiple stopping attempts", func(t *testing.T) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + queue := newBlocksQueue(ctx, &blocksQueueConfig{ + blocksFetcher: fetcher, + headFetcher: mc, + highestExpectedSlot: blockBatchSize, + }) + if err := queue.start(); err != nil { + t.Error(err) + } + + if err := queue.stop(); err != nil { + t.Error(err) + } + if err := queue.stop(); err != nil { + t.Error(err) + } + }) + + t.Run("cancellation", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + queue := newBlocksQueue(ctx, &blocksQueueConfig{ + blocksFetcher: fetcher, + headFetcher: mc, + highestExpectedSlot: blockBatchSize, + }) + if err := queue.start(); err != nil { + t.Error(err) + } + + cancel() + if err := queue.stop(); err != nil { + t.Error(err) + } + }) +} + +func TestBlocksQueueUpdateSchedulerState(t *testing.T) { + chainConfig := struct { + expectedBlockSlots []uint64 + peers []*peerData + }{ + expectedBlockSlots: makeSequence(1, 241), + peers: []*peerData{}, + } + + mc, _, beaconDB := initializeTestServices(t, chainConfig.expectedBlockSlots, chainConfig.peers) + defer dbtest.TeardownDB(t, beaconDB) + + setupQueue := func(ctx context.Context) *blocksQueue { + queue := newBlocksQueue(ctx, &blocksQueueConfig{ + blocksFetcher: &blocksProviderMock{}, + headFetcher: mc, + highestExpectedSlot: uint64(len(chainConfig.expectedBlockSlots)), + }) + + return queue + } + + t.Run("cancelled context", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + + queue := setupQueue(ctx) + cancel() + if err := assertState(queue.state.scheduler, 0, 0, 0, 0); err != nil { + t.Error(err) + } + if err := queue.scheduleFetchRequests(ctx); err != ctx.Err() { + t.Errorf("expected error: %v", ctx.Err()) + } + }) + + t.Run("empty state on pristine node", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + queue := setupQueue(ctx) + state := queue.state.scheduler + + if err := assertState(queue.state.scheduler, 0, 0, 0, 0); err != nil { + t.Error(err) + } + if err := queue.scheduleFetchRequests(ctx); err != nil { + t.Error(err) + } + if state.currentSlot != 0 { + t.Errorf("invalid current slot, want: %v, got: %v", 0, state.currentSlot) + } + }) + + t.Run("empty state on pre-synced node", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + queue := setupQueue(ctx) + state := queue.state.scheduler + + syncToSlot := uint64(7) + setBlocksFromCache(ctx, t, mc, syncToSlot) + if err := assertState(queue.state.scheduler, 0, 0, 0, 0); err != nil { + t.Error(err) + } + if err := queue.scheduleFetchRequests(ctx); err != nil { + t.Error(err) + } + if state.currentSlot != syncToSlot { + t.Errorf("invalid current slot, want: %v, got: %v", syncToSlot, state.currentSlot) + } + }) + + t.Run("reset block batch size to default", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + queue := setupQueue(ctx) + state := queue.state.scheduler + + if err := assertState(queue.state.scheduler, 0, 0, 0, 0); err != nil { + t.Error(err) + } + + // On enough valid blocks, batch size should get back to default value. + state.blockBatchSize *= 2 + state.requestedBlocks[validBlock] = blockBatchSize + state.requestedBlocks[pendingBlock] = 13 + state.requestedBlocks[skippedBlock] = 17 + state.requestedBlocks[failedBlock] = 19 + if err := assertState(queue.state.scheduler, 13, blockBatchSize, 17, 19); err != nil { + t.Error(err) + } + if state.blockBatchSize != 2*blockBatchSize { + t.Errorf("unexpected batch size, want: %v, got: %v", 2*blockBatchSize, state.blockBatchSize) + } + + if err := queue.scheduleFetchRequests(ctx); err != nil { + t.Error(err) + } + if err := assertState(queue.state.scheduler, 13+state.blockBatchSize, 0, 17+blockBatchSize, 19); err != nil { + t.Error(err) + } + if state.blockBatchSize != blockBatchSize { + t.Errorf("unexpected batch size, want: %v, got: %v", blockBatchSize, state.blockBatchSize) + } + }) + + t.Run("increase block batch size on too many failures", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + queue := setupQueue(ctx) + state := queue.state.scheduler + + if err := assertState(queue.state.scheduler, 0, 0, 0, 0); err != nil { + t.Error(err) + } + + // On too many failures, batch size should get doubled and counters reset. + state.requestedBlocks[validBlock] = 19 + state.requestedBlocks[pendingBlock] = 13 + state.requestedBlocks[skippedBlock] = 17 + state.requestedBlocks[failedBlock] = blockBatchSize + if err := assertState(queue.state.scheduler, 13, 19, 17, blockBatchSize); err != nil { + t.Error(err) + } + if state.blockBatchSize != blockBatchSize { + t.Errorf("unexpected batch size, want: %v, got: %v", blockBatchSize, state.blockBatchSize) + } + + if err := queue.scheduleFetchRequests(ctx); err != nil { + t.Error(err) + } + if state.blockBatchSize != 2*blockBatchSize { + t.Errorf("unexpected batch size, want: %v, got: %v", 2*blockBatchSize, state.blockBatchSize) + } + if err := assertState(queue.state.scheduler, state.blockBatchSize, 0, 0, 0); err != nil { + t.Error(err) + } + }) + + t.Run("reset counters and block batch size on too many cached items", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + queue := setupQueue(ctx) + state := queue.state.scheduler + + if err := assertState(queue.state.scheduler, 0, 0, 0, 0); err != nil { + t.Error(err) + } + + // On too many cached items, batch size and counters should reset. + state.requestedBlocks[validBlock] = queueMaxCachedBlocks + state.requestedBlocks[pendingBlock] = 13 + state.requestedBlocks[skippedBlock] = 17 + state.requestedBlocks[failedBlock] = 19 + if err := assertState(queue.state.scheduler, 13, queueMaxCachedBlocks, 17, 19); err != nil { + t.Error(err) + } + if state.blockBatchSize != blockBatchSize { + t.Errorf("unexpected batch size, want: %v, got: %v", blockBatchSize, state.blockBatchSize) + } + + // This call should trigger resetting. + if err := queue.scheduleFetchRequests(ctx); err != nil { + t.Error(err) + } + if state.blockBatchSize != blockBatchSize { + t.Errorf("unexpected batch size, want: %v, got: %v", blockBatchSize, state.blockBatchSize) + } + if err := assertState(queue.state.scheduler, state.blockBatchSize, 0, 0, 0); err != nil { + t.Error(err) + } + }) + + t.Run("no pending blocks left", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + queue := setupQueue(ctx) + state := queue.state.scheduler + + if err := assertState(queue.state.scheduler, 0, 0, 0, 0); err != nil { + t.Error(err) + } + + // On too many cached items, batch size and counters should reset. + state.blockBatchSize = 2 * blockBatchSize + state.requestedBlocks[pendingBlock] = 0 + state.requestedBlocks[validBlock] = 1 + state.requestedBlocks[skippedBlock] = 1 + state.requestedBlocks[failedBlock] = 1 + if err := assertState(queue.state.scheduler, 0, 1, 1, 1); err != nil { + t.Error(err) + } + if state.blockBatchSize != 2*blockBatchSize { + t.Errorf("unexpected batch size, want: %v, got: %v", 2*blockBatchSize, state.blockBatchSize) + } + + // This call should trigger resetting. + if err := queue.scheduleFetchRequests(ctx); err != nil { + t.Error(err) + } + if state.blockBatchSize != blockBatchSize { + t.Errorf("unexpected batch size, want: %v, got: %v", blockBatchSize, state.blockBatchSize) + } + if err := assertState(queue.state.scheduler, state.blockBatchSize, 0, 0, 0); err != nil { + t.Error(err) + } + }) +} + +func TestBlocksQueueScheduleFetchRequests(t *testing.T) { + chainConfig := struct { + expectedBlockSlots []uint64 + peers []*peerData + }{ + expectedBlockSlots: makeSequence(1, 241), + peers: []*peerData{ + { + blocks: makeSequence(1, 320), + finalizedEpoch: 8, + headSlot: 320, + }, + { + blocks: makeSequence(1, 320), + finalizedEpoch: 8, + headSlot: 320, + }, + }, + } + + mc, _, beaconDB := initializeTestServices(t, chainConfig.expectedBlockSlots, chainConfig.peers) + defer dbtest.TeardownDB(t, beaconDB) + + setupQueue := func(ctx context.Context) *blocksQueue { + queue := newBlocksQueue(ctx, &blocksQueueConfig{ + blocksFetcher: &blocksProviderMock{}, + headFetcher: mc, + highestExpectedSlot: uint64(len(chainConfig.expectedBlockSlots)), + }) + + return queue + } + + t.Run("check start/count boundaries", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + queue := setupQueue(ctx) + state := queue.state.scheduler + + // Move sliding window normally. + if err := assertState(state, 0, 0, 0, 0); err != nil { + t.Error(err) + } + end := queue.highestExpectedSlot / state.blockBatchSize + for i := uint64(0); i < end; i++ { + if err := queue.scheduleFetchRequests(ctx); err != nil { + t.Error(err) + } + if err := assertState(state, (i+1)*blockBatchSize, 0, 0, 0); err != nil { + t.Error(err) + } + } + + // Make sure that the last request is up to highest expected slot. + if err := queue.scheduleFetchRequests(ctx); err != nil { + t.Error(err) + } + if err := assertState(state, queue.highestExpectedSlot, 0, 0, 0); err != nil { + t.Error(err) + } + + // Try schedule beyond the highest slot. + if err := queue.scheduleFetchRequests(ctx); err == nil { + t.Errorf("expected error: %v", errStartSlotIsTooHigh) + } + if err := assertState(state, queue.highestExpectedSlot, 0, 0, 0); err != nil { + t.Error(err) + } + }) + + t.Run("too many failures", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + queue := setupQueue(ctx) + state := queue.state.scheduler + + // Schedule enough items. + if err := assertState(state, 0, 0, 0, 0); err != nil { + t.Error(err) + } + end := queue.highestExpectedSlot / state.blockBatchSize + for i := uint64(0); i < end; i++ { + if err := queue.scheduleFetchRequests(ctx); err != nil { + t.Error(err) + } + if err := assertState(state, (i+1)*blockBatchSize, 0, 0, 0); err != nil { + t.Error(err) + } + } + + // "Process" some items and reschedule. + if err := assertState(state, end*blockBatchSize, 0, 0, 0); err != nil { + t.Error(err) + } + state.incrementCounter(failedBlock, 25) + if err := assertState(state, end*blockBatchSize-25, 0, 0, 25); err != nil { + t.Error(err) + } + state.incrementCounter(failedBlock, 500) // too high value shouldn't cause issues + if err := assertState(state, 0, 0, 0, end*blockBatchSize); err != nil { + t.Error(err) + } + + // Due to failures, resetting is expected. + if err := queue.scheduleFetchRequests(ctx); err != nil { + t.Error(err) + } + if err := assertState(state, 2*blockBatchSize, 0, 0, 0); err != nil { + t.Error(err) + } + if state.blockBatchSize != 2*blockBatchSize { + t.Errorf("unexpeced block batch size, want: %v, got: %v", 2*blockBatchSize, state.blockBatchSize) + } + }) + + t.Run("too many skipped", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + queue := setupQueue(ctx) + state := queue.state.scheduler + + // Schedule enough items. + if err := assertState(state, 0, 0, 0, 0); err != nil { + t.Error(err) + } + end := queue.highestExpectedSlot / state.blockBatchSize + for i := uint64(0); i < end; i++ { + if err := queue.scheduleFetchRequests(ctx); err != nil { + t.Error(err) + } + if err := assertState(state, (i+1)*blockBatchSize, 0, 0, 0); err != nil { + t.Error(err) + } + } + + // "Process" some items and reschedule. + if err := assertState(state, end*blockBatchSize, 0, 0, 0); err != nil { + t.Error(err) + } + state.incrementCounter(skippedBlock, 25) + if err := assertState(state, end*blockBatchSize-25, 0, 25, 0); err != nil { + t.Error(err) + } + state.incrementCounter(skippedBlock, 500) // too high value shouldn't cause issues + if err := assertState(state, 0, 0, end*blockBatchSize, 0); err != nil { + t.Error(err) + } + + // No pending items, resetting is expected (both counters and block batch size). + state.blockBatchSize = 2 * blockBatchSize + if err := queue.scheduleFetchRequests(ctx); err != nil { + t.Error(err) + } + if err := assertState(state, blockBatchSize, 0, 0, 0); err != nil { + t.Error(err) + } + if state.blockBatchSize != blockBatchSize { + t.Errorf("unexpeced block batch size, want: %v, got: %v", blockBatchSize, state.blockBatchSize) + } + }) + + t.Run("reset block batch size", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + queue := setupQueue(ctx) + state := queue.state.scheduler + + state.requestedBlocks[failedBlock] = blockBatchSize + + // Increase block batch size. + if err := queue.scheduleFetchRequests(ctx); err != nil { + t.Error(err) + } + if err := assertState(state, 2*blockBatchSize, 0, 0, 0); err != nil { + t.Error(err) + } + if state.blockBatchSize != 2*blockBatchSize { + t.Errorf("unexpeced block batch size, want: %v, got: %v", 2*blockBatchSize, state.blockBatchSize) + } + + // Reset block batch size. + state.requestedBlocks[validBlock] = blockBatchSize + state.requestedBlocks[pendingBlock] = 1 + state.requestedBlocks[failedBlock] = 1 + state.requestedBlocks[skippedBlock] = 1 + if err := assertState(state, 1, blockBatchSize, 1, 1); err != nil { + t.Error(err) + } + if err := queue.scheduleFetchRequests(ctx); err != nil { + t.Error(err) + } + if err := assertState(state, blockBatchSize+1, 0, blockBatchSize+1, 1); err != nil { + t.Error(err) + } + if state.blockBatchSize != blockBatchSize { + t.Errorf("unexpeced block batch size, want: %v, got: %v", blockBatchSize, state.blockBatchSize) + } + }) + + t.Run("overcrowded scheduler", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + queue := setupQueue(ctx) + state := queue.state.scheduler + + state.requestedBlocks[pendingBlock] = queueMaxCachedBlocks + if err := queue.scheduleFetchRequests(ctx); err != nil { + t.Error(err) + } + if err := assertState(state, blockBatchSize, 0, 0, 0); err != nil { + t.Error(err) + } + if state.blockBatchSize != blockBatchSize { + t.Errorf("unexpeced block batch size, want: %v, got: %v", blockBatchSize, state.blockBatchSize) + } + }) +} + +func TestBlocksQueueParseFetchResponse(t *testing.T) { + chainConfig := struct { + expectedBlockSlots []uint64 + peers []*peerData + }{ + expectedBlockSlots: makeSequence(1, 2*blockBatchSize*queueMaxPendingRequests+31), + peers: []*peerData{ + { + blocks: makeSequence(1, 320), + finalizedEpoch: 8, + headSlot: 320, + }, + { + blocks: makeSequence(1, 320), + finalizedEpoch: 8, + headSlot: 320, + }, + }, + } + + mc, _, beaconDB := initializeTestServices(t, chainConfig.expectedBlockSlots, chainConfig.peers) + defer dbtest.TeardownDB(t, beaconDB) + + setupQueue := func(ctx context.Context) *blocksQueue { + queue := newBlocksQueue(ctx, &blocksQueueConfig{ + blocksFetcher: &blocksProviderMock{}, + headFetcher: mc, + highestExpectedSlot: uint64(len(chainConfig.expectedBlockSlots)), + }) + + return queue + } + + var blocks []*eth.SignedBeaconBlock + for i := 1; i <= blockBatchSize; i++ { + blocks = append(blocks, ð.SignedBeaconBlock{ + Block: ð.BeaconBlock{ + Slot: uint64(i), + }, + }) + } + + t.Run("response error", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + queue := setupQueue(ctx) + + response := &fetchRequestResponse{ + start: 1, + count: blockBatchSize, + blocks: blocks, + err: errStartSlotIsTooHigh, + } + if _, err := queue.parseFetchResponse(ctx, response); err != errStartSlotIsTooHigh { + t.Errorf("expected error not thrown, want: %v, got: %v", errStartSlotIsTooHigh, err) + } + }) + + t.Run("context error", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + + queue := setupQueue(ctx) + + cancel() + response := &fetchRequestResponse{ + start: 1, + count: blockBatchSize, + blocks: blocks, + err: ctx.Err(), + } + if _, err := queue.parseFetchResponse(ctx, response); err != ctx.Err() { + t.Errorf("expected error not thrown, want: %v, got: %v", ctx.Err(), err) + } + }) + + t.Run("no skipped blocks", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + queue := setupQueue(ctx) + + for i := uint64(1); i <= blockBatchSize; i++ { + if _, ok := queue.state.cachedBlocks[i]; ok { + t.Errorf("unexpeced block found: %v", i) + } + } + + response := &fetchRequestResponse{ + start: 1, + count: blockBatchSize, + blocks: blocks, + } + if _, err := queue.parseFetchResponse(ctx, response); err != nil { + t.Error(err) + } + + // All blocks should be saved at this point. + for i := uint64(1); i <= blockBatchSize; i++ { + block, ok := queue.state.cachedBlocks[i] + if !ok { + t.Errorf("expeced block not found: %v", i) + } + if block.SignedBeaconBlock == nil { + t.Errorf("unexpectedly marked as skipped: %v", i) + } + } + }) + + t.Run("with skipped blocks", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + queue := setupQueue(ctx) + + for i := uint64(1); i <= blockBatchSize; i++ { + if _, ok := queue.state.cachedBlocks[i]; ok { + t.Errorf("unexpeced block found: %v", i) + } + } + + response := &fetchRequestResponse{ + start: 1, + count: blockBatchSize, + blocks: blocks, + } + skipStart, skipEnd := uint64(5), uint64(15) + response.blocks = append(response.blocks[:skipStart], response.blocks[skipEnd:]...) + if _, err := queue.parseFetchResponse(ctx, response); err != nil { + t.Error(err) + } + + for i := skipStart + 1; i <= skipEnd; i++ { + block, ok := queue.state.cachedBlocks[i] + if !ok { + t.Errorf("expeced block not found: %v", i) + } + if block.SignedBeaconBlock != nil { + t.Errorf("unexpectedly marked as not skipped: %v", i) + } + } + for i := uint64(1); i <= skipStart; i++ { + block, ok := queue.state.cachedBlocks[i] + if !ok { + t.Errorf("expeced block not found: %v", i) + } + if block.SignedBeaconBlock == nil { + t.Errorf("unexpectedly marked as skipped: %v", i) + } + } + for i := skipEnd + 1; i <= blockBatchSize; i++ { + block, ok := queue.state.cachedBlocks[i] + if !ok { + t.Errorf("expeced block not found: %v", i) + } + if block.SignedBeaconBlock == nil { + t.Errorf("unexpectedly marked as skipped: %v", i) + } + } + }) +} + +func TestBlocksQueueLoop(t *testing.T) { + tests := []struct { + name string + highestExpectedSlot uint64 + expectedBlockSlots []uint64 + peers []*peerData + }{ + { + name: "Single peer with all blocks", + highestExpectedSlot: 251, + expectedBlockSlots: makeSequence(1, 251), + peers: []*peerData{ + { + blocks: makeSequence(1, 320), + finalizedEpoch: 8, + headSlot: 320, + }, + }, + }, + { + name: "Multiple peers with all blocks", + highestExpectedSlot: 251, + expectedBlockSlots: makeSequence(1, 251), + peers: []*peerData{ + { + blocks: makeSequence(1, 320), + finalizedEpoch: 8, + headSlot: 320, + }, + { + blocks: makeSequence(1, 320), + finalizedEpoch: 8, + headSlot: 320, + }, + { + blocks: makeSequence(1, 320), + finalizedEpoch: 8, + headSlot: 320, + }, + { + blocks: makeSequence(1, 320), + finalizedEpoch: 8, + headSlot: 320, + }, + { + blocks: makeSequence(1, 320), + finalizedEpoch: 8, + headSlot: 320, + }, + }, + }, + { + name: "Multiple peers with skipped slots", + highestExpectedSlot: 576, + expectedBlockSlots: append(makeSequence(1, 64), makeSequence(500, 576)...), // up to 18th epoch + peers: []*peerData{ + { + blocks: append(makeSequence(1, 64), makeSequence(500, 640)...), + finalizedEpoch: 18, + headSlot: 640, + }, + { + blocks: append(makeSequence(1, 64), makeSequence(500, 640)...), + finalizedEpoch: 18, + headSlot: 640, + }, + { + blocks: append(makeSequence(1, 64), makeSequence(500, 640)...), + finalizedEpoch: 18, + headSlot: 640, + }, + }, + }, + { + name: "Multiple peers with failures", + highestExpectedSlot: 128, + expectedBlockSlots: makeSequence(1, 128), + peers: []*peerData{ + { + blocks: makeSequence(1, 320), + finalizedEpoch: 8, + headSlot: 320, + failureSlots: makeSequence(32*3+1, 32*3+32), + }, + { + blocks: makeSequence(1, 320), + finalizedEpoch: 8, + headSlot: 320, + failureSlots: makeSequence(1, 32*3), + }, + { + blocks: makeSequence(1, 320), + finalizedEpoch: 8, + headSlot: 320, + }, + { + blocks: makeSequence(1, 320), + finalizedEpoch: 8, + headSlot: 320, + }, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mc, p2p, beaconDB := initializeTestServices(t, tt.expectedBlockSlots, tt.peers) + defer dbtest.TeardownDB(t, beaconDB) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + fetcher := newBlocksFetcher(ctx, &blocksFetcherConfig{ + headFetcher: mc, + p2p: p2p, + }) + queue := newBlocksQueue(ctx, &blocksQueueConfig{ + blocksFetcher: fetcher, + headFetcher: mc, + highestExpectedSlot: tt.highestExpectedSlot, + }) + if err := queue.start(); err != nil { + t.Error(err) + } + processBlock := func(block *eth.SignedBeaconBlock) error { + if !beaconDB.HasBlock(ctx, bytesutil.ToBytes32(block.Block.ParentRoot)) { + return fmt.Errorf("beacon node doesn't have a block in db with root %#x", block.Block.ParentRoot) + } + if featureconfig.Get().InitSyncNoVerify { + if err := mc.ReceiveBlockNoVerify(ctx, block); err != nil { + return err + } + } else { + if err := mc.ReceiveBlockNoPubsubForkchoice(ctx, block); err != nil { + return err + } + } + + return nil + } + + var blocks []*eth.SignedBeaconBlock + for block := range queue.fetchedBlocks { + if err := processBlock(block); err != nil { + queue.state.scheduler.incrementCounter(failedBlock, 1) + continue + } + blocks = append(blocks, block) + queue.state.scheduler.incrementCounter(validBlock, 1) + } + + if err := queue.stop(); err != nil { + t.Error(err) + } + + if queue.headFetcher.HeadSlot() < uint64(len(tt.expectedBlockSlots)) { + t.Errorf("Not enough slots synced, want: %v, got: %v", + len(tt.expectedBlockSlots), queue.headFetcher.HeadSlot()) + } + if len(blocks) != len(tt.expectedBlockSlots) { + t.Errorf("Processes wrong number of blocks. Wanted %d got %d", len(tt.expectedBlockSlots), len(blocks)) + } + var receivedBlockSlots []uint64 + for _, blk := range blocks { + receivedBlockSlots = append(receivedBlockSlots, blk.Block.Slot) + } + if missing := sliceutil.NotUint64(sliceutil.IntersectionUint64(tt.expectedBlockSlots, receivedBlockSlots), tt.expectedBlockSlots); len(missing) > 0 { + t.Errorf("Missing blocks at slots %v", missing) + } + }) + } +} + +func setBlocksFromCache(ctx context.Context, t *testing.T, mc *mock.ChainService, highestSlot uint64) { + cache.RLock() + parentRoot := cache.rootCache[0] + cache.RUnlock() + for slot := uint64(0); slot <= highestSlot; slot++ { + blk := ð.SignedBeaconBlock{ + Block: ð.BeaconBlock{ + Slot: slot, + ParentRoot: parentRoot[:], + }, + } + mc.BlockNotifier().BlockFeed().Send(&feed.Event{ + Type: blockfeed.ReceivedBlock, + Data: blockfeed.ReceivedBlockData{ + SignedBlock: blk, + }, + }) + + if err := mc.ReceiveBlockNoPubsubForkchoice(ctx, blk); err != nil { + t.Error(err) + } + + currRoot, _ := ssz.HashTreeRoot(blk.Block) + parentRoot = currRoot + } +} +func assertState(state *schedulerState, pending, valid, skipped, failed uint64) error { + s := state.requestedBlocks + res := s[pendingBlock] != pending || s[validBlock] != valid || + s[skippedBlock] != skipped || s[failedBlock] != failed + if res { + b := struct{ pending, valid, skipped, failed uint64 }{pending, valid, skipped, failed,} + return fmt.Errorf("invalid state, want: %+v, got: %+v", b, state.requestedBlocks) + } + return nil +}