Downloader: refactor phase loops. (#117)

Extrac the logic to DownloaderStageLoop.
This commit is contained in:
battlmonstr
2022-01-10 19:21:00 +01:00
committed by GitHub
parent e06913a51c
commit 10de70545e
5 changed files with 98 additions and 143 deletions

View File

@@ -1,15 +1,13 @@
use super::{
fetch_receive_stage::FetchReceiveStage, fetch_request_stage::FetchRequestStage, header_slices,
header_slices::HeaderSlices, penalize_stage::PenalizeStage, retry_stage::RetryStage,
save_stage::SaveStage, verify_stage_forky_link::VerifyStageForkyLink,
verify_stage_linear::VerifyStageLinear, HeaderSlicesView,
downloader_stage_loop::DownloaderStageLoop, fetch_receive_stage::FetchReceiveStage,
fetch_request_stage::FetchRequestStage, header_slices, header_slices::HeaderSlices,
penalize_stage::PenalizeStage, retry_stage::RetryStage, save_stage::SaveStage,
verify_stage_forky_link::VerifyStageForkyLink, verify_stage_linear::VerifyStageLinear,
HeaderSlicesView,
};
use crate::{
downloader::{
headers::{
header_slices::align_block_num_to_slice_start,
stage_stream::{make_stage_stream, StageStream},
},
headers::header_slices::align_block_num_to_slice_start,
ui_system::{UISystemShared, UISystemViewScope},
},
kv,
@@ -17,8 +15,6 @@ use crate::{
sentry::{chain_config::ChainConfig, messages::BlockHashAndNumber, sentry_client_reactor::*},
};
use std::sync::Arc;
use tokio_stream::{StreamExt, StreamMap};
use tracing::*;
#[derive(Debug)]
pub struct DownloaderForky {
@@ -81,13 +77,6 @@ impl DownloaderForky {
let _header_slices_view_scope =
UISystemViewScope::new(&ui_system, Box::new(header_slices_view));
// Downloading happens with several stages where
// each of the stages processes blocks in one status,
// and updates them to proceed to the next status.
// All stages runs in parallel,
// although most of the time only one of the stages is actively running,
// while the others are waiting for the status updates or timeouts.
let fetch_request_stage = FetchRequestStage::new(
header_slices.clone(),
sentry.clone(),
@@ -111,36 +100,16 @@ impl DownloaderForky {
let can_proceed = fetch_receive_stage.can_proceed_check();
let mut stream = StreamMap::<&str, StageStream>::new();
stream.insert(
"fetch_request_stage",
make_stage_stream(fetch_request_stage),
);
stream.insert(
"fetch_receive_stage",
make_stage_stream(fetch_receive_stage),
);
stream.insert("retry_stage", make_stage_stream(retry_stage));
stream.insert("verify_stage", make_stage_stream(verify_stage));
stream.insert("verify_link_stage", make_stage_stream(verify_link_stage));
stream.insert("penalize_stage", make_stage_stream(penalize_stage));
stream.insert("save_stage", make_stage_stream(save_stage));
let mut stages = DownloaderStageLoop::new(&header_slices);
stages.insert(fetch_request_stage);
stages.insert(fetch_receive_stage);
stages.insert(retry_stage);
stages.insert(verify_stage);
stages.insert(verify_link_stage);
stages.insert(penalize_stage);
stages.insert(save_stage);
while let Some((key, result)) = stream.next().await {
if result.is_err() {
error!("Downloader headers {} failure: {:?}", key, result);
break;
}
if !can_proceed() {
break;
}
if header_slices.is_empty_at_final_position() {
break;
}
header_slices.notify_status_watchers();
}
stages.run(can_proceed).await;
let report = DownloaderForkyReport {
loaded_count: (header_slices.min_block_num().0 - start_block_num.0) as usize,

View File

@@ -1,16 +1,14 @@
use super::{
fetch_receive_stage::FetchReceiveStage, fetch_request_stage::FetchRequestStage, header_slices,
header_slices::HeaderSlices, penalize_stage::PenalizeStage, refill_stage::RefillStage,
retry_stage::RetryStage, save_stage::SaveStage,
top_block_estimate_stage::TopBlockEstimateStage, verify_stage_linear::VerifyStageLinear,
verify_stage_linear_link::VerifyStageLinearLink, HeaderSlicesView,
downloader_stage_loop::DownloaderStageLoop, fetch_receive_stage::FetchReceiveStage,
fetch_request_stage::FetchRequestStage, header_slices, header_slices::HeaderSlices,
penalize_stage::PenalizeStage, refill_stage::RefillStage, retry_stage::RetryStage,
save_stage::SaveStage, top_block_estimate_stage::TopBlockEstimateStage,
verify_stage_linear::VerifyStageLinear, verify_stage_linear_link::VerifyStageLinearLink,
HeaderSlicesView,
};
use crate::{
downloader::{
headers::{
header_slices::{align_block_num_to_slice_start, HeaderSliceStatus},
stage_stream::{make_stage_stream, StageStream},
},
headers::header_slices::{align_block_num_to_slice_start, HeaderSliceStatus},
ui_system::{UISystemShared, UISystemViewScope},
},
kv,
@@ -18,7 +16,6 @@ use crate::{
sentry::{chain_config::ChainConfig, messages::BlockHashAndNumber, sentry_client_reactor::*},
};
use std::sync::Arc;
use tokio_stream::{StreamExt, StreamMap};
use tracing::*;
#[derive(Debug)]
@@ -115,13 +112,6 @@ impl DownloaderLinear {
let _header_slices_view_scope =
UISystemViewScope::new(&ui_system, Box::new(header_slices_view));
// Downloading happens with several stages where
// each of the stages processes blocks in one status,
// and updates them to proceed to the next status.
// All stages runs in parallel,
// although most of the time only one of the stages is actively running,
// while the others are waiting for the status updates or timeouts.
let fetch_request_stage = FetchRequestStage::new(
header_slices.clone(),
sentry.clone(),
@@ -147,37 +137,17 @@ impl DownloaderLinear {
let can_proceed = fetch_receive_stage.can_proceed_check();
let mut stream = StreamMap::<&str, StageStream>::new();
stream.insert(
"fetch_request_stage",
make_stage_stream(fetch_request_stage),
);
stream.insert(
"fetch_receive_stage",
make_stage_stream(fetch_receive_stage),
);
stream.insert("retry_stage", make_stage_stream(retry_stage));
stream.insert("verify_stage", make_stage_stream(verify_stage));
stream.insert("verify_link_stage", make_stage_stream(verify_link_stage));
stream.insert("penalize_stage", make_stage_stream(penalize_stage));
stream.insert("save_stage", make_stage_stream(save_stage));
stream.insert("refill_stage", make_stage_stream(refill_stage));
let mut stages = DownloaderStageLoop::new(&header_slices);
stages.insert(fetch_request_stage);
stages.insert(fetch_receive_stage);
stages.insert(retry_stage);
stages.insert(verify_stage);
stages.insert(verify_link_stage);
stages.insert(penalize_stage);
stages.insert(save_stage);
stages.insert(refill_stage);
while let Some((key, result)) = stream.next().await {
if result.is_err() {
error!("Downloader headers {} failure: {:?}", key, result);
break;
}
if !can_proceed() {
break;
}
if header_slices.is_empty_at_final_position() {
break;
}
header_slices.notify_status_watchers();
}
stages.run(can_proceed).await;
let report = DownloaderLinearReport {
loaded_count: (header_slices.min_block_num().0 - start_block_num.0) as usize,

View File

@@ -1,17 +1,14 @@
use super::{
fetch_receive_stage::FetchReceiveStage, fetch_request_stage::FetchRequestStage, header_slices,
header_slices::HeaderSlices, penalize_stage::PenalizeStage,
preverified_hashes_config::PreverifiedHashesConfig, refill_stage::RefillStage,
retry_stage::RetryStage, save_stage::SaveStage,
downloader_stage_loop::DownloaderStageLoop, fetch_receive_stage::FetchReceiveStage,
fetch_request_stage::FetchRequestStage, header_slices, header_slices::HeaderSlices,
penalize_stage::PenalizeStage, preverified_hashes_config::PreverifiedHashesConfig,
refill_stage::RefillStage, retry_stage::RetryStage, save_stage::SaveStage,
top_block_estimate_stage::TopBlockEstimateStage,
verify_stage_preverified::VerifyStagePreverified, HeaderSlicesView,
};
use crate::{
downloader::{
headers::{
header_slices::align_block_num_to_slice_start,
stage_stream::{make_stage_stream, StageStream},
},
headers::header_slices::align_block_num_to_slice_start,
ui_system::{UISystemShared, UISystemViewScope},
},
kv,
@@ -19,8 +16,6 @@ use crate::{
sentry::sentry_client_reactor::*,
};
use std::sync::Arc;
use tokio_stream::{StreamExt, StreamMap};
use tracing::*;
#[derive(Debug)]
pub struct DownloaderPreverified {
@@ -95,13 +90,6 @@ impl DownloaderPreverified {
let _header_slices_view_scope =
UISystemViewScope::new(&ui_system, Box::new(header_slices_view));
// Downloading happens with several stages where
// each of the stages processes blocks in one status,
// and updates them to proceed to the next status.
// All stages runs in parallel,
// although most of the time only one of the stages is actively running,
// while the others are waiting for the status updates or timeouts.
let fetch_request_stage = FetchRequestStage::new(
header_slices.clone(),
sentry.clone(),
@@ -122,40 +110,17 @@ impl DownloaderPreverified {
let estimated_top_block_num_provider =
top_block_estimate_stage.estimated_top_block_num_provider();
let mut stream = StreamMap::<&str, StageStream>::new();
stream.insert(
"fetch_request_stage",
make_stage_stream(fetch_request_stage),
);
stream.insert(
"fetch_receive_stage",
make_stage_stream(fetch_receive_stage),
);
stream.insert("retry_stage", make_stage_stream(retry_stage));
stream.insert("verify_stage", make_stage_stream(verify_stage));
stream.insert("penalize_stage", make_stage_stream(penalize_stage));
stream.insert("save_stage", make_stage_stream(save_stage));
stream.insert("refill_stage", make_stage_stream(refill_stage));
stream.insert(
"top_block_estimate_stage",
make_stage_stream(top_block_estimate_stage),
);
let mut stages = DownloaderStageLoop::new(&header_slices);
stages.insert(fetch_request_stage);
stages.insert(fetch_receive_stage);
stages.insert(retry_stage);
stages.insert(verify_stage);
stages.insert(penalize_stage);
stages.insert(save_stage);
stages.insert(refill_stage);
stages.insert(top_block_estimate_stage);
while let Some((key, result)) = stream.next().await {
if result.is_err() {
error!("Downloader headers {} failure: {:?}", key, result);
break;
}
if !can_proceed() {
break;
}
if header_slices.is_empty_at_final_position() {
break;
}
header_slices.notify_status_watchers();
}
stages.run(can_proceed).await;
let report = DownloaderPreverifiedReport {
loaded_count: (header_slices.min_block_num().0 - start_block_num.0) as usize,

View File

@@ -0,0 +1,50 @@
use super::{
header_slices::HeaderSlices,
stage_stream::{make_stage_stream, StageStream},
};
use std::sync::Arc;
use tokio_stream::{StreamExt, StreamMap};
use tracing::*;
// Downloading happens with several stages where
// each of the stages processes blocks in one status,
// and updates them to proceed to the next status.
// All stages run in parallel,
// although most of the time only one of the stages is actively running,
// while the others are waiting for the status updates, IO or timeouts.
pub struct DownloaderStageLoop<'s> {
header_slices: Arc<HeaderSlices>,
stream: StreamMap<String, StageStream<'s>>,
}
impl<'s> DownloaderStageLoop<'s> {
pub fn new(header_slices: &Arc<HeaderSlices>) -> Self {
Self {
header_slices: header_slices.clone(),
stream: StreamMap::<String, StageStream>::new(),
}
}
pub fn insert<Stage: super::stage::Stage + 's>(&mut self, stage: Stage) {
let name = String::from(std::any::type_name::<Stage>());
self.stream.insert(name, make_stage_stream(stage));
}
pub async fn run(mut self, can_proceed: impl Fn() -> bool) {
while let Some((key, result)) = self.stream.next().await {
if result.is_err() {
error!("Downloader headers {} failure: {:?}", key, result);
break;
}
if !can_proceed() {
break;
}
if self.header_slices.is_empty_at_final_position() {
break;
}
self.header_slices.notify_status_watchers();
}
}
}

View File

@@ -10,6 +10,7 @@ mod parallel;
pub mod stage;
mod stage_stream;
mod downloader_stage_loop;
mod fetch_receive_stage;
mod fetch_request_stage;
mod fork_mode_stage;