From 0ba685386d328cfbeeb02776368cdddddbf5a7db Mon Sep 17 00:00:00 2001 From: DaniPopes <57450786+DaniPopes@users.noreply.github.com> Date: Tue, 17 Feb 2026 18:35:31 +0100 Subject: [PATCH] refactor: dedup runtime initializations (#22263) Co-authored-by: Alexey Shekhirin --- Cargo.lock | 2 + Cargo.toml | 2 +- crates/cli/commands/src/common.rs | 10 ++-- crates/cli/commands/src/db/mod.rs | 3 +- crates/cli/commands/src/export_era.rs | 4 +- crates/cli/commands/src/import.rs | 5 +- crates/cli/commands/src/import_core.rs | 17 +++--- crates/cli/commands/src/import_era.rs | 5 +- crates/cli/commands/src/init_cmd.rs | 7 ++- crates/cli/commands/src/init_state/mod.rs | 5 +- crates/cli/commands/src/p2p/mod.rs | 24 ++++---- crates/cli/commands/src/prune.rs | 2 +- crates/cli/commands/src/re_execute.rs | 8 ++- crates/cli/commands/src/stage/drop.rs | 4 +- .../cli/commands/src/stage/dump/execution.rs | 3 +- .../src/stage/dump/hashing_account.rs | 2 +- .../src/stage/dump/hashing_storage.rs | 2 +- crates/cli/commands/src/stage/dump/merkle.rs | 3 +- crates/cli/commands/src/stage/dump/mod.rs | 37 ++++++++---- crates/cli/commands/src/stage/mod.rs | 7 ++- crates/cli/commands/src/stage/run.rs | 5 +- crates/cli/commands/src/stage/unwind.rs | 4 +- crates/cli/runner/src/lib.rs | 5 ++ crates/e2e-test-utils/src/setup_builder.rs | 2 +- crates/e2e-test-utils/src/setup_import.rs | 8 ++- crates/ethereum/cli/src/app.rs | 16 +++-- crates/ethereum/node/tests/e2e/blobs.rs | 6 +- crates/ethereum/node/tests/e2e/dev.rs | 4 +- crates/ethereum/node/tests/e2e/eth.rs | 6 +- crates/ethereum/node/tests/e2e/pool.rs | 6 +- crates/ethereum/node/tests/e2e/prestate.rs | 2 +- crates/ethereum/node/tests/e2e/rpc.rs | 2 +- crates/ethereum/node/tests/it/builder.rs | 4 +- crates/ethereum/node/tests/it/testing.rs | 2 +- crates/exex/test-utils/src/lib.rs | 4 +- crates/net/network/src/config.rs | 44 +++++--------- crates/net/network/src/lib.rs | 6 +- crates/net/network/src/manager.rs | 8 ++- crates/net/network/src/test_utils/testnet.rs | 2 +- .../network/src/test_utils/transactions.rs | 3 +- crates/net/network/src/transactions/mod.rs | 13 +++-- crates/net/network/tests/it/connect.rs | 26 +++++---- crates/net/network/tests/it/multiplex.rs | 3 +- crates/net/network/tests/it/startup.rs | 19 +++--- crates/node/builder/src/builder/mod.rs | 2 +- crates/node/builder/src/builder/states.rs | 5 +- crates/node/core/Cargo.toml | 1 + crates/node/core/src/args/network.rs | 5 +- crates/node/metrics/src/server.rs | 2 +- crates/rpc/rpc-builder/src/eth.rs | 2 +- crates/rpc/rpc-eth-types/src/cache/mod.rs | 16 ----- crates/rpc/rpc/src/eth/builder.rs | 16 +++-- crates/rpc/rpc/src/eth/core.rs | 58 +------------------ crates/rpc/rpc/src/eth/pubsub.rs | 13 +---- crates/tasks/src/lib.rs | 34 +++++------ crates/tasks/src/runtime.rs | 33 ++--------- crates/tracing/Cargo.toml | 2 +- crates/transaction-pool/src/lib.rs | 6 +- crates/transaction-pool/src/maintain.rs | 2 +- examples/bsc-p2p/Cargo.toml | 1 + examples/bsc-p2p/src/main.rs | 3 +- examples/bsc-p2p/tests/it/p2p.rs | 3 +- examples/custom-dev-node/src/main.rs | 2 +- examples/custom-engine-types/src/main.rs | 2 +- examples/custom-evm/src/main.rs | 2 +- examples/custom-rlpx-subprotocol/src/main.rs | 3 +- examples/network-proxy/src/main.rs | 6 +- examples/network-txpool/src/main.rs | 3 +- examples/network/src/main.rs | 4 +- examples/polygon-p2p/src/main.rs | 11 ++-- examples/precompile-cache/src/main.rs | 2 +- examples/rpc-db/src/main.rs | 2 +- 72 files changed, 275 insertions(+), 318 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index be99b14e9a..d1daec56a1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3599,6 +3599,7 @@ dependencies = [ "reth-primitives", "reth-primitives-traits", "reth-provider", + "reth-tasks", "reth-tracing", "secp256k1 0.30.0", "serde", @@ -9321,6 +9322,7 @@ dependencies = [ "reth-stages-types", "reth-storage-api", "reth-storage-errors", + "reth-tasks", "reth-tracing", "reth-tracing-otlp", "reth-transaction-pool", diff --git a/Cargo.toml b/Cargo.toml index 1635818baf..3eea7485f3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -651,7 +651,7 @@ ethereum_ssz_derive = "0.10.1" jemalloc_pprof = { version = "0.8", default-features = false } tikv-jemalloc-ctl = "0.6" tikv-jemallocator = "0.6" -tracy-client = "0.18.0" +tracy-client = { version = "0.18.0", features = ["demangle"] } snmalloc-rs = { version = "0.3.7", features = ["build_cc"] } aes = "0.8.1" diff --git a/crates/cli/commands/src/common.rs b/crates/cli/commands/src/common.rs index 3b364f8acd..f23fe2d5a0 100644 --- a/crates/cli/commands/src/common.rs +++ b/crates/cli/commands/src/common.rs @@ -89,13 +89,15 @@ impl EnvironmentArgs { /// Initializes environment according to [`AccessRights`] and returns an instance of /// [`Environment`]. /// - /// Internally builds a [`reth_tasks::Runtime`] attached to the current tokio handle for - /// parallel storage I/O. - pub fn init(&self, access: AccessRights) -> eyre::Result> + /// The provided `runtime` is used for parallel storage I/O. + pub fn init( + &self, + access: AccessRights, + runtime: reth_tasks::Runtime, + ) -> eyre::Result> where C: ChainSpecParser, { - let runtime = reth_tasks::Runtime::with_existing_handle(tokio::runtime::Handle::current())?; let data_dir = self.datadir.clone().resolve_datadir(self.chain.chain()); let db_path = data_dir.db(); let sf_path = data_dir.static_files(); diff --git a/crates/cli/commands/src/db/mod.rs b/crates/cli/commands/src/db/mod.rs index 1676f2cb05..74608b99e1 100644 --- a/crates/cli/commands/src/db/mod.rs +++ b/crates/cli/commands/src/db/mod.rs @@ -83,7 +83,8 @@ impl> Command /// provided command. macro_rules! db_exec { ($env:expr, $tool:ident, $N:ident, $access_rights:expr, $command:block) => { - let Environment { provider_factory, .. } = $env.init::<$N>($access_rights)?; + let Environment { provider_factory, .. } = + $env.init::<$N>($access_rights, ctx.task_executor.clone())?; let $tool = DbTool::new(provider_factory)?; $command; diff --git a/crates/cli/commands/src/export_era.rs b/crates/cli/commands/src/export_era.rs index 5f4f0306bb..4efd38bcda 100644 --- a/crates/cli/commands/src/export_era.rs +++ b/crates/cli/commands/src/export_era.rs @@ -44,11 +44,11 @@ pub struct ExportArgs { impl> ExportEraCommand { /// Execute `export-era` command - pub async fn execute(self) -> eyre::Result<()> + pub async fn execute(self, runtime: reth_tasks::Runtime) -> eyre::Result<()> where N: CliNodeTypes, { - let Environment { provider_factory, .. } = self.env.init::(AccessRights::RO)?; + let Environment { provider_factory, .. } = self.env.init::(AccessRights::RO, runtime)?; // Either specified path or default to `//era1-export/` let data_dir = match &self.export.path { diff --git a/crates/cli/commands/src/import.rs b/crates/cli/commands/src/import.rs index bbf48209f8..fdddc614ed 100644 --- a/crates/cli/commands/src/import.rs +++ b/crates/cli/commands/src/import.rs @@ -47,6 +47,7 @@ impl> ImportComm pub async fn execute( self, components: impl FnOnce(Arc) -> Comp, + runtime: reth_tasks::Runtime, ) -> eyre::Result<()> where N: CliNodeTypes, @@ -54,7 +55,8 @@ impl> ImportComm { info!(target: "reth::cli", "reth {} starting", version_metadata().short_version); - let Environment { provider_factory, config, .. } = self.env.init::(AccessRights::RW)?; + let Environment { provider_factory, config, .. } = + self.env.init::(AccessRights::RW, runtime.clone())?; let components = components(provider_factory.chain_spec()); @@ -85,6 +87,7 @@ impl> ImportComm &config, executor.clone(), consensus.clone(), + runtime.clone(), ) .await?; diff --git a/crates/cli/commands/src/import_core.rs b/crates/cli/commands/src/import_core.rs index 2e9ce23774..2bcf930a75 100644 --- a/crates/cli/commands/src/import_core.rs +++ b/crates/cli/commands/src/import_core.rs @@ -87,6 +87,7 @@ pub async fn import_blocks_from_file( config: &Config, executor: impl ConfigureEvm + 'static, consensus: Arc + 'static>, + runtime: reth_tasks::Runtime, ) -> eyre::Result where N: ProviderNodeTypes, @@ -139,7 +140,7 @@ where total_decoded_blocks += file_client.headers_len(); total_decoded_txns += file_client.total_transactions(); - let (mut pipeline, events, _runtime) = build_import_pipeline_impl( + let (mut pipeline, events) = build_import_pipeline_impl( config, provider_factory.clone(), &consensus, @@ -147,6 +148,7 @@ where static_file_producer.clone(), import_config.no_state, executor.clone(), + runtime.clone(), )?; // override the tip @@ -257,6 +259,7 @@ where /// /// If configured to execute, all stages will run. Otherwise, only stages that don't require state /// will run. +#[expect(clippy::too_many_arguments)] pub fn build_import_pipeline_impl( config: &Config, provider_factory: ProviderFactory, @@ -265,11 +268,8 @@ pub fn build_import_pipeline_impl( static_file_producer: StaticFileProducer>, disable_exec: bool, evm_config: E, -) -> eyre::Result<( - Pipeline, - impl futures::Stream> + use, - reth_tasks::Runtime, -)> + runtime: reth_tasks::Runtime, +) -> eyre::Result<(Pipeline, impl futures::Stream> + use)> where N: ProviderNodeTypes, C: FullConsensus + 'static, @@ -285,9 +285,6 @@ where .sealed_header(last_block_number)? .ok_or_else(|| ProviderError::HeaderNotFound(last_block_number.into()))?; - let runtime = reth_tasks::Runtime::with_existing_handle(tokio::runtime::Handle::current()) - .expect("failed to create runtime"); - let mut header_downloader = ReverseHeadersDownloaderBuilder::new(config.stages.headers) .build(file_client.clone(), consensus.clone()) .into_task_with(&runtime); @@ -333,5 +330,5 @@ where let events = pipeline.events().map(Into::into); - Ok((pipeline, events, runtime)) + Ok((pipeline, events)) } diff --git a/crates/cli/commands/src/import_era.rs b/crates/cli/commands/src/import_era.rs index 2bf1fe1c9f..d0eeebe229 100644 --- a/crates/cli/commands/src/import_era.rs +++ b/crates/cli/commands/src/import_era.rs @@ -64,13 +64,14 @@ impl TryFromChain for ChainKind { impl> ImportEraCommand { /// Execute `import-era` command - pub async fn execute(self) -> eyre::Result<()> + pub async fn execute(self, runtime: reth_tasks::Runtime) -> eyre::Result<()> where N: CliNodeTypes, { info!(target: "reth::cli", "reth {} starting", version_metadata().short_version); - let Environment { provider_factory, config, .. } = self.env.init::(AccessRights::RW)?; + let Environment { provider_factory, config, .. } = + self.env.init::(AccessRights::RW, runtime)?; let mut hash_collector = Collector::new(config.stages.etl.file_size, config.stages.etl.dir); diff --git a/crates/cli/commands/src/init_cmd.rs b/crates/cli/commands/src/init_cmd.rs index 40ed8eb4a3..b25d4e0f75 100644 --- a/crates/cli/commands/src/init_cmd.rs +++ b/crates/cli/commands/src/init_cmd.rs @@ -18,10 +18,13 @@ pub struct InitCommand { impl> InitCommand { /// Execute the `init` command - pub async fn execute>(self) -> eyre::Result<()> { + pub async fn execute>( + self, + runtime: reth_tasks::Runtime, + ) -> eyre::Result<()> { info!(target: "reth::cli", "reth init starting"); - let Environment { provider_factory, .. } = self.env.init::(AccessRights::RW)?; + let Environment { provider_factory, .. } = self.env.init::(AccessRights::RW, runtime)?; let genesis_block_number = provider_factory.chain_spec().genesis_header().number(); let hash = provider_factory diff --git a/crates/cli/commands/src/init_state/mod.rs b/crates/cli/commands/src/init_state/mod.rs index 712404430e..9baf8576a0 100644 --- a/crates/cli/commands/src/init_state/mod.rs +++ b/crates/cli/commands/src/init_state/mod.rs @@ -65,7 +65,7 @@ pub struct InitStateCommand { impl> InitStateCommand { /// Execute the `init` command - pub async fn execute(self) -> eyre::Result<()> + pub async fn execute(self, runtime: reth_tasks::Runtime) -> eyre::Result<()> where N: CliNodeTypes< ChainSpec = C::ChainSpec, @@ -74,7 +74,8 @@ impl> InitStateC { info!(target: "reth::cli", "Reth init-state starting"); - let Environment { config, provider_factory, .. } = self.env.init::(AccessRights::RW)?; + let Environment { config, provider_factory, .. } = + self.env.init::(AccessRights::RW, runtime)?; let static_file_provider = provider_factory.static_file_provider(); let provider_rw = provider_factory.database_provider_rw()?; diff --git a/crates/cli/commands/src/p2p/mod.rs b/crates/cli/commands/src/p2p/mod.rs index 9634d95ba1..c9d616e4dc 100644 --- a/crates/cli/commands/src/p2p/mod.rs +++ b/crates/cli/commands/src/p2p/mod.rs @@ -16,6 +16,7 @@ use reth_node_core::{ args::{DatadirArgs, NetworkArgs}, utils::get_single_header, }; +use reth_tasks::Runtime; pub mod bootnode; pub mod enode; @@ -194,17 +195,18 @@ impl DownloadArgs { let rlpx_socket = (self.network.addr, self.network.port).into(); let boot_nodes = self.chain.bootnodes().unwrap_or_default(); - let net = NetworkConfigBuilder::::new(p2p_secret_key) - .peer_config(config.peers_config_with_basic_nodes_from_file(None)) - .external_ip_resolver(self.network.nat.clone()) - .network_id(self.network.network_id) - .boot_nodes(boot_nodes.clone()) - .apply(|builder| { - self.network.discovery.apply_to_builder(builder, rlpx_socket, boot_nodes) - }) - .build_with_noop_provider(self.chain.clone()) - .manager() - .await?; + let net = + NetworkConfigBuilder::::new(p2p_secret_key, Runtime::test()) + .peer_config(config.peers_config_with_basic_nodes_from_file(None)) + .external_ip_resolver(self.network.nat.clone()) + .network_id(self.network.network_id) + .boot_nodes(boot_nodes.clone()) + .apply(|builder| { + self.network.discovery.apply_to_builder(builder, rlpx_socket, boot_nodes) + }) + .build_with_noop_provider(self.chain.clone()) + .manager() + .await?; let handle = net.handle().clone(); tokio::task::spawn(net); diff --git a/crates/cli/commands/src/prune.rs b/crates/cli/commands/src/prune.rs index b40f1b71be..45837f72dd 100644 --- a/crates/cli/commands/src/prune.rs +++ b/crates/cli/commands/src/prune.rs @@ -36,7 +36,7 @@ impl> PruneComma self, ctx: CliContext, ) -> eyre::Result<()> { - let env = self.env.init::(AccessRights::RW)?; + let env = self.env.init::(AccessRights::RW, ctx.task_executor.clone())?; let provider_factory = env.provider_factory; let config = env.config.prune; let data_dir = env.data_dir; diff --git a/crates/cli/commands/src/re_execute.rs b/crates/cli/commands/src/re_execute.rs index 742b6ce76d..8086759227 100644 --- a/crates/cli/commands/src/re_execute.rs +++ b/crates/cli/commands/src/re_execute.rs @@ -60,11 +60,15 @@ impl Command { impl> Command { /// Execute `re-execute` command - pub async fn execute(self, components: impl CliComponentsBuilder) -> eyre::Result<()> + pub async fn execute( + self, + components: impl CliComponentsBuilder, + runtime: reth_tasks::Runtime, + ) -> eyre::Result<()> where N: CliNodeTypes, { - let Environment { provider_factory, .. } = self.env.init::(AccessRights::RO)?; + let Environment { provider_factory, .. } = self.env.init::(AccessRights::RO, runtime)?; let components = components(provider_factory.chain_spec()); diff --git a/crates/cli/commands/src/stage/drop.rs b/crates/cli/commands/src/stage/drop.rs index 6cf4d07b73..43d5d0c77d 100644 --- a/crates/cli/commands/src/stage/drop.rs +++ b/crates/cli/commands/src/stage/drop.rs @@ -37,11 +37,11 @@ pub struct Command { impl Command { /// Execute `db` command - pub async fn execute(self) -> eyre::Result<()> + pub async fn execute(self, runtime: reth_tasks::Runtime) -> eyre::Result<()> where C: ChainSpecParser, { - let Environment { provider_factory, .. } = self.env.init::(AccessRights::RW)?; + let Environment { provider_factory, .. } = self.env.init::(AccessRights::RW, runtime)?; let tool = DbTool::new(provider_factory)?; diff --git a/crates/cli/commands/src/stage/dump/execution.rs b/crates/cli/commands/src/stage/dump/execution.rs index 912b99b8be..8640a2c109 100644 --- a/crates/cli/commands/src/stage/dump/execution.rs +++ b/crates/cli/commands/src/stage/dump/execution.rs @@ -16,6 +16,7 @@ use reth_stages::{stages::ExecutionStage, Stage, StageCheckpoint, UnwindInput}; use std::sync::Arc; use tracing::info; +#[expect(clippy::too_many_arguments)] pub(crate) async fn dump_execution_stage( db_tool: &DbTool, from: u64, @@ -24,6 +25,7 @@ pub(crate) async fn dump_execution_stage( should_run: bool, evm_config: E, consensus: C, + runtime: reth_tasks::Runtime, ) -> eyre::Result<()> where N: ProviderNodeTypes, @@ -37,7 +39,6 @@ where unwind_and_copy(db_tool, from, tip_block_number, &output_db, evm_config.clone())?; if should_run { - let runtime = reth_tasks::Runtime::with_existing_handle(tokio::runtime::Handle::current())?; dry_run( ProviderFactory::::new( output_db, diff --git a/crates/cli/commands/src/stage/dump/hashing_account.rs b/crates/cli/commands/src/stage/dump/hashing_account.rs index 42c23646cb..4e2aebc5b2 100644 --- a/crates/cli/commands/src/stage/dump/hashing_account.rs +++ b/crates/cli/commands/src/stage/dump/hashing_account.rs @@ -18,6 +18,7 @@ pub(crate) async fn dump_hashing_account_stage, should_run: bool, + runtime: reth_tasks::Runtime, ) -> Result<()> { let (output_db, tip_block_number) = setup(from, to, &output_datadir.db(), db_tool)?; @@ -33,7 +34,6 @@ pub(crate) async fn dump_hashing_account_stage::new( output_db, diff --git a/crates/cli/commands/src/stage/dump/hashing_storage.rs b/crates/cli/commands/src/stage/dump/hashing_storage.rs index 538aa21dcc..2fd37d4a4c 100644 --- a/crates/cli/commands/src/stage/dump/hashing_storage.rs +++ b/crates/cli/commands/src/stage/dump/hashing_storage.rs @@ -17,13 +17,13 @@ pub(crate) async fn dump_hashing_storage_stage, should_run: bool, + runtime: reth_tasks::Runtime, ) -> Result<()> { let (output_db, tip_block_number) = setup(from, to, &output_datadir.db(), db_tool)?; unwind_and_copy(db_tool, from, tip_block_number, &output_db)?; if should_run { - let runtime = reth_tasks::Runtime::with_existing_handle(tokio::runtime::Handle::current())?; dry_run( ProviderFactory::::new( output_db, diff --git a/crates/cli/commands/src/stage/dump/merkle.rs b/crates/cli/commands/src/stage/dump/merkle.rs index 0932eeaf8b..0928ea2e84 100644 --- a/crates/cli/commands/src/stage/dump/merkle.rs +++ b/crates/cli/commands/src/stage/dump/merkle.rs @@ -24,6 +24,7 @@ use reth_stages::{ }; use tracing::info; +#[expect(clippy::too_many_arguments)] pub(crate) async fn dump_merkle_stage( db_tool: &DbTool, from: BlockNumber, @@ -32,6 +33,7 @@ pub(crate) async fn dump_merkle_stage( should_run: bool, evm_config: impl ConfigureEvm, consensus: impl FullConsensus + 'static, + runtime: reth_tasks::Runtime, ) -> Result<()> where N: ProviderNodeTypes, @@ -57,7 +59,6 @@ where unwind_and_copy(db_tool, (from, to), tip_block_number, &output_db, evm_config, consensus)?; if should_run { - let runtime = reth_tasks::Runtime::with_existing_handle(tokio::runtime::Handle::current())?; dry_run( ProviderFactory::::new( output_db, diff --git a/crates/cli/commands/src/stage/dump/mod.rs b/crates/cli/commands/src/stage/dump/mod.rs index a857888685..f90add3393 100644 --- a/crates/cli/commands/src/stage/dump/mod.rs +++ b/crates/cli/commands/src/stage/dump/mod.rs @@ -72,30 +72,36 @@ pub struct StageCommand { } macro_rules! handle_stage { - ($stage_fn:ident, $tool:expr, $command:expr) => {{ + ($stage_fn:ident, $tool:expr, $command:expr, $runtime:expr) => {{ let StageCommand { output_datadir, from, to, dry_run, .. } = $command; let output_datadir = output_datadir.with_chain($tool.chain().chain(), DatadirArgs::default()); - $stage_fn($tool, *from, *to, output_datadir, *dry_run).await? + $stage_fn($tool, *from, *to, output_datadir, *dry_run, $runtime).await? }}; - ($stage_fn:ident, $tool:expr, $command:expr, $executor:expr, $consensus:expr) => {{ + ($stage_fn:ident, $tool:expr, $command:expr, $executor:expr, $consensus:expr, $runtime:expr) => {{ let StageCommand { output_datadir, from, to, dry_run, .. } = $command; let output_datadir = output_datadir.with_chain($tool.chain().chain(), DatadirArgs::default()); - $stage_fn($tool, *from, *to, output_datadir, *dry_run, $executor, $consensus).await? + $stage_fn($tool, *from, *to, output_datadir, *dry_run, $executor, $consensus, $runtime) + .await? }}; } impl> Command { /// Execute `dump-stage` command - pub async fn execute(self, components: F) -> eyre::Result<()> + pub async fn execute( + self, + components: F, + runtime: reth_tasks::Runtime, + ) -> eyre::Result<()> where N: CliNodeTypes, Comp: CliNodeComponents, F: FnOnce(Arc) -> Comp, { - let Environment { provider_factory, .. } = self.env.init::(AccessRights::RO)?; + let Environment { provider_factory, .. } = + self.env.init::(AccessRights::RO, runtime.clone())?; let tool = DbTool::new(provider_factory)?; let components = components(tool.chain()); let evm_config = components.evm_config().clone(); @@ -103,12 +109,23 @@ impl> Command match &self.command { Stages::Execution(cmd) => { - handle_stage!(dump_execution_stage, &tool, cmd, evm_config, consensus) + handle_stage!( + dump_execution_stage, + &tool, + cmd, + evm_config, + consensus, + runtime.clone() + ) + } + Stages::StorageHashing(cmd) => { + handle_stage!(dump_hashing_storage_stage, &tool, cmd, runtime.clone()) + } + Stages::AccountHashing(cmd) => { + handle_stage!(dump_hashing_account_stage, &tool, cmd, runtime.clone()) } - Stages::StorageHashing(cmd) => handle_stage!(dump_hashing_storage_stage, &tool, cmd), - Stages::AccountHashing(cmd) => handle_stage!(dump_hashing_account_stage, &tool, cmd), Stages::Merkle(cmd) => { - handle_stage!(dump_merkle_stage, &tool, cmd, evm_config, consensus) + handle_stage!(dump_merkle_stage, &tool, cmd, evm_config, consensus, runtime.clone()) } } diff --git a/crates/cli/commands/src/stage/mod.rs b/crates/cli/commands/src/stage/mod.rs index 129a84733f..561ad8f8d6 100644 --- a/crates/cli/commands/src/stage/mod.rs +++ b/crates/cli/commands/src/stage/mod.rs @@ -49,11 +49,12 @@ impl N: CliNodeTypes, Comp: CliNodeComponents, { + let executor = ctx.task_executor.clone(); match self.command { Subcommands::Run(command) => command.execute::(ctx, components).await, - Subcommands::Drop(command) => command.execute::().await, - Subcommands::Dump(command) => command.execute::(components).await, - Subcommands::Unwind(command) => command.execute::(components).await, + Subcommands::Drop(command) => command.execute::(executor).await, + Subcommands::Dump(command) => command.execute::(components, executor).await, + Subcommands::Unwind(command) => command.execute::(components, executor).await, } } } diff --git a/crates/cli/commands/src/stage/run.rs b/crates/cli/commands/src/stage/run.rs index 0aa2e0fd16..f23ca3adef 100644 --- a/crates/cli/commands/src/stage/run.rs +++ b/crates/cli/commands/src/stage/run.rs @@ -119,8 +119,9 @@ impl // Does not do anything on windows. let _ = fdlimit::raise_fd_limit(); + let runtime = ctx.task_executor.clone(); let Environment { provider_factory, config, data_dir } = - self.env.init::(AccessRights::RW)?; + self.env.init::(AccessRights::RW, ctx.task_executor.clone())?; let mut provider_rw = provider_factory.database_provider_rw()?; let components = components(provider_factory.chain_spec()); @@ -171,6 +172,7 @@ impl provider_factory.chain_spec(), p2p_secret_key, default_peers_path, + runtime.clone(), ) .build(provider_factory.clone()) .start_network() @@ -226,6 +228,7 @@ impl provider_factory.chain_spec(), p2p_secret_key, default_peers_path, + runtime.clone(), ) .build(provider_factory.clone()) .start_network() diff --git a/crates/cli/commands/src/stage/unwind.rs b/crates/cli/commands/src/stage/unwind.rs index 2ad83a32cf..2e4e6a654b 100644 --- a/crates/cli/commands/src/stage/unwind.rs +++ b/crates/cli/commands/src/stage/unwind.rs @@ -46,12 +46,14 @@ impl> Command pub async fn execute, F, Comp>( self, components: F, + runtime: reth_tasks::Runtime, ) -> eyre::Result<()> where Comp: CliNodeComponents, F: FnOnce(Arc) -> Comp, { - let Environment { provider_factory, config, .. } = self.env.init::(AccessRights::RW)?; + let Environment { provider_factory, config, .. } = + self.env.init::(AccessRights::RW, runtime)?; let target = self.command.unwind_target(provider_factory.clone())?; diff --git a/crates/cli/runner/src/lib.rs b/crates/cli/runner/src/lib.rs index 7c359d421b..6642c7606a 100644 --- a/crates/cli/runner/src/lib.rs +++ b/crates/cli/runner/src/lib.rs @@ -47,6 +47,11 @@ impl CliRunner { self } + /// Returns a clone of the underlying [`Runtime`](reth_tasks::Runtime). + pub fn runtime(&self) -> reth_tasks::Runtime { + self.runtime.clone() + } + /// Executes an async block on the runtime and blocks until completion. pub fn block_on(&self, fut: F) -> T where diff --git a/crates/e2e-test-utils/src/setup_builder.rs b/crates/e2e-test-utils/src/setup_builder.rs index efc01c8856..82826d3953 100644 --- a/crates/e2e-test-utils/src/setup_builder.rs +++ b/crates/e2e-test-utils/src/setup_builder.rs @@ -112,7 +112,7 @@ where Vec>>>, Wallet, )> { - let runtime = Runtime::with_existing_handle(tokio::runtime::Handle::current())?; + let runtime = Runtime::test(); let network_config = NetworkArgs { discovery: DiscoveryArgs { disable_discovery: true, ..DiscoveryArgs::default() }, diff --git a/crates/e2e-test-utils/src/setup_import.rs b/crates/e2e-test-utils/src/setup_import.rs index b853b2ff1e..fddc51bde9 100644 --- a/crates/e2e-test-utils/src/setup_import.rs +++ b/crates/e2e-test-utils/src/setup_import.rs @@ -15,7 +15,6 @@ use reth_provider::{ }; use reth_rpc_server_types::RpcModuleSelection; use reth_stages_types::StageId; -use reth_tasks::Runtime; use std::{path::Path, sync::Arc}; use tempfile::TempDir; use tracing::{debug, info, span, Level}; @@ -66,7 +65,7 @@ pub async fn setup_engine_with_chain_import( + Copy + 'static, ) -> eyre::Result { - let runtime = Runtime::with_existing_handle(tokio::runtime::Handle::current())?; + let runtime = reth_tasks::Runtime::test(); let network_config = NetworkArgs { discovery: DiscoveryArgs { disable_discovery: true, ..DiscoveryArgs::default() }, @@ -149,6 +148,7 @@ pub async fn setup_engine_with_chain_import( &config, evm_config, consensus, + runtime.clone(), ) .await?; @@ -343,6 +343,7 @@ mod tests { let evm_config = reth_node_ethereum::EthEvmConfig::new(chain_spec.clone()); // Use NoopConsensus to skip gas limit validation for test imports let consensus = reth_consensus::noop::NoopConsensus::arc(); + let runtime = reth_tasks::Runtime::test(); let result = import_blocks_from_file( &rlp_path, @@ -351,6 +352,7 @@ mod tests { &config, evm_config, consensus, + runtime, ) .await .unwrap(); @@ -509,6 +511,7 @@ mod tests { let evm_config = reth_node_ethereum::EthEvmConfig::new(chain_spec.clone()); // Use NoopConsensus to skip gas limit validation for test imports let consensus = reth_consensus::noop::NoopConsensus::arc(); + let runtime = reth_tasks::Runtime::test(); let result = import_blocks_from_file( &rlp_path, @@ -517,6 +520,7 @@ mod tests { &config, evm_config, consensus, + runtime, ) .await .unwrap(); diff --git a/crates/ethereum/cli/src/app.rs b/crates/ethereum/cli/src/app.rs index 71cd2869e9..0d5bc6c9bd 100644 --- a/crates/ethereum/cli/src/app.rs +++ b/crates/ethereum/cli/src/app.rs @@ -144,6 +144,8 @@ where N: CliNodeTypes, ChainSpec: Hardforks>, SubCmd: ExtendedCommand + Subcommand + fmt::Debug, { + let rt = runner.runtime(); + match cli.command { Commands::Node(command) => { // Validate RPC modules using the configured validator @@ -169,13 +171,13 @@ where command.execute(ctx, FnLauncher::new::(launcher)) }) } - Commands::Init(command) => runner.run_blocking_until_ctrl_c(command.execute::()), - Commands::InitState(command) => runner.run_blocking_until_ctrl_c(command.execute::()), + Commands::Init(command) => runner.run_blocking_until_ctrl_c(command.execute::(rt)), + Commands::InitState(command) => runner.run_blocking_until_ctrl_c(command.execute::(rt)), Commands::Import(command) => { - runner.run_blocking_until_ctrl_c(command.execute::(components)) + runner.run_blocking_until_ctrl_c(command.execute::(components, rt)) } - Commands::ImportEra(command) => runner.run_blocking_until_ctrl_c(command.execute::()), - Commands::ExportEra(command) => runner.run_blocking_until_ctrl_c(command.execute::()), + Commands::ImportEra(command) => runner.run_blocking_until_ctrl_c(command.execute::(rt)), + Commands::ExportEra(command) => runner.run_blocking_until_ctrl_c(command.execute::(rt)), Commands::DumpGenesis(command) => runner.run_blocking_until_ctrl_c(command.execute()), Commands::Db(command) => { runner.run_blocking_command_until_exit(|ctx| command.execute::(ctx)) @@ -189,7 +191,9 @@ where Commands::Prune(command) => runner.run_command_until_exit(|ctx| command.execute::(ctx)), #[cfg(feature = "dev")] Commands::TestVectors(command) => runner.run_until_ctrl_c(command.execute()), - Commands::ReExecute(command) => runner.run_until_ctrl_c(command.execute::(components)), + Commands::ReExecute(command) => { + runner.run_until_ctrl_c(command.execute::(components, rt)) + } Commands::Ext(command) => command.execute(runner), } } diff --git a/crates/ethereum/node/tests/e2e/blobs.rs b/crates/ethereum/node/tests/e2e/blobs.rs index f9469f358e..830cc8befc 100644 --- a/crates/ethereum/node/tests/e2e/blobs.rs +++ b/crates/ethereum/node/tests/e2e/blobs.rs @@ -20,7 +20,7 @@ use std::{ #[tokio::test] async fn can_handle_blobs() -> eyre::Result<()> { reth_tracing::init_test_tracing(); - let runtime = Runtime::with_existing_handle(tokio::runtime::Handle::current()).unwrap(); + let runtime = Runtime::test(); let genesis: Genesis = serde_json::from_str(include_str!("../assets/genesis.json")).unwrap(); let chain_spec = Arc::new( @@ -91,7 +91,7 @@ async fn can_handle_blobs() -> eyre::Result<()> { #[tokio::test] async fn can_send_legacy_sidecar_post_activation() -> eyre::Result<()> { reth_tracing::init_test_tracing(); - let runtime = Runtime::with_existing_handle(tokio::runtime::Handle::current()).unwrap(); + let runtime = Runtime::test(); let genesis: Genesis = serde_json::from_str(include_str!("../assets/genesis.json")).unwrap(); let chain_spec = Arc::new( @@ -144,7 +144,7 @@ async fn can_send_legacy_sidecar_post_activation() -> eyre::Result<()> { #[tokio::test] async fn blob_conversion_at_osaka() -> eyre::Result<()> { reth_tracing::init_test_tracing(); - let runtime = Runtime::with_existing_handle(tokio::runtime::Handle::current()).unwrap(); + let runtime = Runtime::test(); let current_timestamp = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs(); // Osaka activates in 2 slots diff --git a/crates/ethereum/node/tests/e2e/dev.rs b/crates/ethereum/node/tests/e2e/dev.rs index 279f7d0cb7..72590adc22 100644 --- a/crates/ethereum/node/tests/e2e/dev.rs +++ b/crates/ethereum/node/tests/e2e/dev.rs @@ -15,7 +15,7 @@ use std::sync::Arc; #[tokio::test] async fn can_run_dev_node() -> eyre::Result<()> { reth_tracing::init_test_tracing(); - let runtime = Runtime::with_existing_handle(tokio::runtime::Handle::current()).unwrap(); + let runtime = Runtime::test(); let node_config = NodeConfig::test() .with_chain(custom_chain()) @@ -36,7 +36,7 @@ async fn can_run_dev_node() -> eyre::Result<()> { #[tokio::test] async fn can_run_dev_node_custom_attributes() -> eyre::Result<()> { reth_tracing::init_test_tracing(); - let runtime = Runtime::with_existing_handle(tokio::runtime::Handle::current()).unwrap(); + let runtime = Runtime::test(); let node_config = NodeConfig::test() .with_chain(custom_chain()) diff --git a/crates/ethereum/node/tests/e2e/eth.rs b/crates/ethereum/node/tests/e2e/eth.rs index 1be9b1e593..31eb81ce80 100644 --- a/crates/ethereum/node/tests/e2e/eth.rs +++ b/crates/ethereum/node/tests/e2e/eth.rs @@ -57,7 +57,7 @@ async fn can_run_eth_node() -> eyre::Result<()> { #[cfg(unix)] async fn can_run_eth_node_with_auth_engine_api_over_ipc() -> eyre::Result<()> { reth_tracing::init_test_tracing(); - let runtime = Runtime::with_existing_handle(tokio::runtime::Handle::current()).unwrap(); + let runtime = Runtime::test(); // Chain spec with test allocs let genesis: Genesis = serde_json::from_str(include_str!("../assets/genesis.json")).unwrap(); @@ -104,7 +104,7 @@ async fn can_run_eth_node_with_auth_engine_api_over_ipc() -> eyre::Result<()> { #[cfg(unix)] async fn test_failed_run_eth_node_with_no_auth_engine_api_over_ipc_opts() -> eyre::Result<()> { reth_tracing::init_test_tracing(); - let runtime = Runtime::with_existing_handle(tokio::runtime::Handle::current()).unwrap(); + let runtime = Runtime::test(); // Chain spec with test allocs let genesis: Genesis = serde_json::from_str(include_str!("../assets/genesis.json")).unwrap(); @@ -188,7 +188,7 @@ async fn test_engine_graceful_shutdown() -> eyre::Result<()> { #[tokio::test] async fn test_testing_build_block_v1_osaka() -> eyre::Result<()> { reth_tracing::init_test_tracing(); - let runtime = Runtime::with_existing_handle(tokio::runtime::Handle::current()).unwrap(); + let runtime = Runtime::test(); let genesis: Genesis = serde_json::from_str(include_str!("../assets/genesis.json")).unwrap(); let chain_spec = Arc::new( diff --git a/crates/ethereum/node/tests/e2e/pool.rs b/crates/ethereum/node/tests/e2e/pool.rs index e30aa41420..6ad25bfeaa 100644 --- a/crates/ethereum/node/tests/e2e/pool.rs +++ b/crates/ethereum/node/tests/e2e/pool.rs @@ -24,7 +24,7 @@ use std::{sync::Arc, time::Duration}; #[tokio::test] async fn maintain_txpool_stale_eviction() -> eyre::Result<()> { reth_tracing::init_test_tracing(); - let runtime = Runtime::with_existing_handle(tokio::runtime::Handle::current()).unwrap(); + let runtime = Runtime::test(); let txpool = Pool::new( OkValidator::default(), @@ -97,7 +97,7 @@ async fn maintain_txpool_stale_eviction() -> eyre::Result<()> { #[tokio::test] async fn maintain_txpool_reorg() -> eyre::Result<()> { reth_tracing::init_test_tracing(); - let runtime = Runtime::with_existing_handle(tokio::runtime::Handle::current()).unwrap(); + let runtime = Runtime::test(); let txpool = Pool::new( OkValidator::default(), @@ -229,7 +229,7 @@ async fn maintain_txpool_reorg() -> eyre::Result<()> { #[tokio::test] async fn maintain_txpool_commit() -> eyre::Result<()> { reth_tracing::init_test_tracing(); - let runtime = Runtime::with_existing_handle(tokio::runtime::Handle::current()).unwrap(); + let runtime = Runtime::test(); let txpool = Pool::new( OkValidator::default(), diff --git a/crates/ethereum/node/tests/e2e/prestate.rs b/crates/ethereum/node/tests/e2e/prestate.rs index 42a3e62a00..d1c35063ec 100644 --- a/crates/ethereum/node/tests/e2e/prestate.rs +++ b/crates/ethereum/node/tests/e2e/prestate.rs @@ -29,7 +29,7 @@ async fn debug_trace_call_matches_geth_prestate_snapshot() -> Result<()> { let mut genesis: Genesis = MAINNET.genesis().clone(); genesis.coinbase = address!("0x95222290dd7278aa3ddd389cc1e1d165cc4bafe5"); - let runtime = Runtime::with_existing_handle(tokio::runtime::Handle::current()).unwrap(); + let runtime = Runtime::test(); let expected_frame = expected_snapshot_frame()?; let prestate_mode = match &expected_frame { diff --git a/crates/ethereum/node/tests/e2e/rpc.rs b/crates/ethereum/node/tests/e2e/rpc.rs index 794254fe6c..b1206884dd 100644 --- a/crates/ethereum/node/tests/e2e/rpc.rs +++ b/crates/ethereum/node/tests/e2e/rpc.rs @@ -344,7 +344,7 @@ async fn test_eth_config() -> eyre::Result<()> { async fn test_admin_external_ip() -> eyre::Result<()> { reth_tracing::init_test_tracing(); - let runtime = Runtime::with_existing_handle(tokio::runtime::Handle::current()).unwrap(); + let runtime = Runtime::test(); // Chain spec with test allocs let genesis: Genesis = serde_json::from_str(include_str!("../assets/genesis.json")).unwrap(); diff --git a/crates/ethereum/node/tests/it/builder.rs b/crates/ethereum/node/tests/it/builder.rs index 103d766cc6..5961f422e5 100644 --- a/crates/ethereum/node/tests/it/builder.rs +++ b/crates/ethereum/node/tests/it/builder.rs @@ -46,7 +46,7 @@ fn test_basic_setup() { #[tokio::test] async fn test_eth_launcher() { - let runtime = Runtime::with_existing_handle(tokio::runtime::Handle::current()).unwrap(); + let runtime = Runtime::test(); let config = NodeConfig::test(); let db = create_test_rw_db(); let _builder = @@ -81,7 +81,7 @@ fn test_eth_launcher_with_tokio_runtime() { let custom_rt = tokio::runtime::Runtime::new().expect("Failed to create tokio runtime"); main_rt.block_on(async { - let runtime = Runtime::with_existing_handle(tokio::runtime::Handle::current()).unwrap(); + let runtime = Runtime::test(); let config = NodeConfig::test(); let db = create_test_rw_db(); let _builder = diff --git a/crates/ethereum/node/tests/it/testing.rs b/crates/ethereum/node/tests/it/testing.rs index 97b13b1bf2..6f749f014c 100644 --- a/crates/ethereum/node/tests/it/testing.rs +++ b/crates/ethereum/node/tests/it/testing.rs @@ -20,7 +20,7 @@ use tokio::sync::oneshot; #[tokio::test(flavor = "multi_thread")] async fn testing_rpc_build_block_works() -> eyre::Result<()> { - let runtime = Runtime::with_existing_handle(tokio::runtime::Handle::current()).unwrap(); + let runtime = Runtime::test(); let mut rpc_args = reth_node_core::args::RpcServerArgs::default().with_http(); rpc_args.http_api = Some(RpcModuleSelection::from_iter([RethRpcModule::Testing])); let tempdir = tempdir().expect("temp datadir"); diff --git a/crates/exex/test-utils/src/lib.rs b/crates/exex/test-utils/src/lib.rs index a1a9f20ef4..900a985eb3 100644 --- a/crates/exex/test-utils/src/lib.rs +++ b/crates/exex/test-utils/src/lib.rs @@ -258,15 +258,15 @@ pub async fn test_exex_context_with_chain_spec( let genesis_hash = init_genesis(&provider_factory)?; let provider = BlockchainProvider::new(provider_factory.clone())?; + let runtime = Runtime::test(); let network_manager = NetworkManager::new( - NetworkConfigBuilder::new(rng_secret_key()) + NetworkConfigBuilder::new(rng_secret_key(), runtime.clone()) .with_unused_discovery_port() .with_unused_listener_port() .build(provider_factory.clone()), ) .await?; let network = network_manager.handle().clone(); - let runtime = Runtime::with_existing_handle(tokio::runtime::Handle::current()).unwrap(); let task_executor = runtime.clone(); runtime.spawn_task(network_manager); diff --git a/crates/net/network/src/config.rs b/crates/net/network/src/config.rs index c86861bf42..3fc4fc42cb 100644 --- a/crates/net/network/src/config.rs +++ b/crates/net/network/src/config.rs @@ -102,26 +102,18 @@ pub struct NetworkConfig { // === impl NetworkConfig === impl NetworkConfig<(), N> { - /// Convenience method for creating the corresponding builder type - pub fn builder(secret_key: SecretKey) -> NetworkConfigBuilder { - NetworkConfigBuilder::new(secret_key) + /// Convenience method for creating the corresponding builder type. + pub fn builder(secret_key: SecretKey, executor: Runtime) -> NetworkConfigBuilder { + NetworkConfigBuilder::new(secret_key, executor) } /// Convenience method for creating the corresponding builder type with a random secret key. - pub fn builder_with_rng_secret_key() -> NetworkConfigBuilder { - NetworkConfigBuilder::with_rng_secret_key() + pub fn builder_with_rng_secret_key(executor: Runtime) -> NetworkConfigBuilder { + NetworkConfigBuilder::with_rng_secret_key(executor) } } impl NetworkConfig { - /// Create a new instance with all mandatory fields set, rest is field with defaults. - pub fn new(client: C, secret_key: SecretKey) -> Self - where - C: ChainSpecProvider, - { - NetworkConfig::builder(secret_key).build(client) - } - /// Apply a function to the config. pub fn apply(self, f: F) -> Self where @@ -206,7 +198,7 @@ pub struct NetworkConfigBuilder { /// The default mode of the network. network_mode: NetworkMode, /// The executor to use for spawning tasks. - executor: Option, + executor: Runtime, /// Sets the hello message for the p2p handshake in `RLPx` hello_message: Option, /// The executor to use for spawning tasks. @@ -232,8 +224,8 @@ pub struct NetworkConfigBuilder { impl NetworkConfigBuilder { /// Creates the `NetworkConfigBuilder` with [`EthNetworkPrimitives`] types. - pub fn eth(secret_key: SecretKey) -> Self { - Self::new(secret_key) + pub fn eth(secret_key: SecretKey, executor: Runtime) -> Self { + Self::new(secret_key, executor) } } @@ -242,12 +234,12 @@ impl NetworkConfigBuilder { #[expect(missing_docs)] impl NetworkConfigBuilder { /// Create a new builder instance with a random secret key. - pub fn with_rng_secret_key() -> Self { - Self::new(rng_secret_key()) + pub fn with_rng_secret_key(executor: Runtime) -> Self { + Self::new(rng_secret_key(), executor) } /// Create a new builder instance with the given secret key. - pub fn new(secret_key: SecretKey) -> Self { + pub fn new(secret_key: SecretKey, executor: Runtime) -> Self { Self { secret_key, dns_discovery_config: Some(Default::default()), @@ -259,7 +251,7 @@ impl NetworkConfigBuilder { peers_config: None, sessions_config: None, network_mode: Default::default(), - executor: None, + executor, hello_message: None, extra_protocols: Default::default(), head: None, @@ -340,10 +332,8 @@ impl NetworkConfigBuilder { } /// Sets the executor to use for spawning tasks. - /// - /// If `None`, then [`tokio::spawn`] is used for spawning tasks. pub fn with_task_executor(mut self, executor: Runtime) -> Self { - self.executor = Some(executor); + self.executor = executor; self } @@ -691,11 +681,7 @@ impl NetworkConfigBuilder { chain_id, block_import: block_import.unwrap_or_else(|| Box::::default()), network_mode, - executor: executor.unwrap_or_else(|| match tokio::runtime::Handle::try_current() { - Ok(handle) => Runtime::with_existing_handle(handle) - .expect("failed to create runtime with existing handle"), - Err(_) => Runtime::test(), - }), + executor, status, hello_message, extra_protocols, @@ -749,7 +735,7 @@ mod tests { fn builder() -> NetworkConfigBuilder { let secret_key = SecretKey::new(&mut rand_08::thread_rng()); - NetworkConfigBuilder::new(secret_key) + NetworkConfigBuilder::new(secret_key, Runtime::test()) } #[test] diff --git a/crates/net/network/src/lib.rs b/crates/net/network/src/lib.rs index ad63067a51..6a18934983 100644 --- a/crates/net/network/src/lib.rs +++ b/crates/net/network/src/lib.rs @@ -51,6 +51,7 @@ //! }; //! use reth_network_peers::mainnet_nodes; //! use reth_storage_api::noop::NoopProvider; +//! use reth_tasks::Runtime; //! //! // This block provider implementation is used for testing purposes. //! let client = NoopProvider::default(); @@ -58,7 +59,7 @@ //! // The key that's used for encrypting sessions and to identify our node. //! let local_key = rng_secret_key(); //! -//! let config = NetworkConfig::<_, EthNetworkPrimitives>::builder(local_key) +//! let config = NetworkConfig::<_, EthNetworkPrimitives>::builder(local_key, Runtime::test()) //! .boot_nodes(mainnet_nodes()) //! .build(client); //! @@ -80,6 +81,7 @@ //! }; //! use reth_network_peers::mainnet_nodes; //! use reth_storage_api::noop::NoopProvider; +//! use reth_tasks::Runtime; //! use reth_transaction_pool::TransactionPool; //! async fn launch(pool: Pool) { //! // This block provider implementation is used for testing purposes. @@ -88,7 +90,7 @@ //! // The key that's used for encrypting sessions and to identify our node. //! let local_key = rng_secret_key(); //! -//! let config = NetworkConfig::<_, EthNetworkPrimitives>::builder(local_key) +//! let config = NetworkConfig::<_, EthNetworkPrimitives>::builder(local_key, Runtime::test()) //! .boot_nodes(mainnet_nodes()) //! .build(client.clone()); //! let transactions_manager_config = config.transactions_manager_config.clone(); diff --git a/crates/net/network/src/manager.rs b/crates/net/network/src/manager.rs index 0710fdf2b5..3742deed4f 100644 --- a/crates/net/network/src/manager.rs +++ b/crates/net/network/src/manager.rs @@ -157,8 +157,9 @@ impl NetworkManager { /// # async fn f() { /// use reth_chainspec::MAINNET; /// use reth_network::{NetworkConfig, NetworkManager}; - /// let config = - /// NetworkConfig::builder_with_rng_secret_key().build_with_noop_provider(MAINNET.clone()); + /// use reth_tasks::Runtime; + /// let config = NetworkConfig::builder_with_rng_secret_key(Runtime::test()) + /// .build_with_noop_provider(MAINNET.clone()); /// let manager = NetworkManager::eth(config).await; /// # } /// ``` @@ -378,6 +379,7 @@ impl NetworkManager { /// }; /// use reth_network_peers::mainnet_nodes; /// use reth_storage_api::noop::NoopProvider; + /// use reth_tasks::Runtime; /// use reth_transaction_pool::TransactionPool; /// async fn launch(pool: Pool) { /// // This block provider implementation is used for testing purposes. @@ -386,7 +388,7 @@ impl NetworkManager { /// // The key that's used for encrypting sessions and to identify our node. /// let local_key = rng_secret_key(); /// - /// let config = NetworkConfig::<_, EthNetworkPrimitives>::builder(local_key) + /// let config = NetworkConfig::<_, EthNetworkPrimitives>::builder(local_key, Runtime::test()) /// .boot_nodes(mainnet_nodes()) /// .build(client.clone()); /// let transactions_manager_config = config.transactions_manager_config.clone(); diff --git a/crates/net/network/src/test_utils/testnet.rs b/crates/net/network/src/test_utils/testnet.rs index 570e03ddfa..8bc2550cda 100644 --- a/crates/net/network/src/test_utils/testnet.rs +++ b/crates/net/network/src/test_utils/testnet.rs @@ -713,7 +713,7 @@ where } fn network_config_builder(secret_key: SecretKey) -> NetworkConfigBuilder { - NetworkConfigBuilder::new(secret_key) + NetworkConfigBuilder::new(secret_key, Runtime::test()) .listener_addr(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0))) .discovery_addr(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0))) .disable_dns_discovery() diff --git a/crates/net/network/src/test_utils/transactions.rs b/crates/net/network/src/test_utils/transactions.rs index 467f146b05..49b4030d1d 100644 --- a/crates/net/network/src/test_utils/transactions.rs +++ b/crates/net/network/src/test_utils/transactions.rs @@ -20,6 +20,7 @@ use reth_eth_wire_types::EthNetworkPrimitives; use reth_network_api::{PeerKind, PeerRequest, PeerRequestSender}; use reth_network_peers::PeerId; use reth_storage_api::noop::NoopProvider; +use reth_tasks::Runtime; use reth_transaction_pool::test_utils::{testing_pool, TestPool}; use secp256k1::SecretKey; use std::sync::Arc; @@ -32,7 +33,7 @@ pub async fn new_tx_manager( let secret_key = SecretKey::new(&mut rand_08::thread_rng()); let client = NoopProvider::default(); - let config = NetworkConfigBuilder::new(secret_key) + let config = NetworkConfigBuilder::new(secret_key, Runtime::test()) // let OS choose port .listener_port(0) .disable_discovery() diff --git a/crates/net/network/src/transactions/mod.rs b/crates/net/network/src/transactions/mod.rs index 91ee7c17d5..44b6faa0e3 100644 --- a/crates/net/network/src/transactions/mod.rs +++ b/crates/net/network/src/transactions/mod.rs @@ -2167,6 +2167,7 @@ mod tests { sync::{NetworkSyncUpdater, SyncState}, }; use reth_storage_api::noop::NoopProvider; + use reth_tasks::Runtime; use reth_transaction_pool::test_utils::{ testing_pool, MockTransaction, MockTransactionFactory, TestPool, }; @@ -2196,7 +2197,7 @@ mod tests { let client = NoopProvider::default(); let pool = testing_pool(); - let config = NetworkConfigBuilder::eth(secret_key) + let config = NetworkConfigBuilder::eth(secret_key, Runtime::test()) .disable_discovery() .listener_port(0) .build(client); @@ -2266,7 +2267,7 @@ mod tests { let client = NoopProvider::default(); let pool = testing_pool(); - let config = NetworkConfigBuilder::new(secret_key) + let config = NetworkConfigBuilder::new(secret_key, Runtime::test()) .disable_discovery() .listener_port(0) .build(client); @@ -2332,7 +2333,7 @@ mod tests { let secret_key = SecretKey::new(&mut rand_08::thread_rng()); let client = NoopProvider::default(); - let config = NetworkConfigBuilder::new(secret_key) + let config = NetworkConfigBuilder::new(secret_key, Runtime::test()) // let OS choose port .listener_port(0) .disable_discovery() @@ -2440,7 +2441,7 @@ mod tests { let client = NoopProvider::default(); let pool = testing_pool(); - let config = NetworkConfigBuilder::new(secret_key) + let config = NetworkConfigBuilder::new(secret_key, Runtime::test()) .disable_discovery() .listener_port(0) .build(client); @@ -2518,7 +2519,7 @@ mod tests { let client = NoopProvider::default(); let pool = testing_pool(); - let config = NetworkConfigBuilder::new(secret_key) + let config = NetworkConfigBuilder::new(secret_key, Runtime::test()) .disable_discovery() .listener_port(0) .build(client); @@ -2936,7 +2937,7 @@ mod tests { let secret_key = SecretKey::new(&mut rand_08::thread_rng()); let client = NoopProvider::default(); - let network_config = NetworkConfigBuilder::new(secret_key) + let network_config = NetworkConfigBuilder::new(secret_key, Runtime::test()) .listener_port(0) .disable_discovery() .build(client.clone()); diff --git a/crates/net/network/tests/it/connect.rs b/crates/net/network/tests/it/connect.rs index 10c8d05556..0567bf5680 100644 --- a/crates/net/network/tests/it/connect.rs +++ b/crates/net/network/tests/it/connect.rs @@ -21,6 +21,7 @@ use reth_network_p2p::{ use reth_network_peers::{mainnet_nodes, NodeRecord, TrustedPeer}; use reth_provider::test_utils::MockEthProvider; use reth_storage_api::noop::NoopProvider; +use reth_tasks::Runtime; use reth_tracing::init_test_tracing; use reth_transaction_pool::test_utils::testing_pool; use secp256k1::SecretKey; @@ -207,8 +208,9 @@ async fn test_connect_with_boot_nodes() { let mut discv4 = Discv4Config::builder(); discv4.add_boot_nodes(mainnet_nodes()); - let config = - NetworkConfigBuilder::eth(secret_key).discovery(discv4).build(NoopProvider::default()); + let config = NetworkConfigBuilder::eth(secret_key, Runtime::test()) + .discovery(discv4) + .build(NoopProvider::default()); let network = NetworkManager::new(config).await.unwrap(); let handle = network.handle().clone(); @@ -229,7 +231,9 @@ async fn test_connect_with_builder() { discv4.add_boot_nodes(mainnet_nodes()); let client = NoopProvider::default(); - let config = NetworkConfigBuilder::eth(secret_key).discovery(discv4).build(client.clone()); + let config = NetworkConfigBuilder::eth(secret_key, Runtime::test()) + .discovery(discv4) + .build(client.clone()); let (handle, network, _, requests) = NetworkManager::new(config) .await .unwrap() @@ -265,7 +269,9 @@ async fn test_connect_to_trusted_peer() { let discv4 = Discv4Config::builder(); let client = NoopProvider::default(); - let config = NetworkConfigBuilder::eth(secret_key).discovery(discv4).build(client.clone()); + let config = NetworkConfigBuilder::eth(secret_key, Runtime::test()) + .discovery(discv4) + .build(client.clone()); let transactions_manager_config = config.transactions_manager_config.clone(); let (handle, network, transactions, requests) = NetworkManager::new(config) .await @@ -381,7 +387,7 @@ async fn test_trusted_peer_only() { let secret_key = SecretKey::new(&mut rand_08::thread_rng()); let peers_config = PeersConfig::test().with_trusted_nodes_only(true); - let config = NetworkConfigBuilder::eth(secret_key) + let config = NetworkConfigBuilder::eth(secret_key, Runtime::test()) .listener_port(0) .disable_discovery() .peer_config(peers_config) @@ -444,7 +450,7 @@ async fn test_network_state_change() { let secret_key = SecretKey::new(&mut rand_08::thread_rng()); let peers_config = PeersConfig::test(); - let config = NetworkConfigBuilder::eth(secret_key) + let config = NetworkConfigBuilder::eth(secret_key, Runtime::test()) .listener_port(0) .disable_discovery() .peer_config(peers_config) @@ -485,7 +491,7 @@ async fn test_exceed_outgoing_connections() { let secret_key = SecretKey::new(&mut rand_08::thread_rng()); let peers_config = PeersConfig::test().with_max_outbound(1); - let config = NetworkConfigBuilder::eth(secret_key) + let config = NetworkConfigBuilder::eth(secret_key, Runtime::test()) .listener_port(0) .disable_discovery() .peer_config(peers_config) @@ -526,7 +532,7 @@ async fn test_disconnect_incoming_when_exceeded_incoming_connections() { let secret_key = SecretKey::new(&mut rand_08::thread_rng()); let peers_config = PeersConfig::test().with_max_inbound(0); - let config = NetworkConfigBuilder::eth(secret_key) + let config = NetworkConfigBuilder::eth(secret_key, Runtime::test()) .listener_port(0) .disable_discovery() .peer_config(peers_config) @@ -643,7 +649,7 @@ async fn new_random_peer( let peers_config = PeersConfig::test().with_max_inbound(max_in_bound).with_trusted_nodes(trusted_nodes); - let config = NetworkConfigBuilder::new(secret_key) + let config = NetworkConfigBuilder::new(secret_key, Runtime::test()) .listener_port(0) .disable_discovery() .peer_config(peers_config) @@ -715,7 +721,7 @@ async fn test_connect_peer_in_different_network_should_fail() { // If the remote disconnect first, then we would not get a fatal protocol error. So set // max_backoff_count to 0 to speed up the removal of the peer. let peers_config = PeersConfig::default().with_max_backoff_count(0); - let config = NetworkConfigBuilder::eth(secret_key) + let config = NetworkConfigBuilder::eth(secret_key, Runtime::test()) .listener_port(0) .disable_discovery() .peer_config(peers_config) diff --git a/crates/net/network/tests/it/multiplex.rs b/crates/net/network/tests/it/multiplex.rs index d081100681..9646282002 100644 --- a/crates/net/network/tests/it/multiplex.rs +++ b/crates/net/network/tests/it/multiplex.rs @@ -19,6 +19,7 @@ use reth_network::{ }; use reth_network_api::{Direction, NetworkInfo, PeerId, Peers}; use reth_provider::{noop::NoopProvider, test_utils::MockEthProvider}; +use reth_tasks::Runtime; use secp256k1::SecretKey; use tokio::sync::{mpsc, oneshot}; use tokio_stream::wrappers::UnboundedReceiverStream; @@ -285,7 +286,7 @@ async fn test_connect_to_non_multiplex_peer() { let secret_key = SecretKey::new(&mut rand_08::thread_rng()); - let config = NetworkConfigBuilder::eth(secret_key) + let config = NetworkConfigBuilder::eth(secret_key, Runtime::test()) .listener_port(0) .disable_discovery() .build(NoopProvider::default()); diff --git a/crates/net/network/tests/it/startup.rs b/crates/net/network/tests/it/startup.rs index 3e05409961..b3471f67e7 100644 --- a/crates/net/network/tests/it/startup.rs +++ b/crates/net/network/tests/it/startup.rs @@ -11,6 +11,7 @@ use reth_network::{ }; use reth_network_api::{NetworkInfo, PeersInfo}; use reth_storage_api::noop::NoopProvider; +use reth_tasks::Runtime; use secp256k1::SecretKey; use tokio::net::TcpListener; @@ -29,7 +30,7 @@ fn is_addr_in_use_kind(err: &NetworkError, kind: ServiceKind) -> bool { #[tokio::test(flavor = "multi_thread")] async fn test_is_default_syncing() { let secret_key = SecretKey::new(&mut rand_08::thread_rng()); - let config = NetworkConfigBuilder::eth(secret_key) + let config = NetworkConfigBuilder::eth(secret_key, Runtime::test()) .disable_discovery() .listener_port(0) .build(NoopProvider::default()); @@ -40,13 +41,13 @@ async fn test_is_default_syncing() { #[tokio::test(flavor = "multi_thread")] async fn test_listener_addr_in_use() { let secret_key = SecretKey::new(&mut rand_08::thread_rng()); - let config = NetworkConfigBuilder::eth(secret_key) + let config = NetworkConfigBuilder::eth(secret_key, Runtime::test()) .disable_discovery() .listener_port(0) .build(NoopProvider::default()); let network = NetworkManager::new(config).await.unwrap(); let listener_port = network.local_addr().port(); - let config = NetworkConfigBuilder::eth(secret_key) + let config = NetworkConfigBuilder::eth(secret_key, Runtime::test()) .listener_port(listener_port) .disable_discovery() .build(NoopProvider::default()); @@ -74,7 +75,7 @@ async fn test_discovery_addr_in_use() { #[tokio::test(flavor = "multi_thread")] async fn test_discv5_and_discv4_same_socket_fails() { let secret_key = SecretKey::new(&mut rand_08::thread_rng()); - let config = NetworkConfigBuilder::eth(secret_key) + let config = NetworkConfigBuilder::eth(secret_key, Runtime::test()) .listener_port(DEFAULT_DISCOVERY_PORT) .discovery_v5( reth_discv5::Config::builder((DEFAULT_DISCOVERY_ADDR, DEFAULT_DISCOVERY_PORT).into()) @@ -105,7 +106,7 @@ async fn test_discv5_and_rlpx_same_socket_ok_without_discv4() { .port(); let secret_key = SecretKey::new(&mut rand_08::thread_rng()); - let config = NetworkConfigBuilder::eth(secret_key) + let config = NetworkConfigBuilder::eth(secret_key, Runtime::test()) .listener_port(test_port) .disable_discv4_discovery() .discovery_v5( @@ -126,7 +127,7 @@ async fn test_discv5_and_rlpx_same_socket_ok_without_discv4() { #[tokio::test(flavor = "multi_thread")] async fn test_tcp_port_node_record_no_discovery() { let secret_key = SecretKey::new(&mut rand_08::thread_rng()); - let config = NetworkConfigBuilder::eth(secret_key) + let config = NetworkConfigBuilder::eth(secret_key, Runtime::test()) .listener_port(0) .disable_discovery() .build_with_noop_provider(MAINNET.clone()); @@ -144,7 +145,7 @@ async fn test_tcp_port_node_record_no_discovery() { #[tokio::test(flavor = "multi_thread")] async fn test_tcp_port_node_record_discovery() { let secret_key = SecretKey::new(&mut rand_08::thread_rng()); - let config = NetworkConfigBuilder::eth(secret_key) + let config = NetworkConfigBuilder::eth(secret_key, Runtime::test()) .listener_port(0) .discovery_port(0) .disable_dns_discovery() @@ -163,7 +164,7 @@ async fn test_tcp_port_node_record_discovery() { #[tokio::test(flavor = "multi_thread")] async fn test_node_record_address_with_nat() { let secret_key = SecretKey::new(&mut rand_08::thread_rng()); - let config = NetworkConfigBuilder::eth(secret_key) + let config = NetworkConfigBuilder::eth(secret_key, Runtime::test()) .add_nat(Some(NatResolver::ExternalIp("10.1.1.1".parse().unwrap()))) .disable_discv4_discovery() .disable_dns_discovery() @@ -179,7 +180,7 @@ async fn test_node_record_address_with_nat() { #[tokio::test(flavor = "multi_thread")] async fn test_node_record_address_with_nat_disable_discovery() { let secret_key = SecretKey::new(&mut rand_08::thread_rng()); - let config = NetworkConfigBuilder::eth(secret_key) + let config = NetworkConfigBuilder::eth(secret_key, Runtime::test()) .add_nat(Some(NatResolver::ExternalIp("10.1.1.1".parse().unwrap()))) .disable_discovery() .disable_nat() diff --git a/crates/node/builder/src/builder/mod.rs b/crates/node/builder/src/builder/mod.rs index c98fb37f0d..728e6bfe3a 100644 --- a/crates/node/builder/src/builder/mod.rs +++ b/crates/node/builder/src/builder/mod.rs @@ -985,8 +985,8 @@ impl>> BuilderContext self.config().chain.clone(), secret_key, default_peers_path, + self.executor.clone(), ) - .with_task_executor(self.executor.clone()) .set_head(self.head); Ok(builder) diff --git a/crates/node/builder/src/builder/states.rs b/crates/node/builder/src/builder/states.rs index c1693c503b..bc9b146722 100644 --- a/crates/node/builder/src/builder/states.rs +++ b/crates/node/builder/src/builder/states.rs @@ -343,10 +343,7 @@ mod test { payload_builder_handle: PayloadBuilderHandle::::noop(), }; - let task_executor = { - let runtime = tokio::runtime::Runtime::new().unwrap(); - Runtime::with_existing_handle(runtime.handle().clone()).unwrap() - }; + let task_executor = Runtime::test(); let node = NodeAdapter { components, task_executor, provider: NoopProvider::default() }; diff --git a/crates/node/core/Cargo.toml b/crates/node/core/Cargo.toml index 75061b1c59..767e44608f 100644 --- a/crates/node/core/Cargo.toml +++ b/crates/node/core/Cargo.toml @@ -25,6 +25,7 @@ reth-rpc-eth-types.workspace = true reth-rpc-server-types.workspace = true reth-rpc-convert.workspace = true reth-transaction-pool.workspace = true +reth-tasks.workspace = true reth-tracing.workspace = true reth-config = { workspace = true, features = ["serde"] } reth-discv4.workspace = true diff --git a/crates/node/core/src/args/network.rs b/crates/node/core/src/args/network.rs index a99cb16b7c..b5a1a45185 100644 --- a/crates/node/core/src/args/network.rs +++ b/crates/node/core/src/args/network.rs @@ -39,6 +39,7 @@ use reth_network::{ HelloMessageWithProtocols, NetworkConfigBuilder, NetworkPrimitives, }; use reth_network_peers::{mainnet_nodes, TrustedPeer}; +use reth_tasks::Runtime; use secp256k1::SecretKey; use std::str::FromStr; use tracing::error; @@ -326,6 +327,7 @@ impl NetworkArgs { chain_spec: impl EthChainSpec, secret_key: SecretKey, default_peers_file: PathBuf, + executor: Runtime, ) -> NetworkConfigBuilder { let addr = self.resolved_addr(); let chain_bootnodes = self @@ -345,7 +347,7 @@ impl NetworkArgs { .with_enforce_enr_fork_id(self.enforce_enr_fork_id); // Configure basic network stack - NetworkConfigBuilder::::new(secret_key) + NetworkConfigBuilder::::new(secret_key, executor) .external_ip_resolver(self.nat.clone()) .sessions_config( config.sessions.clone().with_upscaled_event_buffer(peers_config.max_peers()), @@ -1097,6 +1099,7 @@ mod tests { MAINNET.clone(), secret_key, peers_file.clone(), + Runtime::test(), ); let net_cfg = builder.build_with_noop_provider(MAINNET.clone()); diff --git a/crates/node/metrics/src/server.rs b/crates/node/metrics/src/server.rs index df64d4cd7c..2c7222ce68 100644 --- a/crates/node/metrics/src/server.rs +++ b/crates/node/metrics/src/server.rs @@ -445,7 +445,7 @@ mod tests { build_profile: "test", }; - let runtime = Runtime::with_existing_handle(tokio::runtime::Handle::current()).unwrap(); + let runtime = Runtime::test(); let hooks = Hooks::builder().build(); diff --git a/crates/rpc/rpc-builder/src/eth.rs b/crates/rpc/rpc-builder/src/eth.rs index d702b0bea5..6fd55f0b36 100644 --- a/crates/rpc/rpc-builder/src/eth.rs +++ b/crates/rpc/rpc-builder/src/eth.rs @@ -24,7 +24,7 @@ where pub fn bootstrap(config: EthConfig, executor: Runtime, eth_api: EthApi) -> Self { let filter = EthFilter::new(eth_api.clone(), config.filter_config(), executor.clone()); - let pubsub = EthPubSub::with_spawner(eth_api.clone(), executor); + let pubsub = EthPubSub::new(eth_api.clone(), executor); Self { api: eth_api, filter, pubsub } } diff --git a/crates/rpc/rpc-eth-types/src/cache/mod.rs b/crates/rpc/rpc-eth-types/src/cache/mod.rs index 20c9430f8e..e0bb6bfb05 100644 --- a/crates/rpc/rpc-eth-types/src/cache/mod.rs +++ b/crates/rpc/rpc-eth-types/src/cache/mod.rs @@ -105,22 +105,6 @@ impl EthStateCache { (cache, service) } - /// Creates a new async LRU backed cache service task and spawns it to a new task via - /// [`tokio::spawn`]. - /// - /// See also [`Self::spawn_with`] - pub fn spawn(provider: Provider, config: EthStateCacheConfig) -> Self - where - Provider: BlockReader + Clone + Unpin + 'static, - { - Self::spawn_with( - provider, - config, - Runtime::with_existing_handle(tokio::runtime::Handle::current()) - .expect("failed to create Runtime"), - ) - } - /// Creates a new async LRU backed cache service task and spawns it to a new task via the given /// spawner. /// diff --git a/crates/rpc/rpc/src/eth/builder.rs b/crates/rpc/rpc/src/eth/builder.rs index 5864cbfdce..aaa9805274 100644 --- a/crates/rpc/rpc/src/eth/builder.rs +++ b/crates/rpc/rpc/src/eth/builder.rs @@ -147,8 +147,7 @@ where blocking_task_pool: None, fee_history_cache_config: FeeHistoryCacheConfig::default(), proof_permits: DEFAULT_PROOF_PERMITS, - task_spawner: Runtime::with_existing_handle(tokio::runtime::Handle::current()) - .expect("called outside tokio runtime"), + task_spawner: Runtime::test(), gas_oracle_config: Default::default(), eth_state_cache_config: Default::default(), next_env: Default::default(), @@ -479,7 +478,7 @@ where /// Builds the [`EthApiInner`] instance. /// - /// If not configured, this will spawn the cache backend: [`EthStateCache::spawn`]. + /// If not configured, this will spawn the cache backend: [`EthStateCache::spawn_with`]. /// /// # Panics /// @@ -516,8 +515,13 @@ where let provider = components.provider().clone(); - let eth_cache = eth_cache - .unwrap_or_else(|| EthStateCache::spawn(provider.clone(), eth_state_cache_config)); + let eth_cache = eth_cache.unwrap_or_else(|| { + EthStateCache::spawn_with( + provider.clone(), + eth_state_cache_config, + task_spawner.clone(), + ) + }); let gas_oracle = gas_oracle.unwrap_or_else(|| { GasPriceOracle::new(provider.clone(), gas_oracle_config, eth_cache.clone()) }); @@ -564,7 +568,7 @@ where /// Builds the [`EthApi`] instance. /// - /// If not configured, this will spawn the cache backend: [`EthStateCache::spawn`]. + /// If not configured, this will spawn the cache backend: [`EthStateCache::spawn_with`]. /// /// # Panics /// diff --git a/crates/rpc/rpc/src/eth/core.rs b/crates/rpc/rpc/src/eth/core.rs index ac7cfa1ddf..c242a0e1d7 100644 --- a/crates/rpc/rpc/src/eth/core.rs +++ b/crates/rpc/rpc/src/eth/core.rs @@ -21,8 +21,8 @@ use reth_rpc_eth_api::{ EthApiTypes, RpcNodeCore, }; use reth_rpc_eth_types::{ - builder::config::PendingBlockKind, receipt::EthReceiptConverter, tx_forward::ForwardConfig, - EthApiError, EthStateCache, FeeHistoryCache, GasCap, GasPriceOracle, PendingBlock, + builder::config::PendingBlockKind, receipt::EthReceiptConverter, EthApiError, EthStateCache, + FeeHistoryCache, GasCap, GasPriceOracle, PendingBlock, }; use reth_storage_api::{noop::NoopProvider, BlockReaderIdExt, ProviderHeader}; use reth_tasks::{ @@ -132,60 +132,6 @@ impl } } -impl EthApi -where - N: RpcNodeCore, - Rpc: RpcConvert, - (): PendingEnvBuilder, -{ - /// Creates a new, shareable instance using the default tokio task spawner. - #[expect(clippy::too_many_arguments)] - pub fn new( - components: N, - eth_cache: EthStateCache, - gas_oracle: GasPriceOracle, - gas_cap: impl Into, - max_simulate_blocks: u64, - eth_proof_window: u64, - blocking_task_pool: BlockingTaskPool, - fee_history_cache: FeeHistoryCache>, - proof_permits: usize, - rpc_converter: Rpc, - max_batch_size: usize, - max_blocking_io_requests: usize, - pending_block_kind: PendingBlockKind, - raw_tx_forwarder: ForwardConfig, - send_raw_transaction_sync_timeout: Duration, - evm_memory_limit: u64, - force_blob_sidecar_upcasting: bool, - ) -> Self { - let inner = EthApiInner::new( - components, - eth_cache, - gas_oracle, - gas_cap, - max_simulate_blocks, - eth_proof_window, - blocking_task_pool, - fee_history_cache, - Runtime::with_existing_handle(tokio::runtime::Handle::current()) - .expect("called outside tokio runtime"), - proof_permits, - rpc_converter, - (), - max_batch_size, - max_blocking_io_requests, - pending_block_kind, - raw_tx_forwarder.forwarder_client(), - send_raw_transaction_sync_timeout, - evm_memory_limit, - force_blob_sidecar_upcasting, - ); - - Self { inner: Arc::new(inner) } - } -} - impl EthApiTypes for EthApi where N: RpcNodeCore, diff --git a/crates/rpc/rpc/src/eth/pubsub.rs b/crates/rpc/rpc/src/eth/pubsub.rs index 234aace0b1..77b806407f 100644 --- a/crates/rpc/rpc/src/eth/pubsub.rs +++ b/crates/rpc/rpc/src/eth/pubsub.rs @@ -42,18 +42,7 @@ pub struct EthPubSub { impl EthPubSub { /// Creates a new, shareable instance. - /// - /// Subscription tasks are spawned via [`tokio::task::spawn`] - pub fn new(eth_api: Eth) -> Self { - Self::with_spawner( - eth_api, - Runtime::with_existing_handle(tokio::runtime::Handle::current()) - .expect("called outside tokio runtime"), - ) - } - - /// Creates a new, shareable instance. - pub fn with_spawner(eth_api: Eth, subscription_task_spawner: Runtime) -> Self { + pub fn new(eth_api: Eth, subscription_task_spawner: Runtime) -> Self { let inner = EthPubSubInner { eth_api, subscription_task_spawner }; Self { inner: Arc::new(inner) } } diff --git a/crates/tasks/src/lib.rs b/crates/tasks/src/lib.rs index 29859611a0..0b244be9e2 100644 --- a/crates/tasks/src/lib.rs +++ b/crates/tasks/src/lib.rs @@ -236,13 +236,12 @@ mod tests { #[test] fn test_critical() { - let runtime = tokio::runtime::Runtime::new().unwrap(); - let rt = Runtime::with_existing_handle(runtime.handle().clone()).unwrap(); + let rt = Runtime::test(); let handle = rt.take_task_manager_handle().unwrap(); rt.spawn_critical_task("this is a critical task", async { panic!("intentionally panic") }); - runtime.block_on(async move { + rt.handle().block_on(async move { let err_result = handle.await.unwrap(); assert!(err_result.is_err(), "Expected TaskManager to return an error due to panic"); let panicked_err = err_result.unwrap_err(); @@ -254,8 +253,7 @@ mod tests { #[test] fn test_manager_shutdown_critical() { - let runtime = tokio::runtime::Runtime::new().unwrap(); - let rt = Runtime::with_existing_handle(runtime.handle().clone()).unwrap(); + let rt = Runtime::test(); let (signal, shutdown) = signal(); @@ -266,13 +264,12 @@ mod tests { rt.graceful_shutdown(); - runtime.block_on(shutdown); + rt.handle().block_on(shutdown); } #[test] fn test_manager_shutdown() { - let runtime = tokio::runtime::Runtime::new().unwrap(); - let rt = Runtime::with_existing_handle(runtime.handle().clone()).unwrap(); + let rt = Runtime::test(); let (signal, shutdown) = signal(); @@ -283,13 +280,12 @@ mod tests { rt.graceful_shutdown(); - runtime.block_on(shutdown); + rt.handle().block_on(shutdown); } #[test] fn test_manager_graceful_shutdown() { - let runtime = tokio::runtime::Runtime::new().unwrap(); - let rt = Runtime::with_existing_handle(runtime.handle().clone()).unwrap(); + let rt = Runtime::test(); let val = Arc::new(AtomicBool::new(false)); let c = val.clone(); @@ -305,8 +301,7 @@ mod tests { #[test] fn test_manager_graceful_shutdown_many() { - let runtime = tokio::runtime::Runtime::new().unwrap(); - let rt = Runtime::with_existing_handle(runtime.handle().clone()).unwrap(); + let rt = Runtime::test(); let counter = Arc::new(AtomicUsize::new(0)); let num = 10; @@ -325,8 +320,7 @@ mod tests { #[test] fn test_manager_graceful_shutdown_timeout() { - let runtime = tokio::runtime::Runtime::new().unwrap(); - let rt = Runtime::with_existing_handle(runtime.handle().clone()).unwrap(); + let rt = Runtime::test(); let timeout = Duration::from_millis(500); let val = Arc::new(AtomicBool::new(false)); @@ -344,15 +338,13 @@ mod tests { #[test] fn can_build_runtime() { - let runtime = tokio::runtime::Runtime::new().unwrap(); - let rt = Runtime::with_existing_handle(runtime.handle().clone()).unwrap(); + let rt = Runtime::test(); let _handle = rt.handle(); } #[test] fn test_graceful_shutdown_triggered_by_executor() { - let runtime = tokio::runtime::Runtime::new().unwrap(); - let rt = Runtime::with_existing_handle(runtime.handle().clone()).unwrap(); + let rt = Runtime::test(); let task_manager_handle = rt.take_task_manager_handle().unwrap(); let task_did_shutdown_flag = Arc::new(AtomicBool::new(false)); @@ -366,11 +358,11 @@ mod tests { let send_result = rt.initiate_graceful_shutdown(); assert!(send_result.is_ok()); - let manager_final_result = runtime.block_on(task_manager_handle); + let manager_final_result = rt.handle().block_on(task_manager_handle); assert!(manager_final_result.is_ok(), "TaskManager task should not panic"); assert_eq!(manager_final_result.unwrap(), Ok(())); - let task_join_result = runtime.block_on(spawned_task_handle); + let task_join_result = rt.handle().block_on(spawned_task_handle); assert!(task_join_result.is_ok()); assert!(task_did_shutdown_flag.load(Ordering::Relaxed)); diff --git a/crates/tasks/src/runtime.rs b/crates/tasks/src/runtime.rs index a644e43b83..9cd9b11f97 100644 --- a/crates/tasks/src/runtime.rs +++ b/crates/tasks/src/runtime.rs @@ -198,16 +198,6 @@ pub struct RuntimeConfig { } impl RuntimeConfig { - /// Create a config that attaches to an existing tokio runtime handle. - #[cfg_attr(not(feature = "rayon"), allow(clippy::missing_const_for_fn))] - pub fn with_existing_handle(handle: Handle) -> Self { - Self { - tokio: TokioConfig::ExistingHandle(handle), - #[cfg(feature = "rayon")] - rayon: RayonConfig::default(), - } - } - /// Set the tokio configuration. pub fn with_tokio(mut self, tokio: TokioConfig) -> Self { self.tokio = tokio; @@ -297,15 +287,6 @@ impl std::fmt::Debug for Runtime { } } -// ── Constructors ────────────────────────────────────────────────────── - -impl Runtime { - /// Creates a [`Runtime`] that attaches to an existing tokio runtime handle. - pub fn with_existing_handle(handle: Handle) -> Result { - RuntimeBuilder::new(RuntimeConfig::with_existing_handle(handle)).build() - } -} - // ── Pool accessors ──────────────────────────────────────────────────── impl Runtime { @@ -381,12 +362,6 @@ impl Runtime { RuntimeBuilder::new(config).build().expect("failed to build test Runtime") } - /// Creates a lightweight [`Runtime`] for tests, attaching to the given tokio handle. - pub fn test_with_handle(handle: Handle) -> Self { - let config = Self::test_config().with_tokio(TokioConfig::existing_handle(handle)); - RuntimeBuilder::new(config).build().expect("failed to build test Runtime") - } - const fn test_config() -> RuntimeConfig { RuntimeConfig { tokio: TokioConfig::Owned { @@ -747,7 +722,7 @@ impl RuntimeBuilder { /// The [`TaskManager`] is automatically spawned as a background task that monitors /// critical tasks for panics. Use [`Runtime::take_task_manager_handle`] to extract /// the join handle if you need to poll for panic errors. - #[tracing::instrument(level = "debug", skip_all)] + #[tracing::instrument(name = "RuntimeBuilder::build", level = "debug", skip_all)] pub fn build(self) -> Result { debug!(?self.config, "Building runtime"); let config = self.config; @@ -904,7 +879,8 @@ mod tests { #[test] fn test_runtime_config_existing_handle() { let rt = TokioRuntime::new().unwrap(); - let config = RuntimeConfig::with_existing_handle(rt.handle().clone()); + let config = + Runtime::test_config().with_tokio(TokioConfig::existing_handle(rt.handle().clone())); assert!(matches!(config.tokio, TokioConfig::ExistingHandle(_))); } @@ -919,7 +895,8 @@ mod tests { #[test] fn test_runtime_builder() { let rt = TokioRuntime::new().unwrap(); - let config = RuntimeConfig::with_existing_handle(rt.handle().clone()); + let config = + Runtime::test_config().with_tokio(TokioConfig::existing_handle(rt.handle().clone())); let runtime = RuntimeBuilder::new(config).build().unwrap(); let _ = runtime.handle(); } diff --git a/crates/tracing/Cargo.toml b/crates/tracing/Cargo.toml index 5943ab3b7c..58c18ff7ed 100644 --- a/crates/tracing/Cargo.toml +++ b/crates/tracing/Cargo.toml @@ -23,7 +23,7 @@ tracing-journald.workspace = true tracing-logfmt.workspace = true tracing-samply.workspace = true tracing-tracy = { workspace = true, optional = true } -tracy-client = { workspace = true, optional = true, features = ["demangle"] } +tracy-client = { workspace = true, optional = true } # misc clap = { workspace = true, features = ["derive"] } diff --git a/crates/transaction-pool/src/lib.rs b/crates/transaction-pool/src/lib.rs index 16a76aca43..d98eb23b6e 100644 --- a/crates/transaction-pool/src/lib.rs +++ b/crates/transaction-pool/src/lib.rs @@ -211,8 +211,7 @@ //! Evm: ConfigureEvm> + 'static, //! { //! let blob_store = InMemoryBlobStore::default(); -//! let rt = tokio::runtime::Runtime::new().unwrap(); -//! let runtime = Runtime::with_existing_handle(rt.handle().clone()).unwrap(); +//! let runtime = Runtime::test(); //! let pool = Pool::eth_pool( //! TransactionValidationTaskExecutor::eth(client, evm_config, blob_store.clone(), runtime), //! blob_store, @@ -251,8 +250,7 @@ //! Evm: ConfigureEvm + 'static, //! { //! let blob_store = InMemoryBlobStore::default(); -//! let rt = tokio::runtime::Runtime::new().unwrap(); -//! let runtime = Runtime::with_existing_handle(rt.handle().clone()).unwrap(); +//! let runtime = Runtime::test(); //! let pool = Pool::eth_pool( //! TransactionValidationTaskExecutor::eth(client.clone(), evm_config, blob_store.clone(), runtime.clone()), //! blob_store, diff --git a/crates/transaction-pool/src/maintain.rs b/crates/transaction-pool/src/maintain.rs index 70502d1694..1382c2dba3 100644 --- a/crates/transaction-pool/src/maintain.rs +++ b/crates/transaction-pool/src/maintain.rs @@ -904,7 +904,7 @@ mod tests { txpool.add_transaction(TransactionOrigin::Local, transaction.clone()).await.unwrap(); - let rt = Runtime::with_existing_handle(tokio::runtime::Handle::current()).unwrap(); + let rt = Runtime::test(); let config = LocalTransactionBackupConfig::with_local_txs_backup(transactions_path.clone()); rt.spawn_critical_with_graceful_shutdown_signal("test task", |shutdown| { backup_local_transactions_task(shutdown, txpool.clone(), config) diff --git a/examples/bsc-p2p/Cargo.toml b/examples/bsc-p2p/Cargo.toml index a3e2ba1d6a..484a32c946 100644 --- a/examples/bsc-p2p/Cargo.toml +++ b/examples/bsc-p2p/Cargo.toml @@ -20,6 +20,7 @@ reth-payload-primitives.workspace = true reth-primitives.workspace = true reth-primitives-traits.workspace = true reth-provider = { workspace = true, features = ["test-utils"] } +reth-tasks.workspace = true reth-tracing.workspace = true # alloy diff --git a/examples/bsc-p2p/src/main.rs b/examples/bsc-p2p/src/main.rs index e8c36f0d2a..2267b112ff 100644 --- a/examples/bsc-p2p/src/main.rs +++ b/examples/bsc-p2p/src/main.rs @@ -19,6 +19,7 @@ use reth_network::{ }; use reth_network_api::events::{PeerEvent, SessionInfo}; use reth_provider::noop::NoopProvider; +use reth_tasks::Runtime; use reth_tracing::{ tracing_subscriber::filter::LevelFilter, LayerInfo, LogFormat, RethTracer, Tracer, }; @@ -53,7 +54,7 @@ async fn main() { let bsc_boot_nodes = boot_nodes(); - let net_cfg = NetworkConfig::builder(secret_key) + let net_cfg = NetworkConfig::builder(secret_key, Runtime::test()) .boot_nodes(bsc_boot_nodes.clone()) .set_head(head()) .with_pow() diff --git a/examples/bsc-p2p/tests/it/p2p.rs b/examples/bsc-p2p/tests/it/p2p.rs index 5f8ef4cafb..98faa7321a 100644 --- a/examples/bsc-p2p/tests/it/p2p.rs +++ b/examples/bsc-p2p/tests/it/p2p.rs @@ -8,6 +8,7 @@ use reth_network::{ EthNetworkPrimitives, NetworkConfig, NetworkEvent, NetworkEventListenerProvider, NetworkManager, }; use reth_provider::noop::NoopProvider; +use reth_tasks::Runtime; use secp256k1::{rand, SecretKey}; use std::{ net::{Ipv4Addr, SocketAddr}, @@ -25,7 +26,7 @@ async fn can_connect() { let secret_key = SecretKey::new(&mut rand::thread_rng()); - let net_cfg = NetworkConfig::<_, EthNetworkPrimitives>::builder(secret_key) + let net_cfg = NetworkConfig::<_, EthNetworkPrimitives>::builder(secret_key, Runtime::test()) .boot_nodes(boot_nodes()) .set_head(head()) .with_pow() diff --git a/examples/custom-dev-node/src/main.rs b/examples/custom-dev-node/src/main.rs index c55d6cc844..7a17a3a838 100644 --- a/examples/custom-dev-node/src/main.rs +++ b/examples/custom-dev-node/src/main.rs @@ -22,7 +22,7 @@ use reth_ethereum::{ #[tokio::main] async fn main() -> eyre::Result<()> { - let runtime = Runtime::with_existing_handle(tokio::runtime::Handle::current())?; + let runtime = Runtime::test(); // create node config let node_config = NodeConfig::test() diff --git a/examples/custom-engine-types/src/main.rs b/examples/custom-engine-types/src/main.rs index ffbaa19002..e572e208e4 100644 --- a/examples/custom-engine-types/src/main.rs +++ b/examples/custom-engine-types/src/main.rs @@ -391,7 +391,7 @@ where async fn main() -> eyre::Result<()> { let _guard = RethTracer::new().init()?; - let runtime = Runtime::with_existing_handle(tokio::runtime::Handle::current())?; + let runtime = Runtime::test(); // create genesis with canyon at block 2 let spec = ChainSpec::builder() diff --git a/examples/custom-evm/src/main.rs b/examples/custom-evm/src/main.rs index 07a3b009d3..65a6497730 100644 --- a/examples/custom-evm/src/main.rs +++ b/examples/custom-evm/src/main.rs @@ -121,7 +121,7 @@ pub fn prague_custom() -> &'static Precompiles { async fn main() -> eyre::Result<()> { let _guard = RethTracer::new().init()?; - let runtime = Runtime::with_existing_handle(tokio::runtime::Handle::current())?; + let runtime = Runtime::test(); // create a custom chain spec let spec = ChainSpec::builder() diff --git a/examples/custom-rlpx-subprotocol/src/main.rs b/examples/custom-rlpx-subprotocol/src/main.rs index d64022df93..93579f1e48 100644 --- a/examples/custom-rlpx-subprotocol/src/main.rs +++ b/examples/custom-rlpx-subprotocol/src/main.rs @@ -22,6 +22,7 @@ use reth_ethereum::{ NetworkConfig, NetworkManager, NetworkProtocols, }, node::{builder::NodeHandle, EthereumNode}, + tasks::Runtime, }; use subprotocol::{ connection::CustomCommand, @@ -50,7 +51,7 @@ fn main() -> eyre::Result<()> { let secret_key = rng_secret_key(); let (tx, mut from_peer1) = mpsc::unbounded_channel(); let custom_rlpx_handler_2 = CustomRlpxProtoHandler { state: ProtocolState { events: tx } }; - let net_cfg = NetworkConfig::builder(secret_key) + let net_cfg = NetworkConfig::builder(secret_key, Runtime::test()) .listener_addr(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0))) .disable_discovery() .add_rlpx_sub_protocol(custom_rlpx_handler_2.into_rlpx_sub_protocol()) diff --git a/examples/network-proxy/src/main.rs b/examples/network-proxy/src/main.rs index c7ba2236d3..22ace3ecbb 100644 --- a/examples/network-proxy/src/main.rs +++ b/examples/network-proxy/src/main.rs @@ -24,6 +24,7 @@ use reth_ethereum::{ BlockDownloaderProvider, FetchClient, NetworkConfig, NetworkEventListenerProvider, NetworkHandle, NetworkInfo, NetworkManager, Peers, }, + tasks::Runtime, }; #[tokio::main] @@ -34,7 +35,8 @@ async fn main() -> eyre::Result<()> { let local_key = rng_secret_key(); // Configure the network - let config = NetworkConfig::builder(local_key).build_with_noop_provider(DEV.clone()); + let config = + NetworkConfig::builder(local_key, Runtime::test()).build_with_noop_provider(DEV.clone()); let (requests_tx, mut requests_rx) = tokio::sync::mpsc::channel(1000); let (transactions_tx, mut transactions_rx) = tokio::sync::mpsc::unbounded_channel(); @@ -107,7 +109,7 @@ async fn main() -> eyre::Result<()> { /// first peer. async fn run_peer(handle: NetworkHandle) -> eyre::Result<()> { // create another peer - let config = NetworkConfig::builder(rng_secret_key()) + let config = NetworkConfig::builder(rng_secret_key(), Runtime::test()) // use random ports .with_unused_ports() .build_with_noop_provider(DEV.clone()); diff --git a/examples/network-txpool/src/main.rs b/examples/network-txpool/src/main.rs index 1a914a2fc2..a4e21623a6 100644 --- a/examples/network-txpool/src/main.rs +++ b/examples/network-txpool/src/main.rs @@ -16,6 +16,7 @@ use reth_ethereum::{ EthPooledTransaction, Pool, TransactionListenerKind, TransactionPool, }, provider::test_utils::NoopProvider, + tasks::Runtime, }; #[tokio::main] @@ -41,7 +42,7 @@ async fn main() -> eyre::Result<()> { let local_key = rng_secret_key(); // Configure the network - let config = NetworkConfig::<_, EthNetworkPrimitives>::builder(local_key) + let config = NetworkConfig::<_, EthNetworkPrimitives>::builder(local_key, Runtime::test()) .mainnet_boot_nodes() .build(client); let transactions_manager_config = config.transactions_manager_config.clone(); diff --git a/examples/network/src/main.rs b/examples/network/src/main.rs index c04cc29f51..a47605df87 100644 --- a/examples/network/src/main.rs +++ b/examples/network/src/main.rs @@ -14,6 +14,7 @@ use reth_ethereum::{ config::rng_secret_key, NetworkConfig, NetworkEventListenerProvider, NetworkManager, }, provider::test_utils::NoopProvider, + tasks::Runtime, }; #[tokio::main] @@ -25,7 +26,8 @@ async fn main() -> eyre::Result<()> { let local_key = rng_secret_key(); // Configure the network - let config = NetworkConfig::builder(local_key).mainnet_boot_nodes().build(client); + let config = + NetworkConfig::builder(local_key, Runtime::test()).mainnet_boot_nodes().build(client); // create the network instance let network = NetworkManager::eth(config).await?; diff --git a/examples/polygon-p2p/src/main.rs b/examples/polygon-p2p/src/main.rs index d4301ec012..e345340413 100644 --- a/examples/polygon-p2p/src/main.rs +++ b/examples/polygon-p2p/src/main.rs @@ -14,9 +14,12 @@ use chain_cfg::{boot_nodes, head, polygon_chain_spec}; use reth_discv4::Discv4ConfigBuilder; -use reth_ethereum::network::{ - api::events::SessionInfo, config::NetworkMode, NetworkConfig, NetworkEvent, - NetworkEventListenerProvider, NetworkManager, +use reth_ethereum::{ + network::{ + api::events::SessionInfo, config::NetworkMode, NetworkConfig, NetworkEvent, + NetworkEventListenerProvider, NetworkManager, + }, + tasks::Runtime, }; use reth_tracing::{ tracing::info, tracing_subscriber::filter::LevelFilter, LayerInfo, LogFormat, RethTracer, @@ -49,7 +52,7 @@ async fn main() { let local_addr = SocketAddr::new(Ipv4Addr::UNSPECIFIED.into(), 30303); // The network configuration - let net_cfg = NetworkConfig::builder(secret_key) + let net_cfg = NetworkConfig::builder(secret_key, Runtime::test()) .set_head(head()) .network_mode(NetworkMode::Work) .listener_addr(local_addr) diff --git a/examples/precompile-cache/src/main.rs b/examples/precompile-cache/src/main.rs index ee387c583a..2590eb6233 100644 --- a/examples/precompile-cache/src/main.rs +++ b/examples/precompile-cache/src/main.rs @@ -187,7 +187,7 @@ where async fn main() -> eyre::Result<()> { let _guard = RethTracer::new().init()?; - let runtime = Runtime::with_existing_handle(tokio::runtime::Handle::current())?; + let runtime = Runtime::test(); // create a custom chain spec let spec = ChainSpec::builder() diff --git a/examples/rpc-db/src/main.rs b/examples/rpc-db/src/main.rs index 844dc51491..1ff8e0f3e1 100644 --- a/examples/rpc-db/src/main.rs +++ b/examples/rpc-db/src/main.rs @@ -49,7 +49,7 @@ async fn main() -> eyre::Result<()> { DatabaseArguments::new(ClientVersion::default()), )?; let spec = Arc::new(ChainSpecBuilder::mainnet().build()); - let runtime = Runtime::with_existing_handle(tokio::runtime::Handle::current())?; + let runtime = Runtime::test(); let factory = ProviderFactory::>::new( db.clone(), spec.clone(),