feat(sync): set max block if debug.tip provided (#1522)

This commit is contained in:
Roman Krasiuk
2023-02-23 13:05:37 +02:00
committed by GitHub
parent 3c77dd9e2f
commit 22d20d568d
6 changed files with 126 additions and 70 deletions

View File

@@ -20,3 +20,4 @@ pub mod runner;
pub mod stage;
pub mod test_eth_chain;
pub mod test_vectors;
pub mod utils;

View File

@@ -6,6 +6,7 @@ use crate::{
dirs::{ConfigPath, DbPath, PlatformPath},
prometheus_exporter,
runner::CliContext,
utils::get_single_header,
};
use clap::{crate_version, Parser};
use eyre::Context;
@@ -31,10 +32,10 @@ use reth_interfaces::{
sync::SyncStateUpdater,
};
use reth_network::{
error::NetworkError, NetworkConfig, NetworkEvent, NetworkHandle, NetworkManager,
error::NetworkError, FetchClient, NetworkConfig, NetworkEvent, NetworkHandle, NetworkManager,
};
use reth_network_api::NetworkInfo;
use reth_primitives::{BlockNumber, ChainSpec, Head, H256};
use reth_primitives::{BlockHashOrNumber, BlockNumber, ChainSpec, Head, H256};
use reth_provider::{BlockProvider, HeaderProvider, ShareableDatabase};
use reth_rpc_engine_api::{EngineApi, EngineApiHandle};
use reth_staged_sync::{
@@ -205,9 +206,18 @@ impl Command {
task_executor: &TaskExecutor,
) -> eyre::Result<(Pipeline<Env<WriteMap>, impl SyncStateUpdater>, impl Stream<Item = NodeEvent>)>
{
// building network downloaders using the fetch client
let fetch_client = Arc::new(network.fetch_client().await?);
let fetch_client = network.fetch_client().await?;
let max_block = if let Some(block) = self.max_block {
Some(block)
} else if let Some(tip) = self.tip {
Some(self.lookup_or_fetch_tip(db.clone(), fetch_client.clone(), tip).await?)
} else {
None
};
// TODO: remove Arc requirement from downloader builders.
// building network downloaders using the fetch client
let fetch_client = Arc::new(fetch_client);
let header_downloader = ReverseHeadersDownloaderBuilder::from(config.stages.headers)
.build(fetch_client.clone(), consensus.clone())
.into_task_with(task_executor);
@@ -217,7 +227,14 @@ impl Command {
.into_task_with(task_executor);
let mut pipeline = self
.build_pipeline(config, header_downloader, body_downloader, network.clone(), consensus)
.build_pipeline(
config,
header_downloader,
body_downloader,
network.clone(),
consensus,
max_block,
)
.await?;
let events = stream_select(
@@ -316,7 +333,7 @@ impl Command {
Ok(handle)
}
fn fetch_head(&self, db: Arc<Env<WriteMap>>) -> Result<Head, reth_interfaces::db::Error> {
fn lookup_head(&self, db: Arc<Env<WriteMap>>) -> Result<Head, reth_interfaces::db::Error> {
db.view(|tx| {
let head = FINISH.get_progress(tx)?.unwrap_or_default();
let header = tx
@@ -339,13 +356,42 @@ impl Command {
.map_err(Into::into)
}
/// Attempt to look up the block number for the tip hash in the database.
/// If it doesn't exist, download the header and return the block number.
///
/// NOTE: The download is attempted with infinite retries.
async fn lookup_or_fetch_tip(
&self,
db: Arc<Env<WriteMap>>,
fetch_client: FetchClient,
tip: H256,
) -> Result<u64, reth_interfaces::Error> {
if let Some(number) = db.view(|tx| tx.get::<tables::HeaderNumbers>(tip))?? {
debug!(target: "reth::cli", ?tip, number, "Successfully looked up tip in the database");
return Ok(number)
}
debug!(target: "reth::cli", ?tip, "Fetching tip header from the network.");
loop {
match get_single_header(fetch_client.clone(), BlockHashOrNumber::Hash(tip)).await {
Ok(tip_header) => {
debug!(target: "reth::cli", ?tip, number = tip_header.number, "Successfully fetched tip");
return Ok(tip_header.number)
}
Err(error) => {
error!(target: "reth::cli", %error, "Failed to fetch the tip. Retrying...");
}
}
}
}
fn load_network_config(
&self,
config: &Config,
db: Arc<Env<WriteMap>>,
executor: TaskExecutor,
) -> NetworkConfig<ShareableDatabase<Arc<Env<WriteMap>>>> {
let head = self.fetch_head(Arc::clone(&db)).expect("the head block is missing");
let head = self.lookup_head(Arc::clone(&db)).expect("the head block is missing");
self.network
.network_config(config, self.chain.clone())
@@ -361,6 +407,7 @@ impl Command {
body_downloader: B,
updater: U,
consensus: &Arc<dyn Consensus>,
max_block: Option<u64>,
) -> eyre::Result<Pipeline<Env<WriteMap>, U>>
where
H: HeaderDownloader + 'static,
@@ -371,7 +418,7 @@ impl Command {
let mut builder = Pipeline::builder();
if let Some(max_block) = self.max_block {
if let Some(max_block) = max_block {
debug!(target: "reth::cli", max_block, "Configuring builder to use max block");
builder = builder.with_max_block(max_block)
}

View File

@@ -2,17 +2,14 @@
use crate::{
args::DiscoveryArgs,
dirs::{ConfigPath, PlatformPath},
utils::get_single_header,
};
use backon::{ConstantBuilder, Retryable};
use clap::{Parser, Subcommand};
use reth_db::mdbx::{Env, EnvKind, WriteMap};
use reth_discv4::NatResolver;
use reth_interfaces::p2p::{
bodies::client::BodiesClient,
headers::client::{HeadersClient, HeadersRequest},
};
use reth_network::FetchClient;
use reth_primitives::{BlockHashOrNumber, ChainSpec, NodeRecord, SealedHeader};
use reth_interfaces::p2p::bodies::client::BodiesClient;
use reth_primitives::{BlockHashOrNumber, ChainSpec, NodeRecord};
use reth_provider::ShareableDatabase;
use reth_staged_sync::{
utils::{chainspec::chain_spec_value_parser, hash_or_num_value_parser},
@@ -117,7 +114,7 @@ impl Command {
match self.command {
Subcommands::Header { id } => {
let header = (move || self.get_single_header(fetch_client.clone(), id))
let header = (move || get_single_header(fetch_client.clone(), id))
.retry(&backoff)
.notify(|err, _| println!("Error requesting header: {err}. Retrying..."))
.await?;
@@ -130,10 +127,7 @@ impl Command {
println!("Block number provided. Downloading header first...");
let client = fetch_client.clone();
let header = (move || {
self.get_single_header(
client.clone(),
BlockHashOrNumber::Number(number),
)
get_single_header(client.clone(), BlockHashOrNumber::Number(number))
})
.retry(&backoff)
.notify(|err, _| println!("Error requesting header: {err}. Retrying..."))
@@ -162,43 +156,4 @@ impl Command {
Ok(())
}
/// Get a single header from network
pub async fn get_single_header(
&self,
client: FetchClient,
id: BlockHashOrNumber,
) -> eyre::Result<SealedHeader> {
let request = HeadersRequest {
direction: reth_primitives::HeadersDirection::Rising,
limit: 1,
start: id,
};
let (_, response) = client.get_headers(request).await?.split();
if response.len() != 1 {
eyre::bail!(
"Invalid number of headers received. Expected: 1. Received: {}",
response.len()
)
}
let header = response.into_iter().next().unwrap().seal_slow();
let valid = match id {
BlockHashOrNumber::Hash(hash) => header.hash() == hash,
BlockHashOrNumber::Number(number) => header.number == number,
};
if !valid {
eyre::bail!(
"Received invalid header. Received: {:?}. Expected: {:?}",
header.num_hash(),
id
);
}
Ok(header)
}
}

43
bin/reth/src/utils.rs Normal file
View File

@@ -0,0 +1,43 @@
//! Common CLI utility functions.
use reth_interfaces::p2p::{
download::DownloadClient,
headers::client::{HeadersClient, HeadersRequest},
priority::Priority,
};
use reth_network::FetchClient;
use reth_primitives::{BlockHashOrNumber, HeadersDirection, SealedHeader};
/// Get a single header from network
pub async fn get_single_header(
client: FetchClient,
id: BlockHashOrNumber,
) -> eyre::Result<SealedHeader> {
let request = HeadersRequest { direction: HeadersDirection::Rising, limit: 1, start: id };
let (peer_id, response) =
client.get_headers_with_priority(request, Priority::High).await?.split();
if response.len() != 1 {
client.report_bad_message(peer_id);
eyre::bail!("Invalid number of headers received. Expected: 1. Received: {}", response.len())
}
let header = response.into_iter().next().unwrap().seal_slow();
let valid = match id {
BlockHashOrNumber::Hash(hash) => header.hash() == hash,
BlockHashOrNumber::Number(number) => header.number == number,
};
if !valid {
client.report_bad_message(peer_id);
eyre::bail!(
"Received invalid header. Received: {:?}. Expected: {:?}",
header.num_hash(),
id
);
}
Ok(header)
}

View File

@@ -15,11 +15,14 @@ pub(crate) enum ControlFlow {
/// The progress of the last stage
progress: BlockNumber,
},
NoProgress,
NoProgress {
/// The current stage progress.
stage_progress: Option<BlockNumber>,
},
}
impl ControlFlow {
pub(crate) fn should_continue(&self) -> bool {
matches!(self, ControlFlow::Continue { .. } | ControlFlow::NoProgress)
matches!(self, ControlFlow::Continue { .. } | ControlFlow::NoProgress { .. })
}
}

View File

@@ -145,6 +145,13 @@ impl<DB: Database, U: SyncStateUpdater> Pipeline<DB, U> {
.zip(self.max_block)
.map_or(false, |(progress, target)| progress >= target)
{
trace!(
target: "sync::pipeline",
?next_action,
minimum_progress = ?self.progress.minimum_progress,
max_block = ?self.max_block,
"Terminating pipeline."
);
return Ok(())
}
}
@@ -168,18 +175,18 @@ impl<DB: Database, U: SyncStateUpdater> Pipeline<DB, U> {
updater.update_sync_state(state);
}
trace!(
target: "sync::pipeline",
stage = %stage_id,
"Executing stage"
);
trace!(target: "sync::pipeline", stage = %stage_id, "Executing stage");
let next = self
.execute_stage_to_completion(db, previous_stage, stage_index)
.instrument(info_span!("execute", stage = %stage_id))
.await?;
match next {
ControlFlow::NoProgress => {} // noop
ControlFlow::NoProgress { stage_progress } => {
if let Some(progress) = stage_progress {
self.progress.update(progress);
}
}
ControlFlow::Continue { progress } => self.progress.update(progress),
ControlFlow::Unwind { target, bad_block } => {
// reset the sync state
@@ -277,7 +284,7 @@ impl<DB: Database, U: SyncStateUpdater> Pipeline<DB, U> {
self.listeners.notify(PipelineEvent::Skipped { stage_id });
// We reached the maximum block, so we skip the stage
return Ok(ControlFlow::NoProgress)
return Ok(ControlFlow::NoProgress { stage_progress: prev_progress })
}
self.listeners
@@ -308,7 +315,7 @@ impl<DB: Database, U: SyncStateUpdater> Pipeline<DB, U> {
return Ok(if made_progress {
ControlFlow::Continue { progress: stage_progress }
} else {
ControlFlow::NoProgress
ControlFlow::NoProgress { stage_progress: Some(stage_progress) }
})
}
}
@@ -405,7 +412,7 @@ impl PipelineProgress {
fn next_ctrl(&self) -> ControlFlow {
match self.progress {
Some(progress) => ControlFlow::Continue { progress },
None => ControlFlow::NoProgress,
None => ControlFlow::NoProgress { stage_progress: None },
}
}
}
@@ -458,7 +465,7 @@ mod tests {
fn progress_ctrl_flow() {
let mut progress = PipelineProgress::default();
assert_eq!(progress.next_ctrl(), ControlFlow::NoProgress);
assert_eq!(progress.next_ctrl(), ControlFlow::NoProgress { stage_progress: None });
progress.update(1);
assert_eq!(progress.next_ctrl(), ControlFlow::Continue { progress: 1 });