From e98acdc8d9483ea72b6a2d6a00a58a4d1be057e4 Mon Sep 17 00:00:00 2001 From: Federico Gimenez Date: Sat, 3 Aug 2024 10:07:04 +0200 Subject: [PATCH] feat: improve download observability (#10039) --- crates/engine/tree/src/chain.rs | 2 +- crates/engine/tree/src/download.rs | 47 ++++++++++++++++++++++++++++-- 2 files changed, 46 insertions(+), 3 deletions(-) diff --git a/crates/engine/tree/src/chain.rs b/crates/engine/tree/src/chain.rs index e77139d3e3..fb34db6b14 100644 --- a/crates/engine/tree/src/chain.rs +++ b/crates/engine/tree/src/chain.rs @@ -195,7 +195,7 @@ pub enum HandlerEvent { BackfillAction(BackfillAction), /// Other event emitted by the handler Event(T), - // Fatal error + /// Fatal error FatalError, } diff --git a/crates/engine/tree/src/download.rs b/crates/engine/tree/src/download.rs index de588c2c93..1218eb844d 100644 --- a/crates/engine/tree/src/download.rs +++ b/crates/engine/tree/src/download.rs @@ -11,7 +11,7 @@ use reth_network_p2p::{ use reth_primitives::{SealedBlock, SealedBlockWithSenders, B256}; use std::{ cmp::{Ordering, Reverse}, - collections::{binary_heap::PeekMut, BinaryHeap, HashSet}, + collections::{binary_heap::PeekMut, BinaryHeap, HashSet, VecDeque}, sync::Arc, task::{Context, Poll}, }; @@ -40,6 +40,8 @@ pub enum DownloadAction { pub enum DownloadOutcome { /// Downloaded blocks. Blocks(Vec), + /// New download started. + NewDownloadStarted { remaining_blocks: u64, target: B256 }, } /// Basic [`BlockDownloader`]. @@ -58,6 +60,8 @@ where set_buffered_blocks: BinaryHeap>, /// Engine download metrics. metrics: BlockDownloaderMetrics, + /// Pending events to be emitted. + pending_events: VecDeque, } impl BasicBlockDownloader @@ -72,6 +76,7 @@ where inflight_block_range_requests: Vec::new(), set_buffered_blocks: BinaryHeap::new(), metrics: BlockDownloaderMetrics::default(), + pending_events: Default::default(), } } @@ -111,6 +116,10 @@ where ); let request = self.full_block_client.get_full_block_range(hash, count); + self.push_pending_event(DownloadOutcome::NewDownloadStarted { + remaining_blocks: request.count(), + target: request.start_hash(), + }); self.inflight_block_range_requests.push(request); } } @@ -123,6 +132,11 @@ where if self.is_inflight_request(hash) { return false } + self.push_pending_event(DownloadOutcome::NewDownloadStarted { + remaining_blocks: 1, + target: hash, + }); + trace!( target: "consensus::engine::sync", ?hash, @@ -147,6 +161,16 @@ where self.metrics.active_block_downloads.set(self.inflight_full_block_requests.len() as f64); // TODO: full block range metrics } + + /// Adds a pending event to the FIFO queue. + fn push_pending_event(&mut self, pending_event: DownloadOutcome) { + self.pending_events.push_back(pending_event); + } + + /// Removes a pending event from the FIFO queue. + fn pop_pending_event(&mut self) -> Option { + self.pending_events.pop_front() + } } impl BlockDownloader for BasicBlockDownloader @@ -163,6 +187,10 @@ where /// Advances the download process. fn poll(&mut self, cx: &mut Context<'_>) -> Poll { + if let Some(pending_event) = self.pop_pending_event() { + return Poll::Ready(pending_event); + } + // advance all full block requests for idx in (0..self.inflight_full_block_requests.len()).rev() { let mut request = self.inflight_full_block_requests.swap_remove(idx); @@ -332,6 +360,13 @@ mod tests { let sync_future = poll_fn(|cx| block_downloader.poll(cx)); let next_ready = sync_future.await; + assert_matches!(next_ready, DownloadOutcome::NewDownloadStarted { remaining_blocks, .. } => { + assert_eq!(remaining_blocks, TOTAL_BLOCKS as u64); + }); + + let sync_future = poll_fn(|cx| block_downloader.poll(cx)); + let next_ready = sync_future.await; + assert_matches!(next_ready, DownloadOutcome::Blocks(blocks) => { // ensure all blocks were obtained assert_eq!(blocks.len(), TOTAL_BLOCKS); @@ -359,9 +394,17 @@ mod tests { assert_eq!(block_downloader.inflight_full_block_requests.len(), TOTAL_BLOCKS); // poll downloader + for i in 0..TOTAL_BLOCKS { + let sync_future = poll_fn(|cx| block_downloader.poll(cx)); + let next_ready = sync_future.await; + + assert_matches!(next_ready, DownloadOutcome::NewDownloadStarted { remaining_blocks, target } => { + assert_eq!(remaining_blocks, 1); + }); + } + let sync_future = poll_fn(|cx| block_downloader.poll(cx)); let next_ready = sync_future.await; - assert_matches!(next_ready, DownloadOutcome::Blocks(blocks) => { // ensure all blocks were obtained assert_eq!(blocks.len(), TOTAL_BLOCKS);