feat: improve download observability (#10039)

This commit is contained in:
Federico Gimenez
2024-08-03 10:07:04 +02:00
committed by GitHub
parent 2bfa2defc4
commit e98acdc8d9
2 changed files with 46 additions and 3 deletions

View File

@@ -195,7 +195,7 @@ pub enum HandlerEvent<T> {
BackfillAction(BackfillAction),
/// Other event emitted by the handler
Event(T),
// Fatal error
/// Fatal error
FatalError,
}

View File

@@ -11,7 +11,7 @@ use reth_network_p2p::{
use reth_primitives::{SealedBlock, SealedBlockWithSenders, B256};
use std::{
cmp::{Ordering, Reverse},
collections::{binary_heap::PeekMut, BinaryHeap, HashSet},
collections::{binary_heap::PeekMut, BinaryHeap, HashSet, VecDeque},
sync::Arc,
task::{Context, Poll},
};
@@ -40,6 +40,8 @@ pub enum DownloadAction {
pub enum DownloadOutcome {
/// Downloaded blocks.
Blocks(Vec<SealedBlockWithSenders>),
/// New download started.
NewDownloadStarted { remaining_blocks: u64, target: B256 },
}
/// Basic [`BlockDownloader`].
@@ -58,6 +60,8 @@ where
set_buffered_blocks: BinaryHeap<Reverse<OrderedSealedBlockWithSenders>>,
/// Engine download metrics.
metrics: BlockDownloaderMetrics,
/// Pending events to be emitted.
pending_events: VecDeque<DownloadOutcome>,
}
impl<Client> BasicBlockDownloader<Client>
@@ -72,6 +76,7 @@ where
inflight_block_range_requests: Vec::new(),
set_buffered_blocks: BinaryHeap::new(),
metrics: BlockDownloaderMetrics::default(),
pending_events: Default::default(),
}
}
@@ -111,6 +116,10 @@ where
);
let request = self.full_block_client.get_full_block_range(hash, count);
self.push_pending_event(DownloadOutcome::NewDownloadStarted {
remaining_blocks: request.count(),
target: request.start_hash(),
});
self.inflight_block_range_requests.push(request);
}
}
@@ -123,6 +132,11 @@ where
if self.is_inflight_request(hash) {
return false
}
self.push_pending_event(DownloadOutcome::NewDownloadStarted {
remaining_blocks: 1,
target: hash,
});
trace!(
target: "consensus::engine::sync",
?hash,
@@ -147,6 +161,16 @@ where
self.metrics.active_block_downloads.set(self.inflight_full_block_requests.len() as f64);
// TODO: full block range metrics
}
/// Adds a pending event to the FIFO queue.
fn push_pending_event(&mut self, pending_event: DownloadOutcome) {
self.pending_events.push_back(pending_event);
}
/// Removes a pending event from the FIFO queue.
fn pop_pending_event(&mut self) -> Option<DownloadOutcome> {
self.pending_events.pop_front()
}
}
impl<Client> BlockDownloader for BasicBlockDownloader<Client>
@@ -163,6 +187,10 @@ where
/// Advances the download process.
fn poll(&mut self, cx: &mut Context<'_>) -> Poll<DownloadOutcome> {
if let Some(pending_event) = self.pop_pending_event() {
return Poll::Ready(pending_event);
}
// advance all full block requests
for idx in (0..self.inflight_full_block_requests.len()).rev() {
let mut request = self.inflight_full_block_requests.swap_remove(idx);
@@ -332,6 +360,13 @@ mod tests {
let sync_future = poll_fn(|cx| block_downloader.poll(cx));
let next_ready = sync_future.await;
assert_matches!(next_ready, DownloadOutcome::NewDownloadStarted { remaining_blocks, .. } => {
assert_eq!(remaining_blocks, TOTAL_BLOCKS as u64);
});
let sync_future = poll_fn(|cx| block_downloader.poll(cx));
let next_ready = sync_future.await;
assert_matches!(next_ready, DownloadOutcome::Blocks(blocks) => {
// ensure all blocks were obtained
assert_eq!(blocks.len(), TOTAL_BLOCKS);
@@ -359,9 +394,17 @@ mod tests {
assert_eq!(block_downloader.inflight_full_block_requests.len(), TOTAL_BLOCKS);
// poll downloader
for i in 0..TOTAL_BLOCKS {
let sync_future = poll_fn(|cx| block_downloader.poll(cx));
let next_ready = sync_future.await;
assert_matches!(next_ready, DownloadOutcome::NewDownloadStarted { remaining_blocks, target } => {
assert_eq!(remaining_blocks, 1);
});
}
let sync_future = poll_fn(|cx| block_downloader.poll(cx));
let next_ready = sync_future.await;
assert_matches!(next_ready, DownloadOutcome::Blocks(blocks) => {
// ensure all blocks were obtained
assert_eq!(blocks.len(), TOTAL_BLOCKS);