mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-04-30 03:01:58 -04:00
chore: move pipeline setup to node-builder (#7588)
This commit is contained in:
@@ -34,8 +34,6 @@ reth-tasks.workspace = true
|
||||
reth-consensus-common.workspace = true
|
||||
reth-auto-seal-consensus.workspace = true
|
||||
reth-beacon-consensus.workspace = true
|
||||
reth-downloaders.workspace = true
|
||||
reth-revm.workspace = true
|
||||
reth-stages.workspace = true
|
||||
reth-prune.workspace = true
|
||||
reth-static-file.workspace = true
|
||||
|
||||
@@ -14,47 +14,22 @@ use metrics_exporter_prometheus::PrometheusHandle;
|
||||
use once_cell::sync::Lazy;
|
||||
use reth_auto_seal_consensus::{AutoSealConsensus, MiningMode};
|
||||
use reth_beacon_consensus::BeaconConsensus;
|
||||
use reth_config::{
|
||||
config::{PruneConfig, StageConfig},
|
||||
Config,
|
||||
};
|
||||
use reth_config::{config::PruneConfig, Config};
|
||||
use reth_db::{database::Database, database_metrics::DatabaseMetrics};
|
||||
use reth_downloaders::{
|
||||
bodies::bodies::BodiesDownloaderBuilder,
|
||||
headers::reverse_headers::ReverseHeadersDownloaderBuilder,
|
||||
};
|
||||
use reth_interfaces::{
|
||||
consensus::Consensus,
|
||||
p2p::{
|
||||
bodies::{client::BodiesClient, downloader::BodyDownloader},
|
||||
headers::{client::HeadersClient, downloader::HeaderDownloader},
|
||||
},
|
||||
RethResult,
|
||||
};
|
||||
use reth_interfaces::{consensus::Consensus, p2p::headers::client::HeadersClient, RethResult};
|
||||
use reth_network::{NetworkBuilder, NetworkConfig, NetworkManager};
|
||||
use reth_node_api::ConfigureEvm;
|
||||
use reth_primitives::{
|
||||
constants::eip4844::MAINNET_KZG_TRUSTED_SETUP, kzg::KzgSettings, stage::StageId,
|
||||
BlockHashOrNumber, BlockNumber, ChainSpec, Head, SealedHeader, TxHash, B256, MAINNET,
|
||||
};
|
||||
use reth_provider::{
|
||||
providers::StaticFileProvider, BlockHashReader, BlockNumReader, HeaderProvider, HeaderSyncMode,
|
||||
providers::StaticFileProvider, BlockHashReader, BlockNumReader, HeaderProvider,
|
||||
ProviderFactory, StageCheckpointReader,
|
||||
};
|
||||
use reth_revm::stack::{Hook, InspectorStackConfig};
|
||||
use reth_stages::{
|
||||
prelude::*,
|
||||
stages::{
|
||||
AccountHashingStage, ExecutionStage, ExecutionStageThresholds, IndexAccountHistoryStage,
|
||||
IndexStorageHistoryStage, MerkleStage, SenderRecoveryStage, StorageHashingStage,
|
||||
TransactionLookupStage,
|
||||
},
|
||||
};
|
||||
use reth_static_file::StaticFileProducer;
|
||||
use reth_tasks::TaskExecutor;
|
||||
use secp256k1::SecretKey;
|
||||
use std::{net::SocketAddr, path::PathBuf, sync::Arc};
|
||||
use tokio::sync::{mpsc::Receiver, watch};
|
||||
use tokio::sync::mpsc::Receiver;
|
||||
use tracing::*;
|
||||
|
||||
/// The default prometheus recorder handle. We use a global static to ensure that it is only
|
||||
@@ -374,54 +349,6 @@ impl NodeConfig {
|
||||
}
|
||||
}
|
||||
|
||||
/// Constructs a [Pipeline] that's wired to the network
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub async fn build_networked_pipeline<DB, Client, EvmConfig>(
|
||||
&self,
|
||||
config: &StageConfig,
|
||||
client: Client,
|
||||
consensus: Arc<dyn Consensus>,
|
||||
provider_factory: ProviderFactory<DB>,
|
||||
task_executor: &TaskExecutor,
|
||||
metrics_tx: reth_stages::MetricEventsSender,
|
||||
prune_config: Option<PruneConfig>,
|
||||
max_block: Option<BlockNumber>,
|
||||
static_file_producer: StaticFileProducer<DB>,
|
||||
evm_config: EvmConfig,
|
||||
) -> eyre::Result<Pipeline<DB>>
|
||||
where
|
||||
DB: Database + Unpin + Clone + 'static,
|
||||
Client: HeadersClient + BodiesClient + Clone + 'static,
|
||||
EvmConfig: ConfigureEvm + Clone + 'static,
|
||||
{
|
||||
// building network downloaders using the fetch client
|
||||
let header_downloader = ReverseHeadersDownloaderBuilder::new(config.headers)
|
||||
.build(client.clone(), Arc::clone(&consensus))
|
||||
.into_task_with(task_executor);
|
||||
|
||||
let body_downloader = BodiesDownloaderBuilder::new(config.bodies)
|
||||
.build(client, Arc::clone(&consensus), provider_factory.clone())
|
||||
.into_task_with(task_executor);
|
||||
|
||||
let pipeline = self
|
||||
.build_pipeline(
|
||||
provider_factory,
|
||||
config,
|
||||
header_downloader,
|
||||
body_downloader,
|
||||
consensus,
|
||||
max_block,
|
||||
self.debug.continuous,
|
||||
metrics_tx,
|
||||
prune_config,
|
||||
static_file_producer,
|
||||
evm_config,
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(pipeline)
|
||||
}
|
||||
|
||||
/// Loads 'MAINNET_KZG_TRUSTED_SETUP'
|
||||
pub fn kzg_settings(&self) -> eyre::Result<Arc<KzgSettings>> {
|
||||
Ok(Arc::clone(&MAINNET_KZG_TRUSTED_SETUP))
|
||||
@@ -583,123 +510,6 @@ impl NodeConfig {
|
||||
})
|
||||
}
|
||||
|
||||
/// Builds the [Pipeline] with the given [ProviderFactory] and downloaders.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub async fn build_pipeline<DB, H, B, EvmConfig>(
|
||||
&self,
|
||||
provider_factory: ProviderFactory<DB>,
|
||||
stage_config: &StageConfig,
|
||||
header_downloader: H,
|
||||
body_downloader: B,
|
||||
consensus: Arc<dyn Consensus>,
|
||||
max_block: Option<u64>,
|
||||
continuous: bool,
|
||||
metrics_tx: reth_stages::MetricEventsSender,
|
||||
prune_config: Option<PruneConfig>,
|
||||
static_file_producer: StaticFileProducer<DB>,
|
||||
evm_config: EvmConfig,
|
||||
) -> eyre::Result<Pipeline<DB>>
|
||||
where
|
||||
DB: Database + Clone + 'static,
|
||||
H: HeaderDownloader + 'static,
|
||||
B: BodyDownloader + 'static,
|
||||
EvmConfig: ConfigureEvm + Clone + 'static,
|
||||
{
|
||||
let mut builder = Pipeline::builder();
|
||||
|
||||
if let Some(max_block) = max_block {
|
||||
debug!(target: "reth::cli", max_block, "Configuring builder to use max block");
|
||||
builder = builder.with_max_block(max_block)
|
||||
}
|
||||
|
||||
let (tip_tx, tip_rx) = watch::channel(B256::ZERO);
|
||||
let factory = reth_revm::EvmProcessorFactory::new(self.chain.clone(), evm_config);
|
||||
|
||||
let stack_config = InspectorStackConfig {
|
||||
use_printer_tracer: self.debug.print_inspector,
|
||||
hook: if let Some(hook_block) = self.debug.hook_block {
|
||||
Hook::Block(hook_block)
|
||||
} else if let Some(tx) = self.debug.hook_transaction {
|
||||
Hook::Transaction(tx)
|
||||
} else if self.debug.hook_all {
|
||||
Hook::All
|
||||
} else {
|
||||
Hook::None
|
||||
},
|
||||
};
|
||||
|
||||
let factory = factory.with_stack_config(stack_config);
|
||||
|
||||
let prune_modes = prune_config.map(|prune| prune.segments).unwrap_or_default();
|
||||
|
||||
let header_mode =
|
||||
if continuous { HeaderSyncMode::Continuous } else { HeaderSyncMode::Tip(tip_rx) };
|
||||
let pipeline = builder
|
||||
.with_tip_sender(tip_tx)
|
||||
.with_metrics_tx(metrics_tx.clone())
|
||||
.add_stages(
|
||||
DefaultStages::new(
|
||||
provider_factory.clone(),
|
||||
header_mode,
|
||||
Arc::clone(&consensus),
|
||||
header_downloader,
|
||||
body_downloader,
|
||||
factory.clone(),
|
||||
stage_config.etl.clone(),
|
||||
)
|
||||
.set(SenderRecoveryStage {
|
||||
commit_threshold: stage_config.sender_recovery.commit_threshold,
|
||||
})
|
||||
.set(
|
||||
ExecutionStage::new(
|
||||
factory,
|
||||
ExecutionStageThresholds {
|
||||
max_blocks: stage_config.execution.max_blocks,
|
||||
max_changes: stage_config.execution.max_changes,
|
||||
max_cumulative_gas: stage_config.execution.max_cumulative_gas,
|
||||
max_duration: stage_config.execution.max_duration,
|
||||
},
|
||||
stage_config
|
||||
.merkle
|
||||
.clean_threshold
|
||||
.max(stage_config.account_hashing.clean_threshold)
|
||||
.max(stage_config.storage_hashing.clean_threshold),
|
||||
prune_modes.clone(),
|
||||
)
|
||||
.with_metrics_tx(metrics_tx),
|
||||
)
|
||||
.set(AccountHashingStage::new(
|
||||
stage_config.account_hashing.clean_threshold,
|
||||
stage_config.account_hashing.commit_threshold,
|
||||
stage_config.etl.clone(),
|
||||
))
|
||||
.set(StorageHashingStage::new(
|
||||
stage_config.storage_hashing.clean_threshold,
|
||||
stage_config.storage_hashing.commit_threshold,
|
||||
stage_config.etl.clone(),
|
||||
))
|
||||
.set(MerkleStage::new_execution(stage_config.merkle.clean_threshold))
|
||||
.set(TransactionLookupStage::new(
|
||||
stage_config.transaction_lookup.chunk_size,
|
||||
stage_config.etl.clone(),
|
||||
prune_modes.transaction_lookup,
|
||||
))
|
||||
.set(IndexAccountHistoryStage::new(
|
||||
stage_config.index_account_history.commit_threshold,
|
||||
prune_modes.account_history,
|
||||
stage_config.etl.clone(),
|
||||
))
|
||||
.set(IndexStorageHistoryStage::new(
|
||||
stage_config.index_storage_history.commit_threshold,
|
||||
prune_modes.storage_history,
|
||||
stage_config.etl.clone(),
|
||||
)),
|
||||
)
|
||||
.build(provider_factory, static_file_producer);
|
||||
|
||||
Ok(pipeline)
|
||||
}
|
||||
|
||||
/// Change rpc port numbers based on the instance number, using the inner
|
||||
/// [RpcServerArgs::adjust_instance_ports] method.
|
||||
pub fn adjust_instance_ports(&mut self) {
|
||||
|
||||
Reference in New Issue
Block a user