chore: more LaunchContext helpers (#7894)

This commit is contained in:
Matthias Seitz
2024-04-25 20:18:46 +02:00
committed by GitHub
parent d312dbbea4
commit 663a7185e6
5 changed files with 143 additions and 88 deletions

View File

@@ -62,6 +62,17 @@ impl MiningMode {
}
}
impl fmt::Display for MiningMode {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let kind = match self {
MiningMode::None => "None",
MiningMode::Auto(_) => "Auto",
MiningMode::FixedBlockTime(_) => "FixedBlockTime",
};
write!(f, "{kind}")
}
}
/// A miner that's supposed to create a new block every `interval`, mining all transactions that are
/// ready at that time.
///

View File

@@ -1,20 +1,28 @@
//! Helper types that can be used by launchers.
use std::{cmp::max, sync::Arc, thread::available_parallelism};
use eyre::Context;
use rayon::ThreadPoolBuilder;
use reth_config::PruneConfig;
use tokio::sync::mpsc::Receiver;
use reth_auto_seal_consensus::MiningMode;
use reth_config::{config::EtlConfig, PruneConfig};
use reth_db::{database::Database, database_metrics::DatabaseMetrics};
use reth_interfaces::p2p::headers::client::HeadersClient;
use reth_node_core::{
cli::config::RethRpcConfig,
dirs::{ChainPath, DataDirPath},
init::{init_genesis, InitDatabaseError},
node_config::NodeConfig,
};
use reth_primitives::{Chain, ChainSpec, Head, B256};
use reth_primitives::{BlockNumber, Chain, ChainSpec, Head, PruneModes, B256};
use reth_provider::{providers::StaticFileProvider, ProviderFactory};
use reth_prune::PrunerBuilder;
use reth_rpc::JwtSecret;
use reth_static_file::StaticFileProducer;
use reth_tasks::TaskExecutor;
use reth_tracing::tracing::{error, info};
use std::{cmp::max, sync::Arc, thread::available_parallelism};
/// Reusable setup for launching a node.
///
@@ -73,6 +81,12 @@ impl LaunchContext {
Ok(toml_config)
}
/// Convenience function to [Self::configure_globals]
pub fn with_configured_globals(self) -> Self {
self.configure_globals();
self
}
/// Configure global settings this includes:
///
/// - Raising the file descriptor limit
@@ -155,6 +169,31 @@ impl<L, R> LaunchContextWith<Attached<L, R>> {
}
}
impl<R> LaunchContextWith<Attached<WithConfigs, R>> {
/// Adjust certain settings in the config to make sure they are set correctly
///
/// This includes:
/// - Making sure the ETL dir is set to the datadir
/// - RPC settings are adjusted to the correct port
pub fn with_adjusted_configs(self) -> Self {
self.ensure_etl_datadir().with_adjusted_rpc_instance_ports()
}
/// Make sure ETL doesn't default to /tmp/, but to whatever datadir is set to
pub fn ensure_etl_datadir(mut self) -> Self {
if self.toml_config_mut().stages.etl.dir.is_none() {
self.toml_config_mut().stages.etl.dir =
Some(EtlConfig::from_datadir(&self.data_dir().data_dir_path()))
}
self
}
/// Change rpc port numbers based on the instance number.
pub fn with_adjusted_rpc_instance_ports(mut self) -> Self {
self.node_config_mut().adjust_instance_ports();
self
}
/// Returns the attached [NodeConfig].
pub const fn node_config(&self) -> &NodeConfig {
&self.left().config
@@ -196,8 +235,20 @@ impl<R> LaunchContextWith<Attached<WithConfigs, R>> {
}
/// Returns the configured [PruneConfig]
pub fn prune_config(&self) -> eyre::Result<Option<PruneConfig>> {
Ok(self.node_config().prune_config()?.or_else(|| self.toml_config().prune.clone()))
pub fn prune_config(&self) -> Option<PruneConfig> {
self.node_config().prune_config().or_else(|| self.toml_config().prune.clone())
}
/// Returns the configured [PruneModes]
pub fn prune_modes(&self) -> Option<PruneModes> {
self.prune_config().map(|config| config.segments)
}
/// Returns an initialized [PrunerBuilder] based on the configured [PruneConfig]
pub fn pruner_builder(&self) -> PrunerBuilder {
PrunerBuilder::new(self.prune_config().unwrap_or_default())
.prune_delete_limit(self.chain_spec().prune_delete_limit)
.timeout(PrunerBuilder::DEFAULT_TIMEOUT)
}
/// Returns the initial pipeline target, based on whether or not the node is running in
@@ -216,6 +267,17 @@ impl<R> LaunchContextWith<Attached<WithConfigs, R>> {
let secret = self.node_config().rpc.auth_jwt_secret(default_jwt_path)?;
Ok(secret)
}
/// Returns the [MiningMode] intended for --dev mode.
pub fn dev_mining_mode(&self, pending_transactions_listener: Receiver<B256>) -> MiningMode {
if let Some(interval) = self.node_config().dev.block_time {
MiningMode::interval(interval)
} else if let Some(max_transactions) = self.node_config().dev.block_max_transactions {
MiningMode::instant(max_transactions, pending_transactions_listener)
} else {
MiningMode::instant(1, pending_transactions_listener)
}
}
}
impl<DB> LaunchContextWith<Attached<WithConfigs, DB>>
@@ -267,6 +329,29 @@ where
self.right().static_file_provider()
}
/// Creates a new [StaticFileProducer] with the attached database.
pub fn static_file_producer(&self) -> StaticFileProducer<DB> {
StaticFileProducer::new(
self.provider_factory().clone(),
self.static_file_provider(),
self.prune_modes().unwrap_or_default(),
)
}
/// Write the genesis block and state if it has not already been written
pub fn init_genesis(&self) -> Result<B256, InitDatabaseError> {
init_genesis(self.provider_factory().clone())
}
/// Returns the max block that the node should run to, looking it up from the network if
/// necessary
pub async fn max_block<C>(&self, client: C) -> eyre::Result<Option<BlockNumber>>
where
C: HeadersClient,
{
self.node_config().max_block(client, self.provider_factory().clone()).await
}
/// Starts the prometheus endpoint.
pub async fn start_prometheus_endpoint(&self) -> eyre::Result<()> {
let prometheus_handle = self.node_config().install_prometheus_recorder()?;

View File

@@ -8,7 +8,7 @@ use crate::{
BuilderContext, NodeBuilderWithComponents, NodeHandle, RethFullAdapter,
};
use futures::{future, future::Either, stream, stream_select, StreamExt};
use reth_auto_seal_consensus::{AutoSealConsensus, MiningMode};
use reth_auto_seal_consensus::AutoSealConsensus;
use reth_beacon_consensus::{
hooks::{EngineHooks, PruneHook, StaticFileHook},
BeaconConsensus, BeaconConsensusEngine,
@@ -16,7 +16,6 @@ use reth_beacon_consensus::{
use reth_blockchain_tree::{
BlockchainTree, BlockchainTreeConfig, ShareableBlockchainTree, TreeExternals,
};
use reth_config::config::EtlConfig;
use reth_consensus::Consensus;
use reth_db::{
database::Database,
@@ -31,15 +30,12 @@ use reth_node_core::{
engine_api_store::EngineApiStore,
engine_skip_fcu::EngineApiSkipFcu,
exit::NodeExitFuture,
init::init_genesis,
};
use reth_node_events::{cl::ConsensusLayerHealthEvents, node};
use reth_primitives::format_ether;
use reth_provider::{providers::BlockchainProvider, CanonStateSubscriptions};
use reth_prune::PrunerBuilder;
use reth_revm::EvmProcessorFactory;
use reth_rpc_engine_api::EngineApi;
use reth_static_file::StaticFileProducer;
use reth_tasks::TaskExecutor;
use reth_tracing::tracing::{debug, info};
use reth_transaction_pool::TransactionPool;
@@ -99,14 +95,14 @@ where
config,
} = target;
// configure globals
ctx.configure_globals();
let mut ctx = ctx
let ctx = ctx
.with_configured_globals()
// load the toml config
.with_loaded_toml_config(config)?
// attach the database
.attach(database.clone())
// ensure certain settings take effect
.with_adjusted_configs()
// Create the provider factory
.with_provider_factory()?;
@@ -115,8 +111,7 @@ where
ctx.start_prometheus_endpoint().await?;
debug!(target: "reth::cli", chain=%ctx.chain_id(), genesis=?ctx.genesis_hash(), "Initializing genesis");
init_genesis(ctx.provider_factory().clone())?;
ctx.init_genesis()?;
info!(target: "reth::cli", "\n{}", ctx.chain_spec().display_hardforks());
@@ -132,8 +127,6 @@ where
let sync_metrics_listener = reth_stages::MetricsListener::new(sync_metrics_rx);
ctx.task_executor().spawn_critical("stages metrics listener task", sync_metrics_listener);
let prune_config = ctx.prune_config()?;
// Configure the blockchain tree for the node
let evm_config = types.evm_config();
let tree_config = BlockchainTreeConfig::default();
@@ -142,12 +135,8 @@ where
consensus.clone(),
EvmProcessorFactory::new(ctx.chain_spec(), evm_config.clone()),
);
let tree = BlockchainTree::new(
tree_externals,
tree_config,
prune_config.as_ref().map(|prune| prune.segments.clone()),
)?
.with_sync_metrics_tx(sync_metrics_tx.clone());
let tree = BlockchainTree::new(tree_externals, tree_config, ctx.prune_modes())?
.with_sync_metrics_tx(sync_metrics_tx.clone());
let canon_state_notification_sender = tree.canon_state_notification_sender();
let blockchain_tree = Arc::new(ShareableBlockchainTree::new(tree));
@@ -286,17 +275,10 @@ manager",
consensus_engine_rx = engine_intercept_rx;
};
let max_block = ctx
.node_config()
.max_block(network_client.clone(), ctx.provider_factory().clone())
.await?;
let max_block = ctx.max_block(network_client.clone()).await?;
let mut hooks = EngineHooks::new();
let static_file_producer = StaticFileProducer::new(
ctx.provider_factory().clone(),
ctx.static_file_provider(),
prune_config.clone().unwrap_or_default().segments,
);
let static_file_producer = ctx.static_file_producer();
let static_file_producer_events = static_file_producer.lock().events();
hooks.add(StaticFileHook::new(
static_file_producer.clone(),
@@ -304,12 +286,6 @@ manager",
));
info!(target: "reth::cli", "StaticFileProducer initialized");
// Make sure ETL doesn't default to /tmp/, but to whatever datadir is set to
if ctx.toml_config_mut().stages.etl.dir.is_none() {
ctx.toml_config_mut().stages.etl.dir =
Some(EtlConfig::from_datadir(&ctx.data_dir().data_dir_path()));
}
// Configure the pipeline
let pipeline_exex_handle =
exex_manager_handle.clone().unwrap_or_else(ExExManagerHandle::empty);
@@ -322,18 +298,9 @@ address.to_string(), format_ether(alloc.balance));
}
// install auto-seal
let pending_transactions_listener =
node_adapter.components.pool().pending_transactions_listener();
let mining_mode = if let Some(interval) = ctx.node_config().dev.block_time {
MiningMode::interval(interval)
} else if let Some(max_transactions) = ctx.node_config().dev.block_max_transactions {
MiningMode::instant(max_transactions, pending_transactions_listener)
} else {
info!(target: "reth::cli", "No mining mode specified, defaulting to
ReadyTransaction");
MiningMode::instant(1, pending_transactions_listener)
};
let mining_mode =
ctx.dev_mining_mode(node_adapter.components.pool().pending_transactions_listener());
info!(target: "reth::cli", mode=%mining_mode, "configuring dev mining mode");
let (_, client, mut task) = reth_auto_seal_consensus::AutoSealBuilder::new(
ctx.chain_spec(),
@@ -354,7 +321,7 @@ ReadyTransaction");
ctx.provider_factory().clone(),
ctx.task_executor(),
sync_metrics_tx,
prune_config.clone(),
ctx.prune_config(),
max_block,
static_file_producer,
evm_config,
@@ -377,7 +344,7 @@ ReadyTransaction");
ctx.provider_factory().clone(),
ctx.task_executor(),
sync_metrics_tx,
prune_config.clone(),
ctx.prune_config(),
max_block,
static_file_producer,
evm_config,
@@ -392,11 +359,8 @@ ReadyTransaction");
let initial_target = ctx.initial_pipeline_target();
let prune_config = prune_config.unwrap_or_default();
let mut pruner_builder = PrunerBuilder::new(prune_config.clone())
.max_reorg_depth(tree_config.max_reorg_depth() as usize)
.prune_delete_limit(ctx.chain_spec().prune_delete_limit)
.timeout(PrunerBuilder::DEFAULT_TIMEOUT);
let mut pruner_builder =
ctx.pruner_builder().max_reorg_depth(tree_config.max_reorg_depth() as usize);
if let Some(exex_manager_handle) = &exex_manager_handle {
pruner_builder =
pruner_builder.finished_exex_height(exex_manager_handle.finished_height());
@@ -405,8 +369,8 @@ ReadyTransaction");
let mut pruner = pruner_builder.build(ctx.provider_factory().clone());
let pruner_events = pruner.events();
info!(target: "reth::cli", prune_config=?ctx.prune_config().unwrap_or_default(), "Pruner initialized");
hooks.add(PruneHook::new(pruner, Box::new(ctx.task_executor().clone())));
info!(target: "reth::cli", ?prune_config, "Pruner initialized");
// Configure the consensus engine
let (beacon_consensus_engine, beacon_engine_handle) = BeaconConsensusEngine::with_channel(
@@ -463,9 +427,6 @@ ReadyTransaction");
// extract the jwt secret from the args if possible
let jwt_secret = ctx.auth_jwt_secret()?;
// adjust rpc port numbers based on instance number
ctx.node_config_mut().adjust_instance_ports();
// Start RPC servers
let (rpc_server_handles, mut rpc_registry) = crate::rpc::launch_rpc_servers(
node_adapter.clone(),

View File

@@ -5,7 +5,6 @@ use reth_config::config::PruneConfig;
use reth_primitives::{
ChainSpec, PruneMode, PruneModes, ReceiptsLogPruneConfig, MINIMUM_PRUNING_DISTANCE,
};
use std::sync::Arc;
/// Parameters for pruning and full node
#[derive(Debug, Clone, Args, PartialEq, Eq, Default)]
@@ -19,31 +18,30 @@ pub struct PruningArgs {
impl PruningArgs {
/// Returns pruning configuration.
pub fn prune_config(&self, chain_spec: Arc<ChainSpec>) -> eyre::Result<Option<PruneConfig>> {
Ok(if self.full {
Some(PruneConfig {
block_interval: 5,
segments: PruneModes {
sender_recovery: Some(PruneMode::Full),
transaction_lookup: None,
receipts: chain_spec
pub fn prune_config(&self, chain_spec: &ChainSpec) -> Option<PruneConfig> {
if !self.full {
return None;
}
Some(PruneConfig {
block_interval: 5,
segments: PruneModes {
sender_recovery: Some(PruneMode::Full),
transaction_lookup: None,
receipts: chain_spec
.deposit_contract
.as_ref()
.map(|contract| PruneMode::Before(contract.block)),
account_history: Some(PruneMode::Distance(MINIMUM_PRUNING_DISTANCE)),
storage_history: Some(PruneMode::Distance(MINIMUM_PRUNING_DISTANCE)),
receipts_log_filter: ReceiptsLogPruneConfig(
chain_spec
.deposit_contract
.as_ref()
.map(|contract| PruneMode::Before(contract.block)),
account_history: Some(PruneMode::Distance(MINIMUM_PRUNING_DISTANCE)),
storage_history: Some(PruneMode::Distance(MINIMUM_PRUNING_DISTANCE)),
receipts_log_filter: ReceiptsLogPruneConfig(
chain_spec
.deposit_contract
.as_ref()
.map(|contract| (contract.address, PruneMode::Before(contract.block)))
.into_iter()
.collect(),
),
},
})
} else {
None
.map(|contract| (contract.address, PruneMode::Before(contract.block)))
.into_iter()
.collect(),
),
},
})
}
}

View File

@@ -262,8 +262,8 @@ impl NodeConfig {
}
/// Returns pruning configuration.
pub fn prune_config(&self) -> eyre::Result<Option<PruneConfig>> {
self.pruning.prune_config(Arc::clone(&self.chain))
pub fn prune_config(&self) -> Option<PruneConfig> {
self.pruning.prune_config(&self.chain)
}
/// Returns the max block that the node should run to, looking it up from the network if