diff --git a/bin/reth/src/commands/debug_cmd/execution.rs b/bin/reth/src/commands/debug_cmd/execution.rs index db54bbb0df..99edd2ef25 100644 --- a/bin/reth/src/commands/debug_cmd/execution.rs +++ b/bin/reth/src/commands/debug_cmd/execution.rs @@ -13,7 +13,7 @@ use crate::{ use clap::Parser; use futures::{stream::select as stream_select, StreamExt}; use reth_beacon_consensus::BeaconConsensus; -use reth_config::Config; +use reth_config::{config::EtlConfig, Config}; use reth_db::{database::Database, init_db, DatabaseEnv}; use reth_downloaders::{ bodies::bodies::BodiesDownloaderBuilder, @@ -200,10 +200,16 @@ impl Command { /// Execute `execution-debug` command pub async fn execute(self, ctx: CliContext) -> eyre::Result<()> { - let config = Config::default(); + let mut config = Config::default(); let data_dir = self.datadir.unwrap_or_chain_default(self.chain.chain); let db_path = data_dir.db_path(); + + // Make sure ETL doesn't default to /tmp/, but to whatever datadir is set to + if config.stages.etl.dir.is_none() { + config.stages.etl.dir = Some(EtlConfig::from_datadir(&data_dir.data_dir_path())); + } + fs::create_dir_all(&db_path)?; let db = Arc::new(init_db(db_path, self.db.database_args())?); let provider_factory = diff --git a/bin/reth/src/commands/import.rs b/bin/reth/src/commands/import.rs index d87328e33a..ce9cd3efe7 100644 --- a/bin/reth/src/commands/import.rs +++ b/bin/reth/src/commands/import.rs @@ -12,7 +12,7 @@ use clap::Parser; use eyre::Context; use futures::{Stream, StreamExt}; use reth_beacon_consensus::BeaconConsensus; -use reth_config::Config; +use reth_config::{config::EtlConfig, Config}; use reth_db::{database::Database, init_db}; use reth_downloaders::{ bodies::bodies::BodiesDownloaderBuilder, @@ -115,9 +115,14 @@ impl ImportCommand { let data_dir = self.datadir.unwrap_or_chain_default(self.chain.chain); let config_path = self.config.clone().unwrap_or_else(|| data_dir.config_path()); - let config: Config = self.load_config(config_path.clone())?; + let mut config: Config = self.load_config(config_path.clone())?; info!(target: "reth::cli", path = ?config_path, "Configuration loaded"); + // Make sure ETL doesn't default to /tmp/, but to whatever datadir is set to + if config.stages.etl.dir.is_none() { + config.stages.etl.dir = Some(EtlConfig::from_datadir(&data_dir.data_dir_path())); + } + let db_path = data_dir.db_path(); info!(target: "reth::cli", path = ?db_path, "Opening database"); diff --git a/bin/reth/src/commands/stage/run.rs b/bin/reth/src/commands/stage/run.rs index 47e70ae7c0..b8f9bc5278 100644 --- a/bin/reth/src/commands/stage/run.rs +++ b/bin/reth/src/commands/stage/run.rs @@ -256,8 +256,14 @@ impl Command { Box::new(MerkleStage::default_execution()), Some(Box::new(MerkleStage::default_unwind())), ), - StageEnum::AccountHistory => (Box::::default(), None), - StageEnum::StorageHistory => (Box::::default(), None), + StageEnum::AccountHistory => ( + Box::new(IndexAccountHistoryStage::default().with_etl_config(etl_config)), + None, + ), + StageEnum::StorageHistory => ( + Box::new(IndexStorageHistoryStage::default().with_etl_config(etl_config)), + None, + ), _ => return Ok(()), }; if let Some(unwind_stage) = &unwind_stage { diff --git a/crates/stages/src/sets.rs b/crates/stages/src/sets.rs index 4e24e9a923..1185de25f0 100644 --- a/crates/stages/src/sets.rs +++ b/crates/stages/src/sets.rs @@ -17,6 +17,7 @@ //! # use reth_node_ethereum::EthEvmConfig; //! # use reth_provider::test_utils::create_test_provider_factory; //! # use reth_static_file::StaticFileProducer; +//! # use reth_config::config::EtlConfig; //! //! # let executor_factory = EvmProcessorFactory::new(MAINNET.clone(), EthEvmConfig::default()); //! # let provider_factory = create_test_provider_factory(); @@ -27,7 +28,7 @@ //! ); //! // Build a pipeline with all offline stages. //! # let pipeline = Pipeline::builder() -//! .add_stages(OfflineStages::new(executor_factory)) +//! .add_stages(OfflineStages::new(executor_factory, EtlConfig::default())) //! .build(provider_factory, static_file_producer); //! ``` //! @@ -37,11 +38,13 @@ //! # use reth_revm::EvmProcessorFactory; //! # use reth_node_ethereum::EthEvmConfig; //! # use reth_primitives::MAINNET; +//! # use reth_config::config::EtlConfig; +//! //! // Build a pipeline with all offline stages and a custom stage at the end. //! # let executor_factory = EvmProcessorFactory::new(MAINNET.clone(), EthEvmConfig::default()); //! Pipeline::builder() //! .add_stages( -//! OfflineStages::new(executor_factory).builder().add_stage(MyCustomStage) +//! OfflineStages::new(executor_factory, EtlConfig::default()).builder().add_stage(MyCustomStage) //! ) //! .build(); //! ``` @@ -90,6 +93,8 @@ pub struct DefaultStages { online: OnlineStages, /// Executor factory needs for execution stage executor_factory: EF, + /// ETL configuration + etl_config: EtlConfig, } impl DefaultStages { @@ -113,9 +118,10 @@ impl DefaultStages { consensus, header_downloader, body_downloader, - etl_config, + etl_config.clone(), ), executor_factory, + etl_config, } } } @@ -128,10 +134,11 @@ where pub fn add_offline_stages( default_offline: StageSetBuilder, executor_factory: EF, + etl_config: EtlConfig, ) -> StageSetBuilder { StageSetBuilder::default() .add_set(default_offline) - .add_set(OfflineStages::new(executor_factory)) + .add_set(OfflineStages::new(executor_factory, etl_config)) .add_stage(FinishStage) } } @@ -145,7 +152,7 @@ where DB: Database + 'static, { fn builder(self) -> StageSetBuilder { - Self::add_offline_stages(self.online.builder(), self.executor_factory) + Self::add_offline_stages(self.online.builder(), self.executor_factory, self.etl_config) } } @@ -250,12 +257,14 @@ where pub struct OfflineStages { /// Executor factory needs for execution stage pub executor_factory: EF, + /// ETL configuration + etl_config: EtlConfig, } impl OfflineStages { /// Create a new set of offline stages with default values. - pub fn new(executor_factory: EF) -> Self { - Self { executor_factory } + pub fn new(executor_factory: EF, etl_config: EtlConfig) -> Self { + Self { executor_factory, etl_config } } } @@ -263,8 +272,8 @@ impl StageSet for OfflineStages { fn builder(self) -> StageSetBuilder { ExecutionStages::new(self.executor_factory) .builder() - .add_set(HashingStages) - .add_set(HistoryIndexingStages) + .add_set(HashingStages { etl_config: self.etl_config.clone() }) + .add_set(HistoryIndexingStages { etl_config: self.etl_config }) } } @@ -294,14 +303,17 @@ impl StageSet for ExecutionStages { /// A set containing all stages that hash account state. #[derive(Debug, Default)] #[non_exhaustive] -pub struct HashingStages; +pub struct HashingStages { + /// ETL configuration + etl_config: EtlConfig, +} impl StageSet for HashingStages { fn builder(self) -> StageSetBuilder { StageSetBuilder::default() .add_stage(MerkleStage::default_unwind()) - .add_stage(AccountHashingStage::default()) - .add_stage(StorageHashingStage::default()) + .add_stage(AccountHashingStage::default().with_etl_config(self.etl_config.clone())) + .add_stage(StorageHashingStage::default().with_etl_config(self.etl_config)) .add_stage(MerkleStage::default_execution()) } } @@ -309,13 +321,16 @@ impl StageSet for HashingStages { /// A set containing all stages that do additional indexing for historical state. #[derive(Debug, Default)] #[non_exhaustive] -pub struct HistoryIndexingStages; +pub struct HistoryIndexingStages { + /// ETL configuration + etl_config: EtlConfig, +} impl StageSet for HistoryIndexingStages { fn builder(self) -> StageSetBuilder { StageSetBuilder::default() - .add_stage(TransactionLookupStage::default()) - .add_stage(IndexStorageHistoryStage::default()) - .add_stage(IndexAccountHistoryStage::default()) + .add_stage(TransactionLookupStage::default().with_etl_config(self.etl_config.clone())) + .add_stage(IndexStorageHistoryStage::default().with_etl_config(self.etl_config.clone())) + .add_stage(IndexAccountHistoryStage::default().with_etl_config(self.etl_config)) } } diff --git a/crates/stages/src/stages/hashing_account.rs b/crates/stages/src/stages/hashing_account.rs index 372ce46eab..4afccc77f8 100644 --- a/crates/stages/src/stages/hashing_account.rs +++ b/crates/stages/src/stages/hashing_account.rs @@ -47,6 +47,12 @@ impl AccountHashingStage { pub fn new(clean_threshold: u64, commit_threshold: u64, etl_config: EtlConfig) -> Self { Self { clean_threshold, commit_threshold, etl_config } } + + /// Set the ETL configuration to use. + pub fn with_etl_config(mut self, etl_config: EtlConfig) -> Self { + self.etl_config = etl_config; + self + } } impl Default for AccountHashingStage { diff --git a/crates/stages/src/stages/hashing_storage.rs b/crates/stages/src/stages/hashing_storage.rs index 059fdf0fca..54f4b9520e 100644 --- a/crates/stages/src/stages/hashing_storage.rs +++ b/crates/stages/src/stages/hashing_storage.rs @@ -48,6 +48,12 @@ impl StorageHashingStage { pub fn new(clean_threshold: u64, commit_threshold: u64, etl_config: EtlConfig) -> Self { Self { clean_threshold, commit_threshold, etl_config } } + + /// Set the ETL configuration to use. + pub fn with_etl_config(mut self, etl_config: EtlConfig) -> Self { + self.etl_config = etl_config; + self + } } impl Default for StorageHashingStage { diff --git a/crates/stages/src/stages/index_account_history.rs b/crates/stages/src/stages/index_account_history.rs index 76d01dbdd6..89c77d6e1b 100644 --- a/crates/stages/src/stages/index_account_history.rs +++ b/crates/stages/src/stages/index_account_history.rs @@ -37,6 +37,12 @@ impl IndexAccountHistoryStage { ) -> Self { Self { commit_threshold, prune_mode, etl_config } } + + /// Set the ETL configuration to use. + pub fn with_etl_config(mut self, etl_config: EtlConfig) -> Self { + self.etl_config = etl_config; + self + } } impl Default for IndexAccountHistoryStage { diff --git a/crates/stages/src/stages/index_storage_history.rs b/crates/stages/src/stages/index_storage_history.rs index c00de1632e..b321f1c562 100644 --- a/crates/stages/src/stages/index_storage_history.rs +++ b/crates/stages/src/stages/index_storage_history.rs @@ -41,6 +41,12 @@ impl IndexStorageHistoryStage { ) -> Self { Self { commit_threshold, prune_mode, etl_config } } + + /// Set the ETL configuration to use. + pub fn with_etl_config(mut self, etl_config: EtlConfig) -> Self { + self.etl_config = etl_config; + self + } } impl Default for IndexStorageHistoryStage { diff --git a/crates/stages/src/stages/tx_lookup.rs b/crates/stages/src/stages/tx_lookup.rs index 03a8aa2ece..7bdeb4e1a5 100644 --- a/crates/stages/src/stages/tx_lookup.rs +++ b/crates/stages/src/stages/tx_lookup.rs @@ -48,6 +48,12 @@ impl TransactionLookupStage { pub fn new(chunk_size: u64, etl_config: EtlConfig, prune_mode: Option) -> Self { Self { chunk_size, etl_config, prune_mode } } + + /// Set the ETL configuration to use. + pub fn with_etl_config(mut self, etl_config: EtlConfig) -> Self { + self.etl_config = etl_config; + self + } } impl Stage for TransactionLookupStage {