From d9f9504dbde67019f8b5da3091bb8996dcb07e74 Mon Sep 17 00:00:00 2001 From: joshieDo <93316087+joshieDo@users.noreply.github.com> Date: Sun, 12 May 2024 12:38:34 +0100 Subject: [PATCH] chore: refactor `DefaultStages` to take `StageConfig` (#8173) --- bin/reth/src/commands/debug_cmd/execution.rs | 17 +-- bin/reth/src/commands/import.rs | 33 +---- bin/reth/src/commands/stage/run.rs | 60 ++++++-- bin/reth/src/commands/stage/unwind.rs | 29 +--- crates/config/src/config.rs | 13 ++ .../consensus/beacon/src/engine/test_utils.rs | 5 +- crates/node/builder/src/setup.rs | 59 +------- crates/stages/benches/criterion.rs | 8 +- crates/stages/src/lib.rs | 5 +- crates/stages/src/sets.rs | 130 ++++++++++++------ crates/stages/src/stages/execution.rs | 28 ++++ crates/stages/src/stages/hashing_account.rs | 16 +-- crates/stages/src/stages/hashing_storage.rs | 16 +-- .../src/stages/index_account_history.rs | 14 +- .../src/stages/index_storage_history.rs | 14 +- crates/stages/src/stages/sender_recovery.rs | 5 +- crates/stages/src/stages/tx_lookup.rs | 16 +-- 17 files changed, 245 insertions(+), 223 deletions(-) diff --git a/bin/reth/src/commands/debug_cmd/execution.rs b/bin/reth/src/commands/debug_cmd/execution.rs index 50e93dfbca..8fecc928ab 100644 --- a/bin/reth/src/commands/debug_cmd/execution.rs +++ b/bin/reth/src/commands/debug_cmd/execution.rs @@ -35,7 +35,7 @@ use reth_provider::{ }; use reth_stages::{ sets::DefaultStages, - stages::{ExecutionStage, ExecutionStageThresholds, SenderRecoveryStage}, + stages::{ExecutionStage, ExecutionStageThresholds}, Pipeline, StageSet, }; use reth_static_file::StaticFileProducer; @@ -109,6 +109,7 @@ impl Command { .into_task_with(task_executor); let stage_conf = &config.stages; + let prune_modes = config.prune.clone().map(|prune| prune.segments).unwrap_or_default(); let (tip_tx, tip_rx) = watch::channel(B256::ZERO); let executor = block_executor!(self.chain.clone()); @@ -124,11 +125,9 @@ impl Command { header_downloader, body_downloader, executor.clone(), - stage_conf.etl.clone(), + stage_conf.clone(), + prune_modes.clone(), ) - .set(SenderRecoveryStage { - commit_threshold: stage_conf.sender_recovery.commit_threshold, - }) .set(ExecutionStage::new( executor, ExecutionStageThresholds { @@ -137,12 +136,8 @@ impl Command { max_cumulative_gas: None, max_duration: None, }, - stage_conf - .merkle - .clean_threshold - .max(stage_conf.account_hashing.clean_threshold) - .max(stage_conf.storage_hashing.clean_threshold), - config.prune.clone().map(|prune| prune.segments).unwrap_or_default(), + stage_conf.execution_external_clean_threshold(), + prune_modes, ExExManagerHandle::empty(), )), ) diff --git a/bin/reth/src/commands/import.rs b/bin/reth/src/commands/import.rs index 354787f326..7d6b12fd8f 100644 --- a/bin/reth/src/commands/import.rs +++ b/bin/reth/src/commands/import.rs @@ -21,7 +21,6 @@ use reth_downloaders::{ file_client::{ChunkedFileReader, FileClient, DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE}, headers::reverse_headers::ReverseHeadersDownloaderBuilder, }; -use reth_exex::ExExManagerHandle; use reth_interfaces::p2p::{ bodies::downloader::BodyDownloader, headers::downloader::{HeaderDownloader, SyncTarget}, @@ -33,11 +32,7 @@ use reth_provider::{ BlockNumReader, ChainSpecProvider, HeaderProvider, HeaderSyncMode, ProviderError, ProviderFactory, StageCheckpointReader, StaticFileProviderFactory, }; -use reth_stages::{ - prelude::*, - stages::{ExecutionStage, ExecutionStageThresholds, SenderRecoveryStage}, - Pipeline, StageSet, -}; +use reth_stages::{prelude::*, Pipeline, StageSet}; use reth_static_file::StaticFileProducer; use std::{path::PathBuf, sync::Arc}; use tokio::sync::watch; @@ -273,29 +268,11 @@ where consensus.clone(), header_downloader, body_downloader, - executor.clone(), - config.stages.etl.clone(), - ) - .set(SenderRecoveryStage { - commit_threshold: config.stages.sender_recovery.commit_threshold, - }) - .set(ExecutionStage::new( executor, - ExecutionStageThresholds { - max_blocks: config.stages.execution.max_blocks, - max_changes: config.stages.execution.max_changes, - max_cumulative_gas: config.stages.execution.max_cumulative_gas, - max_duration: config.stages.execution.max_duration, - }, - config - .stages - .merkle - .clean_threshold - .max(config.stages.account_hashing.clean_threshold) - .max(config.stages.storage_hashing.clean_threshold), - config.prune.as_ref().map(|prune| prune.segments.clone()).unwrap_or_default(), - ExExManagerHandle::empty(), - )) + config.stages.clone(), + PruneModes::default(), + ) + .builder() .disable_all_if(&StageId::STATE_REQUIRED, || should_exec), ) .build(provider_factory, static_file_producer); diff --git a/bin/reth/src/commands/stage/run.rs b/bin/reth/src/commands/stage/run.rs index 59d26fc293..d34b67db42 100644 --- a/bin/reth/src/commands/stage/run.rs +++ b/bin/reth/src/commands/stage/run.rs @@ -16,7 +16,10 @@ use crate::{ use clap::Parser; use reth_beacon_consensus::EthBeaconConsensus; use reth_cli_runner::CliContext; -use reth_config::{config::EtlConfig, Config}; +use reth_config::{ + config::{EtlConfig, HashingConfig, SenderRecoveryConfig, TransactionLookupConfig}, + Config, +}; use reth_db::init_db; use reth_downloaders::bodies::bodies::BodiesDownloaderBuilder; use reth_exex::ExExManagerHandle; @@ -165,6 +168,7 @@ impl Command { Some(self.etl_dir.unwrap_or_else(|| EtlConfig::from_datadir(data_dir.data_dir()))), self.etl_file_size.unwrap_or(EtlConfig::default_file_size()), ); + let prune_modes = config.prune.clone().map(|prune| prune.segments).unwrap_or_default(); let (mut exec_stage, mut unwind_stage): (Box>, Option>>) = match self.stage { @@ -222,7 +226,12 @@ impl Command { ); (Box::new(stage), None) } - StageEnum::Senders => (Box::new(SenderRecoveryStage::new(batch_size)), None), + StageEnum::Senders => ( + Box::new(SenderRecoveryStage::new(SenderRecoveryConfig { + commit_threshold: batch_size, + })), + None, + ), StageEnum::Execution => { let executor = block_executor!(self.chain.clone()); ( @@ -235,31 +244,52 @@ impl Command { max_duration: None, }, config.stages.merkle.clean_threshold, - config.prune.map(|prune| prune.segments).unwrap_or_default(), + prune_modes, ExExManagerHandle::empty(), )), None, ) } - StageEnum::TxLookup => { - (Box::new(TransactionLookupStage::new(batch_size, etl_config, None)), None) - } - StageEnum::AccountHashing => { - (Box::new(AccountHashingStage::new(1, batch_size, etl_config)), None) - } - StageEnum::StorageHashing => { - (Box::new(StorageHashingStage::new(1, batch_size, etl_config)), None) - } + StageEnum::TxLookup => ( + Box::new(TransactionLookupStage::new( + TransactionLookupConfig { chunk_size: batch_size }, + etl_config, + prune_modes.transaction_lookup, + )), + None, + ), + StageEnum::AccountHashing => ( + Box::new(AccountHashingStage::new( + HashingConfig { clean_threshold: 1, commit_threshold: batch_size }, + etl_config, + )), + None, + ), + StageEnum::StorageHashing => ( + Box::new(StorageHashingStage::new( + HashingConfig { clean_threshold: 1, commit_threshold: batch_size }, + etl_config, + )), + None, + ), StageEnum::Merkle => ( - Box::new(MerkleStage::default_execution()), + Box::new(MerkleStage::new_execution(config.stages.merkle.clean_threshold)), Some(Box::new(MerkleStage::default_unwind())), ), StageEnum::AccountHistory => ( - Box::new(IndexAccountHistoryStage::default().with_etl_config(etl_config)), + Box::new(IndexAccountHistoryStage::new( + config.stages.index_account_history, + etl_config, + prune_modes.account_history, + )), None, ), StageEnum::StorageHistory => ( - Box::new(IndexStorageHistoryStage::default().with_etl_config(etl_config)), + Box::new(IndexStorageHistoryStage::new( + config.stages.index_storage_history, + etl_config, + prune_modes.storage_history, + )), None, ), _ => return Ok(()), diff --git a/bin/reth/src/commands/stage/unwind.rs b/bin/reth/src/commands/stage/unwind.rs index 1f0c7fc456..d2ebe70db1 100644 --- a/bin/reth/src/commands/stage/unwind.rs +++ b/bin/reth/src/commands/stage/unwind.rs @@ -15,11 +15,7 @@ use reth_provider::{ }; use reth_stages::{ sets::DefaultStages, - stages::{ - AccountHashingStage, ExecutionStage, ExecutionStageThresholds, IndexAccountHistoryStage, - IndexStorageHistoryStage, MerkleStage, SenderRecoveryStage, StorageHashingStage, - TransactionLookupStage, - }, + stages::{ExecutionStage, ExecutionStageThresholds}, Pipeline, StageSet, }; use reth_static_file::StaticFileProducer; @@ -133,6 +129,7 @@ impl Command { let consensus: Arc = Arc::new(EthBeaconConsensus::new(provider_factory.chain_spec())); let stage_conf = &config.stages; + let prune_modes = config.prune.clone().map(|prune| prune.segments).unwrap_or_default(); let (tip_tx, tip_rx) = watch::channel(B256::ZERO); let executor = block_executor!(provider_factory.chain_spec()); @@ -148,11 +145,9 @@ impl Command { NoopHeaderDownloader::default(), NoopBodiesDownloader::default(), executor.clone(), - stage_conf.etl.clone(), + stage_conf.clone(), + prune_modes.clone(), ) - .set(SenderRecoveryStage { - commit_threshold: stage_conf.sender_recovery.commit_threshold, - }) .set(ExecutionStage::new( executor, ExecutionStageThresholds { @@ -161,20 +156,10 @@ impl Command { max_cumulative_gas: None, max_duration: None, }, - stage_conf - .merkle - .clean_threshold - .max(stage_conf.account_hashing.clean_threshold) - .max(stage_conf.storage_hashing.clean_threshold), - config.prune.clone().map(|prune| prune.segments).unwrap_or_default(), + stage_conf.execution_external_clean_threshold(), + prune_modes, ExExManagerHandle::empty(), - )) - .set(AccountHashingStage::default()) - .set(StorageHashingStage::default()) - .set(MerkleStage::default_unwind()) - .set(TransactionLookupStage::default()) - .set(IndexAccountHistoryStage::default()) - .set(IndexStorageHistoryStage::default()), + )), ) .build( provider_factory.clone(), diff --git a/crates/config/src/config.rs b/crates/config/src/config.rs index f6537a04c7..aa8b7ee09a 100644 --- a/crates/config/src/config.rs +++ b/crates/config/src/config.rs @@ -96,6 +96,19 @@ pub struct StageConfig { pub etl: EtlConfig, } +impl StageConfig { + /// The highest threshold (in number of blocks) for switching between incremental and full + /// calculations across `MerkleStage`, `AccountHashingStage` and `StorageHashingStage`. This is + /// required to figure out if can prune or not changesets on subsequent pipeline runs during + /// `ExecutionStage` + pub fn execution_external_clean_threshold(&self) -> u64 { + self.merkle + .clean_threshold + .max(self.account_hashing.clean_threshold) + .max(self.storage_hashing.clean_threshold) + } +} + /// Header stage configuration. #[derive(Debug, Clone, Copy, Deserialize, PartialEq, Eq, Serialize)] #[serde(default)] diff --git a/crates/consensus/beacon/src/engine/test_utils.rs b/crates/consensus/beacon/src/engine/test_utils.rs index d9fd67c130..13ffd0c4ff 100644 --- a/crates/consensus/beacon/src/engine/test_utils.rs +++ b/crates/consensus/beacon/src/engine/test_utils.rs @@ -6,7 +6,7 @@ use crate::{ use reth_blockchain_tree::{ config::BlockchainTreeConfig, externals::TreeExternals, BlockchainTree, ShareableBlockchainTree, }; -use reth_config::config::EtlConfig; +use reth_config::config::StageConfig; use reth_consensus::{test_utils::TestConsensus, Consensus}; use reth_db::{test_utils::TempDatabase, DatabaseEnv as DE}; use reth_downloaders::{ @@ -375,7 +375,8 @@ where header_downloader, body_downloader, executor_factory.clone(), - EtlConfig::default(), + StageConfig::default(), + PruneModes::default(), )) } }; diff --git a/crates/node/builder/src/setup.rs b/crates/node/builder/src/setup.rs index 8033ab1c68..3314891fe9 100644 --- a/crates/node/builder/src/setup.rs +++ b/crates/node/builder/src/setup.rs @@ -18,15 +18,7 @@ use reth_node_core::{ primitives::{BlockNumber, B256}, }; use reth_provider::{HeaderSyncMode, ProviderFactory}; -use reth_stages::{ - prelude::DefaultStages, - stages::{ - AccountHashingStage, ExecutionStage, ExecutionStageThresholds, IndexAccountHistoryStage, - IndexStorageHistoryStage, MerkleStage, SenderRecoveryStage, StorageHashingStage, - TransactionLookupStage, - }, - Pipeline, StageSet, -}; +use reth_stages::{prelude::DefaultStages, stages::ExecutionStage, Pipeline, StageSet}; use reth_static_file::StaticFileProducer; use reth_tasks::TaskExecutor; use reth_tracing::tracing::debug; @@ -131,56 +123,19 @@ where header_downloader, body_downloader, executor.clone(), - stage_config.etl.clone(), + stage_config.clone(), + prune_modes.clone(), ) - .set(SenderRecoveryStage { - commit_threshold: stage_config.sender_recovery.commit_threshold, - }) .set( ExecutionStage::new( executor, - ExecutionStageThresholds { - max_blocks: stage_config.execution.max_blocks, - max_changes: stage_config.execution.max_changes, - max_cumulative_gas: stage_config.execution.max_cumulative_gas, - max_duration: stage_config.execution.max_duration, - }, - stage_config - .merkle - .clean_threshold - .max(stage_config.account_hashing.clean_threshold) - .max(stage_config.storage_hashing.clean_threshold), - prune_modes.clone(), + stage_config.execution.into(), + stage_config.execution_external_clean_threshold(), + prune_modes, exex_manager_handle, ) .with_metrics_tx(metrics_tx), - ) - .set(AccountHashingStage::new( - stage_config.account_hashing.clean_threshold, - stage_config.account_hashing.commit_threshold, - stage_config.etl.clone(), - )) - .set(StorageHashingStage::new( - stage_config.storage_hashing.clean_threshold, - stage_config.storage_hashing.commit_threshold, - stage_config.etl.clone(), - )) - .set(MerkleStage::new_execution(stage_config.merkle.clean_threshold)) - .set(TransactionLookupStage::new( - stage_config.transaction_lookup.chunk_size, - stage_config.etl.clone(), - prune_modes.transaction_lookup, - )) - .set(IndexAccountHistoryStage::new( - stage_config.index_account_history.commit_threshold, - prune_modes.account_history, - stage_config.etl.clone(), - )) - .set(IndexStorageHistoryStage::new( - stage_config.index_storage_history.commit_threshold, - prune_modes.storage_history, - stage_config.etl.clone(), - )), + ), ) .build(provider_factory, static_file_producer); diff --git a/crates/stages/benches/criterion.rs b/crates/stages/benches/criterion.rs index 98b97462b9..976723dcd2 100644 --- a/crates/stages/benches/criterion.rs +++ b/crates/stages/benches/criterion.rs @@ -2,7 +2,7 @@ use criterion::{criterion_main, measurement::WallTime, BenchmarkGroup, Criterion}; #[cfg(not(target_os = "windows"))] use pprof::criterion::{Output, PProfProfiler}; -use reth_config::config::EtlConfig; +use reth_config::config::{EtlConfig, TransactionLookupConfig}; use reth_db::{test_utils::TempDatabase, DatabaseEnv}; use reth_primitives::{stage::StageCheckpoint, BlockNumber}; @@ -87,7 +87,11 @@ fn transaction_lookup(c: &mut Criterion, runtime: &Runtime) { let mut group = c.benchmark_group("Stages"); // don't need to run each stage for that many times group.sample_size(10); - let stage = TransactionLookupStage::new(DEFAULT_NUM_BLOCKS, EtlConfig::default(), None); + let stage = TransactionLookupStage::new( + TransactionLookupConfig { chunk_size: DEFAULT_NUM_BLOCKS }, + EtlConfig::default(), + None, + ); let db = setup::txs_testdata(DEFAULT_NUM_BLOCKS); diff --git a/crates/stages/src/lib.rs b/crates/stages/src/lib.rs index 2c6aaff251..370dd18aca 100644 --- a/crates/stages/src/lib.rs +++ b/crates/stages/src/lib.rs @@ -28,7 +28,7 @@ //! # use reth_provider::HeaderSyncMode; //! # use reth_provider::test_utils::create_test_provider_factory; //! # use reth_static_file::StaticFileProducer; -//! # use reth_config::config::EtlConfig; +//! # use reth_config::config::StageConfig; //! # use reth_consensus::Consensus; //! # use reth_consensus::test_utils::TestConsensus; //! # @@ -62,7 +62,8 @@ //! headers_downloader, //! bodies_downloader, //! executor_provider, -//! EtlConfig::default(), +//! StageConfig::default(), +//! PruneModes::default(), //! )) //! .build(provider_factory, static_file_producer); //! ``` diff --git a/crates/stages/src/sets.rs b/crates/stages/src/sets.rs index 7ec85170fc..e8257047e5 100644 --- a/crates/stages/src/sets.rs +++ b/crates/stages/src/sets.rs @@ -17,7 +17,7 @@ //! # use reth_provider::StaticFileProviderFactory; //! # use reth_provider::test_utils::create_test_provider_factory; //! # use reth_static_file::StaticFileProducer; -//! # use reth_config::config::EtlConfig; +//! # use reth_config::config::StageConfig; //! # use reth_evm::execute::BlockExecutorProvider; //! //! # fn create(exec: impl BlockExecutorProvider) { @@ -30,7 +30,7 @@ //! ); //! // Build a pipeline with all offline stages. //! let pipeline = Pipeline::builder() -//! .add_stages(OfflineStages::new(exec, EtlConfig::default())) +//! .add_stages(OfflineStages::new(exec, StageConfig::default(), PruneModes::default())) //! .build(provider_factory, static_file_producer); //! //! # } @@ -43,13 +43,14 @@ use crate::{ }, StageSet, StageSetBuilder, }; -use reth_config::config::EtlConfig; +use reth_config::config::StageConfig; use reth_consensus::Consensus; use reth_db::database::Database; use reth_evm::execute::BlockExecutorProvider; use reth_interfaces::p2p::{ bodies::downloader::BodyDownloader, headers::downloader::HeaderDownloader, }; +use reth_primitives::PruneModes; use reth_provider::{HeaderSyncGapProvider, HeaderSyncMode}; use std::sync::Arc; @@ -80,12 +81,15 @@ pub struct DefaultStages { online: OnlineStages, /// Executor factory needs for execution stage executor_factory: EF, - /// ETL configuration - etl_config: EtlConfig, + /// Configuration for each stage in the pipeline + stages_config: StageConfig, + /// Prune configuration for every segment that can be pruned + prune_modes: PruneModes, } impl DefaultStages { /// Create a new set of default stages with default values. + #[allow(clippy::too_many_arguments)] pub fn new( provider: Provider, header_mode: HeaderSyncMode, @@ -93,7 +97,8 @@ impl DefaultStages { header_downloader: H, body_downloader: B, executor_factory: E, - etl_config: EtlConfig, + stages_config: StageConfig, + prune_modes: PruneModes, ) -> Self where E: BlockExecutorProvider, @@ -105,10 +110,11 @@ impl DefaultStages { consensus, header_downloader, body_downloader, - etl_config.clone(), + stages_config.clone(), ), executor_factory, - etl_config, + stages_config, + prune_modes, } } } @@ -121,11 +127,12 @@ where pub fn add_offline_stages( default_offline: StageSetBuilder, executor_factory: E, - etl_config: EtlConfig, + stages_config: StageConfig, + prune_modes: PruneModes, ) -> StageSetBuilder { StageSetBuilder::default() .add_set(default_offline) - .add_set(OfflineStages::new(executor_factory, etl_config)) + .add_set(OfflineStages::new(executor_factory, stages_config, prune_modes)) .add_stage(FinishStage) } } @@ -139,7 +146,12 @@ where DB: Database + 'static, { fn builder(self) -> StageSetBuilder { - Self::add_offline_stages(self.online.builder(), self.executor_factory, self.etl_config) + Self::add_offline_stages( + self.online.builder(), + self.executor_factory, + self.stages_config.clone(), + self.prune_modes, + ) } } @@ -159,8 +171,8 @@ pub struct OnlineStages { header_downloader: H, /// The block body downloader body_downloader: B, - /// ETL configuration - etl_config: EtlConfig, + /// Configuration for each stage in the pipeline + stages_config: StageConfig, } impl OnlineStages { @@ -171,9 +183,9 @@ impl OnlineStages { consensus: Arc, header_downloader: H, body_downloader: B, - etl_config: EtlConfig, + stages_config: StageConfig, ) -> Self { - Self { provider, header_mode, consensus, header_downloader, body_downloader, etl_config } + Self { provider, header_mode, consensus, header_downloader, body_downloader, stages_config } } } @@ -198,7 +210,7 @@ where mode: HeaderSyncMode, header_downloader: H, consensus: Arc, - etl_config: EtlConfig, + stages_config: StageConfig, ) -> StageSetBuilder { StageSetBuilder::default() .add_stage(HeaderStage::new( @@ -206,7 +218,7 @@ where header_downloader, mode, consensus.clone(), - etl_config, + stages_config.etl, )) .add_stage(bodies) } @@ -226,7 +238,7 @@ where self.header_downloader, self.header_mode, self.consensus.clone(), - self.etl_config.clone(), + self.stages_config.etl.clone(), )) .add_stage(BodyStage::new(self.body_downloader)) } @@ -244,14 +256,16 @@ where pub struct OfflineStages { /// Executor factory needs for execution stage pub executor_factory: EF, - /// ETL configuration - etl_config: EtlConfig, + /// Configuration for each stage in the pipeline + stages_config: StageConfig, + /// Prune configuration for every segment that can be pruned + prune_modes: PruneModes, } impl OfflineStages { /// Create a new set of offline stages with default values. - pub fn new(executor_factory: EF, etl_config: EtlConfig) -> Self { - Self { executor_factory, etl_config } + pub fn new(executor_factory: EF, stages_config: StageConfig, prune_modes: PruneModes) -> Self { + Self { executor_factory, stages_config, prune_modes } } } @@ -261,10 +275,17 @@ where DB: Database, { fn builder(self) -> StageSetBuilder { - ExecutionStages::new(self.executor_factory) - .builder() - .add_set(HashingStages { etl_config: self.etl_config.clone() }) - .add_set(HistoryIndexingStages { etl_config: self.etl_config }) + ExecutionStages::new( + self.executor_factory, + self.stages_config.clone(), + self.prune_modes.clone(), + ) + .builder() + .add_set(HashingStages { stages_config: self.stages_config.clone() }) + .add_set(HistoryIndexingStages { + stages_config: self.stages_config.clone(), + prune_modes: self.prune_modes, + }) } } @@ -274,12 +295,16 @@ where pub struct ExecutionStages { /// Executor factory that will create executors. executor_factory: E, + /// Configuration for each stage in the pipeline + stages_config: StageConfig, + /// Prune configuration for every segment that can be pruned + prune_modes: PruneModes, } impl ExecutionStages { /// Create a new set of execution stages with default values. - pub fn new(executor_factory: E) -> Self { - Self { executor_factory } + pub fn new(executor_factory: E, stages_config: StageConfig, prune_modes: PruneModes) -> Self { + Self { executor_factory, stages_config, prune_modes } } } @@ -290,8 +315,13 @@ where { fn builder(self) -> StageSetBuilder { StageSetBuilder::default() - .add_stage(SenderRecoveryStage::default()) - .add_stage(ExecutionStage::new_with_executor(self.executor_factory)) + .add_stage(SenderRecoveryStage::new(self.stages_config.sender_recovery)) + .add_stage(ExecutionStage::from_config( + self.executor_factory, + self.stages_config.execution, + self.stages_config.execution_external_clean_threshold(), + self.prune_modes, + )) } } @@ -299,17 +329,23 @@ where #[derive(Debug, Default)] #[non_exhaustive] pub struct HashingStages { - /// ETL configuration - etl_config: EtlConfig, + /// Configuration for each stage in the pipeline + stages_config: StageConfig, } impl StageSet for HashingStages { fn builder(self) -> StageSetBuilder { StageSetBuilder::default() .add_stage(MerkleStage::default_unwind()) - .add_stage(AccountHashingStage::default().with_etl_config(self.etl_config.clone())) - .add_stage(StorageHashingStage::default().with_etl_config(self.etl_config)) - .add_stage(MerkleStage::default_execution()) + .add_stage(AccountHashingStage::new( + self.stages_config.account_hashing, + self.stages_config.etl.clone(), + )) + .add_stage(StorageHashingStage::new( + self.stages_config.storage_hashing, + self.stages_config.etl.clone(), + )) + .add_stage(MerkleStage::new_execution(self.stages_config.merkle.clean_threshold)) } } @@ -317,15 +353,29 @@ impl StageSet for HashingStages { #[derive(Debug, Default)] #[non_exhaustive] pub struct HistoryIndexingStages { - /// ETL configuration - etl_config: EtlConfig, + /// Configuration for each stage in the pipeline + stages_config: StageConfig, + /// Prune configuration for every segment that can be pruned + prune_modes: PruneModes, } impl StageSet for HistoryIndexingStages { fn builder(self) -> StageSetBuilder { StageSetBuilder::default() - .add_stage(TransactionLookupStage::default().with_etl_config(self.etl_config.clone())) - .add_stage(IndexStorageHistoryStage::default().with_etl_config(self.etl_config.clone())) - .add_stage(IndexAccountHistoryStage::default().with_etl_config(self.etl_config)) + .add_stage(TransactionLookupStage::new( + self.stages_config.transaction_lookup, + self.stages_config.etl.clone(), + self.prune_modes.transaction_lookup, + )) + .add_stage(IndexStorageHistoryStage::new( + self.stages_config.index_storage_history, + self.stages_config.etl.clone(), + self.prune_modes.account_history, + )) + .add_stage(IndexAccountHistoryStage::new( + self.stages_config.index_account_history, + self.stages_config.etl.clone(), + self.prune_modes.storage_history, + )) } } diff --git a/crates/stages/src/stages/execution.rs b/crates/stages/src/stages/execution.rs index 9d8cf6ac66..0f933cea78 100644 --- a/crates/stages/src/stages/execution.rs +++ b/crates/stages/src/stages/execution.rs @@ -1,5 +1,6 @@ use crate::stages::MERKLE_STAGE_DEFAULT_CLEAN_THRESHOLD; use num_traits::Zero; +use reth_config::config::ExecutionConfig; use reth_db::{ cursor::DbCursorRO, database::Database, static_file::HeaderMask, tables, transaction::DbTx, }; @@ -111,6 +112,22 @@ impl ExecutionStage { ) } + /// Create new instance of [ExecutionStage] from configuration. + pub fn from_config( + executor_provider: E, + config: ExecutionConfig, + external_clean_threshold: u64, + prune_modes: PruneModes, + ) -> Self { + Self::new( + executor_provider, + config.into(), + external_clean_threshold, + prune_modes, + ExExManagerHandle::empty(), + ) + } + /// Set the metric events sender. pub fn with_metrics_tx(mut self, metrics_tx: MetricEventsSender) -> Self { self.metrics_tx = Some(metrics_tx); @@ -540,6 +557,17 @@ impl ExecutionStageThresholds { } } +impl From for ExecutionStageThresholds { + fn from(config: ExecutionConfig) -> Self { + ExecutionStageThresholds { + max_blocks: config.max_blocks, + max_changes: config.max_changes, + max_cumulative_gas: config.max_cumulative_gas, + max_duration: config.max_duration, + } + } +} + /// Returns a `StaticFileProviderRWRefMut` static file producer after performing a consistency /// check. /// diff --git a/crates/stages/src/stages/hashing_account.rs b/crates/stages/src/stages/hashing_account.rs index 051b6a85f9..1a63f6d893 100644 --- a/crates/stages/src/stages/hashing_account.rs +++ b/crates/stages/src/stages/hashing_account.rs @@ -1,5 +1,5 @@ use itertools::Itertools; -use reth_config::config::EtlConfig; +use reth_config::config::{EtlConfig, HashingConfig}; use reth_db::{ cursor::{DbCursorRO, DbCursorRW}, database::Database, @@ -44,14 +44,12 @@ pub struct AccountHashingStage { impl AccountHashingStage { /// Create new instance of [AccountHashingStage]. - pub fn new(clean_threshold: u64, commit_threshold: u64, etl_config: EtlConfig) -> Self { - Self { clean_threshold, commit_threshold, etl_config } - } - - /// Set the ETL configuration to use. - pub fn with_etl_config(mut self, etl_config: EtlConfig) -> Self { - self.etl_config = etl_config; - self + pub fn new(config: HashingConfig, etl_config: EtlConfig) -> Self { + Self { + clean_threshold: config.clean_threshold, + commit_threshold: config.commit_threshold, + etl_config, + } } } diff --git a/crates/stages/src/stages/hashing_storage.rs b/crates/stages/src/stages/hashing_storage.rs index 97da1278d8..97f9154c38 100644 --- a/crates/stages/src/stages/hashing_storage.rs +++ b/crates/stages/src/stages/hashing_storage.rs @@ -1,5 +1,5 @@ use itertools::Itertools; -use reth_config::config::EtlConfig; +use reth_config::config::{EtlConfig, HashingConfig}; use reth_db::{ codecs::CompactU256, cursor::{DbCursorRO, DbDupCursorRW}, @@ -45,14 +45,12 @@ pub struct StorageHashingStage { impl StorageHashingStage { /// Create new instance of [StorageHashingStage]. - pub fn new(clean_threshold: u64, commit_threshold: u64, etl_config: EtlConfig) -> Self { - Self { clean_threshold, commit_threshold, etl_config } - } - - /// Set the ETL configuration to use. - pub fn with_etl_config(mut self, etl_config: EtlConfig) -> Self { - self.etl_config = etl_config; - self + pub fn new(config: HashingConfig, etl_config: EtlConfig) -> Self { + Self { + clean_threshold: config.clean_threshold, + commit_threshold: config.commit_threshold, + etl_config, + } } } diff --git a/crates/stages/src/stages/index_account_history.rs b/crates/stages/src/stages/index_account_history.rs index d452406512..6c313f0d3e 100644 --- a/crates/stages/src/stages/index_account_history.rs +++ b/crates/stages/src/stages/index_account_history.rs @@ -1,5 +1,5 @@ use super::{collect_history_indices, load_history_indices}; -use reth_config::config::EtlConfig; +use reth_config::config::{EtlConfig, IndexHistoryConfig}; use reth_db::{ database::Database, models::ShardedKey, table::Decode, tables, transaction::DbTxMut, }; @@ -31,17 +31,11 @@ pub struct IndexAccountHistoryStage { impl IndexAccountHistoryStage { /// Create new instance of [IndexAccountHistoryStage]. pub fn new( - commit_threshold: u64, - prune_mode: Option, + config: IndexHistoryConfig, etl_config: EtlConfig, + prune_mode: Option, ) -> Self { - Self { commit_threshold, prune_mode, etl_config } - } - - /// Set the ETL configuration to use. - pub fn with_etl_config(mut self, etl_config: EtlConfig) -> Self { - self.etl_config = etl_config; - self + Self { commit_threshold: config.commit_threshold, etl_config, prune_mode } } } diff --git a/crates/stages/src/stages/index_storage_history.rs b/crates/stages/src/stages/index_storage_history.rs index 6d5b6e2ade..51fc92f18b 100644 --- a/crates/stages/src/stages/index_storage_history.rs +++ b/crates/stages/src/stages/index_storage_history.rs @@ -1,5 +1,5 @@ use super::{collect_history_indices, load_history_indices}; -use reth_config::config::EtlConfig; +use reth_config::config::{EtlConfig, IndexHistoryConfig}; use reth_db::{ database::Database, models::{storage_sharded_key::StorageShardedKey, AddressStorageKey, BlockNumberAddress}, @@ -35,17 +35,11 @@ pub struct IndexStorageHistoryStage { impl IndexStorageHistoryStage { /// Create new instance of [IndexStorageHistoryStage]. pub fn new( - commit_threshold: u64, - prune_mode: Option, + config: IndexHistoryConfig, etl_config: EtlConfig, + prune_mode: Option, ) -> Self { - Self { commit_threshold, prune_mode, etl_config } - } - - /// Set the ETL configuration to use. - pub fn with_etl_config(mut self, etl_config: EtlConfig) -> Self { - self.etl_config = etl_config; - self + Self { commit_threshold: config.commit_threshold, prune_mode, etl_config } } } diff --git a/crates/stages/src/stages/sender_recovery.rs b/crates/stages/src/stages/sender_recovery.rs index e078fd9542..0bb05e0c40 100644 --- a/crates/stages/src/stages/sender_recovery.rs +++ b/crates/stages/src/stages/sender_recovery.rs @@ -1,3 +1,4 @@ +use reth_config::config::SenderRecoveryConfig; use reth_consensus::ConsensusError; use reth_db::{ cursor::DbCursorRW, @@ -42,8 +43,8 @@ pub struct SenderRecoveryStage { impl SenderRecoveryStage { /// Create new instance of [SenderRecoveryStage]. - pub fn new(commit_threshold: u64) -> Self { - Self { commit_threshold } + pub fn new(config: SenderRecoveryConfig) -> Self { + Self { commit_threshold: config.commit_threshold } } } diff --git a/crates/stages/src/stages/tx_lookup.rs b/crates/stages/src/stages/tx_lookup.rs index 342183905b..fae08e854d 100644 --- a/crates/stages/src/stages/tx_lookup.rs +++ b/crates/stages/src/stages/tx_lookup.rs @@ -1,5 +1,5 @@ use num_traits::Zero; -use reth_config::config::EtlConfig; +use reth_config::config::{EtlConfig, TransactionLookupConfig}; use reth_db::{ cursor::{DbCursorRO, DbCursorRW}, database::Database, @@ -45,14 +45,12 @@ impl Default for TransactionLookupStage { impl TransactionLookupStage { /// Create new instance of [TransactionLookupStage]. - pub fn new(chunk_size: u64, etl_config: EtlConfig, prune_mode: Option) -> Self { - Self { chunk_size, etl_config, prune_mode } - } - - /// Set the ETL configuration to use. - pub fn with_etl_config(mut self, etl_config: EtlConfig) -> Self { - self.etl_config = etl_config; - self + pub fn new( + config: TransactionLookupConfig, + etl_config: EtlConfig, + prune_mode: Option, + ) -> Self { + Self { chunk_size: config.chunk_size, etl_config, prune_mode } } }