Downloader refactoring: stages trait. (#57)

Add a common protocol implemented by every header downloading stage.
This commit is contained in:
battlmonstr
2021-11-08 19:33:22 +01:00
committed by GitHub
parent 824831c8c7
commit f8bec8d1be
11 changed files with 94 additions and 59 deletions

View File

@@ -45,7 +45,7 @@ modular-bitfield = "0.11"
num-bigint = "0.4"
num-traits = "0.2"
once_cell = "1"
parking_lot = "0.11"
parking_lot = { version = "0.11", features = ["send_guard"] }
pin-utils = "0.1"
rand = "0.8"
rayon = "1"

View File

@@ -25,13 +25,22 @@ use tracing::*;
type StageStream = Pin<Box<dyn Stream<Item = anyhow::Result<()>>>>;
pub struct Downloader<DB: kv::traits::MutableKV> {
fn make_stage_stream(mut stage: Box<dyn crate::downloader::headers::stage::Stage>) -> StageStream {
let stream = async_stream::stream! {
loop {
yield stage.execute().await;
}
};
Box::pin(stream)
}
pub struct Downloader<DB: kv::traits::MutableKV + Sync> {
opts: Opts,
chain_config: ChainConfig,
db: Arc<DB>,
}
impl<DB: kv::traits::MutableKV> Downloader<DB> {
impl<DB: kv::traits::MutableKV + Sync> Downloader<DB> {
pub fn new(opts: Opts, chains_config: ChainsConfig, db: Arc<DB>) -> Self {
let chain_config = chains_config.0[&opts.chain_name].clone();
@@ -91,53 +100,28 @@ impl<DB: kv::traits::MutableKV> Downloader<DB> {
// although most of the time only one of the stages is actively running,
// while the others are waiting for the status updates or timeouts.
let mut fetch_request_stage = FetchRequestStage::new(header_slices.clone(), sentry.clone());
let mut fetch_receive_stage = FetchReceiveStage::new(header_slices.clone(), sentry.clone());
let mut retry_stage = RetryStage::new(header_slices.clone());
let mut verify_stage = VerifyStage::new(header_slices.clone(), preverified_hashes_config);
let mut save_stage = SaveStage::new(header_slices.clone(), self.db.clone());
let mut refill_stage = RefillStage::new(header_slices.clone());
let fetch_request_stage = FetchRequestStage::new(header_slices.clone(), sentry.clone());
let fetch_receive_stage = FetchReceiveStage::new(header_slices.clone(), sentry.clone());
let retry_stage = RetryStage::new(header_slices.clone());
let verify_stage = VerifyStage::new(header_slices.clone(), preverified_hashes_config);
let save_stage = SaveStage::new(header_slices.clone(), self.db.clone());
let refill_stage = RefillStage::new(header_slices.clone());
let can_proceed = fetch_receive_stage.can_proceed_checker();
let fetch_request_stage_stream: StageStream = Box::pin(async_stream::stream! {
loop {
yield fetch_request_stage.execute().await;
}
});
let fetch_receive_stage_stream: StageStream = Box::pin(async_stream::stream! {
loop {
yield fetch_receive_stage.execute().await;
}
});
let retry_stage_stage_stream: StageStream = Box::pin(async_stream::stream! {
loop {
yield retry_stage.execute().await;
}
});
let verify_stage_stage_stream: StageStream = Box::pin(async_stream::stream! {
loop {
yield verify_stage.execute().await;
}
});
let save_stage_stage_stream: StageStream = Box::pin(async_stream::stream! {
loop {
yield save_stage.execute().await;
}
});
let refill_stage_stage_stream: StageStream = Box::pin(async_stream::stream! {
loop {
yield refill_stage.execute().await;
}
});
let mut stream = StreamMap::<&str, StageStream>::new();
stream.insert("fetch_request_stage_stream", fetch_request_stage_stream);
stream.insert("fetch_receive_stage_stream", fetch_receive_stage_stream);
stream.insert("retry_stage_stage_stream", retry_stage_stage_stream);
stream.insert("verify_stage_stage_stream", verify_stage_stage_stream);
stream.insert("save_stage_stage_stream", save_stage_stage_stream);
stream.insert("refill_stage_stage_stream", refill_stage_stage_stream);
stream.insert(
"fetch_request_stage",
make_stage_stream(Box::new(fetch_request_stage)),
);
stream.insert(
"fetch_receive_stage",
make_stage_stream(Box::new(fetch_receive_stage)),
);
stream.insert("retry_stage", make_stage_stream(Box::new(retry_stage)));
stream.insert("verify_stage", make_stage_stream(Box::new(verify_stage)));
stream.insert("save_stage", make_stage_stream(Box::new(save_stage)));
stream.insert("refill_stage", make_stage_stream(Box::new(refill_stage)));
while let Some((key, result)) = stream.next().await {
if result.is_err() {

View File

@@ -16,6 +16,7 @@ use std::{
pin::Pin,
sync::{atomic::*, Arc},
};
use tokio::sync::Mutex;
use tokio_stream::StreamExt;
use tracing::*;
@@ -26,7 +27,7 @@ pub struct FetchReceiveStage {
header_slices: Arc<HeaderSlices>,
sentry: Arc<RwLock<SentryClientReactor>>,
is_over: Arc<AtomicBool>,
message_stream: Option<BlockHeadersMessageStream>,
message_stream: Mutex<Option<BlockHeadersMessageStream>>,
}
pub struct CanProceed {
@@ -50,17 +51,18 @@ impl FetchReceiveStage {
header_slices,
sentry,
is_over: Arc::new(false.into()),
message_stream: None,
message_stream: Mutex::new(None),
}
}
pub async fn execute(&mut self) -> anyhow::Result<()> {
pub async fn execute(&self) -> anyhow::Result<()> {
debug!("FetchReceiveStage: start");
if self.message_stream.is_none() {
self.message_stream = Some(self.receive_headers()?);
let mut message_stream = self.message_stream.try_lock()?;
if message_stream.is_none() {
*message_stream = Some(self.receive_headers()?);
}
let message_result = self.message_stream.as_mut().unwrap().next().await;
let message_result = message_stream.as_mut().unwrap().next().await;
match message_result {
Some(message) => self.on_headers(message.headers),
None => self.is_over.store(true, Ordering::SeqCst),
@@ -135,3 +137,10 @@ impl FetchReceiveStage {
Ok(Box::pin(out_stream))
}
}
#[async_trait::async_trait]
impl super::stage::Stage for FetchReceiveStage {
async fn execute(&mut self) -> anyhow::Result<()> {
FetchReceiveStage::execute(self).await
}
}

View File

@@ -12,7 +12,7 @@ use crate::{
},
models::BlockNumber,
};
use parking_lot::{lock_api::RwLockUpgradableReadGuard, RwLock};
use parking_lot::{RwLock, RwLockUpgradableReadGuard};
use std::{
ops::DerefMut,
sync::{atomic::*, Arc},
@@ -112,3 +112,10 @@ impl FetchRequestStage {
.try_send_message(Message::GetBlockHeaders(message), PeerFilter::Random(1))
}
}
#[async_trait::async_trait]
impl super::stage::Stage for FetchRequestStage {
async fn execute(&mut self) -> anyhow::Result<()> {
FetchRequestStage::execute(self).await
}
}

View File

@@ -1,5 +1,6 @@
mod header_slice_status_watch;
pub mod header_slices;
pub mod stage;
pub mod fetch_receive_stage;
pub mod fetch_request_stage;

View File

@@ -42,3 +42,10 @@ impl RefillStage {
self.header_slices.refill();
}
}
#[async_trait::async_trait]
impl super::stage::Stage for RefillStage {
async fn execute(&mut self) -> anyhow::Result<()> {
RefillStage::execute(self).await
}
}

View File

@@ -2,7 +2,7 @@ use crate::downloader::headers::{
header_slice_status_watch::HeaderSliceStatusWatch,
header_slices::{HeaderSlice, HeaderSliceStatus, HeaderSlices},
};
use parking_lot::lock_api::RwLockUpgradableReadGuard;
use parking_lot::RwLockUpgradableReadGuard;
use std::{ops::DerefMut, sync::Arc, time, time::Duration};
use tracing::*;
@@ -79,3 +79,10 @@ impl RetryStage {
}
}
}
#[async_trait::async_trait]
impl super::stage::Stage for RetryStage {
async fn execute(&mut self) -> anyhow::Result<()> {
RetryStage::execute(self).await
}
}

View File

@@ -6,18 +6,18 @@ use crate::{
kv,
kv::traits::MutableTransaction,
};
use parking_lot::lock_api::RwLockUpgradableReadGuard;
use parking_lot::RwLockUpgradableReadGuard;
use std::{ops::DerefMut, sync::Arc};
use tracing::*;
/// Saves slices into the database, and sets Saved status.
pub struct SaveStage<DB: kv::traits::MutableKV> {
pub struct SaveStage<DB: kv::traits::MutableKV + Sync> {
header_slices: Arc<HeaderSlices>,
pending_watch: HeaderSliceStatusWatch,
db: Arc<DB>,
}
impl<DB: kv::traits::MutableKV> SaveStage<DB> {
impl<DB: kv::traits::MutableKV + Sync> SaveStage<DB> {
pub fn new(header_slices: Arc<HeaderSlices>, db: Arc<DB>) -> Self {
Self {
header_slices: header_slices.clone(),
@@ -72,3 +72,10 @@ impl<DB: kv::traits::MutableKV> SaveStage<DB> {
tx.commit().await
}
}
#[async_trait::async_trait]
impl<DB: kv::traits::MutableKV + Sync> super::stage::Stage for SaveStage<DB> {
async fn execute(&mut self) -> anyhow::Result<()> {
SaveStage::<DB>::execute(self).await
}
}

View File

@@ -0,0 +1,6 @@
use async_trait::async_trait;
#[async_trait]
pub trait Stage {
async fn execute(&mut self) -> anyhow::Result<()>;
}

View File

@@ -4,7 +4,7 @@ use crate::downloader::headers::{
header_slices::{HeaderSlice, HeaderSliceStatus, HeaderSlices},
preverified_hashes_config::PreverifiedHashesConfig,
};
use parking_lot::lock_api::RwLockUpgradableReadGuard;
use parking_lot::RwLockUpgradableReadGuard;
use std::{ops::DerefMut, sync::Arc};
use tracing::*;
@@ -124,3 +124,10 @@ impl VerifyStage {
self.preverified_hashes.hashes.get(index as usize)
}
}
#[async_trait::async_trait]
impl super::stage::Stage for VerifyStage {
async fn execute(&mut self) -> anyhow::Result<()> {
VerifyStage::execute(self).await
}
}

View File

@@ -152,7 +152,7 @@ impl SentryClientReactor {
pub fn reserve_capacity_in_send_queue(
&self,
) -> Pin<Box<dyn Future<Output = anyhow::Result<()>>>> {
) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send>> {
let sender = self.send_message_sender.clone();
Box::pin(sender.reserve_owned().map(|result| match result {
Ok(_) => Ok(()),