fix: set ETL directory inside datadir on reth run and reth import (#7722)

This commit is contained in:
joshieDo
2024-04-18 18:26:29 +01:00
committed by GitHub
parent e401c4848a
commit 528f1e9047
9 changed files with 84 additions and 22 deletions

View File

@@ -13,7 +13,7 @@ use crate::{
use clap::Parser;
use futures::{stream::select as stream_select, StreamExt};
use reth_beacon_consensus::BeaconConsensus;
use reth_config::Config;
use reth_config::{config::EtlConfig, Config};
use reth_db::{database::Database, init_db, DatabaseEnv};
use reth_downloaders::{
bodies::bodies::BodiesDownloaderBuilder,
@@ -200,10 +200,16 @@ impl Command {
/// Execute `execution-debug` command
pub async fn execute(self, ctx: CliContext) -> eyre::Result<()> {
let config = Config::default();
let mut config = Config::default();
let data_dir = self.datadir.unwrap_or_chain_default(self.chain.chain);
let db_path = data_dir.db_path();
// Make sure ETL doesn't default to /tmp/, but to whatever datadir is set to
if config.stages.etl.dir.is_none() {
config.stages.etl.dir = Some(EtlConfig::from_datadir(&data_dir.data_dir_path()));
}
fs::create_dir_all(&db_path)?;
let db = Arc::new(init_db(db_path, self.db.database_args())?);
let provider_factory =

View File

@@ -12,7 +12,7 @@ use clap::Parser;
use eyre::Context;
use futures::{Stream, StreamExt};
use reth_beacon_consensus::BeaconConsensus;
use reth_config::Config;
use reth_config::{config::EtlConfig, Config};
use reth_db::{database::Database, init_db};
use reth_downloaders::{
bodies::bodies::BodiesDownloaderBuilder,
@@ -115,9 +115,14 @@ impl ImportCommand {
let data_dir = self.datadir.unwrap_or_chain_default(self.chain.chain);
let config_path = self.config.clone().unwrap_or_else(|| data_dir.config_path());
let config: Config = self.load_config(config_path.clone())?;
let mut config: Config = self.load_config(config_path.clone())?;
info!(target: "reth::cli", path = ?config_path, "Configuration loaded");
// Make sure ETL doesn't default to /tmp/, but to whatever datadir is set to
if config.stages.etl.dir.is_none() {
config.stages.etl.dir = Some(EtlConfig::from_datadir(&data_dir.data_dir_path()));
}
let db_path = data_dir.db_path();
info!(target: "reth::cli", path = ?db_path, "Opening database");

View File

@@ -256,8 +256,14 @@ impl Command {
Box::new(MerkleStage::default_execution()),
Some(Box::new(MerkleStage::default_unwind())),
),
StageEnum::AccountHistory => (Box::<IndexAccountHistoryStage>::default(), None),
StageEnum::StorageHistory => (Box::<IndexStorageHistoryStage>::default(), None),
StageEnum::AccountHistory => (
Box::new(IndexAccountHistoryStage::default().with_etl_config(etl_config)),
None,
),
StageEnum::StorageHistory => (
Box::new(IndexStorageHistoryStage::default().with_etl_config(etl_config)),
None,
),
_ => return Ok(()),
};
if let Some(unwind_stage) = &unwind_stage {

View File

@@ -17,6 +17,7 @@
//! # use reth_node_ethereum::EthEvmConfig;
//! # use reth_provider::test_utils::create_test_provider_factory;
//! # use reth_static_file::StaticFileProducer;
//! # use reth_config::config::EtlConfig;
//!
//! # let executor_factory = EvmProcessorFactory::new(MAINNET.clone(), EthEvmConfig::default());
//! # let provider_factory = create_test_provider_factory();
@@ -27,7 +28,7 @@
//! );
//! // Build a pipeline with all offline stages.
//! # let pipeline = Pipeline::builder()
//! .add_stages(OfflineStages::new(executor_factory))
//! .add_stages(OfflineStages::new(executor_factory, EtlConfig::default()))
//! .build(provider_factory, static_file_producer);
//! ```
//!
@@ -37,11 +38,13 @@
//! # use reth_revm::EvmProcessorFactory;
//! # use reth_node_ethereum::EthEvmConfig;
//! # use reth_primitives::MAINNET;
//! # use reth_config::config::EtlConfig;
//!
//! // Build a pipeline with all offline stages and a custom stage at the end.
//! # let executor_factory = EvmProcessorFactory::new(MAINNET.clone(), EthEvmConfig::default());
//! Pipeline::builder()
//! .add_stages(
//! OfflineStages::new(executor_factory).builder().add_stage(MyCustomStage)
//! OfflineStages::new(executor_factory, EtlConfig::default()).builder().add_stage(MyCustomStage)
//! )
//! .build();
//! ```
@@ -90,6 +93,8 @@ pub struct DefaultStages<Provider, H, B, EF> {
online: OnlineStages<Provider, H, B>,
/// Executor factory needs for execution stage
executor_factory: EF,
/// ETL configuration
etl_config: EtlConfig,
}
impl<Provider, H, B, EF> DefaultStages<Provider, H, B, EF> {
@@ -113,9 +118,10 @@ impl<Provider, H, B, EF> DefaultStages<Provider, H, B, EF> {
consensus,
header_downloader,
body_downloader,
etl_config,
etl_config.clone(),
),
executor_factory,
etl_config,
}
}
}
@@ -128,10 +134,11 @@ where
pub fn add_offline_stages<DB: Database>(
default_offline: StageSetBuilder<DB>,
executor_factory: EF,
etl_config: EtlConfig,
) -> StageSetBuilder<DB> {
StageSetBuilder::default()
.add_set(default_offline)
.add_set(OfflineStages::new(executor_factory))
.add_set(OfflineStages::new(executor_factory, etl_config))
.add_stage(FinishStage)
}
}
@@ -145,7 +152,7 @@ where
DB: Database + 'static,
{
fn builder(self) -> StageSetBuilder<DB> {
Self::add_offline_stages(self.online.builder(), self.executor_factory)
Self::add_offline_stages(self.online.builder(), self.executor_factory, self.etl_config)
}
}
@@ -250,12 +257,14 @@ where
pub struct OfflineStages<EF: ExecutorFactory> {
/// Executor factory needs for execution stage
pub executor_factory: EF,
/// ETL configuration
etl_config: EtlConfig,
}
impl<EF: ExecutorFactory> OfflineStages<EF> {
/// Create a new set of offline stages with default values.
pub fn new(executor_factory: EF) -> Self {
Self { executor_factory }
pub fn new(executor_factory: EF, etl_config: EtlConfig) -> Self {
Self { executor_factory, etl_config }
}
}
@@ -263,8 +272,8 @@ impl<EF: ExecutorFactory, DB: Database> StageSet<DB> for OfflineStages<EF> {
fn builder(self) -> StageSetBuilder<DB> {
ExecutionStages::new(self.executor_factory)
.builder()
.add_set(HashingStages)
.add_set(HistoryIndexingStages)
.add_set(HashingStages { etl_config: self.etl_config.clone() })
.add_set(HistoryIndexingStages { etl_config: self.etl_config })
}
}
@@ -294,14 +303,17 @@ impl<EF: ExecutorFactory, DB: Database> StageSet<DB> for ExecutionStages<EF> {
/// A set containing all stages that hash account state.
#[derive(Debug, Default)]
#[non_exhaustive]
pub struct HashingStages;
pub struct HashingStages {
/// ETL configuration
etl_config: EtlConfig,
}
impl<DB: Database> StageSet<DB> for HashingStages {
fn builder(self) -> StageSetBuilder<DB> {
StageSetBuilder::default()
.add_stage(MerkleStage::default_unwind())
.add_stage(AccountHashingStage::default())
.add_stage(StorageHashingStage::default())
.add_stage(AccountHashingStage::default().with_etl_config(self.etl_config.clone()))
.add_stage(StorageHashingStage::default().with_etl_config(self.etl_config))
.add_stage(MerkleStage::default_execution())
}
}
@@ -309,13 +321,16 @@ impl<DB: Database> StageSet<DB> for HashingStages {
/// A set containing all stages that do additional indexing for historical state.
#[derive(Debug, Default)]
#[non_exhaustive]
pub struct HistoryIndexingStages;
pub struct HistoryIndexingStages {
/// ETL configuration
etl_config: EtlConfig,
}
impl<DB: Database> StageSet<DB> for HistoryIndexingStages {
fn builder(self) -> StageSetBuilder<DB> {
StageSetBuilder::default()
.add_stage(TransactionLookupStage::default())
.add_stage(IndexStorageHistoryStage::default())
.add_stage(IndexAccountHistoryStage::default())
.add_stage(TransactionLookupStage::default().with_etl_config(self.etl_config.clone()))
.add_stage(IndexStorageHistoryStage::default().with_etl_config(self.etl_config.clone()))
.add_stage(IndexAccountHistoryStage::default().with_etl_config(self.etl_config))
}
}

View File

@@ -47,6 +47,12 @@ impl AccountHashingStage {
pub fn new(clean_threshold: u64, commit_threshold: u64, etl_config: EtlConfig) -> Self {
Self { clean_threshold, commit_threshold, etl_config }
}
/// Set the ETL configuration to use.
pub fn with_etl_config(mut self, etl_config: EtlConfig) -> Self {
self.etl_config = etl_config;
self
}
}
impl Default for AccountHashingStage {

View File

@@ -48,6 +48,12 @@ impl StorageHashingStage {
pub fn new(clean_threshold: u64, commit_threshold: u64, etl_config: EtlConfig) -> Self {
Self { clean_threshold, commit_threshold, etl_config }
}
/// Set the ETL configuration to use.
pub fn with_etl_config(mut self, etl_config: EtlConfig) -> Self {
self.etl_config = etl_config;
self
}
}
impl Default for StorageHashingStage {

View File

@@ -37,6 +37,12 @@ impl IndexAccountHistoryStage {
) -> Self {
Self { commit_threshold, prune_mode, etl_config }
}
/// Set the ETL configuration to use.
pub fn with_etl_config(mut self, etl_config: EtlConfig) -> Self {
self.etl_config = etl_config;
self
}
}
impl Default for IndexAccountHistoryStage {

View File

@@ -41,6 +41,12 @@ impl IndexStorageHistoryStage {
) -> Self {
Self { commit_threshold, prune_mode, etl_config }
}
/// Set the ETL configuration to use.
pub fn with_etl_config(mut self, etl_config: EtlConfig) -> Self {
self.etl_config = etl_config;
self
}
}
impl Default for IndexStorageHistoryStage {

View File

@@ -48,6 +48,12 @@ impl TransactionLookupStage {
pub fn new(chunk_size: u64, etl_config: EtlConfig, prune_mode: Option<PruneMode>) -> Self {
Self { chunk_size, etl_config, prune_mode }
}
/// Set the ETL configuration to use.
pub fn with_etl_config(mut self, etl_config: EtlConfig) -> Self {
self.etl_config = etl_config;
self
}
}
impl<DB: Database> Stage<DB> for TransactionLookupStage {