From e70f6871b8447e2f0931ec0d919259eaebf4d7c8 Mon Sep 17 00:00:00 2001 From: Federico Gimenez Date: Mon, 7 Jul 2025 15:26:20 +0200 Subject: [PATCH] refactor: extract import functionality to separate module (#17253) --- crates/cli/commands/src/import.rs | 212 +++------------------- crates/cli/commands/src/import_op.rs | 254 +++++++++++++++++++++++++++ crates/cli/commands/src/lib.rs | 1 + 3 files changed, 277 insertions(+), 190 deletions(-) create mode 100644 crates/cli/commands/src/import_op.rs diff --git a/crates/cli/commands/src/import.rs b/crates/cli/commands/src/import.rs index eef6711706..05434de4c2 100644 --- a/crates/cli/commands/src/import.rs +++ b/crates/cli/commands/src/import.rs @@ -1,36 +1,16 @@ //! Command that initializes the node by importing a chain from a file. -use crate::common::{AccessRights, CliNodeComponents, CliNodeTypes, Environment, EnvironmentArgs}; -use alloy_primitives::B256; +use crate::{ + common::{AccessRights, CliNodeComponents, CliNodeTypes, Environment, EnvironmentArgs}, + import_op::{import_blocks_from_file, ImportConfig}, +}; use clap::Parser; -use futures::{Stream, StreamExt}; -use reth_chainspec::{EthChainSpec, EthereumHardforks}; +use reth_chainspec::{ChainSpecProvider, EthChainSpec, EthereumHardforks}; use reth_cli::chainspec::ChainSpecParser; -use reth_config::Config; -use reth_consensus::{ConsensusError, FullConsensus}; -use reth_db_api::{tables, transaction::DbTx}; -use reth_downloaders::{ - bodies::bodies::BodiesDownloaderBuilder, - file_client::{ChunkedFileReader, FileClient, DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE}, - headers::reverse_headers::ReverseHeadersDownloaderBuilder, -}; -use reth_evm::ConfigureEvm; -use reth_network_p2p::{ - bodies::downloader::BodyDownloader, - headers::downloader::{HeaderDownloader, SyncTarget}, -}; -use reth_node_api::BlockTy; use reth_node_core::version::SHORT_VERSION; -use reth_node_events::node::NodeEvent; -use reth_provider::{ - providers::ProviderNodeTypes, BlockNumReader, ChainSpecProvider, HeaderProvider, ProviderError, - ProviderFactory, StageCheckpointReader, -}; -use reth_prune::PruneModes; -use reth_stages::{prelude::*, Pipeline, StageId, StageSet}; -use reth_static_file::StaticFileProducer; use std::{path::PathBuf, sync::Arc}; -use tokio::sync::watch; -use tracing::{debug, error, info}; +use tracing::info; + +pub use crate::import_op::build_import_pipeline_impl as build_import_pipeline; /// Syncs RLP encoded blocks from a file. #[derive(Debug, Parser)] @@ -66,101 +46,29 @@ impl> ImportComm { info!(target: "reth::cli", "reth {} starting", SHORT_VERSION); - if self.no_state { - info!(target: "reth::cli", "Disabled stages requiring state"); - } - - debug!(target: "reth::cli", - chunk_byte_len=self.chunk_len.unwrap_or(DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE), - "Chunking chain import" - ); - let Environment { provider_factory, config, .. } = self.env.init::(AccessRights::RW)?; let components = components(provider_factory.chain_spec()); + + let import_config = ImportConfig { no_state: self.no_state, chunk_len: self.chunk_len }; + let executor = components.evm_config().clone(); let consensus = Arc::new(components.consensus().clone()); - info!(target: "reth::cli", "Consensus engine initialized"); - // open file - let mut reader = ChunkedFileReader::new(&self.path, self.chunk_len).await?; + let result = import_blocks_from_file( + &self.path, + import_config, + provider_factory, + &config, + executor, + consensus, + ) + .await?; - let mut total_decoded_blocks = 0; - let mut total_decoded_txns = 0; - - let mut sealed_header = provider_factory - .sealed_header(provider_factory.last_block_number()?)? - .expect("should have genesis"); - - while let Some(file_client) = - reader.next_chunk::>(consensus.clone(), Some(sealed_header)).await? - { - // create a new FileClient from chunk read from file - info!(target: "reth::cli", - "Importing chain file chunk" - ); - - let tip = file_client.tip().ok_or(eyre::eyre!("file client has no tip"))?; - info!(target: "reth::cli", "Chain file chunk read"); - - total_decoded_blocks += file_client.headers_len(); - total_decoded_txns += file_client.total_transactions(); - - let (mut pipeline, events) = build_import_pipeline( - &config, - provider_factory.clone(), - &consensus, - Arc::new(file_client), - StaticFileProducer::new(provider_factory.clone(), PruneModes::default()), - self.no_state, - executor.clone(), - )?; - - // override the tip - pipeline.set_tip(tip); - debug!(target: "reth::cli", ?tip, "Tip manually set"); - - let provider = provider_factory.provider()?; - - let latest_block_number = - provider.get_stage_checkpoint(StageId::Finish)?.map(|ch| ch.block_number); - tokio::spawn(reth_node_events::node::handle_events(None, latest_block_number, events)); - - // Run pipeline - info!(target: "reth::cli", "Starting sync pipeline"); - tokio::select! { - res = pipeline.run() => res?, - _ = tokio::signal::ctrl_c() => {}, - } - - sealed_header = provider_factory - .sealed_header(provider_factory.last_block_number()?)? - .expect("should have genesis"); + if !result.is_complete() { + return Err(eyre::eyre!("Chain was partially imported")); } - let provider = provider_factory.provider()?; - - let total_imported_blocks = provider.tx_ref().entries::()?; - let total_imported_txns = provider.tx_ref().entries::()?; - - if total_decoded_blocks != total_imported_blocks || - total_decoded_txns != total_imported_txns - { - error!(target: "reth::cli", - total_decoded_blocks, - total_imported_blocks, - total_decoded_txns, - total_imported_txns, - "Chain was partially imported" - ); - } - - info!(target: "reth::cli", - total_imported_blocks, - total_imported_txns, - "Chain file imported" - ); - Ok(()) } } @@ -172,82 +80,6 @@ impl ImportCommand { } } -/// Builds import pipeline. -/// -/// If configured to execute, all stages will run. Otherwise, only stages that don't require state -/// will run. -pub fn build_import_pipeline( - config: &Config, - provider_factory: ProviderFactory, - consensus: &Arc, - file_client: Arc>>, - static_file_producer: StaticFileProducer>, - disable_exec: bool, - evm_config: E, -) -> eyre::Result<(Pipeline, impl Stream>)> -where - N: ProviderNodeTypes, - C: FullConsensus + 'static, - E: ConfigureEvm + 'static, -{ - if !file_client.has_canonical_blocks() { - eyre::bail!("unable to import non canonical blocks"); - } - - // Retrieve latest header found in the database. - let last_block_number = provider_factory.last_block_number()?; - let local_head = provider_factory - .sealed_header(last_block_number)? - .ok_or_else(|| ProviderError::HeaderNotFound(last_block_number.into()))?; - - let mut header_downloader = ReverseHeadersDownloaderBuilder::new(config.stages.headers) - .build(file_client.clone(), consensus.clone()) - .into_task(); - // TODO: The pipeline should correctly configure the downloader on its own. - // Find the possibility to remove unnecessary pre-configuration. - header_downloader.update_local_head(local_head); - header_downloader.update_sync_target(SyncTarget::Tip(file_client.tip().unwrap())); - - let mut body_downloader = BodiesDownloaderBuilder::new(config.stages.bodies) - .build(file_client.clone(), consensus.clone(), provider_factory.clone()) - .into_task(); - // TODO: The pipeline should correctly configure the downloader on its own. - // Find the possibility to remove unnecessary pre-configuration. - body_downloader - .set_download_range(file_client.min_block().unwrap()..=file_client.max_block().unwrap()) - .expect("failed to set download range"); - - let (tip_tx, tip_rx) = watch::channel(B256::ZERO); - - let max_block = file_client.max_block().unwrap_or(0); - - let pipeline = Pipeline::builder() - .with_tip_sender(tip_tx) - // we want to sync all blocks the file client provides or 0 if empty - .with_max_block(max_block) - .with_fail_on_unwind(true) - .add_stages( - DefaultStages::new( - provider_factory.clone(), - tip_rx, - consensus.clone(), - header_downloader, - body_downloader, - evm_config, - config.stages.clone(), - PruneModes::default(), - None, - ) - .builder() - .disable_all_if(&StageId::STATE_REQUIRED, || disable_exec), - ) - .build(provider_factory, static_file_producer); - - let events = pipeline.events().map(Into::into); - - Ok((pipeline, events)) -} - #[cfg(test)] mod tests { use super::*; diff --git a/crates/cli/commands/src/import_op.rs b/crates/cli/commands/src/import_op.rs new file mode 100644 index 0000000000..c3adec1020 --- /dev/null +++ b/crates/cli/commands/src/import_op.rs @@ -0,0 +1,254 @@ +//! Core import functionality without CLI dependencies. + +use alloy_primitives::B256; +use futures::StreamExt; +use reth_config::Config; +use reth_consensus::FullConsensus; +use reth_db_api::{tables, transaction::DbTx}; +use reth_downloaders::{ + bodies::bodies::BodiesDownloaderBuilder, + file_client::{ChunkedFileReader, FileClient, DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE}, + headers::reverse_headers::ReverseHeadersDownloaderBuilder, +}; +use reth_evm::ConfigureEvm; +use reth_network_p2p::{ + bodies::downloader::BodyDownloader, + headers::downloader::{HeaderDownloader, SyncTarget}, +}; +use reth_node_api::BlockTy; +use reth_node_events::node::NodeEvent; +use reth_provider::{ + providers::ProviderNodeTypes, BlockNumReader, HeaderProvider, ProviderError, ProviderFactory, + StageCheckpointReader, +}; +use reth_prune::PruneModes; +use reth_stages::{prelude::*, Pipeline, StageId, StageSet}; +use reth_static_file::StaticFileProducer; +use std::{path::Path, sync::Arc}; +use tokio::sync::watch; +use tracing::{debug, error, info}; + +/// Configuration for importing blocks from RLP files. +#[derive(Debug, Clone, Default)] +pub struct ImportConfig { + /// Disables stages that require state. + pub no_state: bool, + /// Chunk byte length to read from file. + pub chunk_len: Option, +} + +/// Result of an import operation. +#[derive(Debug)] +pub struct ImportResult { + /// Total number of blocks decoded from the file. + pub total_decoded_blocks: usize, + /// Total number of transactions decoded from the file. + pub total_decoded_txns: usize, + /// Total number of blocks imported into the database. + pub total_imported_blocks: usize, + /// Total number of transactions imported into the database. + pub total_imported_txns: usize, +} + +impl ImportResult { + /// Returns true if all blocks and transactions were imported successfully. + pub fn is_complete(&self) -> bool { + self.total_decoded_blocks == self.total_imported_blocks && + self.total_decoded_txns == self.total_imported_txns + } +} + +/// Imports blocks from an RLP-encoded file into the database. +/// +/// This function reads RLP-encoded blocks from a file in chunks and imports them +/// using the pipeline infrastructure. It's designed to be used both from the CLI +/// and from test code. +pub async fn import_blocks_from_file( + path: &Path, + import_config: ImportConfig, + provider_factory: ProviderFactory, + config: &Config, + executor: impl ConfigureEvm + 'static, + consensus: Arc< + impl FullConsensus + 'static, + >, +) -> eyre::Result +where + N: ProviderNodeTypes, +{ + if import_config.no_state { + info!(target: "reth::import", "Disabled stages requiring state"); + } + + debug!(target: "reth::import", + chunk_byte_len=import_config.chunk_len.unwrap_or(DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE), + "Chunking chain import" + ); + + info!(target: "reth::import", "Consensus engine initialized"); + + // open file + let mut reader = ChunkedFileReader::new(path, import_config.chunk_len).await?; + + let mut total_decoded_blocks = 0; + let mut total_decoded_txns = 0; + + let mut sealed_header = provider_factory + .sealed_header(provider_factory.last_block_number()?)? + .expect("should have genesis"); + + while let Some(file_client) = + reader.next_chunk::>(consensus.clone(), Some(sealed_header)).await? + { + // create a new FileClient from chunk read from file + info!(target: "reth::import", + "Importing chain file chunk" + ); + + let tip = file_client.tip().ok_or(eyre::eyre!("file client has no tip"))?; + info!(target: "reth::import", "Chain file chunk read"); + + total_decoded_blocks += file_client.headers_len(); + total_decoded_txns += file_client.total_transactions(); + + let (mut pipeline, events) = build_import_pipeline_impl( + config, + provider_factory.clone(), + &consensus, + Arc::new(file_client), + StaticFileProducer::new(provider_factory.clone(), PruneModes::default()), + import_config.no_state, + executor.clone(), + )?; + + // override the tip + pipeline.set_tip(tip); + debug!(target: "reth::import", ?tip, "Tip manually set"); + + let provider = provider_factory.provider()?; + + let latest_block_number = + provider.get_stage_checkpoint(StageId::Finish)?.map(|ch| ch.block_number); + tokio::spawn(reth_node_events::node::handle_events(None, latest_block_number, events)); + + // Run pipeline + info!(target: "reth::import", "Starting sync pipeline"); + tokio::select! { + res = pipeline.run() => res?, + _ = tokio::signal::ctrl_c() => { + info!(target: "reth::import", "Import interrupted by user"); + break; + }, + } + + sealed_header = provider_factory + .sealed_header(provider_factory.last_block_number()?)? + .expect("should have genesis"); + } + + let provider = provider_factory.provider()?; + + let total_imported_blocks = provider.tx_ref().entries::()?; + let total_imported_txns = provider.tx_ref().entries::()?; + + let result = ImportResult { + total_decoded_blocks, + total_decoded_txns, + total_imported_blocks, + total_imported_txns, + }; + + if !result.is_complete() { + error!(target: "reth::import", + total_decoded_blocks, + total_imported_blocks, + total_decoded_txns, + total_imported_txns, + "Chain was partially imported" + ); + } else { + info!(target: "reth::import", + total_imported_blocks, + total_imported_txns, + "Chain file imported" + ); + } + + Ok(result) +} + +/// Builds import pipeline. +/// +/// If configured to execute, all stages will run. Otherwise, only stages that don't require state +/// will run. +pub fn build_import_pipeline_impl( + config: &Config, + provider_factory: ProviderFactory, + consensus: &Arc, + file_client: Arc>>, + static_file_producer: StaticFileProducer>, + disable_exec: bool, + evm_config: E, +) -> eyre::Result<(Pipeline, impl futures::Stream>)> +where + N: ProviderNodeTypes, + C: FullConsensus + 'static, + E: ConfigureEvm + 'static, +{ + if !file_client.has_canonical_blocks() { + eyre::bail!("unable to import non canonical blocks"); + } + + // Retrieve latest header found in the database. + let last_block_number = provider_factory.last_block_number()?; + let local_head = provider_factory + .sealed_header(last_block_number)? + .ok_or_else(|| ProviderError::HeaderNotFound(last_block_number.into()))?; + + let mut header_downloader = ReverseHeadersDownloaderBuilder::new(config.stages.headers) + .build(file_client.clone(), consensus.clone()) + .into_task(); + // TODO: The pipeline should correctly configure the downloader on its own. + // Find the possibility to remove unnecessary pre-configuration. + header_downloader.update_local_head(local_head); + header_downloader.update_sync_target(SyncTarget::Tip(file_client.tip().unwrap())); + + let mut body_downloader = BodiesDownloaderBuilder::new(config.stages.bodies) + .build(file_client.clone(), consensus.clone(), provider_factory.clone()) + .into_task(); + // TODO: The pipeline should correctly configure the downloader on its own. + // Find the possibility to remove unnecessary pre-configuration. + body_downloader + .set_download_range(file_client.min_block().unwrap()..=file_client.max_block().unwrap()) + .expect("failed to set download range"); + + let (tip_tx, tip_rx) = watch::channel(B256::ZERO); + + let max_block = file_client.max_block().unwrap_or(0); + + let pipeline = Pipeline::builder() + .with_tip_sender(tip_tx) + // we want to sync all blocks the file client provides or 0 if empty + .with_max_block(max_block) + .with_fail_on_unwind(true) + .add_stages( + DefaultStages::new( + provider_factory.clone(), + tip_rx, + consensus.clone(), + header_downloader, + body_downloader, + evm_config, + config.stages.clone(), + PruneModes::default(), + None, + ) + .builder() + .disable_all_if(&StageId::STATE_REQUIRED, || disable_exec), + ) + .build(provider_factory, static_file_producer); + + let events = pipeline.events().map(Into::into); + + Ok((pipeline, events)) +} diff --git a/crates/cli/commands/src/lib.rs b/crates/cli/commands/src/lib.rs index 778f284028..e602fac820 100644 --- a/crates/cli/commands/src/lib.rs +++ b/crates/cli/commands/src/lib.rs @@ -15,6 +15,7 @@ pub mod download; pub mod dump_genesis; pub mod import; pub mod import_era; +pub mod import_op; pub mod init_cmd; pub mod init_state; pub mod launcher;